What's hot ? (and I mean really ...) - scroll down for more
1).  Code Templating - advanced usage of delegates & generics: my slides & demos are available for download! CodeProject article is also available.

2).  My series "TDD in the eyes of a simpleminded" is in progress(including code!): preface, part1, part2, Q&A 1, Manual Stub .vs. Mock Stub

3).  TDD Workshop: SeeCompass v0.1 and v0.2 are out.
# Tuesday, December 30, 2008

It seems that someone has upload a 10 minutes video for some time now (thanks TristanIce!).
This is far from being a well prepared lecture (alt.net is all about "everyone can get up and talk"), I just tried to show the charm behind CCR after people started to share their pain developing multi-threaded applications. I'm talking crazy-fast and the material is not well organized so I had to think while talking/writing. The results are hard to watch. Multi-threading is a bitch eh?

All in all, the vibe was positive and people looked intrigued by it. Anyway, here it is:

Oren explains how to use CCR framework (MSN video, the movie is playing on the right)

 

Makes me want to prepare a lecture on the subject some day...
Posted by Oren Ellenbogen 
30/12/2008 09:29, Israel time UTC-07:00,     Comments [0]  | 

Requirements:

  1. Be able to read all the lines in a given file.
  2. Be able to do so even if the file is HUGE ( == don't load it all at once).
  3. Control the number of items I want to receive and whether or not the enumerator ignore empty lines. Always nice to have.
  4. Thread-safe should be supported easily. Think about 50 threads, each reading the next line and processing it.
  5. Nice performance is a plus.


Playing with the API with my teammate Ron gave the following (code written in notepad, stupidity won't compile):

"Common foreach" usage:

foreach (string line in FileStreamer.GetLines(@"c:\temp\myfile.txt", true, 1000)) { /* .. code */ } // read 1000 items from the file while ignoring empty lines.


Reading from multiple threads usage:

using (FileStreamer streamer = new FileStreamer(@"c:\temp\myfile.txt", true, -1)) // -1 means no limit, read all non-empty lines
{
    Thread[] threads = new Thread[10];
    for(int i=0; i<threads.Length; i++)
    {
        threads[i] = new Thread((ThreadStart)delegate {
            string line;
            if (!streamer.TryGetNextLine(out line)) // thread safe!
                return; // end of file, we can exit

            // do work ...
        });

        threads[i].Start();
    }

    // join the threads + whatever ...
}

 

After reading a few ideas in stackoverflow, I thought to share my solution:    

// written by bogen (30/12/2008)
 
#region using
 
using System;
using System.Collections.Generic;
using System.IO;
 
#endregion
 
namespace Semingo.Common.Utils
{
    /// <summary>
    /// Return a stream of lines for the specified file.
    /// This class is thread safe by design!
    /// Use the static method FileStreamer.GetLines for not thread safe usage (via foreach)
    /// </summary>
    public class FileStreamer : IDisposable
    {
        #region fields
 
        private readonly object _locker = new object();
        private readonly string _path;
        private readonly bool _ignoreEmptyLines;
        private readonly int _limit;
        private readonly IEnumerator<string> _enumerator;
        private int _linesGiven;
        private bool _disposed;
 
        #endregion
 
        #region ctors
 
        /// <summary>
        /// Create a file streamer instance
        /// </summary>
        /// <param name="path">File path</param>
        public FileStreamer(string path) : this(path, false, -1)
        {
        }
 
        /// <summary>
        /// Create a file streamer instance
        /// </summary>
        /// <param name="path">File path</param>
        /// <param name="ignoreEmptyLines">Should the streamer avoid empty lines</param>
        /// <param name="limit">Number of maximum lines the streamer should return. Send -1 for no limit</param>
        public FileStreamer(string path, bool ignoreEmptyLines, int limit)
        {
            if (!File.Exists(path))
                throw new ArgumentException("Cannot find the file: " + path);
            if (limit != -1 && limit <=0 )
                throw new ArgumentException("Limit must be bigger than 0 (or -1 for no limit) but was: " + limit + ". File given was: " + path);
 
            _path = path;
            _ignoreEmptyLines = ignoreEmptyLines;
            _limit = limit;
            
            _enumerator = CreateStream().GetEnumerator();
        }
 
        #endregion
 
        #region public API
 
        public bool TryGetNextLine(out string nextItem)
        {
            lock (_locker)
            {
                return TryGetNextLineAssumingInsideLock(out nextItem);
            }
        }
 
        public bool TryGetNextLines(out ICollection<string> nextItems, int howMany)
        {
            if (howMany <= 0)
                throw new ArgumentException("'howMany' parameter must be > 0 but was " + howMany, "howMany");
 
            nextItems = new List<string>(howMany);
            lock (_locker)
            {
                string nextItem;
                for(int i=0; i<howMany; i++)
                {
                    if (!TryGetNextLineAssumingInsideLock(out nextItem))
                        break; // no more lines (EOF)
                    
                    nextItems.Add(nextItem);
                }
            }
 
            return nextItems.Count > 0;
        }
       
        public static IEnumerable<string> GetLines(string path)
        {
            return GetLines(path, false, -1);
        }
 
        /// <summary>
        /// 
        /// </summary>
        /// <param name="path"></param>
        /// <param name="ignoreEmptyLines"></param>
        /// <param name="limit">send -1 for no limit</param>
        /// <returns></returns>
        public static IEnumerable<string> GetLines(string path, bool ignoreEmptyLines, int limit)
        {
            using (FileStreamer streamer = new FileStreamer(path, ignoreEmptyLines, limit))
            {
                string nextItem;
                while (streamer.TryGetNextLine(out nextItem))
                    yield return nextItem;
 
                yield break; // EOF
            }
        }
 
        ///<summary>
        ///Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
        ///</summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
 
        #endregion
 
        #region private API
 
        /// <summary>
        /// Get the next line in the file.
        /// dev: assume that the lock is from the outside, by the caller (this is why it's a private method)
        /// </summary>
        private bool TryGetNextLineAssumingInsideLock(out string nextItem)
        {
            nextItem = null;
            if (_linesGiven == _limit)
                return false; // we reached the limit, no more please.
 
            if (!_enumerator.MoveNext())
                return false; // end of stream (EOF)
 
            nextItem = _enumerator.Current;
            _linesGiven++;
            return true;
        }
 
        private IEnumerable<string> CreateStream()
        {
            using (FileStream fs = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.Read, 1024, FileOptions.SequentialScan))
            using (StreamReader reader = new StreamReader(fs))
            {
                string line;
                while ((line = reader.ReadLine()) != null)
                {
                    if (_ignoreEmptyLines && line == string.Empty)
                        continue; // skip empty lines if needed
 
                    yield return line;
                }
 
                yield break;
            }
        }
 
        protected void Dispose(bool disposing)
        {
            if (_disposed)
                return;
 
            if (disposing)
            {
                _enumerator.Dispose();
            }
 
            _disposed = true;
        }
 
        #endregion
    }
}
Posted by Oren Ellenbogen 
30/12/2008 09:17, Israel time UTC-07:00,     Comments [0]  | 
# Friday, November 07, 2008

"Why do you love working as X so much? so much that you're willing to spend that many hours of your life at?"

Pause for a second. Try to close your eyes and think what will be your answer for this question?


For me, the answer is obvious: It's the people and challenges that make my brain tick and my motivation SKY high. It's the feeling that I can really make things better by investing everything I got into it that makes me proud of my work. I get huge satisfaction installing TFS 2008, trying to make our Integration Tests work X6 faster, practicing some Agile principles I've read about or take any other "dirty yet important work" no one would like to touch. I'm not scared of  hard work and if I can feel, down there in my stomach, that it would make my teammates more productive - I'll do anything I can to make it happen. Oh, and I'm trying to build one of the most complex search engine the world has the offer with a bunch of b-r-i-l-l-i-a-n-t guys! Can you blame me for working so hard, enjoying every minute of it?

Sure, getting a few bucks more would be great, but that will not make me proud of what I'm doing. One of the main things I've learned in my 8 years of developing software, is that highly motivated teams will always make the best products. Leave aside for a moment the productivity boost these teams enjoy and imagine their daily work, their lunches together, their working environment, their joy of talking with one another about day to day stuff. Imagine how they dream about their goals together, discussing ways to making it better and more enjoyable. It's the buzz these companies have that drove the best guys to them, so "effortlessly". The commitment to one another will make sure you'll build quality systems, that you'll try your best to deliver on time, to make it better, smarter, BIGGER, every single day. It will allow you to grow like you could never anticipated. Trying to grow this culture in your team is one of the hardest things in the world, way harder than any logical puzzle thrown at you. Believe me.

It's just so damn hard to get it right.


"
There are many men who feel a kind of twister pride in cynicism" (Theodore Roosevelt, The Man In The Arena speech).
Over cynicism means death for any joint effort. No matter how strong your team is, negativity and cynicism will break your team spirit. It always does.
Stop being so negative, so cynical about your actions and your dreams. You can do great things by answer the question above and remember that it's all about the people around you. It's all about you! you can actually make everyone around you better by taking action. Stop listening to people who thinks they know best and mocking you with "you're only a tiny nail in a giant machine". Don't be afraid of constantly trying to make a difference, even if you'll lose here and there. Read books, talk about them and your ideas, share and try, try, try, and try again!

This attitude will probably make you a winner, someone that others will enjoy working with, being with, taking inspiration from.
I know that these guys are the one I love working with or going to a bar close by, drinking some beer and talking about how to change the world.

Best people simply do for each other.

Posted by Oren Ellenbogen 
07/11/2008 05:40, Israel time UTC-07:00,     Comments [1]  | 
# Friday, October 31, 2008

We're doing a lot of thinking these days about which features will give us the best ROI, trying to prioritize existing features and asking ourselves "did we miss something? is there a new feature out there we left behind?". It's not easy to think about great one-of-a-kind ideas. It is easy though to make it almost impossible.

Why? it's all because the "parallel" right hemisphere of our brain, imagine the following brain storming conversation:

me(Left brain): Alright I've got one! The user will enter the screen, do X and Y (we'll do some Z behind the scene) to receive ...
   me(Right brain kicks in): but, doing Z will take me two weeks to develop..
   me(R): gosh, we'll need to build a dictionary and hold it in memory if we want it to scale..
   me(R): reminder, use ReaderWriteLockSlim this time. It's much faster than ReaderWriterLock!
   me(R): I guess that this feature is not as important as feature F1, maybe I'm spending my time thinking about this feature??!
   me(R): I'm so hungry!
   me(R): Oh, we can use [some service name] to do Z. Cool, so now this feature is feasible.
   me(R): crap, [some service name] cost money, I think.

