基于锁分段机制的 ConcurrentHashMap 实现内幕

ConcurrentHashMap 是线程安全的 HashMap。此前,HashTable 一直被认为是线程安全的 HashMap,ConcurrentHashMap 相对于 HashTable 采用了 锁分段机制 ,即将原本对整个对象加锁的实现进行粒度细化。这也是源于 HashMap 基本的存储特性,因为许多读写请求都是被哈希到了互相独立的区域,这种情况下即使并发读写也不会相互影响,更不会有线程安全问题,而对于操作加全局锁的实现方式显然是浪费了这一天然的并发优势,我们需要加锁的位置是真正存在竞争的地方,而分段锁很好的利用了这一特点。

一. 存储结构设计

在实现上,ConcurrentHashMap 内置了两个静态内部类 HashEntry 和 Segment,前者用来封装 key/value,后者则用来充当锁的角色。一个 Segment(分段)上挂载着一张包含多行记录的表(table),负责范围内各个结点读写的线程安全,每行记录都是由一个以 HashEntry 为结点的链表构成。

image

上面这张图具体描绘了 ConcurrentHashMap 的存储结构设计,默认情况下一个 ConcurrentHashMap 包含 16 个分段(Segment),每个分段包含一张表(HashEntry 类型数组 table),而表中的每行记录都是一个包含了若干个 HashEntry 类型结点的链表,HashEntry 可以看作是整个存储结构中最小的存储单元。在这样的存储结构基础上,我们可以初步臆测 ConcurrentHashMap 存储、读取数据的实现过程,概括来说就是先基于 key 的哈希值定位到具体的分段,然后映射到对应的表,最后就是对表中链表的常规遍历操作。而在这个过程中,锁的粒度控制在一个分段范围内,也就是说只有哈希到同一个分段内的多个读写操作才会存在竞争,而分段与分段之间的读写操作则互不影响、并发执行。段内竞争的线程安全则以具体的分段对象作为锁来执行同步策略,所以在默认理想情况下,一个 ConcurrentHashMap 具备 16 个线程的并发读写能力,这相对于对整个存储对象加全局锁的 HashTable 来说,效率提升了许多。

我们先来分别看一下 HashEntry 和 Segment 这两个基础组件的内部结构实现,首先来看一下 HashEntry:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static final class HashEntry<K,V> {
final int hash; // 哈希值,key 的 hash 值,不允许修改,所以用 final 修饰,同时也保证了可见性
final K key; // 键,final 修饰保证不允许修改与可见性
volatile V value; // 值,volatile 修饰保证值的可见性,value 的值允许被更新
volatile HashEntry<K,V> next; // 链表 next 指针

HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}

// 省略部分实现
}

HashEntry 是对 key/value 的封装类,可以将一个 HashEntry 对象看做是整个存储结构中的最小单位,它是一个结点,包含了必要的 hash、key、value 元素,因为采用链地址法来解决冲突,所以还包含了链表的 next 指针。四个属性中 hash 和 key 是 final 修饰的,而 value 和 next 则是由 volatile 来修饰,这样的设计最根本的都是希望保证在并发访问过程中结点的线程可见性,又因为后两者是可变的对象,所以采用 volatile 进行修饰。

再来看一下 Segment 的实现,Segment 实现了 ReentrantLock 类,所以可以将其视为一个针对 HashMap 作了适配的可重入锁,每个 Segment 对象的责任就是用来保证其守护的表内各个结点访问的线程安全:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static final class Segment<K,V> extends ReentrantLock implements Serializable {

private static final long serialVersionUID = 2249069246763182397L;

static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

/** 分段内挂载的表,表中包含了若干条记录(对应数组大小),每条记录都是一个由一系列 HashEntry 结点构成的链表 */
transient volatile HashEntry<K,V>[] table;

/** 计数当前 Segment 范围内 HashEntry 结点的数目(各个槽位上链表长度之和) */
transient int count;

/** table 被修改的次数,可以用来表示一段时间内分段内是否有操作行为(不包含更新 key 对应的 value 行为) */
transient int modCount;

/**
* 再散列阈值,当 table 范围内包含的 HashEntry 结点的数量超过该阈值时,将触发再散列
* threshold = capacity * loadFactor
*/
transient int threshold;

/** 装载因子 */
final float loadFactor;

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}

