What's hot ? (and I mean really ...) - scroll down for more
1).  Code Templating - advanced usage of delegates & generics: my slides & demos are available for download! CodeProject article is also available.

2).  My series "TDD in the eyes of a simpleminded" is in progress(including code!): preface, part1, part2, Q&A 1, Manual Stub .vs. Mock Stub

3).  TDD Workshop: SeeCompass v0.1 and v0.2 are out.
# Sunday, June 17, 2007

In one of my tasks, I had to create a simple mechanism that will able my Team to enqueue tasks, while multiple threads will work on them on the background. I've used the Wait\PulseAll pattern in order to signal the workers when a new task is available. Here it goes:

public class TasksQueue : IDisposable
{
   private readonly object _locker = new object();
   private Thread[] _workers;
   private Queue<string> _tasksQueue = new Queue<string>();

Nothing fancy so far. Just notice that we're holding a private lock object and an array of Thread that will represent our workers.
The constructor simple initialize the threads and activate them:

public TasksQueue(int workerCount)
{
   if (workerCount <= 0) throw new ArgumentException("The number of working threads can not be equal or less than 0", "workerCount");
   
   _workers = new Thread[workerCount];
   for (int i = 0; i < workerCount; i++) { // Create and start a separate thread for each worker
      Thread workerThread = new Thread(ConsumeTask);
      workerThread.Name = "WorkerThread #" + i;
      _workers[i] = workerThread;
      workerThread.Start();
   }
}

I've bold the "ConsumeTask" method that each Thread activates. Let's see what we have there:

private void ConsumeTask()
{
   while (true)
   {
      string task;
      lock (_locker)
      {
         // if no tasks available, wait up till we'll get something.
         while (_tasksQueue.Count == 0) {
            // Monitor.Wait is equal to:
            // (1) Monitor.Exit(locker); (give others a chance to lock _locker) 
            // (2) wait for pulse 
            // (3) on pulse - Monitor.Enter(locker);
            Monitor.Wait(_locker); 
         }

         // The first working thread that will be pulsed will reacquire the lock on "locker", thus
         // it's impossible that 2 working threads will try to dequeue the same task.
         task = _tasksQueue.Dequeue();
      }

      if (task == null) return; // This signals our exit

      try
      {
         Console.WriteLine(DateTime.Now + " Start Processing task: " + task + " on thread: " + Thread.CurrentThread.Name);
         // Simulate a time-consuming task
         Console.WriteLine(DateTime.Now + "Done Processing task: " + task + " on thread: " + Thread.CurrentThread.Name);
      }
      catch (Exception err)
      {
         // log err & eat the exception, we still want to resume consuming other tasks.
         //Debug.Fail("Exception occurred while trying to consume the job (thread#: " + Thread.CurrentThread.Name + "): " + err.Message);
      }
   }
}

We're looping(infinite) and trying to see if our queue is empty - if so, we're waiting for a new task to arrive. Till then - the thread will be blocked(inactive). When we have a new task (or the queue is not empty), we dequeue a task and "process" it. Let's see the Enqueue method:

public void EnqueueTask(string task)
{
   lock (_locker)
   {
      if (task != null)
         Console.WriteLine(DateTime.Now + " Enqueue task: " + task);

      _tasksQueue.Enqueue(task);
      Monitor.Pulse(_locker); // wake 1 worker (thanks Alan) to handle the new task
   
   }
}

This leaves us with disposing the all thing (killing our threads gracefully):

public void Dispose()
{
   EndCurrentWorkingThreads();
}

private void EndCurrentWorkingThreads()
{
   if (_workers == null || (_workers != null && _workers.Length == 0))
      return;

   // Signal the threads to shutdown gracefully
   for (int i = 0; i < _workers.Length; i++)
      EnqueueTask(null); //will shutdown the worker that receive a null task.

   // Wait up...
   foreach (Thread worker in _workers)
      worker.Join();
}

That's it, but how can we use it you might ask - here it goes:

using (TasksQueue q = new TasksQueue(2))
{
   for (int i = 0; i < 10; i++)
      q.EnqueueTask("Task " + i);

   Console.WriteLine("Waiting for completion...");
}

Console.WriteLine("All tasks done!");

Wednesday, June 20, 2007 1:52:28 PM (Jerusalem Standard Time, UTC+02:00)
Why are you using PulseAll rather than Pulse? In EnqueueTask you've added only one item to the queue, so at most one worker needs to be woken up to handle it.

Waking them all up just means you do a whole bunch of useless context switches, which in the worst case can really hurt your performance.

Sometimes you need PulseAll for correctness, but Pulse should be preferred otherwise.
Alan
Wednesday, June 20, 2007 5:13:25 PM (Jerusalem Standard Time, UTC+02:00)
Hey Alan,
you're right, using Pulse over PulseAll will be a smarter choice in this scenario - I've just pointed out the skeleton of my solution (I'm not queuing strings in my real code) in which I enqueue multiple(IList of T) items. Additional micro-optimization will be to check the size of the list and accordingly: if it's null or 0 - handle it(exception\do nothing), 1 - call Pulse, X>1 - call PulseAll (or call Pulse X times, if X is really small and you have a lot of workers).

Thanks for your comment, it was a really important remark regarding this code.
Monday, July 02, 2007 1:52:44 PM (Jerusalem Standard Time, UTC+02:00)
Some observations:
1. The TaskQueue encapsulates the task (ConsumeTask method) within it which makes it hard to reuse as a general queue.
2. Altough there is a lower limit which block consumers (ie when the queue is empty), there is no upper limit. This is needed if your producer is producing tasks in rate higher than what consumers are able to consume. In a large application that has to be scalebale this can be a problem.
3. The Queue has intimate knowledge on the number of consumers. It needs to know that because when it has to stop all threads it has to enqueue a "dummy" task for each thread.
4. The tasks has intimate knowledge of the "dummy" task. (ie a null value represents a "stop" event).

I've once implemented a class called WorkQueue which provides the same functionality but leaves the threading code out. This allows you to have more control on the number of consumers\producers and also change the # of cons\prod at runtime without the queue caring about it.
(Drop me a note if you'd like to review it. And send my regards to Pasha :).
Monday, July 02, 2007 10:47:24 PM (Jerusalem Standard Time, UTC+02:00)
Hey Eyal,
I've made a mistake by not mentionning that the code shown here is NOT the actual code implemented. This is just a simple presentation of it (so it will be easier to understand). In my original code, I have implemented all the observations you've mentioned + a few extra bonuses for better control on the queue and the producers(I have a few in the real code)\consumers.

Oh, one last thing - the upper limit is important, but in my scenario we have a lot of "slaves" and one "master" that is responsible for that upper limit. This way I don't really have to take care of it - the slave sends a report to the master with "how busy am I, what have I done analyzing from the last time we've talked etc" and it recieves a bunch of new work items according to the limit set by the master. This way, the master can arrange the work items differently when a new slave is entered to the system.

Thanks for your input (and Pasha we'll get your regards ;))!
Comments are closed.