分析.Net里线程同步机制

           我们知道并行编程模型两种:一种是基于消息式的,第二种是基于共享内存式的。 前段时间项目中遇到了第二种 使用多线程开发并行程序共享资源的问题 ,今天以实际案例出发对.net里的共享内存式的线程同步机制做个总结,由于某些类库的应用属于基础,所以本次不对基本使用做出讲解,基本使用 MSDN是最好的教程。

   一、volatile关键字

    基本介绍: 封装了 Thread.VolatileWrite() 和  Thread.VolatileRead()的实现 ,主要作用是强制刷新高速缓存。

    使用场景: 适用于在多核多CPU的机器上 解决变量在内存和高速缓存同步不及时的问题。

    案例:参考下文   二、原子操作的 案例 或者 System.Collections.Concurrent命名空间下的 ConcurrentQueue ,ConcurrentDictionary  等并发集合的实现方式。       

  二、原子操作(Interlock)

   基本介绍: 原子操作是 实现Spinlock,Monitor,ReadWriterLock锁的基础,其实现原理是在计算机总线上标志一个信号来表示资源已经被占用 如果其他指令进行修改则等待本次操作完成后才能进行,因为原子操作是在硬件上实现的 所以速度非常快,大约在50个时钟周期。其实原子操作也可以看做一种锁。

   使用场景:性能要求较高的场合,需要对字段进行快速的同步或者对变量进行原子形式的跟新操作(例如:int b=0;  b=b+1  实际分解为多条汇编指令,在多线程情况下 多条汇编指令并行的执行可能导致错误的结果,所以要保证执行 b=b+1 生成的汇编指令是一个原子形式执行 ),例如实现一个并行队列,异步队列等。

   案例:一个基于事件触发机制队列的实现

