// Written by Oren Ellenbogen (06/10/2008) #region using using System; using System.Collections.Generic; using System.Reflection; using System.Threading; using log4net; using Microsoft.Ccr.Core; #endregion namespace Semingo.Common.Async { /// /// Spawn multiple threads and execute the given items to process, keeping the size of the internal queue with some given boundaries. /// Use this class when you have a HUGE enumerator of *independent* items to execute. /// public sealed class SpawnEnumerator : IDisposable { #region fields private const double DefaultLowThresholdPrcentage = 0.1; // TODO: expose private static readonly ILog _logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); private event Action _enumeratorDepleted = delegate { }; private event Action _allTasksAreCompleted = delegate { }; private readonly Dispatcher _dispatcher; private readonly DispatcherQueue _dispatcherQueue; private readonly Port _itemsToProcessPort; private readonly Port _itemCompleteNotificationPort; private readonly Port _initializePort; private readonly IEnumerator _enumerator; private int _failures; private int _successes; private int _numberOfDrawnItems; private int _numberOfCompletedItems; private bool _disposed = false; private bool _isEnumeratorDepleted = false; private DateTime _whenEnumeratorDepleted; private DateTime _whenAllTasksAreCompleted; #endregion #region ctors private SpawnEnumerator(int threadsCount, string threadsPoolName, IEnumerator filler) { _dispatcher = new Dispatcher(threadsCount, ThreadPriority.Normal, DispatcherOptions.UseBackgroundThreads | DispatcherOptions.UseProcessorAffinity, threadsPoolName); _dispatcherQueue = new DispatcherQueue(threadsPoolName + "_Queue", _dispatcher); _enumerator = filler; _itemsToProcessPort = new Port(); _itemCompleteNotificationPort = new Port(); _initializePort = new Port(); _failures = _successes = 0; } /// /// Initialize new instance of SpawnEnumerator. /// /// Number of threads to spawn (=worker threads) /// The thread pool name (for debugging) /// The upper limit of the internal queue size to maintain in memory, assuming that the enumerator contains so many items that holding them all will be an actual problem. /// Enumerator of items to execute /// Handler of a given item public static SpawnEnumerator Start(int threadsCount, string threadsPoolName, int upperQueueSize, IEnumerable filler, Action handler) { #region validate parameters if (threadsCount < 1) throw new ArgumentException("threadsCount must be >= 1", "threadsCount"); if (upperQueueSize < 1) throw new ArgumentException("upperQueueSize must be >= 1", "upperQueueSize"); if (filler == null) throw new ArgumentNullException("filler"); if (handler == null) throw new ArgumentNullException("handler"); #endregion SpawnEnumerator sw = new SpawnEnumerator(threadsCount, threadsPoolName, filler.GetEnumerator()); sw.Initialize(handler, upperQueueSize); return sw; } #endregion #region public API /// /// Number of items successfully executed so far /// public int Successes { get { return _successes; } } /// /// Number of items that failed during execution so far /// public int Failures { get { return _failures; } } /// /// Number of items drawn from the enumerator so far /// public int DrawnItems { get { return _numberOfDrawnItems; } } /// /// Suspend current work. /// Stop taking items from the "filler" (=enumerator) and executing pending items. /// public void Suspend() { _dispatcherQueue.Suspend(); } /// /// Resume work as usual /// public void Resume() { if (!_dispatcherQueue.IsSuspended) return; _dispatcherQueue.Resume(); } /// /// Recieve a list of items that were pending for execution /// public IEnumerable DequeuePendingItems() { T item; while (_itemsToProcessPort.Test(out item)) yield return item; // dev: CCR cannot retrieve items from suspended DispatcherQueue if (!_dispatcherQueue.IsSuspended) { ITask task; while (_dispatcherQueue.TryDequeue(out task)) { for (int i = 0; i < task.PortElementCount; i++) { if (task[i]!=null && task[i].Item is T) yield return (T)task[i].Item; } } } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Get notified when the enumerator depleted. /// If you register AFTER this event was raised, you'll get notified immediatly! /// public event Action EnumeratorDepleted { add { if (_isEnumeratorDepleted) value(_whenEnumeratorDepleted); _enumeratorDepleted += value; } remove { _enumeratorDepleted -= value; } } /// /// Get notified when all of the tasks are completed and the enumerator is depleted. /// If you register AFTER this event was raised, you'll get notified immediatly! /// public event Action AllTasksAreCompleted { add { if (AllTasksWereCompleted()) value(_whenAllTasksAreCompleted); _allTasksAreCompleted += value; } remove { _allTasksAreCompleted -= value; } } #endregion #region private API /// /// Register workers and trigger filling of the 1st bulk of items /// private void Initialize(Action handler, int upperQueueSize) { RegisterRecievers(handler, upperQueueSize); _initializePort.Post(EmptyValue.SharedInstance); } private void RegisterRecievers(Action handler, int upperQueueSize) { int numberOfItemsToDepleteBeforePushingNewBulk = (int)Math.Ceiling((1 - DefaultLowThresholdPrcentage) * upperQueueSize); Arbiter.Activate(_dispatcherQueue, Arbiter.Interleave( new TeardownReceiverGroup(), new ExclusiveReceiverGroup( // :: fill next bulk of items to process exclusivly (_enumerator is NOT thread-safe) :: // 1st bulk: Arbiter.Receive(false, _initializePort, delegate { FillItemsToProcessQueueWithNextBulk(upperQueueSize); }), // enough items were completed which means "items to process" queue reached lower limit: Arbiter.MultipleItemReceive(true, _itemCompleteNotificationPort, numberOfItemsToDepleteBeforePushingNewBulk, delegate { FillItemsToProcessQueueWithNextBulk(upperQueueSize); }) ), new ConcurrentReceiverGroup( // process items concurrently Arbiter.Receive(true, _itemsToProcessPort, delegate(T item) { try { handler(item); Interlocked.Increment(ref _successes); } catch (Exception err) { Interlocked.Increment(ref _failures); _logger.Error("Unable to execute item: " + item + " due to: " + err.Message, err); } finally { HandleCompletedItem(); } }) ) ) ); } private void HandleCompletedItem() { Interlocked.Increment(ref _numberOfCompletedItems); _itemCompleteNotificationPort.Post(EmptyValue.SharedInstance); CheckIfAllTasksCompleted(); } private void CheckIfAllTasksCompleted() { if (!AllTasksWereCompleted()) return; _whenAllTasksAreCompleted = DateTime.Now; _allTasksAreCompleted(_whenAllTasksAreCompleted); // trigger event. TODO: raise event safely } private bool AllTasksWereCompleted() { return _isEnumeratorDepleted && (_numberOfDrawnItems == _numberOfCompletedItems); } private void FillItemsToProcessQueueWithNextBulk(int bulkSize) { for (int i = 0; i < bulkSize; i++) { if (!_enumerator.MoveNext()) { TriggerEnumeratorDepleted(); break; } // enqueue "item to process" Interlocked.Increment(ref _numberOfDrawnItems); T itemToProcess = _enumerator.Current; _itemsToProcessPort.Post(itemToProcess); } } private void TriggerEnumeratorDepleted() { _whenEnumeratorDepleted = DateTime.Now; _enumeratorDepleted(_whenEnumeratorDepleted); // trigger event. TODO: raise event safely _isEnumeratorDepleted = true; CheckIfAllTasksCompleted(); // avoid race, might be that all tasks were completed and now we found out that the enumerator is depleted. } private void Dispose(bool disposing) { if (_disposed) return; if (disposing) { Suspend(); _dispatcherQueue.Dispose(); _dispatcher.Dispose(); } _disposed = true; } #endregion } }