[To the surrounding, it looks like I'm saying one fluent sentence of course. During that time they have their right brain working on "why not" / "how" / "when"]
 
   joe(R): gosh, is he for real? this is the lamest idea I've heard!
   jack(R): hmm, maybe he have a point there. This feature reminds me something I've always wanted to do... what was it now??...
   jack(R): naa.. this guy is crazy. for sure.
   joe(R): oh wait! we can use something I wrote to implement this feature! might be cool to use this code finally. It is laying there for ages.
   sarah(R): wonderful idea! I wonder if I'll be assigned to work on it?
   jack(R): I'm listening to his bubbling for 20 minutes now. self reminder: talk about a bonus with the CEO.
   sarah(R): I need coffee! God, if you'll end this meeting now I promise the donate 10$ for charity! coffee... please...

me(L): .. a brilliant search result !!


Any wonder that most brain storming meetings are futile?


Brain storming is a process that should be mastered and I suggest that you'll jump to the nearest browser to find books at the topic, it's a skill worth investing time at.
Before you do so, here are some rules I use to silent my right brain while doing Left Brain Storming:

  1. Never ever prioritize your ideas during brain storming. I can't stress enough how important is this rule. Don't worry about it now, you'll have time later. 
  2. Listen to others.
  3. Be patient = don't judge quality of ideas.
  4. Write everything down. I really mean everything! There are no "stupid ideas" now.
  5. You are not going to execute these ideas. At least that is what you should tell your right brain during that time.
  6. Understand the meaning behind the feature, imagine how it will work, not how it will be executed!
  7. Don't invest more than 2 hours in a single brain storming meeting. If you feel you've missed some ideas, rest a few hours (or even better - a few days) and then give it another shoot. "Burned out quickly, left brain does. Burned out leads to impatience. Impatience kicking the right brain in action. Right brain means trouble for your brain storming meeting" -- so does Master Yoda say (well, sort of)
  8. 80/20: after you're done throwing out ideas (or the 2h gong), go over the features you've raised and mark features you think are interesting and feasible with 80 and features that are not with 20. This should take no longer than 2 minutes, so please use only 80 and 20 as numbers.
  9. Set a separate meeting to prioritize features with the existing backlog you've got. Important: don't do it at the same day, you'll probably want to sleep things over.


Happy hunting!

Posted by Oren Ellenbogen 
31/10/2008 03:14, Israel time UTC-07:00,     Comments [1]  | 
# Sunday, October 19, 2008

[ Part 1, Part 2, this is Part 3 ]

In part 1 I've talked about the general notion behind SpawnEnumerator. In part 2 I've implemented it and talked a bit about CCR and how it works under the hood.
In the third and last part I'll try to show how we can actually execute billions of tasks very easily.

Iterating over billions of tasks:

In my previous post I've tried to show one of the greatest yet unused features in C# 2.0 - the yield statement. I've closed the post with:
"Prefer using the yield statement as long as calculating values doesn't require holding an expensive resource like a DB connection, or a FileHandler for long period of time."

Now, let's assume that we have 100,000,000 Tasks to pull from a repository (DB, files, doesn't really matter) and execute them all via SpawnEnumerator.
Should we define a list of Task, fill it with 100,000,000 items and then iterate over it? of course not! we can't even if we want to.
Assuming that reading 1000 (for example) Tasks is much faster than executing them (it usually is), and it's safe to hold them in memory, we can "bulk read & yield" the entire thing:

