【Flume】从flume的监控度量数据XXXCounter来看JAVA并发编程中的CAS操作

图示

技术分享

如上图所示红框部分,本人在做稳定性测试的时候,当flume运行几天后,我发现这个counter值逐渐变大,到一定值后,又变小了,有一个循环的过程,故而对此产生研究的欲望,下面来看看:

if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }
incrementBatchEmptyCount方法点进去看看

 public long incrementBatchEmptyCount() {
    return increment(COUNTER_BATCH_EMPTY);
  }
protected long increment(String counter) {
    return counterMap.get(counter).incrementAndGet();
  }
public final long incrementAndGet() {
        for (;;) {
            long current = get();
            long next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }
以上这种写法乍一看很奇怪,少了一个return的分支,其实它的意思是,当if条件为false的时候,什么都不做,不会return,只有if为true的时候,才会return

public final boolean compareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }
到这里就不能再进去了,再往下就是JDK  native的方法实现了,有需要的可以看下这个链接
这里就是CAS操作了【compare and swap/set】意思是指在set之前先比较该值有没有变化,只有在没变的情况下才对其赋值。

例如你对一个变量进行++操作的时候,你要先判断这个变量本身在做++操作前有没有发生变化,这种情况就会在多线程环境下发生,有锁的机制可以解决此问题,但是不用锁,用CAS也可以解决!!

上面代码的调用者是Unsafe,提供一个针对于volatile变量的操作,其原理是直接调用的CPU的CAS动作。

在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁

锁机制存在以下问题:

(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题

(2)一个线程持有锁会导致其它所有需要此锁的线程挂起

(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险

volatile是不错的机制,但是volatile不能保证原子性。因此对于同步最终还是要回到锁机制上来。

【附】volatile的非原子性

首先说一下,什么时候情况下全局变量需要有volatile修饰呢?比如,我们大部分的全局变量都是用锁保护的,那么是否还需要volatile呢?答案是否定的,即当全局变量由锁保护的时候,该全局变量是不需要volatile的。原因有二:一是锁保证了临界区的串行;二是锁的实现中有内存屏障,保证临界区访问全局变量为最新值。那么,就可以总结出,当没有锁保护的全局变量是需要使用volatile修饰的,如以下这种情况:
1. 全局变量int exit_flag = 0;
2. 线程1的主循环的退出条件是检查exit_flag是否为1,为1,则退出主循环;
3. 线程2在某些情况下,修改exit_flag为1。
另外如果是异步情况,即没有线程2,而有一个信号处理函数,在收到指定信号的情况下将exit_flag赋值为1。
这时,exit_flag都需要使用volatile来修饰。不然对于线程1的代码,如果编译器发现在线程1的代码中没有任何地方修改exit_flag,有可能会将exit_flag放入寄存器缓存。这样,在每次的条件检查的时候,都是从寄存器中读取,而非exit_flag对应的内存。这样将导致每次读取的值都为0,从而导致thread1无法退出。而使用volatile修饰exit_flag,会避免编译器做这种优化,强制每次读取都是从内存中读取,这样就可以保证了在exit_flag置为1时,thread1可以读取到最新值而退出。

volatile只提供了保证访问该变量时,每次都是从内存中读取最新值,并不会使用寄存器缓存该值——每次都会从内存中读取
而对该变量的修改,volatile并不提供原子性的保证。那么编译器究竟是直接修改内存的值,还是使用寄存器修改都符合volatile的定义。所以,一句话,volatile并不提供原子性的保证。


独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。

即:

独占锁——synchronized

乐观锁——CAS【compare and swap/set】

CAS有三个操作:当前内存中的值V,预期值A,更新后的值B,只有当A==V的时候,才会更新为B,否则nothing

以AtomicLong为例

 public final long incrementAndGet() {
        for (;;) {
            long current = get();
            long next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }
compareAndSet方法调用native方法去实现CAS操作,current是在该方法之前获取到的一个当前值,通过get拿到的,是一个volatile定义的变量,也就是从内存中读到的值,那么在做update之前当前值与刚才从内存中读到的值再次比对一下,相等则update到next值,否则,什么都不做。
protected long increment(String counter) {
    return counterMap.get(counter).incrementAndGet();
  }
看下上面这个flume里的代码,counterMap.get(counter)我们可以理解为内存值【当前值】,在incrementAndGet方法内部的current=get()为预期值,next即为更新值

以上部分内容来自于网络,未注明作者,还请海涵!


望不吝指教!!



郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。