/// <summary>
    /// 表示一个实时处理队列
    /// </summary>
    public class ProcessQueue<T>
    {
         #region [成员]

        private ConcurrentQueue<IEnumerable<T>> queue;

        private Action<IEnumerable<T>> PublishHandler;

        //指定处理的线程数
        private int core = Environment.ProcessorCount;

        //正在运行的线程数
        private int runingCore = 0;

        public event Action<Exception> OnException;

        //队列是否正在处理数据
        private int isProcessing=0; 

        //队列是否可用
        private bool enabled = true;

        #endregion

        #region 构造函数

        public ProcessQueue(Action<IEnumerable<T>> handler)
        {

            queue = new ConcurrentQueue<IEnumerable<T>>();
        
            PublishHandler = handler;
            this.OnException += ProcessException.OnProcessException;
        }

        #endregion

        #region [方法]

        /// <summary>
        /// 入队
        /// </summary>
        /// <param name="items">数据集合</param>
        public void Enqueue(IEnumerable<T> items)
        {
            if (items != null)
            {
                queue.Enqueue(items);
            }

            //判断是否队列有线程正在处理 
            if (enabled && Interlocked.CompareExchange(ref isProcessing, 1, 0) == 0)
            {
                if (!queue.IsEmpty)
                {
                    ThreadPool.QueueUserWorkItem(ProcessItemLoop);
                }
                else
                {
                    Interlocked.Exchange(ref isProcessing, 0);
                }
            }
        }

        /// <summary>
        /// 开启队列数据处理
        /// </summary>
        public void Start()
        {
            Thread process_Thread = new Thread(PorcessItem);
            process_Thread.IsBackground = true;
            process_Thread.Start();
        }

        /// <summary>
        /// 循环处理数据项
        /// </summary>
        /// <param name="state"></param>
        private void ProcessItemLoop(object state)
        {
            //表示一个线程递归 当处理完当前数据时 则开起线程处理队列中下一条数据 递归终止条件是队列为空时
            //但是可能会出现 队列有数据但是没有线程去处理的情况 所有一个监视线程监视队列中的数据是否为空,如果为空
            //并且没有线程去处理则开启递归线程

            if (!enabled && queue.IsEmpty)
            {
                Interlocked.Exchange(ref isProcessing, 0);
                return;
            }

            //处理的线程数 是否小于当前CPU核数
            if (Thread.VolatileRead(ref runingCore) <= core * 2*)
            {
                IEnumerable<T> publishFrame;
                //出队以后交给线程池处理
                if (queue.TryDequeue(out publishFrame))
                {
                    Interlocked.Increment(ref runingCore);
                    try
                    {
                        PublishHandler(publishFrame);
                        
                        if (enabled && !queue.IsEmpty)
                        {    
                            ThreadPool.QueueUserWorkItem(ProcessItemLoop);
                        }
                        else
                        {
                            Interlocked.Exchange(ref isProcessing, 0);
                        }
                        
                    }
                    catch (Exception ex)
                    {
                        OnProcessException(ex);
                    }

                    finally
                    {
                        Interlocked.Decrement(ref runingCore);
                    }
                }
            }

        }

        /// <summary>
        ///定时处理帧 线程调用函数  
        ///主要是监视入队的时候线程 没有来的及处理的情况
        /// </summary>
        private void PorcessItem(object state)
        {
            int sleepCount=0;
            int sleepTime = 1000;
            while (enabled)
            {
                //如果队列为空则根据循环的次数确定睡眠的时间
                if (queue.IsEmpty)
                {
                    if (sleepCount == 0)
                    {
                        sleepTime = 1000;
                    }
                    else if (sleepCount == 3)
                    {
                        sleepTime = 1000 * 3;
                    }
                    else if (sleepCount == 5)
                    {
                        sleepTime = 1000 * 5;
                    }
                    else if (sleepCount == 8)
                    {
                        sleepTime = 1000 * 8;
                    }
                    else if (sleepCount == 10)
                    {
                        sleepTime = 1000 * 10;
                    }
                    else
                    {
                        sleepTime = 1000 * 50;
                    }
                    sleepCount++;
                    Thread.Sleep(sleepTime);
                }
                else
                {
                    //判断是否队列有线程正在处理 
                    if (enabled && Interlocked.CompareExchange(ref isProcessing, 1, 0) == 0)
                    {
                        if (!queue.IsEmpty)
                        {
                            ThreadPool.QueueUserWorkItem(ProcessItemLoop);
                        }
                        else
                        {
                            Interlocked.Exchange(ref isProcessing, 0);
                        }
                        sleepCount = 0;
                        sleepTime = 1000;
                    }
                }
            }
        }

        /// <summary>
        /// 停止队列
        /// </summary>
        public void Stop()
        {
            this.enabled = false;

        }

        /// <summary>
        /// 触发异常处理事件
        /// </summary>
        /// <param name="ex">异常</param>
        private void OnProcessException(Exception ex)
        {
            var tempException = OnException;
            Interlocked.CompareExchange(ref tempException, null, null);

            if (tempException != null)
            {
                OnException(ex);
            }
        }

        #endregion

    }

 

  三、自旋锁(Spinlock)

    基本介绍:  在原子操作基础上实现的锁,用户态的锁,缺点是线程一直不释放CPU时间片。操作系统进行一次线程用户态到内核态的切换大约需要500个时钟周期,可以根据这个进行参考我们的线程是进行用户等待还是转到内核的等待.。

    使用场景:线程等待资源时间较短的情况下使用。

    案例: 和最常用的Monitor 使用方法一样  这里就不举例了,在实际场景中应该优先选择使用Monitor,除非是线程等待资源的时间特别的短

 

  四、监视器(Monitor)

         基本介绍:  原子操作基础上实现的锁,开始处于用户态,自旋一段时间进入内核态的等待释放CPU时间片,缺点使用不当容易造成死锁    c#实现的关键字是Lock。         

         使用场景:  所有需要加锁的场景都可以使用。

         案例: 案例太多了,这里就不列出了。

 五、读写锁(ReadWriterLock)

  原理分析:   原子操作基础上实现的锁,

  使用场景:适用于写的次数少,读的频率高的情况。

  案例:一个线程安全的缓存实现(.net 4.0 可以使用基础类库中的  ConcurrentDictionary<K,V>)  注意:老版本ReaderWriterLock已经被淘汰,新版的是ReaderWriterLockSlim

class CacheManager<K, V>
    {
        #region [成员]

        private ReaderWriterLockSlim readerWriterLockSlim;

        private Dictionary<K, V> containter;

        #endregion

        #region [构造函数]

        public CacheManager()
        {
            this.readerWriterLockSlim = new ReaderWriterLockSlim();
            this.containter = new Dictionary<K, V>();
        }

        #endregion

        #region [方法]

        public void Add(K key, V value)
        {
            readerWriterLockSlim.EnterWriteLock();

            try
            {
                containter.Add(key, value);
            }

            finally
            {
                readerWriterLockSlim.ExitWriteLock();
            }
        }

        public V Get(K key)
        {

            bool result = false;
            V value;

            do
            {
                readerWriterLockSlim.EnterReadLock();

                try
                {
                    result = containter.TryGetValue(key, out value);
                }

                finally
                {
                    readerWriterLockSlim.ExitWriteLock();
                }

            } while (!result);

            return value;
        }

        #endregion
    }

  .net中还有其他的线程同步机制:ManualResetEventSlim ,AutoResetEvent ,SemaphoreSlim 这里就逐个进行不介绍 具体在《CLR Via C# 》中解释的非常详细,但在具体的实际开发中我还没有使用到。

最好的线程同步机制是没有同步,这取决于良好的设计,当然有些情况下无法避免使用锁。 在性能要求不高的场合基本的lock就能满足要求,但性能要求比较苛刻的情就需求更具实际场景进行选择哪种线程同步机制。

分析.Net里线程同步机制,古老的榕树,5-wow.com

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。