public delegate T Func<T>();

public static class Yield
{
    public static IEnumerable<T> Bulked<T>(Func<IEnumerable<T>> bulkYielder)
    {
        while (true)
        {
            IEnumerable<T> yielder = bulkYielder();
            if (yielder == null)
                throw new ArgumentException("bulkYielder cannot return a null enumerable", "bulkYielder");

            int itemsGiven = 0;
            foreach (T t in yielder)
            {
                itemsGiven++;
                yield return t;
            }

            if (itemsGiven == 0)
                break;
        }
    }
}

This can be used to return a stream of "tasks bulk" :

Yield.Bulked<Task>(delegate { return tasksRepository.Dequeue(1000); }) // return the next 1000 from the 100,000,000 items queue until no more tasks exists in the repository.

Important note:
In this scenario, we should return 1000 items from Dequeue method without using yield to avoid holding expensive resources for long period of time (as discussed).
Once we have those 1000 items, we can yield each one of them to the caller (very cheap, no resources are used).

Finally, we can execute billions of tasks in parallel via SpawnEnumerator:

IEnumerator<Task> tasksEnumerator = Yield.Bulked<Task>(delegate { return tasksRepository.Dequeue(1000); });
Action<Task> taskHandler = delegate(Task t) { /* execute single task */ };

