这里我们想研究的是jdk1.8中ConcurrentHashMap的addCount(long x, int check)
方法。如下所示在put方法的最后会触发addCount(long x, int check)
方法进行元素个数的统计。
我们再回顾一下另一个参数binCount :
- 在操作链表的分支
if (fh >= 0)
中 用于统计put前链表长度 - 在
if (f instanceof TreeBin)
分支中看到, binCount=2 , 该值被直接赋值常量 2
触发addCount的场景
- 在
putVal(K key, V value, boolean onlyIfAbsent)方法中最后会触发addCount(1L, binCount);
- 在
replaceNode
方法中会触发addCount(-1L, -1)
- 在
clear()
方法中触发addCount(delta, -1);
; - 在
compute
或者computeIfAbsent
或者computeIfPresent
方法中触发addCount((long)delta, binCount)
【1】addCount
添加到计数,若table太小且尚未调整大小,则触发transfer。如果当前正在扩容,则尝试帮助进行扩容调整。在transfer后重新检查占用情况,以查看是否需要另一次调整,因为resizings 是滞后的添加。
Rechecks occupancy after a transfer to see if another resize is already needed because resizings are lagging additions.
// x 表示需要 add 的数
// check < 0 ,不需要检查resize, check <= 1 only check if uncontended
private final void addCount(long x, int check) {CounterCell[] as; long b, s;// counterCells 默认为null,如果as为null且没有成功更新BASECOUNT就进入if// 如果as不为null,直接进入ifif ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;//记录是否存在竞争,true表示不存在竞争boolean uncontended = true;// m = as.length - 1//a = as[ThreadLocalRandom.getProbe() & m]// 如果a 不为null,那么更新a.value = a.value+xif (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||// 这里还会将CAS结果赋予uncontended !(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}// check <= 1 直接返回if (check <= 1)return;// 求和 baseCount+ΣCounterCell.values = sumCount();}// 这下面咱前面系列已经见过很多次了,这里就不再赘述了if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {// 这几个条件也是有bug的if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);// 需要注意的是,在扩容后,这里又触发了一次 sumCounts = sumCount();}}
}
从上面代码可以看到,统计的最终还是依赖于fullAddCount(x, uncontended)
和sumCount()
。
① 出现的几个变量&常量
//基本计数器值,主要在没有争用时使用,但在表初始化竞争期间也用作后备。通过CAS更新。
private transient volatile long baseCount;/*** Spinlock (locked via CAS) used when resizing and/or creating CounterCells.*///旋转锁(locked via CAS),当扩容或者创建CounterCells时使用
private transient volatile int cellsBusy;/*** Table of counter cells. When non-null, size is a power of 2.*/// 存放CounterCell的数组,不为null时,其是2的N次幂
private transient volatile CounterCell[] counterCells;// 对应变量baseCount
private static final long BASECOUNT=U.objectFieldOffset(k.getDeclaredField("baseCount"));
// 对应变量cellsBusy
private static final long CELLSBUSY=U.objectFieldOffset(k.getDeclaredField("cellsBusy"));
// 对应变量 CounterCell.value
private static final long CELLVALUE=U.objectFieldOffset(ck.getDeclaredField("value"));//数组的最大长度 tab.leght
private static final int MAXIMUM_CAPACITY = 1 << 30;// 扩容戳移动位数
private static int RESIZE_STAMP_BITS = 16;/*** The maximum number of threads that can help resize.* Must fit in 32 - RESIZE_STAMP_BITS bits.*/// 最大扩容线程数 65535
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;/*** The bit shift for recording size stamp in sizeCtl.*/// 扩容戳移位数 = 16
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
CounterCell是什么呢?用于分配计数的填充单元格。改编自LongAdder和Striped64。有关解释,请参阅其内部文档。所以,我们还得研究下LongAdder和Striped64。
/*** A padded cell for distributing counts. Adapted from LongAdder* and Striped64. See their internal docs for explanation.*/
@sun.misc.Contended static final class CounterCell {volatile long value;CounterCell(long x) { value = x; }
}
② 触发fullAddCount的分支
第一个if
if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))
进入if 方法体的场景:
- counterCells不为null
- counterCells为null,但是不能CAS更新BASECOUNT=BASECOUNT+x
第二个if
if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)))
进入if 方法体的场景(as=counterCells):
- ① as 为null;
- ② as 不为null,但是
(m = as.length - 1) < 0
,这里其实先给 m 进行了赋值,然后判断。如果判断为真,那么说明counterCells是一个空数组。 - ③ ①②都不满足,
a = as[ThreadLocalRandom.getProbe() & m]) == null
。这里获取了一个CounterCell 赋予了a。 - ④ 不能更新CELLVALUE 为 a.value+x。
③ 统计所有CounterCell的value和
也就是baseCount+ΣCounterCell.value
。
final long sumCount() {CounterCell[] as = counterCells; CounterCell a;// 将sum更新为baseCountlong sum = baseCount;if (as != null) {// 遍历每一个CounterCell 获取value进行累加for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}// 返回sumreturn sum;
}
【2】LongAdder
LongAdder 继承自Striped64,也是java.util.concurrent.atomic
包下的一个原子类。想要搞明白fullAddCount,必须搞懂LongAdder。
一个或多个变量均持有 long sum(初始零)。当线程并发更新(比如add方法)时,变量集可能会动态增长以减少竞争。方法sum(或等价的longValue)返回持有sum的变量的合计值。
当多个线程更新用于收集统计信息而不是细粒度同步控制的公共和时,此类通常优于AtomicLong。在低更新竞争下,这两个类具有相似的特性。但在高竞争情况下,此类的预期吞吐量显著更高,但代价是更高的空间消耗。
LongAdder可以和ConcurrentHashMap
一起使用以维持一个 可伸缩的 frequency map(a form of histogram or multiset)。例如,要将一个计数添加到一个ConcurrentHashMap<String,LongAdder> freqs
,如果尚未初始化,那么可以使用 freqs.computeIfAbsent(k -> new LongAdder()).increment();
。
这个类继承自Number,但是没有定义方法诸如equals、hashCode和compareTo。因为实例常常会发生改变,所以作为集合的键不是那么有用。
如下是其add方法,可以看到ConcurrentHashMap的addCount方法是参考了这个add方法。
public void add(long x) {Cell[] as; long b, v; int m; Cell a;if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended);}
}
如下所示是其 sum 方法,与ConcurrentHashMap中sumCount()
方法可以说简直一致。
public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;
}
【3】fullAddCount
// See LongAdder version for explanationprivate final void fullAddCount(long x, boolean wasUncontended) {int h;if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {CounterCell[] as; CounterCell a; int n; long v;if ((as = counterCells) != null && (n = as.length) > 0) {if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new CellCounterCell r = new CounterCell(x); // Optimistic createif (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try { // Recheck under lockCounterCell[] rs; int m, j;if ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehashelse if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))break;else if (counterCells != as || n >= NCPU)collide = false; // At max size or staleelse if (!collide)collide = true;else if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try {if (counterCells == as) {// Expand table unless staleCounterCell[] rs = new CounterCell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];counterCells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = ThreadLocalRandom.advanceProbe(h);}else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean init = false;try { // Initialize tableif (counterCells == as) {CounterCell[] rs = new CounterCell[2];rs[h & 1] = new CounterCell(x);counterCells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break; // Fall back on using base}}