首页 > 编程学习 > ConcurrentHashMap源码分析(二)

ConcurrentHashMap源码分析(二)

发布时间:2022/5/14 19:03:59

目标

  1. 为什么ConcurrentHashMap1.8需要去除Segments分段锁
  2. 为什么ConcurrentHashMap1.8使用synchronized而不使用lock锁
  3. ConcurrentHashMap1.8如何基于Node节点实现锁机制
  4. ConcurrentHashMap多线程size++效率最高
  5. 多个线程同时执行CAS效率真的非常高吗
  6. ConcurrentHashMap1.8与ConcurrentHashMap1.7区别

前面我们看过了ConcurrentHashMap1.7的源码,现在看看ConcurrentHashMap1.8的源码。
ConcurrentHashMap1.8采用的数据结构是数组+链表+红黑树。
没错,你没看错,我也没写错,和HashMap的数据结构一模一样,那他是怎么保证线程安全的呢?使用CAS和synchronized锁来保证安全,使用synchronized锁住需要进行put的那个链表。
接下来看看源码吧。
先看他的无参构造。
在这里插入图片描述
可以看到这个无参构造是没有任何处理的,这也间接的说明了ConcurrentHashMap的初始化时懒加载。
看完这部分,我们看看put()方法。
在这里插入图片描述
和HashMap挺像的。都调用着putVal()方法,进入看看。

final V putVal(K key, V value, boolean onlyIfAbsent) {
     if (key == null || value == null) throw new NullPointerException();
     int hash = spread(key.hashCode());
     int binCount = 0;
     for (Node<K,V>[] tab = table;;) {
         Node<K,V> f; int n, i, fh;
         if (tab == null || (n = tab.length) == 0)
             tab = initTable();
         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
             if (casTabAt(tab, i, null,
                          new Node<K,V>(hash, key, value, null)))
                 break;                   // no lock when adding to empty bin
         }
         else if ((fh = f.hash) == MOVED)
             tab = helpTransfer(tab, f);
         else {
             V oldVal = null;
             synchronized (f) {
                 if (tabAt(tab, i) == f) {
                     if (fh >= 0) {
                         binCount = 1;
                         for (Node<K,V> e = f;; ++binCount) {
                             K ek;
                             if (e.hash == hash &&
                                 ((ek = e.key) == key ||
                                  (ek != null && key.equals(ek)))) {
                                 oldVal = e.val;
                                 if (!onlyIfAbsent)
                                     e.val = value;
                                 break;
                             }
                             Node<K,V> pred = e;
                             if ((e = e.next) == null) {
                                 pred.next = new Node<K,V>(hash, key,
                                                           value, null);
                                 break;
                             }
                         }
                     }
                     else if (f instanceof TreeBin) {
                         Node<K,V> p;
                         binCount = 2;
                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) != null) {
                             oldVal = p.val;
                             if (!onlyIfAbsent)
                                 p.val = value;
                         }
                     }
                 }
             }
             if (binCount != 0) {
                 if (binCount >= TREEIFY_THRESHOLD)
                     treeifyBin(tab, i);
                 if (oldVal != null)
                     return oldVal;
                 break;
             }
         }
     }
     addCount(1L, binCount);
     return null;
 }

其实我们要是阅读过了HashMap那一篇文章,这一部分应该就都看懂了。

  1. if (key == null || value == null)
    ConcurrentHashMap不支持key和value为空
  2. int hash = spread(key.hashCode());
    计算hash值
  3. int binCount = 0;
    记录链表的长度,达到转为红黑树的要求就进行转换
  4. for (Node<K,V>[] tab = table;;)
    进行死循环,说好听点就是自旋,自旋我们在1.7也详细的说过了。
  5. if (tab == null || (n = tab.length) == 0)
    如果全局table是空数据,那么就进行初始化。后面再来详细的说说怎么初始化
  6. (f = tabAt(tab, i = (n - 1) & hash)) == null
    计算index值,并且拿到该下标的数组,若为空,那就创建新的链表
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
  1. casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))
    创建新的链表
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
 }
  1. (fh = f.hash) == MOVED
    若为-1,代表着有线程正在进行扩容,那么就帮其进行扩容
  2. synchronized (f)
    对数组的链表进行加锁,为什么之前没有使用 synchronized 锁,因为在jdk1.5的时候,synchronized 没有做任何的优化,在jdk1.6的时候,添加了自旋的功能,和之前使用tryLock()实现自旋一个效果。
  3. tabAt(tab, i) == f
    判断之前拿到的 f 是否和现在的拿到的链表一样。若不同则再次自旋,拿到新的链表f
  4. e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))
    若key一模一样,则直接替换原来的value
  5. (e = e.next) == null
    添加新数据到链表的尾部
  6. if (f instanceof TreeBin)
    若链表已经变成了红黑树,则将新数据添加到树种
  7. binCount >= TREEIFY_THRESHOLD
    若达到变成树的要求,则直接变成树。
  8. addCount(1L, binCount);对binCount进行自增,因为需要记录链表的的长度,但是ConcurrentHashMap是官方推荐在多线程使用的,所以使用了一个方法进行自增。

