ManagedThreadPool

In this post I try to implement fully managed Thread Pool. In previous post I implement Synchronized queue. Here I use this class with some modifications.

using System.Collections.Generic;
using System.Threading;

namespace ManagedThreadPoolLib
{
    class SynchronyzedQueueForThreadPool<T>
    {
        private readonly ManagedThreadPool _threadPool;

        /// <summary>
        /// sync for internal queue
        /// </summary>
        readonly object _rootSync = new object();
        /// <summary>
        /// sync for setting tag. used only by thread pool for killing sleeping threads 
        /// </summary>
        readonly object _tagRootSync = new object();
        readonly Queue<T> _queue= new Queue<T>();
        readonly Semaphore _semaphore = new Semaphore(0,int.MaxValue);
        internal object TagRootSync
        {
            get { return _tagRootSync; }
        }

        public SynchronyzedQueueForThreadPool(ManagedThreadPool threadPool)
        {
            _threadPool = threadPool;
        }

        /// <summary>
        /// Enqueue element from queue and release semaphore
        /// </summary>
        /// <param name="item"></param>
        public void Enqueue(T item)
        {
            lock (_rootSync)
            {
                _queue.Enqueue(item);  
            }
            _semaphore.Release();
        }
        /// <summary>
        /// Dequeue element to queue and WaitOne semaphore
        /// </summary>
        /// <param name="item"></param>
        public T Dequeue()
        {
            _threadPool.SetThreadSleepState(Thread.CurrentThread,true);
            _semaphore.WaitOne();
            lock (_tagRootSync) _threadPool.SetThreadSleepState(Thread.CurrentThread, false);
            lock (_rootSync)
            {
                return _queue.Dequeue();
            }
        }
        /// <summary>
        /// clean Queue and invoke all sleep threads. Produce Exception if has sleeping threads in Dequeue operation
        /// </summary>
        public void Clear()
        {
            lock (_rootSync)
            {
                int releaseCount = _queue.Count;
                _queue.Clear();
                _semaphore.Release(releaseCount);
            }
        }
    }
}