// 省略 put、rehash、scanAndLockForPut、scanAndLock、remove、replace 方法的实现
}

Segment 可以理解为分段,默认 ConcurrentHashMap 包含 16 个分段,对应的属性已经在代码中注释,这里再进一步讲解一下 table 属性,因为它是存储结构的一份子。table 属性是一个 HashEntry 类型的数组,每个数组元素都是一个 HashEntry 链表。HashEntry 的名字取的个人觉得既好也不好,说其好是因为每个链表的起始节点也是 HashEntry 类型,可以形象的称其为“入口”,但是具体到链表中的每个结点应该用 Node 来表示更加形象,不过真正理解了也就不会因为名字而疑惑。

接下来我们继续探究 ConcurrentHashMap 基于 HashEntry 和 Segment 的实现,实际上是对前面的存储结构图示的具体描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;

/** 默认分段的数量 */
static final int DEFAULT_INITIAL_CAPACITY = 16;

/** 默认的装载因子 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/** 默认的并发度 */
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/** 分段 table 内最大的行记录数 */
static final int MAXIMUM_CAPACITY = 1 << 30;

/** 分段 table 内行记录数最小值,必须是二次幂 */
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

/** 分段的最大数目 */
static final int MAX_SEGMENTS = 1 << 16; // 保守值

/** 加锁之前的重试次数 */
static final int RETRIES_BEFORE_LOCK = 2;

/** 掩码值,用来计算分段数组的下标,以定位具体的分段 */
final int segmentMask;

/** 偏移量 */
final int segmentShift;

/** 分段数组 */
final Segment<K,V>[] segments;

transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;

// 构造函数
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();

if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;

// 寻找最佳匹配参数,大于给定参数 concurrencyLevel 的最小二次幂
// 必须是二次幂是用来保证能够通过按位与来定位具体的分段
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift; // 默认为 4
ssize <<= 1; // 默认为 16
}
this.segmentShift = 32 - sshift; // 偏移量,默认为 28
this.segmentMask = ssize - 1; // 掩码值,默认为 15

if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;

int c = initialCapacity / ssize; // 16 / 16 = 1
if (c * ssize < initialCapacity) ++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY; // cap 表示每个分段中 table 的最大长度
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}

// 省略剩余方法实现
}

具体的属性含义参考代码注释,我们来看一下构造函数的内部逻辑。构造函数允许我们指定 initialCapacity、loadFactor,以及 concurrencyLevel 三个参数值,其中 loadFactor 译为装载因子,在数据结构课程中讲解哈希冲突时已经对相关概念解释的比较清楚,这里不再多做说明,默认为 0.75,一般不建议修改。

我们来看一下 initialCapacity 和 concurrencyLevel 两个参数,这两个参数一起用来控制构造 map 的初始化大小,其中 concurrencyLevel 用来控制分段的数目,构造函数会计算得到大于该值的最小二次幂作为 map 的分段数目,因为分段数目的多少对应着并发度的多少,所以参数命名为 concurrencyLevel 也不难理解。之所以需要用大于该值的最小二次幂作为分段大小,主要是为了配合哈希函数的定义, 方便通过逻辑与运算来定位具体的分段 ,相对于数值运算来说这样效率更高。再来看一下 initialCapacity,该参数用来控制整个 map 集合包含结点的的初始值,最终还是用来控制分段内表的记录行数。

二. 基本方法实现

探究了 ConcurrentHashMap 的存储结构设计,我们继续来看一下常用操作的实现细节。对于 Map 集合来说,常用的方法有添加(put)、获取(get)、删除(remove)键值,以及对这些基本方法的包装方法,此外还有获取集合大小(size)等操作,下面我们针对 ConcurrentHashMap 中典型方法逐个说明。