我们先来看看第5点,初始化table数组
tab = initTable();

private final Node<K,V>[] initTable() {
     Node<K,V>[] tab; int sc;
     while ((tab = table) == null || tab.length == 0) {
         if ((sc = sizeCtl) < 0)
             Thread.yield(); // lost initialization race; just spin
         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
             try {
                 if ((tab = table) == null || tab.length == 0) {
                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                     @SuppressWarnings("unchecked")
                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                     table = tab = nt;
                     sc = n - (n >>> 2);
                 }
             } finally {
                 sizeCtl = sc;
             }
             break;
         }
     }
     return tab;
 }
  1. (tab = table) == null || tab.length == 0
    再次判断table数组是否为空,并且将它作为自旋的判断条件。
  2. (sc = sizeCtl) < 0
    如果sizeCtl小于0,代表目前有线程正在进行数组的初始化,那么就释放此线程
  3. U.compareAndSwapInt(this, SIZECTL, sc, -1)
    从主内存中修改SIZECTL值,变相的告诉其他线程“我”正在初始化数组
  4. (tab = table) == null || tab.length == 0
    再次判断数组是否为空,若还是为空,那么就进行初始化了
  5. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
    如果sc大于0,则直接利用默认的数组大小16进行初始化
  6. sc = n - (n >>> 2)
    这一段看似复杂,但其实还是很熟悉的,假如n为16,那么(n >>> 2)为4,那么sc的结果就是12,16*0.75=12,0.75加载因子 ,熟悉吧

在数组初始化这一段代码之中,存在一个bug,假如有两个线程,线程1、2均进入到else if部分。线程1完成了else if的数据更改,突然断电,线程1直接被结束了,那么线程2进行数据更改失败,又回到判空的while条件,发现table仍为空,但是sc为-1,所以线程被释放,修改数据sc又失败,那么线程2就会一直这样死循环下去,达不到break;

看完数组初始化,再来看看addCount(1L, binCount);方法

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}
  1. (as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
    判断 counterCells 是否为空。如果为空,就通过 cas 操作尝试修改 baseCount 变量,对这个变量进行原子累加操作 (如果在没有竞争的情况下,仍然采用 baseCount 来记录元素个数)
    2.如果 cas 失败说明存在竞争,这个时候不能再采用 baseCount 来累加,而是通过CounterCell 来记录
    这样就保证了线程的安全性,假如不这样使用,那么假如10多个线程同时进行CAS,只有一个线程成功,那么其他线程则会阻塞,导致效率及其慢
  2. boolean uncontended = true;
    线程是否冲突标识,默认没有冲突
  3. ThreadLocalRandom.getProbe() & m
    Random 在线程并发的时候会有性能问题以及可能会产生相同的随机数
    ThreadLocalRandom.getProbe 可以解决这个问题,并且性能要比 Random 高
  4. s = sumCount();
    对counterCells数组进行求和
  5. as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
    1.计数表counterCells为空则直接调用 fullAddCount
    2.从计数表中随机取出一个数组的位置为空,直接调用 fullAddCount
    3.通过 CAS 修改 CounterCell 随机位置的值,如果修改失败说明出现并发情况,调用fullAndCount
  6. if (check <= 1)
    链表长度小于等于 1,不需要考虑扩容

为什么在全局变量中使用volatile?
加上volatile及时读取最新主内存数据,保证线程的可见性。

1.7和1.8的区别?
在put()方法中,1.7是需要计算两次index值,而1.8只需要计算一次,并且查询速度比1.7快。1.8取消了Lock锁,而使用synchronized。并且去掉了Segment分段锁

Copyright © 2010-2022 ngui.cc 版权所有 |关于我们| 联系方式| 豫B2-20100000