// Written by Oren Ellenbogen (06/10/2008)
#region using
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using log4net;
using Microsoft.Ccr.Core;
#endregion
namespace Semingo.Common.Async
{
///
/// Spawn multiple threads and execute the given items to process, keeping the size of the internal queue with some given boundaries.
/// Use this class when you have a HUGE enumerator of *independent* items to execute.
///
public sealed class SpawnEnumerator : IDisposable
{
#region fields
private const double DefaultLowThresholdPrcentage = 0.1; // TODO: expose
private static readonly ILog _logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private event Action _enumeratorDepleted = delegate { };
private event Action _allTasksAreCompleted = delegate { };
private readonly Dispatcher _dispatcher;
private readonly DispatcherQueue _dispatcherQueue;
private readonly Port _itemsToProcessPort;
private readonly Port _itemCompleteNotificationPort;
private readonly Port _initializePort;
private readonly IEnumerator _enumerator;
private int _failures;
private int _successes;
private int _numberOfDrawnItems;
private int _numberOfCompletedItems;
private bool _disposed = false;
private bool _isEnumeratorDepleted = false;
private DateTime _whenEnumeratorDepleted;
private DateTime _whenAllTasksAreCompleted;
#endregion
#region ctors
private SpawnEnumerator(int threadsCount, string threadsPoolName, IEnumerator filler)
{
_dispatcher = new Dispatcher(threadsCount, ThreadPriority.Normal, DispatcherOptions.UseBackgroundThreads | DispatcherOptions.UseProcessorAffinity, threadsPoolName);
_dispatcherQueue = new DispatcherQueue(threadsPoolName + "_Queue", _dispatcher);
_enumerator = filler;
_itemsToProcessPort = new Port();
_itemCompleteNotificationPort = new Port();
_initializePort = new Port();
_failures = _successes = 0;
}
///
/// Initialize new instance of SpawnEnumerator.
///
/// Number of threads to spawn (=worker threads)
/// The thread pool name (for debugging)
/// The upper limit of the internal queue size to maintain in memory, assuming that the enumerator contains so many items that holding them all will be an actual problem.
/// Enumerator of items to execute
/// Handler of a given item
public static SpawnEnumerator Start(int threadsCount, string threadsPoolName, int upperQueueSize, IEnumerable filler, Action handler)
{
#region validate parameters
if (threadsCount < 1)
throw new ArgumentException("threadsCount must be >= 1", "threadsCount");
if (upperQueueSize < 1)
throw new ArgumentException("upperQueueSize must be >= 1", "upperQueueSize");
if (filler == null)
throw new ArgumentNullException("filler");
if (handler == null)
throw new ArgumentNullException("handler");
#endregion
SpawnEnumerator sw = new SpawnEnumerator(threadsCount, threadsPoolName, filler.GetEnumerator());
sw.Initialize(handler, upperQueueSize);
return sw;
}
#endregion
#region public API
///
/// Number of items successfully executed so far
///
public int Successes
{
get { return _successes; }
}
///
/// Number of items that failed during execution so far
///
public int Failures
{
get { return _failures; }
}
///
/// Number of items drawn from the enumerator so far
///
public int DrawnItems
{
get { return _numberOfDrawnItems; }
}
///
/// Suspend current work.
/// Stop taking items from the "filler" (=enumerator) and executing pending items.
///
public void Suspend()
{
_dispatcherQueue.Suspend();
}
///
/// Resume work as usual
///
public void Resume()
{
if (!_dispatcherQueue.IsSuspended)
return;
_dispatcherQueue.Resume();
}
///
/// Recieve a list of items that were pending for execution
///
public IEnumerable DequeuePendingItems()
{
T item;
while (_itemsToProcessPort.Test(out item))
yield return item;
// dev: CCR cannot retrieve items from suspended DispatcherQueue
if (!_dispatcherQueue.IsSuspended)
{
ITask task;
while (_dispatcherQueue.TryDequeue(out task))
{
for (int i = 0; i < task.PortElementCount; i++)
{
if (task[i]!=null && task[i].Item is T)
yield return (T)task[i].Item;
}
}
}
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
///
/// Get notified when the enumerator depleted.
/// If you register AFTER this event was raised, you'll get notified immediatly!
///
public event Action EnumeratorDepleted
{
add
{
if (_isEnumeratorDepleted)
value(_whenEnumeratorDepleted);
_enumeratorDepleted += value;
}
remove
{
_enumeratorDepleted -= value;
}
}
///
/// Get notified when all of the tasks are completed and the enumerator is depleted.
/// If you register AFTER this event was raised, you'll get notified immediatly!
///
public event Action AllTasksAreCompleted
{
add
{
if (AllTasksWereCompleted())
value(_whenAllTasksAreCompleted);
_allTasksAreCompleted += value;
}
remove
{
_allTasksAreCompleted -= value;
}
}
#endregion
#region private API
///
/// Register workers and trigger filling of the 1st bulk of items
///
private void Initialize(Action handler, int upperQueueSize)
{
RegisterRecievers(handler, upperQueueSize);
_initializePort.Post(EmptyValue.SharedInstance);
}
private void RegisterRecievers(Action handler, int upperQueueSize)
{
int numberOfItemsToDepleteBeforePushingNewBulk = (int)Math.Ceiling((1 - DefaultLowThresholdPrcentage) * upperQueueSize);
Arbiter.Activate(_dispatcherQueue,
Arbiter.Interleave(
new TeardownReceiverGroup(),
new ExclusiveReceiverGroup(
// :: fill next bulk of items to process exclusivly (_enumerator is NOT thread-safe) ::
// 1st bulk:
Arbiter.Receive(false, _initializePort, delegate { FillItemsToProcessQueueWithNextBulk(upperQueueSize); }),
// enough items were completed which means "items to process" queue reached lower limit:
Arbiter.MultipleItemReceive(true, _itemCompleteNotificationPort, numberOfItemsToDepleteBeforePushingNewBulk, delegate { FillItemsToProcessQueueWithNextBulk(upperQueueSize); })
),
new ConcurrentReceiverGroup(
// process items concurrently
Arbiter.Receive(true, _itemsToProcessPort,
delegate(T item)
{
try
{
handler(item);
Interlocked.Increment(ref _successes);
}
catch (Exception err)
{
Interlocked.Increment(ref _failures);
_logger.Error("Unable to execute item: " + item + " due to: " + err.Message, err);
}
finally
{
HandleCompletedItem();
}
})
)
)
);
}
private void HandleCompletedItem()
{
Interlocked.Increment(ref _numberOfCompletedItems);
_itemCompleteNotificationPort.Post(EmptyValue.SharedInstance);
CheckIfAllTasksCompleted();
}
private void CheckIfAllTasksCompleted()
{
if (!AllTasksWereCompleted())
return;
_whenAllTasksAreCompleted = DateTime.Now;
_allTasksAreCompleted(_whenAllTasksAreCompleted); // trigger event. TODO: raise event safely
}
private bool AllTasksWereCompleted()
{
return _isEnumeratorDepleted && (_numberOfDrawnItems == _numberOfCompletedItems);
}
private void FillItemsToProcessQueueWithNextBulk(int bulkSize)
{
for (int i = 0; i < bulkSize; i++)
{
if (!_enumerator.MoveNext())
{
TriggerEnumeratorDepleted();
break;
}
// enqueue "item to process"
Interlocked.Increment(ref _numberOfDrawnItems);
T itemToProcess = _enumerator.Current;
_itemsToProcessPort.Post(itemToProcess);
}
}
private void TriggerEnumeratorDepleted()
{
_whenEnumeratorDepleted = DateTime.Now;
_enumeratorDepleted(_whenEnumeratorDepleted); // trigger event. TODO: raise event safely
_isEnumeratorDepleted = true;
CheckIfAllTasksCompleted(); // avoid race, might be that all tasks were completed and now we found out that the enumerator is depleted.
}
private void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
{
Suspend();
_dispatcherQueue.Dispose();
_dispatcher.Dispose();
}
_disposed = true;
}
#endregion
}
}