All changes here:
public T Dequeue()
{
_threadPool.SetThreadSleepState(Thread.CurrentThread,true);
_semaphore.WaitOne();
lock (_tagRootSync) _threadPool.SetThreadSleepState(Thread.CurrentThread, false);
….
For removing threads from pool we should know, that our thread sleep on our semaphore  or we can drop execution task with special synchronization.
And here ManagedThreadPool Class:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Linq;

namespace ManagedThreadPoolLib
{
    public class ManagedThreadPool
    {
        private readonly SynchronyzedQueueForThreadPool<KeyValuePair<WaitCallback, object>> _queueForThreadPool ;

        public ManagedThreadPool()
        {
            _queueForThreadPool = new SynchronyzedQueueForThreadPool<KeyValuePair<WaitCallback, object>>(this);
        }

        private bool _isStarted;
        private static int _threadIdCounter;
        private int _threadCount;
        private Dictionary<Thread, bool> _threadsAndStates = new Dictionary<Thread, bool>();
        /// <summary>
        /// Lister for exceptions in executed tasks
        /// </summary>
        public event Action<Exception> OnExceptionInThread;
        private void InvokeOnExceptionInThread(Exception ex)
        {
            Action<Exception> handler = OnExceptionInThread;
            if (handler != null) handler(ex);
        }
        /// <summary>
        /// Run task from queue
        /// </summary>
        private void ThreadFunc()
        {
            while (true)
            {
                try
                {
                    var pair=_queueForThreadPool.Dequeue();
                    pair.Key(pair.Value);
                }
                catch (Exception ex)
                {
                    InvokeOnExceptionInThread(ex);
                }
            }
        }

        private void SetThreadCount(int value)
        {
            lock (_threadsAndStates)
            {
                if (_threadsAndStates.Count < value)
                {
                    AddThread(value - _threadsAndStates.Count);
                }
                else
                {
                    RemoveThreads();
                }
            }
        }

        private void RemoveThreads()
        {
            lock (_queueForThreadPool.TagRootSync)
            {
                Func<Thread, bool> predicate = thread => (thread.ThreadState & ThreadState.WaitSleepJoin)
                                                           == ThreadState.WaitSleepJoin 
                                                           &&_threadsAndStates[thread];
                foreach (var thread in _threadsAndStates.Keys.Where(predicate).ToArray())
                {
                    try
                    {
                        thread.Abort();
                        _threadsAndStates.Remove(thread);
                    }
                    catch
                    {
                    }
                }
            }
        }

        private void AddThread(int value)
        {
            for (int i = 0; i < value; i++)
            {
                var thread = new Thread(ThreadFunc);
                Interlocked.Increment(ref _threadIdCounter);
                thread.Name = string.Format("ManagedThreadPool:{0}", _threadIdCounter);
                thread.IsBackground = true;
                _threadsAndStates.Add(thread,false);
                thread.Start();
            }
        }

        /// <summary>
        /// set count of threads in thread pool
        /// </summary>
        public int ThreadCount
        {
            get { return _threadCount; }
            set { SetThreadCount(value); }
        }
        public bool IsStarted { get { return _isStarted; } }
        /// <summary>
        /// Start ThreadPool
        /// </summary>
        /// <param name="threadsCount"></param>
        public void Start(int threadsCount)
        {
            if (_isStarted)
                throw new InvalidOperationException("Already Started");
            _isStarted = true;
            AddThread(threadsCount);
        }

        /// <summary>
        /// Stop Thread pool executing
        /// </summary>
        public void Stop()
        {
            if (!_isStarted)
                throw new InvalidOperationException("Not yet started");
            _isStarted = false;

            _threadsAndStates.Keys.ToList().ForEach(thread =>
                                       {
                                           try
                                           {
                                               thread.Abort();
                                           }
                                           catch
                                           { }
                                       }
                                    );
            _threadsAndStates.Clear();
        }

        /// <summary>
        /// Add Task For Async Executing
        /// </summary>
        public void AddWorkItem(WaitCallback action)
        {
            _queueForThreadPool.Enqueue(new KeyValuePair<WaitCallback, object>(action, null));
        }

        /// <summary>
        /// Add Task For Async Executing
        /// </summary>
        public void AddWorkItem(WaitCallback action,object state)
        {
            _queueForThreadPool.Enqueue(new KeyValuePair<WaitCallback, object>(action, state));
        }
        public void AddWorkItem(Action action)
        {
            WaitCallback waitCallback = (state) => action();
            _queueForThreadPool.Enqueue(new KeyValuePair<WaitCallback, object>(waitCallback, null));
        }
        internal void SetThreadSleepState(Thread thread, bool state)
        {
            _threadsAndStates[thread] = state;
        }
    }
}


And Small test code for thread pool
using System;
using System.Threading;
using ManagedThreadPoolLib;

namespace ManagedThreadPoolTest
{
    class Program
    {
        static void Main(string[] args)
        {
            ManagedThreadPool pool= new ManagedThreadPool();
            pool.Start(5);
            Random r = new Random();
            for (int i = 1; i < 6; i++)
            {
                int counter = i;
                pool.AddWorkItem(() => {
                    while (true)
                    {
                        Console.WriteLine(counter.ToString());
                        return;
//                        Thread.Sleep(counter*1000);
                    }
                    
                                        }
                );    
            }
            pool.ThreadCount = 1;
            Thread.Sleep(5000);

            pool.AddWorkItem(() =>
            {
                Random rnd = new Random((int)DateTime.Now.Ticks);
                while (true)
                {
                    Console.WriteLine("Q");
                    Thread.Sleep(rnd.Next(0,1000));
                }

            }
                );
            pool.AddWorkItem(() =>
            {
                Random rnd = new Random((int) DateTime.Now.Ticks);
                while (true)
                {
                    Console.WriteLine("--------------------------");
                    Thread.Sleep(rnd.Next(0,250));
                }

            }
                );
            pool.AddWorkItem(() =>
            {
                Random rnd = new Random((int)DateTime.Now.Ticks);
                while (true)
                {
                    Console.WriteLine("W");
                    Thread.Sleep(rnd.Next(0,500));
                }

            }
                );
            Console.WriteLine("--------------------------------------------");
            Console.WriteLine("--------------------------------------------");
            pool.ThreadCount = 5;
            Console.ReadLine();
            pool.Stop();
        }
    }
}

Advertisement
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.