// execute the tasks in parallel, using 50 threads and holding ~1000 items in memory.
_tasksExecutor = SpawnEnumerator<Task>.Start(50, "mypool", 1000, tasksEnumerator, taskHandler);

// note:don't forget to dispose _tasksExecutor when killing/stopping app!


Using Yield.Bulked guarantees that we won't hold expensive resources for too long while allowing us to generate a "stream" of Tasks to run in parallel.
The code is easy to read and follow (I hope) and we gain a simple method for executing billions of tasks very effectively (CPU & memory wise).

Posted by Oren Ellenbogen 
19/10/2008 01:39, Israel time UTC-07:00,     Comments [0]  | 

How many of you played with C# yield statement ? I guess that most of you did.
Anyway, like most of the MSDN examples out there, when used incorrectly, it could introduce very bad behavior to your code. Consider the following:

public IEnumerable<User> GetUsers(int count)
{
    using (MysqlConnection connection = new MysqlConnection("..."))
    {
        // MysqlDataReader reader = create an MysqlCommand and execute it
        while (reader.Read())
            yield return new User(/*... fill parameters from the reader ... */);
    }
}

Looks pretty harmless right? Not quite. The yield statement is actually transformed to a "state machine" which means that every time we yield a result back to the client (the caller of GetUsers in our example), we wait for the client to call to the next item (via IEnumerator<T>.MoveNext()). The code above will hold the connection open until the client done iterating all of the User items. This will lead into major scalability issues very quickly! You should always keep your DB connections open for short period to prevent connection exhaustion (threads waiting for available DB connection in the pool for long period, until timeout). Because yield returns the control to the caller, it might be that the caller will "take his time" thus leading to connection exhaustion.