2.1 添加或更新键值对:put(K key, V value)

首先来看一下 put 方法,我们都知道 put 操作会修改集合中的数据,也就是写操作,而写操作若希望保证线程安全和可见,则需要一定的同步策略。ConcurrentHashMap 的写操作相对于 HashTable 要高效很多,这主要归功于锁分段机制,具体的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();

// 计算键对应的哈希值
int hash = hash(key);

/*
* 基于键的哈希值找到对应的 Segment 对象
* 将 hash 向右移动 segmentShift 偏移位,并与 segmentMask 执行 与 操作
*/
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);

// 调用 Segment 的 put 方法,执行 put 操作
return s.put(key, hash, value, false);
}

上述过程的逻辑比较明确,可以概括成如下三步:

  1. 调用 hash 方法计算 key 的哈希值
  2. 基于 key 的哈希值定位具体的分段 Segment
  3. 调用 Segment 对象的 put 方法执行 put 逻辑

从整个操作过程我们可以很直观的理解锁分段的实现细节,Segment 就是这里的锁对象,put 操作会首先定位当前需要操作的具体 Segment 对象,然后调用该对象的 put 方法执行写入或更新操作,而具体的锁策略就位于 Segment 的 put 方法内,这个我们后面再深入探究,先来看一下定位具体分段的过程,首先看一下 hash 算法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private int hash(Object k) {
int h = hashSeed; // 调用 randomHashSeed(this) 方法计算得到

if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}

h ^= k.hashCode();

// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
1
2
3
4
5
6
7
8
9
// 计算随机的 hash 种子,减少哈希碰撞
private transient final int hashSeed = randomHashSeed(this);

private static int randomHashSeed(ConcurrentHashMap instance) {
if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {
return sun.misc.Hashing.randomHashSeed(instance);
}
return 0;
}

之所以不直接使用 jdk 提供的 hashCode() 方法,而是在此基础上进行改写(Wang/Jenkins Hash),主要是考虑到原生哈希算法冲突的可能性较大,而哈希均衡是 ConcurrentHashMap 良好性能的基础,下面的例子证明了改写之后的算法在哈希均衡性上优于原生算法:

我做了一个测试,不通过再哈希而直接执行哈希计算。

System.out.println(Integer.parseInt(“0001111”, 2) & 15);
System.out.println(Integer.parseInt(“0011111”, 2) & 15);
System.out.println(Integer.parseInt(“0111111”, 2) & 15);
System.out.println(Integer.parseInt(“1111111”, 2) & 15);

计算后输出的哈希值全是15,通过这个例子可以发现如果不进行再哈希,哈希冲突会非常严重,因为只要低位一样,无论高位是什么数,其哈希值总是一样。我们再把上面的二进制数据进行再哈希后结果如下,为了方便阅读,不足32位的高位补了0,每隔四位用竖线分割下。

0100|0111|0110|0111|1101|1010|0100|1110
1111|0111|0100|0011|0000|0001|1011|1000
0111|0111|0110|1001|0100|0110|0011|1110
1000|0011|0000|0000|1100|1000|0001|1010

可以发现每一位的数据都散列开了,通过这种再哈希能让数字的每一位都能参加到哈希运算当中,从而减少哈希冲突。

例子引用自 聊聊并发 — 深入分析ConcurrentHashMap

然后方法会基于计算得到的哈希值定位具体的分段:

1
2
3
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 基于下标获取对应的分段对象,如果不存在则基于 CAS 策略创建
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0];
int cap = proto.table.length; // 分段内 table 的长度
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

默认在构造 ConcurrentHashMap 对象时,segmentShift 值为 28,segmentMask 值为 15,而 hash 值为 32 为,所以这里的计算逻辑是对 hash 右移 28 位,再与 segmentMask 执行与操作,目的是让高 4 位也参与到哈希运算中,最终计算得到对应 segments 数组下标。然后基于下标获取对应的分段,如果分段不存在的话,则会基于 CAS 策略创建并记录相应的分段对象。

