【基本组件】线程同步

         本节研究线程同步的基本组件,互斥量、条件变量、倒计时组件;


自旋锁

(1)Linux内核中最常见的锁是自旋锁(中断处理程序中,因为它不能睡眠);自旋锁最多只能被一个可执行的线程持有;如果另一个可执行线程试图获得一个已经被持有的自旋锁,那么该线程将会一直进行忙循环-旋转-等待锁重新可用;如果自旋锁未被锁住,请求锁的执行线程便可以立即得到它,继续执行;自旋锁可以防止多于一个的执行线程进入临界区;

(2)在上述的旋转过程中,会浪费CPU时间,所以自旋锁不应该被长时间持有;自旋锁的初衷就是:在短期间内进行轻量级加锁,而另一个方式是让请求线程睡眠,直到锁可用时再唤醒它,这样CPU就不会浪费,去执行其他的代码,但是这样会有上下文切换,被阻塞的线程要换出和换入,上下文的切换代码和消耗可能更大,所以持有自旋锁的时间应该最好小于两次上下文切换的时间

(3)不要在持有自旋锁的执行代码期间,执行一些其他睡眠的函数,因为会让其他的线程等待自旋锁的时间变长;

(4)拥有自旋锁的执行线程也是可被调度的,因此自旋锁在用户层并不常用;原因是用户空间的线程A(假设已经持有自旋锁),当时间片到时或被高优先级的线程B抢占后,若线程B也需要持有线程A已持有的自旋锁,那么线程B在等待自旋锁(由线程A释放)上的时间将会非常的长;


互斥量

基本知识

(1)互斥量是保护临界区的另一种方法,当执行线程在临界区的执行时间很长时,那么就最好使用互斥量了,否则会造成其他的线程将会在临界区外忙等,浪费CPU时间;此时其他线程发现临界区已经被互斥量锁住,那么它们将会阻塞;当互斥量被释放时,有多个线程在阻塞,多个线程均会被唤醒,但是只有一个线程可以获得该锁,其他的线程将会继续阻塞;

(2)当执行线程需要在临界区睡眠时,那么就最好使用互斥量,如果采用自旋锁,那么其他的线程将会在临界区外忙等,浪费CPU时间;

(3)Posix的互斥量支持递归加锁和非递归加锁,对于非递归加锁可能会造成死锁,试想如果一个已经持有某互斥量的线程继续想要持有该锁,由于不支持递归,因此程序将会死锁,进而我们可能需要修改程序的逻辑;而递归加锁,虽然可以让程序继续执行,但是会使得临界区的数据被破坏,造成程序也有可能会崩溃,如代码片段1使用互斥量A保护vector<int>的push_back操作,而代码段2使用互斥量A保护fun1(遍历该vector),但是fun1又会调用代码片段1,那么vector遍历的过程中vector的迭代器可能失效(push_back的缘故)


互斥量的实现

(C++封装Linux的pthread系统调用)

class Mutex final
{
public:
  Mutex(const Mutex&) = delete;
  Mutex& operator=(const Mutex&) = delete;

  explicit Mutex(bool processShared = false) :
    _processShared(processShared)
  {
    pthread_mutexattr_t attr;
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);

    if (processShared) {
        int shared;
        pthread_mutexattr_getpshared(&attr, &shared);
        assert(shared ==  PTHREAD_PROCESS_PRIVATE);
        pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
    }
   
    int err = pthread_mutex_init(&_mutex, &attr);
    (void) err;
  }

  ~Mutex()
  {
    int err = pthread_mutex_destroy(&_mutex);
    (void) err;
  }

  int lock()
  {
    return pthread_mutex_lock(&_mutex);
  }

  int unlock()
  {
    return pthread_mutex_unlock(&_mutex);
  }

  pthread_mutex_t* getMutexPtr()
  {
    return &_mutex;
  }

private:
  pthread_mutex_t _mutex;
  bool _processShared;
};
说明几点:

(1)_processShared参数为是否支持跨进程的互斥量,默认为单进程的false;互斥量的属性为PTHREAD_MUTEX_NORMAL,即不允许递归加锁;

(2)pthread_mutex_t* getMutexPtr()是为了条件变量而实现的,下文介绍;

(3)利用C++中构造函数和析构函数来初始化和销毁一个互斥量;


互斥量的使用

class MutexLockGuard final
{
public:
  MutexLockGuard(const MutexLockGuard&) = delete;
  MutexLockGuard& operator=(const MutexLockGuard&) = delete;

  explicit MutexLockGuard(Mutex& mutex) :
      _mutex(mutex)
  {
    _mutex.lock();
  }

  ~MutexLockGuard()
  {
    _mutex.unlock();
  }

private:
  Mutex& _mutex;
};