On the other hand, used wisely and yield yields (lame joke, sorry) HUGE benefits:
[note: code written in notepad, stupidity won't compile]

  • Avoid useless memory allocations

#1:
How many times you end up creating something like this:

    public List<T> Filter(List<T> input, Predicate<T> predicate)
    {
       List<T> output = new List<T>(input.Count /2);
       foreach (T item in input)
          if (predicate(item))
             output.Add(item);

       return output;
   }

We allocate much more memory than we need only to hold the output during the calculation. A better approach will be:

    public IEnumerator<T> Filter(IEnumerator<T> input, Predicate<T> predicate)
    {
       foreach (T item in input)
          if (predicate(item))
             yield return item;
   }

This way we allocate only one T at a time (will be saved in the generated state machine). In addition, the client could choose to send each item via yield as well, thus saving the need to create the "input" before calling our Filter method.


#2:

Another oh-(gosh-why)-so-common example is the following:

public void Save(T item)
{
    Save(new T[] { item });
}

public void Save(ICollection<T> items)
{
    // do your magic here to save items
}

Assuming you call Save with a single item quite a lot, you're allocating A LOT of memory to create one-item arrays. A better approach will be:

public void Save(T item)
{
    Save(Yield.One(item));
}

public void Save(IEnumerator<T> items)
{
    // do your magic here to save items
}

public static class Yield
{
    public static IEnumerator<T> One<T>(T item)
    {
        yield item;
    }
}

~Zero memory allocation here.

  • Avoid "impossible" memory allocation

Let's say you want to read a 20G file with emails where every line holds a single email. Trying to declare a List<string> and filling it up will make your memory blow up obviously. You simply can't hold that much in memory. Instead, you can use Stream.ReadLine and yield back each row to your client, until all of the emails are taken care of. Yes, you can try to read the file in chunks (keeping a pointer), but this is exactly what yield does under the hood. Reminder: DRY principle is gold (or Don't Repeat .Net Framework, in our case).

  • Execute synchronous code asynchronously (nicely achieved via CCR)

A bit advanced, but you can read all about it here. The great benefit is you can transform (almost) any "yield based" code to run async, if needed/wanted.


Recap:
Prefer using the yield statement as long as calculating values doesn't require holding an expensive resource like a DB connection, or a FileHandler for long period of time.

Posted by Oren Ellenbogen 
19/10/2008 12:35, Israel time UTC-07:00,     Comments [3]  | 
# Tuesday, October 14, 2008

It seems that there are MANY ways to perform http web request poorly. This is a huge problem in today's world where web-services are more common than bankrupt banks. Here is a quick pattern of how to do it right:

public string Fetch(Uri requestUri)
{

HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create(requestUri);

webRequest.Timeout = requestConnectTimeoutInMs; // take timeout from config
webRequest.ReadWriteTimeout = requestReadWriteTimeoutInMs; // take timeout from config

using (WebResponse webResponse = webRequest.GetResponse())
using (StreamReader streamReader = new StreamReader(new TimeoutStream(webResponse.GetResponseStream(), fetchTimeoutInMs)) // take timeout from config
     return streamReader.ReadToEnd();

}

Details:

  1. Setting Timeout property: to make sure we don't wait the default 100 seconds for "ACK" from the server. WAY too much.
  2. Setting ReadWriteTimeout: This is crucial to understand. StreamReader under the hood read data in chunks, this timeout determine how much time you should wait for reading a single chunk. 100 seconds, the default value, is again WAY too much.
  3. Using TimeoutStream (you need to implement your own or let me know if you're interested and I'll send it to you): Alright, let's say you're willing to wait for 500ms for ACK (Timeout), up to 500ms for reading every chunk (ReadWriteTimeout) but not more than 5 seconds for the entire read to complete. There is no way to achieve it without TimeoutStream. It will start a timer internally and override Seek/Read/Write (etc) method by checking the timer before calling the internal stream method. TimeoutStream is a very simple wrapper around Stream. For example:
  4.    public override int Read(byte[] buffer, int offset, int count)
          {
              CheckTimeout(); // throw TimeoutException if timeout was reached
              return _stream.Read(buffer, offset, count);
          }

Multiple HttpWebRequest limitation:
By default, you can't perform more than 2-3 async HttpWebRequest (depends on the OS). In order to override it (the easiest way, IMHO) don't forget to add this under <configuration> section in the application's config file:

<system.net>
  <connectionManagement>
     <add address="*" maxconnection="65000" />
  </connectionManagement>
</system.net>

Why should you follow these guidelines:

  1. Never trust 3rd party components: avoid excuses like "my site is not responsive because 1000 threads are waiting for web-service-X to respond". By setting those parameters you're safe to make your own choices of how much time to wait. Log and monitor these things to adjust your application and alert your suppliers.
  2. Be able to determine your own SLA for the world: again, if internally you need to call a web-service, make sure you're able to control the time you're willing to spend. You've got clients to serve and they want you to meet the SLA as you promised!

Important note about recycling HttpWebRequest.GetResposne()
Simply put, it's not working by design. That means that if you fail to get a response on time (due to 1,2 or 3), don't call the webRequest.GetResponse() again as it is cached internally (you'll get the same HttpWebResposne). What you should do is to re-create the HttpWebRequest and try again. I don't agree with the selected design by Microsoft for this method, but at least it's good to be aware of it.

   from MSDN:

   " Multiple calls to GetResponse return the same response object; the request is not reissued. "

Final note:
You should obviously consider writing a HttpWebRequestHelper class (or extension method) and use it instead of copy&paste this code all over your codebase.

Posted by Oren Ellenbogen 
14/10/2008 05:32, Israel time UTC-07:00,     Comments [1]  | 

[ Part 1, this is Part 2, Part 3 ]

In Part 1 I've talked about the general notions behind SpawnEnumerator and played with the API. If you're not familiar with Microsoft's CCR, this post might require a 2nd & 3rd read to understand completely. CCR changed the way you should think or address async code. It's a game worth playing and studying the rules is only for your advantage. Alright, enough chit chat, let's make it happen (complete code attached at the end).

Class definition:

  public sealed class SpawnEnumerator<T> : IDisposable
       private SpawnEnumerator(int threadsCount, string threadsPoolName, IEnumerator<T> filler) { /* initialize fields by parameters, nothing more */ }

Now, let's have a look at some of the fields:

  private const double DefaultLowThresholdPrcentage = 0.1; // When "items to process" queue reach 10%, we want to re-fill. Should be exposed of course.

  private event Action<DateTime> _enumeratorDepleted = delegate { }; // trigger when the enumerator is empty
  private event Action<DateTime> _allTasksAreCompleted = delegate { }; // trigger when all tasks are completed

  private readonly Dispatcher _dispatcher; // our "threadpool"
  private readonly DispatcherQueue _dispatcherQueue; // hold actual ITask, waiting for the dispatcher (aka worker threads) to handle them. more about it soon.

  private readonly Port<T> _itemsToProcessPort;  // hold wannabe tasks, currently a queue of items of T we want to process.
  private readonly Port<EmptyValue> _itemCompleteNotificationPort; // soon...
  private readonly Port<EmptyValue> _initializePort;  // soon...

  private readonly IEnumerator<T> _enumerator; // the enumerator we'll use to fill the _itemsToProcessPort

Deeper look on what we have so far:

  • _itemsToProcessPort: will act as the queue of items we want to process. In CCR's world, Port<T> is actually a "smart queue" (more on it later).
  • _itemCompleteNotificationPort: will be used to notify on every completed item. Assuming that we need to fill 100 items to the port, and our lower limit is 10%, we want to re-fill the _itemsToProcessPort every 90 completed items. Notice we're using EmptyValue as T. EmptyValue is a CCR type that holds EmptyValue.SharedInstance to avoid memory allocation.
  • _initializePort: will be used to initialize the _itemsToProcessPort with the 1st bulk of items.

Step back, what's going on?!!? why so many Port ?
Well, the idea behind CCR is all about messaging. You can pass messages to ports (Port<T> is thread-safe of course) and by doing so, you can take advantage of the "smart queue" implementation behind Port<T>. When posting a message to a port, the CCR will try to apply some "predicates" on the port and if a "predicate" returns true, it will dequeue the item(s) from the Port matched that "predicate", create an ITask of it and push it into the DispatcherQueue as an "actual task".

Using Ports makes it easier for us to define complex async code. instead of putting locks all over the place, I can simply post a message to _itemCompleteNotificationPort and after X messages posted to this port, ask to re-fill the queue when a thread is available. This is much easier then counting each completed item and if ((counter % X) == 0), lock some object and re-fill. Both will work, but using the CCR world you don't have to think about technical async problems/solutions but rather on the logical async operations you want to perform. You'll write much less code, zero locks of your own and mostly think about "this could run concurrently", "this must run exclusively" and let the CCR schedule everything for you.

Start method: (as discussed in Part 1)
public static SpawnEnumerator<T> Start(int threadsCount, string threadsPoolName, int upperQueueSize, IEnumerable<T> filler, Action<T> handler)
{
  // .. validate parameters, nothing interesting...
  SpawnEnumerator<T> sw = new SpawnEnumerator<T>(threadsCount, threadsPoolName, filler.GetEnumerator());
  sw.Initialize(handler, upperQueueSize);
  return sw;
}

API Design: Why Start method with private constructor instead of public constructor alone:
The client of this method should understand that once she supply the arguments, things will start happening - we'll immediately start to process items from the enumerator. You'll soon find out that Start is non-blocking method. This "Start" method, so I feel, make it's it more explicit as it should be.

Initialize method:
private void Initialize(Action<T> handler, int upperQueueSize)
{
  RegisterRecievers(handler, upperQueueSize); // where CCR *magic* happens. soon...
  _initializePort.Post(EmptyValue.SharedInstance); // post a message to let "someone" know we want to fill the 1st bulk of items
}

RegisterRecievers method:
Before we look at the code, here is a remainder of the main things we want to accomplish:

  1. We should fill the _itemsToProcessPort for the 1st bulk or once we reach the lower limit of the queue, by counting how many items were completed. Keeping in mind that _enumerator is not thread-safe and we don't want to start locking access to it on our own, we should make sure that re-filling is done exclusively from 1 thread only.
  2. We want to handle each one of the items posted to _itemsToProcessPort with the supplied "handler" (given in Start method). Each item is independent so obviously we want to process each item concurrently, according to the amount of threads in the Dispatcher.

private void RegisterRecievers(Action<T> handler, int upperQueueSize)
{
  int numberOfItemsToDepleteBeforePushingNewBulk = (int)Math.Ceiling((1 - DefaultLowThresholdPrcentage) * upperQueueSize);

  Arbiter.Activate(_dispatcherQueue,

     Arbiter.Interleave(
         new TeardownReceiverGroup(),// nothing here
         new ExclusiveReceiverGroup(
                // 1st bulk:
                Arbiter.Receive(false, _initializePort, delegate { FillItemsToProcessQueueWithNextBulk(upperQueueSize); }), // only once, that's why the "false" is here.

                // enough items were completed which means "items to process" queue reached lower limit:
                Arbiter.MultipleItemReceive(true, _itemCompleteNotificationPort, numberOfItemsToDepleteBeforePushingNewBulk,
                                          delegate { FillItemsToProcessQueueWithNextBulk(upperQueueSize); })
            ),
         new ConcurrentReceiverGroup(
                // process items concurrently
                Arbiter.Receive(true, _itemsToProcessPort, // listen to every post, that's why the "true"
                     delegate(T item)
                     {
                         try
                         {
                             handler(item);
                         }
                         catch (Exception err) { /* log error */ }
                         finally
                         {
                             HandleCompletedItem();
                         }
                     })
            )
        )
    );
}

Alright, this is a bit harder to read but let's try to simplify it by reading it from inside-out.

  • Under ExclusiveReceiverGroup you'll see 2 receivers, one that listen to _initalizePort and once a message is posted, it will create an ITask that internally (when a thread is available) call FillItemsToProcessQueueWithNextBulk method. The second receiver will listen to _itemCompleteNotificationPort and do the same as the first one for every numberOfItemsToDepleteBeforePushingNewBulk items posted to the Port.
  • Under ConcurrentReceiverGroup you'll see a receiver listening to _itemsToProcessPort and for every message it will create an ITask that internally (again, one a thread is available) will run the given "item handler" with the item dequeued from the Port.
  • We use Arbiter.Activate(_dispatcherQueue, ...) to register each created ITask from the receivers to the internal queue. Queue of actual tasks.
  • This method is non-blocking, we only register receivers that know how to create and enqueue ITask from messages posted to some Port. That's it.

Recap:
We post messages to different Port<T> and write receivers that "listen" to the messages based on some rules (one receiver listen to every message, one listen to only one message, one listen to X messages etc). When the receiver's rules apply, the CCR will dequeue all the relevant (applied the rule) messages from the Port<T> and wrap them with the supplied delegated as ITask. This ITask instance will be enqueued to the DispatcherQueue until the Dispatcher have a free thread to handle it. The Dispatcher, according to the requested scheduling (some ITask must run exclusively, some can run concurrently, as we've seen), will execute those tasks.


You can download the complete code (with some extra features) here: SpawnEnumerator.txt (12KB) - you'll need CCR & log4net dll in order to compile it.

Posted by Oren Ellenbogen 
14/10/2008 04:07, Israel time UTC-07:00,     Comments [0]  | 
# Monday, October 13, 2008

[ this is Part 1, Part 2, Part 3 ]

I promised to write more about CCR and how to use it in practice.
There is no better way to explain how infrastructure work then writing some code and play with it, so lets toy with a simple utility, based on CCR under the hood.

Scenario: you've got billions of *independent* tasks you need to execute as fast as possible.
Requirements:

  1. Amount of "worker threads" should be easy to define. obviously.
  2. Memory consumption - don't eat more than you can chew. We don't want to hold everything in memory, this should be easy to configure.
  3. The process should *not* shutdown due to un-handled exception in one of the worker threads.
  4. We want to know when the enumerator is depleted and/or all items were completed. This is crucial for testing and adjusting parameters.

 

The general notion behind SpawnEnumerator is something like this:

SpawnEnumerator_generalIdea

 

We're pulling a bulk of items from the enumerator, each one of them is actually a "wannabe" item. Combining the value of each T with a given "item handler" could be transformed into an actual task, waiting for its turn in the "threads pool". Once the port/queue of "items to execute" reach the lower limit, we'll pull another bulk of items from the enumerator until the enumerator is depleted.


API playground:

SpawnEnumerator<T>.Start(int numberOfThreads, string threadPoolName, int numberOfItemsToHoldInMemory, IEnumerator<T> enumerator, Action<T> singleItemHandler)

T = type of items we would like to run. if we have 10,000,000 items of type int, we'll supply "int" as T.
numberOfThreads = how many worker threads do we want to run?
threadPoolName = name the worker threads for easier debugging.
numberOfItemsToHoldInMemory = rough number of items we want to hold in memory, to make sure memory consumption won't blow up in our face.
enumerator = the enumerator of "items" to execute, assuming that it will hold a *huge* amount of tasks.
singleItemHandler = delegate that receives 1 item and handle it.

event Action<DateTime> EnumeratorDepleted

event Action<DateTime> AllTasksAreCompleted


Test playground:

(written in notepad, sorry for stupid mistakes)

[Test]
public void Execute_LargeEnumeratorOfNumbers_ExecuteAllItems
{

// arrange:
int numberOfThreads = 10;
string threadPoolName = "mypool";
int numberOfItemsToHoldInMemory = 100;

IEnumerator<int> items = Yield.For(1, 10000); // yield return numbers from 1-10000
Action<int> singleItemHandler = delegate(int item) {  /* sum the given number, via Interlocked.Add */ };

ManualResetEvent trigger = new ManualResetEvent(false);

// act:
_executor = SpawnEnumerator<int>.Start(threadPoolSize, threadPoolName, numberOfItemsToHoldInMemory, items, singleItemHandler))
_executor.AllTasksAreCompleted += delegate { trigger.Set(); };   

// assert:
bool signaled = trigger.WaitOne(TimeSpan.FromSeconds(1), false);
Assert.IsTrue(signaled, "timedout reached, that shouldn't happen!");
// assert that all of the items were called by matching sum of 1-N sequential numbers (simple formula) to what we collected in the handler

}


* In the "test teardown" we can check if _executor is not null and if so Dispose it to close the "worker threads".

 

Next post - Implementing SpawnEnumerator via CCR.

Posted by Oren Ellenbogen 
13/10/2008 04:50, Israel time UTC-07:00,     Comments [2]  | 
# Monday, October 06, 2008

This is mostly a self-note but heck, maybe someone will reach this post via Delver (or Google :)), so I'm all about sharing.

Anyway, we're using (for now) Bugzilla and I tried to get all the "open" bugs for my team with status equal to X,Y,Z.

Sounds easy right? well, you are ... sadly mistaken.
After investing 15 minutes banging my head into the nearest wall with “Bugzilla Advanced Serach” (well, I'll be polite and say it's "advanced" alright), I gave in on that one.
Instead, I hacked the url a bit to understand the dark voodoo of Bugzilla and voila, 2 minutes later:

http://qabugz/cgi-bin/bugzilla/buglist.cgi
?bug_status=NEW
&bug_status=ASSIGNED
&bug_status=REOPENED
&assigned_to=Joe
&assigned_to=Joe2
&assigned_to=Joe3
&query_format=specific
&field0-0-0=bug_status
&field0-0-1=assigned_to
&type0-0-0=anyexact
&type0-0-1=anyexact
&order=bugs.bug_severity


Just remove the “break line” of course (this is easier to edit) and replace "Joe" with your favorite developer name.

If you want to make it a bit more complex, here is a nice site with the supported field names:
http://pkp.sfu.ca/bugzilla/page.cgi?id=quicksearchhack.html

 

Anyway, I hope it will help someone.

* geeky btw – notice the funny “binary index” of the fields! made me laugh quite a bit, being the geek that I am.

Posted by Oren Ellenbogen 
06/10/2008 03:20, Israel time UTC-07:00,     Comments [0]  |