接着会调用获取到的 Segment 对象的 put 方法执行 put 操作,记录 key/value 值,其中包含了分段锁机制的具体实现细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// java.util.concurrent.ConcurrentHashMap.Segment#put
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 加锁,如果 tryLock 成功则继续,否则循环尝试加锁
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table; // 当前 Segment 的 table
int index = (tab.length - 1) & hash; // 计算对应的 table 下标
HashEntry<K,V> first = entryAt(tab, index); // 获取对应的 entry
for (HashEntry<K,V> e = first;;) {
if (e != null) { // 对应 entry 上存在链表,且当前还在链表结点上
K k;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
// 找到相同的 key,则更新对应的 value
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next; // 遍历链表上的下一个结点
} else { // 对应 entry 上没有链表,或已经达到链尾
if (node != null)
node.setNext(first); // 在链表上前置插入新的结点
else
node = new HashEntry<K,V>(hash, key, value, first); // 创建新的结点
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 再散列,table 长度扩充一倍
else
setEntryAt(tab, index, node); // 更新 entry
++modCount; // 只有当前新插入结点时才计数,如果仅仅是更新已有 key 的 value 则不计数
count = c;
oldValue = null; // 因为是新的结点,所以 oldValue = null
break;
}
}
} finally {
unlock(); // 释放锁
}
return oldValue;
}

如代码注释所示,整个过程还是比较直观的,在代码一开始会尝试锁定当前分段对象,锁定之后就是常规的链表操作,因为整个操作位于临界区中,所以无需再考虑线程安全问题。而具体的实现正如我们一开始简单臆测的一样,首先通过与操作定位 key 在表中的记录行,然后对行上的链表执行遍历,如果对应的 key 已经存在则更新对应的 value,否则在链表最前端插入新的结点。因为 HashEntry 中 value 属性采用 volatile 修饰,所以能够保证更新操作能够立即对 get 线程可见,而之所以要采取前置插入是考虑到一个概率问题,HashMap 的开发者认为 后面插入的 Entry 被检索的概率更大,从而提升整体 get 性能 ,并且读操作是不需要加锁的,个人觉得前置插入还可以保证在插入的过程中如果存在读线程在遍历当前链表,则插入操作不会对整个遍历过程产生影响。

2.2 获取指定键值:get(Object key)

再来看一下 get 操作,相对于 put 操作而言,get 要简单许多,这是因为读的过程是不需要加锁的,HashEntry 的设计能够保证节点变更的线程可见性。而结合对于存储结构设计的理解,即使不看源码,我们也能够大致知道整个过程的实现细节,这里不再展开说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key); // 计算 key 的哈希值
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
// 获取对应的 entry,并遍历链表上的结点
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
// 找到对应的 key,返回对应的 value
return e.value;
}
}
// key 不存在,返回 null
return null;
}

2.3 删除指定键值:remove(Object key)

remove 操作类似于 put 操作,都是需要对存储结构进行修改的操作,put 是插入或更新结点,而 remove 则是删除结点,所以都需要进行加锁操作。整个过程同样也分为三个步骤:

  1. 计算 key 对应的哈希值
  2. 基于 key 的哈希值定位具体的分段 Segment
  3. 调用 Segment 对象的 remove 方法执行删除逻辑
1
2
3
4
5
6
7
8
public V remove(Object key) {
// 1. 计算 key 对应的哈希值
int hash = hash(key);
// 2. 根据 key 的哈希值找到对应的 Segment 对象
Segment<K,V> s = segmentForHash(hash);
// 3. 调用 Segment 对象的 remove 方法执行删除
return s == null ? null : s.remove(key, hash, null);
}