说明几点:

(1)MutexLockGuard中持有该_mutex;利用C++中构造函数和析构函数来申请和释放一个互斥量;


条件变量

基本知识

(1)当在临界区中,需要等待某个条件成立时,我们应该使用条件变量,在如下代码片段1中,如果_count 大于0时,我们需要等待该条件,即需要_cond.wait();该_cond.wait()过程是将会把调用线程放到等待条件的线程列表上,然后对该互斥量解锁;此时在互斥量解锁期间,又有新的线程进入该临界区,条件尚未发生,_cond.wait()会继续这一过程;

(2)在代码片段2中,首先会进行条件检查(已经被同一个互斥量锁主,睡眠的线程不可能错过),如果_count==0  _cond.wakeAll()将会唤醒线程,记住需要在条件变化后再唤醒线程;

(3)首先_cond.wait()需要在_mutex已经上锁的情况下才能调用,因为_cond.wait()涉及到解锁的过程;

(4)需要使用while (_count > 0),而不是 if (_count > 0),原因为当线程从_cond.wait()唤醒时,此时互斥量会继续被锁住(此时多个线程对互斥量争用的问题),很有可能此时的条件会被其他线程修改,造成_count > 0的条件不成立,因此需要继续判断的;

(5)多次执行_cond.wakeAll()发送信号时,如果没有任何线程阻塞在该等待条件列表上,那么这个信号会丢失,但是不影响程序;

代码片段1:

    MutexLockGuard lock(_mutex);
    while (_count > 0)    //impotant
      {
        _cond.wait();
      }

代码片段2:

    {
      MutexLockGuard lock(_mutex);
      --_count;
    }
    
    if (_count == 0)
      {
        _cond.wakeAll();
      }

条件变量的实现

class Condition final
{
public:
  Condition(const Condition&) = delete;
  Condition& operator=(const Condition&) = delete;

  Condition(Mutex& mutex) :
      _mutex(mutex)
  {
    int err = pthread_cond_init(&_cond, NULL);
    (void) err;
  }

  ~Condition()
  {
    int err = pthread_cond_destroy(&_cond);
    (void) err;
  }

  int wait()
  {
    return pthread_cond_wait(&_cond, _mutex.getMutexPtr());;
  }

  int waitForSeconds(size_t seconds)
  {
    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);

    ts.tv_sec += seconds;
    return pthread_cond_timedwait(&_cond, _mutex.getMutexPtr(), &ts);
  }

  int wake()
  {
    return pthread_cond_signal(&_cond);
  }

  int wakeAll()
  {
    return pthread_cond_broadcast(&_cond);
  }

private:
  Mutex& _mutex;
  pthread_cond_t _cond;
};

说明几点:

(1)wake为唤醒至少一个线程;而wakeAll为唤醒所有的线程;waitForSeconds(size_t seconds)为等待seconds秒后,条件还未出现,那么线程将会重新获得互斥量(此时多个线程对互斥量争用的问题);

(2)wait()的实现需要使用_mutex.getMutexPtr()中pthread_mutex_t类型的_mutex;


倒计时

基本用途

(1)主线程发起多个子线程,等待多个子线程完成一定的初始化(或任务)以后,主线程才继续执行;

(2)主线程发起多个子线程,等待主线程完成一定的初始化(或任务)以后,多个子线程才会继续执行;


倒计时的实现

class CountDownLatch final
{
public:
  CountDownLatch(const CountDownLatch&) = delete;
  CountDownLatch& operator=(const CountDownLatch&) = delete;

  explicit CountDownLatch(size_t counter) :
      _count(counter),
      _mutex(),
      _cond(_mutex)
  {

  }

  void wait()
  {
    MutexLockGuard lock(_mutex);
    while (_count > 0)    //impotant
      {
        _cond.wait();
      }
  }

  void countDown()
  {
    {
      MutexLockGuard lock(_mutex);
      --_count;
    }
    
    if (_count == 0)
      {
        _cond.wakeAll();
      }
  }

  size_t count() const
  {
    MutexLockGuard lock(_mutex);
    return _count;
  }

private:
  size_t _count;
  mutable Mutex _mutex;
  Condition _cond;
};
说明几点:

(1)在用途1中,主线程wait(),子线程进行countDown();在用途2中,主线程countDown(),子线程wait();

(2)size_t count() const中虽然只是取计数,但是MutexLockGuard设计到对_mutex的lock和unlock,因此_mutex是需要加mutable关键字的 ;

(3)在本情况下,if (_count == 0)  {_cond.wakeAll();}应该放在临界区外部;否则等待的线程睡眠醒来,临界区仍有可能被持有线程拥有;


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。