ManagedThreadPool
In this post I try to implement fully managed Thread Pool. In previous post I implement Synchronized queue. Here I use this class with some modifications.
using System.Collections.Generic;
using System.Threading;
namespace ManagedThreadPoolLib
{
class SynchronyzedQueueForThreadPool<T>
{
private readonly ManagedThreadPool _threadPool;
/// <summary>
/// sync for internal queue
/// </summary>
readonly object _rootSync = new object();
/// <summary>
/// sync for setting tag. used only by thread pool for killing sleeping threads
/// </summary>
readonly object _tagRootSync = new object();
readonly Queue<T> _queue= new Queue<T>();
readonly Semaphore _semaphore = new Semaphore(0,int.MaxValue);
internal object TagRootSync
{
get { return _tagRootSync; }
}
public SynchronyzedQueueForThreadPool(ManagedThreadPool threadPool)
{
_threadPool = threadPool;
}
/// <summary>
/// Enqueue element from queue and release semaphore
/// </summary>
/// <param name="item"></param>
public void Enqueue(T item)
{
lock (_rootSync)
{
_queue.Enqueue(item);
}
_semaphore.Release();
}
/// <summary>
/// Dequeue element to queue and WaitOne semaphore
/// </summary>
/// <param name="item"></param>
public T Dequeue()
{
_threadPool.SetThreadSleepState(Thread.CurrentThread,true);
_semaphore.WaitOne();
lock (_tagRootSync) _threadPool.SetThreadSleepState(Thread.CurrentThread, false);
lock (_rootSync)
{
return _queue.Dequeue();
}
}
/// <summary>
/// clean Queue and invoke all sleep threads. Produce Exception if has sleeping threads in Dequeue operation
/// </summary>
public void Clear()
{
lock (_rootSync)
{
int releaseCount = _queue.Count;
_queue.Clear();
_semaphore.Release(releaseCount);
}
}
}
}
All changes here:
public T Dequeue()
{
_threadPool.SetThreadSleepState(Thread.CurrentThread,true);
_semaphore.WaitOne();
lock (_tagRootSync) _threadPool.SetThreadSleepState(Thread.CurrentThread, false);
….
For removing threads from pool we should know, that our thread sleep on our semaphore or we can drop execution task with special synchronization.
And here ManagedThreadPool Class:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Linq;
namespace ManagedThreadPoolLib
{
public class ManagedThreadPool
{
private readonly SynchronyzedQueueForThreadPool<KeyValuePair<WaitCallback, object>> _queueForThreadPool ;
public ManagedThreadPool()
{
_queueForThreadPool = new SynchronyzedQueueForThreadPool<KeyValuePair<WaitCallback, object>>(this);
}
private bool _isStarted;
private static int _threadIdCounter;
private int _threadCount;
private Dictionary<Thread, bool> _threadsAndStates = new Dictionary<Thread, bool>();
/// <summary>
/// Lister for exceptions in executed tasks
/// </summary>
public event Action<Exception> OnExceptionInThread;
private void InvokeOnExceptionInThread(Exception ex)
{
Action<Exception> handler = OnExceptionInThread;
if (handler != null) handler(ex);
}
/// <summary>
/// Run task from queue
/// </summary>
private void ThreadFunc()
{
while (true)
{
try
{
var pair=_queueForThreadPool.Dequeue();
pair.Key(pair.Value);
}
catch (Exception ex)
{
InvokeOnExceptionInThread(ex);
}
}
}
private void SetThreadCount(int value)
{
lock (_threadsAndStates)
{
if (_threadsAndStates.Count < value)
{
AddThread(value - _threadsAndStates.Count);
}
else
{
RemoveThreads();
}
}
}
private void RemoveThreads()
{
lock (_queueForThreadPool.TagRootSync)
{
Func<Thread, bool> predicate = thread => (thread.ThreadState & ThreadState.WaitSleepJoin)
== ThreadState.WaitSleepJoin
&&_threadsAndStates[thread];
foreach (var thread in _threadsAndStates.Keys.Where(predicate).ToArray())
{
try
{
thread.Abort();
_threadsAndStates.Remove(thread);
}
catch
{
}
}
}
}
private void AddThread(int value)
{
for (int i = 0; i < value; i++)
{
var thread = new Thread(ThreadFunc);
Interlocked.Increment(ref _threadIdCounter);
thread.Name = string.Format("ManagedThreadPool:{0}", _threadIdCounter);
thread.IsBackground = true;
_threadsAndStates.Add(thread,false);
thread.Start();
}
}
/// <summary>
/// set count of threads in thread pool
/// </summary>
public int ThreadCount
{
get { return _threadCount; }
set { SetThreadCount(value); }
}
public bool IsStarted { get { return _isStarted; } }
/// <summary>
/// Start ThreadPool
/// </summary>
/// <param name="threadsCount"></param>
public void Start(int threadsCount)
{
if (_isStarted)
throw new InvalidOperationException("Already Started");
_isStarted = true;
AddThread(threadsCount);
}
/// <summary>
/// Stop Thread pool executing
/// </summary>
public void Stop()
{
if (!_isStarted)
throw new InvalidOperationException("Not yet started");
_isStarted = false;
_threadsAndStates.Keys.ToList().ForEach(thread =>
{
try
{
thread.Abort();
}
catch
{ }
}
);
_threadsAndStates.Clear();
}
/// <summary>
/// Add Task For Async Executing
/// </summary>
public void AddWorkItem(WaitCallback action)
{
_queueForThreadPool.Enqueue(new KeyValuePair<WaitCallback, object>(action, null));
}
/// <summary>
/// Add Task For Async Executing
/// </summary>
public void AddWorkItem(WaitCallback action,object state)
{
_queueForThreadPool.Enqueue(new KeyValuePair<WaitCallback, object>(action, state));
}
public void AddWorkItem(Action action)
{
WaitCallback waitCallback = (state) => action();
_queueForThreadPool.Enqueue(new KeyValuePair<WaitCallback, object>(waitCallback, null));
}
internal void SetThreadSleepState(Thread thread, bool state)
{
_threadsAndStates[thread] = state;
}
}
}
And Small test code for thread pool
using System;
using System.Threading;
using ManagedThreadPoolLib;
namespace ManagedThreadPoolTest
{
class Program
{
static void Main(string[] args)
{
ManagedThreadPool pool= new ManagedThreadPool();
pool.Start(5);
Random r = new Random();
for (int i = 1; i < 6; i++)
{
int counter = i;
pool.AddWorkItem(() => {
while (true)
{
Console.WriteLine(counter.ToString());
return;
// Thread.Sleep(counter*1000);
}
}
);
}
pool.ThreadCount = 1;
Thread.Sleep(5000);
pool.AddWorkItem(() =>
{
Random rnd = new Random((int)DateTime.Now.Ticks);
while (true)
{
Console.WriteLine("Q");
Thread.Sleep(rnd.Next(0,1000));
}
}
);
pool.AddWorkItem(() =>
{
Random rnd = new Random((int) DateTime.Now.Ticks);
while (true)
{
Console.WriteLine("--------------------------");
Thread.Sleep(rnd.Next(0,250));
}
}
);
pool.AddWorkItem(() =>
{
Random rnd = new Random((int)DateTime.Now.Ticks);
while (true)
{
Console.WriteLine("W");
Thread.Sleep(rnd.Next(0,500));
}
}
);
Console.WriteLine("--------------------------------------------");
Console.WriteLine("--------------------------------------------");
pool.ThreadCount = 5;
Console.ReadLine();
pool.Stop();
}
}
}
Advertisement
No trackbacks yet.