Segment 对象作为具体的锁对象,所以加锁的细节同样位于 Segment 的 remove 方法中(如下),而具体的实现细节比较直观,做代码注释所示,不再过多撰述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final V remove(Object key, int hash, Object value) {
// 尝试加锁
if (!tryLock()) scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index); // 找到 key 对应的 entry
HashEntry<K,V> pred = null;
// 遍历 entry 上的链表寻找目标 key
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
// 找到了目标 key
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
V v = e.value;
// 如果没有制定期望值,或者当前结点的 value 等于期望值,则执行删除逻辑
if (value == null || value == v || value.equals(v)) {
if (pred == null)
// 没有前置结点,则直接将目标结点的 next 结点作为起始节点
setEntryAt(tab, index, next);
else
// 存在前置结点,则修改前置结点的 next 指向目标结点的 next
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock(); // 释放锁
}
return oldValue;
}

2.4 获取集合的大小:size()

最后我们来一起看一下 size 方法,该方法用于返回当前 map 集合所包含的结点总数目。前面讲解 Segment 类设计的时候,介绍过该类包含一个 count 属性,用于记录分段内结点的总数,并且在具体插入或删除分段内结点时会更新该属性值,那么是不是说这里的 size 方法只需要对所有分段的 count 累加求和返回就行了呢?如果是这样的话那么这里势必要对所有分段加锁,以保证计算的过程中分段内的结构不被更改,否则获取到的 count 极有可能是过期的数据,但是这样的操作性能上将是不太乐观的。

方法 size 的具体实现并没有一上来就加锁,而是先尝试两遍不加锁的累加求和方式,如果两次中所有结点的总数并没有发生变化,则认为计算得到的 size 是可信的,直接返回而不需要加锁,否则再加锁求和也不迟,基于概率统计,这样的设计能够在很大程序上提升整个 ConcurrentHashMap 的性能。具体实现见下面代码注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // 标识当前元素总量是否超过 int 类型上限
long sum; // 各个分段内修改数量的总数
long last = 0L; // 记录上一次各个分段内修改数量的总数
int retries = -1; // 重试次数
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) { // 达到重试最大次数,逐个锁定分段
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L; // 记录当前各个分段的修改总量
size = 0; // 记录所有分段的元素总量
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount; // 累加分段内修改的次数
int c = seg.count; // 累加分段内元素的总量
if (c < 0 || (size += c) < 0)
overflow = true; // 标识溢出
}
}
if (sum == last)
break; // 两次检查修改量都没有变化,说明计算的长度 size 可信
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock(); // 释放锁
}
}
// 如果溢出则返回 int 最大值,否则返回真实 size
return overflow ? Integer.MAX_VALUE : size;
}

以上主要列举了典型的几个操作的具体实现细节,实际上只要理解了整个 ConcurrentHashMap 的存储结构设计,具体方法的实现源码阅读起来将会简单许多,甚至有时候会觉得就是应该这样编码实现。不得不说锁分段技术是一种很巧妙的设计,简单的技术原理,简单的源码实现,却能够极大的提升并发的性能。

但最后还是要泼一盆凉水,在写作本文时,jdk 1.8 已经发布,1.8 版本的 ConcurrentHashMap 彻底摒弃了锁分段技术,转而采用 CAS 机制,并引入了红黑树以解决链表过长而导致的性能低下问题,所有的改进都是为了性能的提升,不过这也增加了代码的实现复杂度,1.7 版本的 ConcurrentHashMap 源码仅 1600 多行,而 1.8 版本则增加到 6000 多行,如此复杂的实现不得不让人再次敬佩 Doug Lea 大师,以后抽时间笔者再来用一篇《基于 CAS 机制的 ConcurrentHashMap 实现内幕》的文章,从源码层面探究 1.8 版本 ConcurrentHashMap 的实现细节。

参考

  1. jdk 1.7 源码
  2. java 并发编程实战
  3. 探索 ConcurrentHashMap 高并发性的实现机制
  4. 聊聊并发 — 深入分析 ConcurrentHashMap