线程安全的集合
Java核心技术卷(10th)读书笔记
如果多线程要并发地修改一个非线程安全的数据结构, 例如散列表, 那么很容易会破坏这个数据结构。 可以通过提供锁来保护共享数据结构, 但是选择线程安全的实现作为替代可能更容易。 下面将讨论 Java 类库提供的另外一些线程安全的集合。
高效的映射、集和队列
java.util.concurrent
包提供了映射、 有序集和队列的高效实现:ConcurrentHashMap、 ConcurrentSkipListMap > ConcurrentSkipListSet 和 ConcurrentLinkedQueue
。
这些集合使用复杂的算法,通过允许并发地访问数据结构的不同部分来使竞争极小化。 与大多数集合不同,size 方法不必在常量时间内操作。确定这样的集合当前的大小通常 需要遍历。
集合返回弱一致性( weakly consistent) 的迭代器。这意味着迭代器不一定能反映出它 们被构造之后的所有的修改,但是,它们不会将同一个值返回两次,也不会拋出 Concurrent ModificationException
异常。
在 Java SE 8中, 还可以使用 ConcurrentHashMap
。
map.putlfAbsent(word, new LongAdder());
map.get(word).increment();
// 上述两句话可以合并为下面这一句
map.putlfAbsent(word, new LongAdderO).increment():
对并发散列映射的批操作
Java SE 8 提供了一些可以更方便地完成原子更新的方法。调用 compute 方法时可以提供 一个键和一个计算新值的函数。这个函数接收键和相关联的值(如果没有值,则为 null), 它 会计算新值。例如,可以如下更新一个整数计数器的映射:
map.compute(word , (k, v) -> v = null ? 1: v + 1);
另外还有 computelfPresent
和 computelf bsent
方法,它们分别只在已经有原值的情况下计 算新值,或者只有没有原值的情况下计算新值。可以如下更新一个 LongAdder
计数器映射:
map.computelfAbsent(word , k -> new LongAdder()).increment()
;
这与之前看到的 putlfAbsent
调用几乎是一样的,不过 LongAdder
构造器只在确实需要 一个新的计数器时才会调用。 首次增加一个键时通常需要做些特殊的处理。利用 merge
方法可以非常方便地做到这一 点。这个方法有一个参数表示键不存在时使用的初始值。否则, 就会调用你提供的函数来结 合原值与初始值。(与 compute 不同,这个函数不处理键。)
map.merge(word, 1L, (existi ngValue, newValue) -> existingValue + newValue);
Java SE 8 为并发散列映射提供了批操作,即使有其他线程在处理映射,这些操作也能安 全地执行。 批操作会遍历映射,处理遍历过程中找到的元素。无须冻结当前映射的快照。 除非你恰好知道批操作运行时映射不会被修改, 否则就要把结果看作是映射状态的一个近似。
- 搜索(search) 为每个键或值提供一个函数,直到函数生成一个非 null 的结果。然后搜 索终止,返回这个函数的结果。
- 归约(reduce) 组合所有键或值, 这里要使用所提供的一个累加函数。
- forEach 为所有键或值提供一个函数。
每个操作都有 4 个版本:
- operationKeys: 处理键。
- operatioriValues: 处理值。
- operation: 处理键和值。
- operatioriEntries: 处理 Map.Entry 对象。
对于上述各个操作, 需要指定一个参数化阈值。如果映射包含的 元素多于这个阈值, 就会并行完成批操作。如果希望批操作在一个线程中运行,可以使用阈 值 Long.MAX_VALUE
。如果希望用尽可能多的线程运行批操作,可以使用阈值 1
。
-
首先来看 search方法。
例如, 假设我们希望找出第一个出现次数超过 1000 次的单词。需要搜索键和值:
String result = map.search(threshold, (k, v) -> v > 1000 ? k : null);
result 会设置为第一个匹配的单词,如果搜索函数对所有输人都返回 null, 则 返 回 null。
-
forEach方法有两种形式。第一个只为各个映射条目提供一个消费者函数, 例如:
map.forEach(threshold, (k, v) -> System.out.println(k + " -> " + v));
第二种形式还有一个转换器函数, 这个函数要先提供, 其结果会传递到消费者:
map.forEach(threshold, (k, v)-> k + " -> " + v ,// 转换器 System.out::println); // 消费者
转换器可以用作为一个过滤器。只要转换器返回 null , 这个值就会被悄无声息地跳过。
例如,下面只打印有大值的条目:
map.forEach(threshold, (k, v) -> v > 1000 ? k + "-> " + v : null , // 过滤 System.out::println); // 空值不会被传递到此
-
reduce
操作用一个累加函数组合其输入。例如,可以如下计算所有值的总和:Long sum = map.reduceValues(threshold, Long::sum);
与 forEach 类似,也可以提供一个转换器函数。可以如下计算最长的键的长度:Integer maxlength = map.reduceKeys(threshold, String::length, // 转换器 Integer::max); // 累加器
转换器可以作为一个过滤器,通过返回
null
来排除不想要的输入。 在这里,我们要统计多少个条目的值 > 1000:Long count = map.reduceValues(threshold, v -> v > 1000? 1L : null , Long::sum);
如果映射为空, 或者所有条目都被过滤掉, reduce 操作会返回 null。如果只有一 个元素, 则返回其转换结果, 不会应用累加器。
并发集视图
假设你想要的是一个大的线程安全的集而不是映射。并没有一个 ConcurrentHashSet
类, 而且你肯定不想自己创建这样一个类。当然,可以使用 ConcurrentHashMap
(包含“ 假” 值), 不过这会得到一个映射而不是集, 而且不能应用 Set 接口的操作。 静态 newKeySet
方法会生成一个 Set
, 这实际上是
ConcurrentHashMap words = ConcurrentHashMap.newKeySet();
当然, 如果原来有一个映射,keySet
方法可以生成这个映射的键集。这个集是可变的。 如果删除这个集的元素,这个键(以及相应的值)会从映射中删除。不过,不能向键集增加 元素,因为没有相应的值可以增加。Java SE 8 为 ConcurrentHashMap
增加了第二个 keySet
方 法,包含一个默认值,可以在为集增加元素时使用:
Set words = map.keySet(1L); words.add("java”)
;
如果 "Java” 在 words 中不存在, 现在它会有一个值 1
。
写数组的拷贝
CopyOnWriteArrayList
和 CopyOnWriteArraySet
是线程安全的集合,其中所有的修改线程对底层数组进行复制。如果在集合上进行迭代的线程数超过修改线程数, 这样的安排是很有用的。当构建一个迭代器的时候,它包含一个对当前数组的引用。如果数组后来被修改 了,迭代器仍然引用旧数组, 但是,集合的数组已经被替换了。因而,旧的迭代器拥有一致 的(可能过时的)视图,访问它无须任何同步开销。
并行数组算法
在 Java SE 8中, Arrays 类提供了大量并行化操作。静态 Arrays.parallelSort
方法可以对 一个基本类型值或对象的数组排序。例如,
String contents = new String(Fi1es.readAllBytes( Paths.get("alice.txt")), StandardCharsets.UTF_8); // 读取文件内容到字符串contents中
String[] words = contents.split("[\\P{L}]+"); // 按照给定字符分割
Arrays,parallelSort(words): // 排序
对对象排序时,可以提供一个 Comparator。
Arrays,parallelSort(words, Comparator.comparing(String::length));
对于所有方法都可以提供一个范围的边界,如:
values.parallelSort(values,length / 2, values,length); // 处理values中超过中间数的值
parallelSetAll
方法会用由一个函数计算得到的值填充一个数组。这个函数接收元素索引, 然后计算相应位置上的值。 Arrays.parallelSetAll(values,i-> i % 10); // Fills values with 0 12 3 4 5 6 7 8 9 0 12 . . .
显然,并行化对这个操作很有好处。这个操作对于所有基本类型数组和对象数组都有相 应的版本。
最后还有一个 parallelPrefix
方法,它会用对应一个给定结合操作的前缀的累加结果替换 各个数组元素。这是什么意思? 这里给出一个例子。考虑数组 [1,2, 3, 4, . . .] 和 x 操作。执 行Arrays.parallelPrefix(values, (x, y) -> x * y)
之后,数组将包含: [1, 1x 2, 1x 2 x 3, l x 2 x 3 x 4, . . .]
较早的线程安全集合
任何集合类都可以通过使用同步包装器(synchronization wrapper) 变成线程安全的:
List synchArrayList = Collections.synchronizedList(new ArrayList());
Map synchHashMap = Col1ections.synchronizedMap(new HashMap());
结果集合的方法使用锁加以保护,提供了线程安全访问。
应该确保没有任何线程通过原始的非同步方法访问数据结构。最便利的方法是确保不保 存任何指向原始对象的引用, 简单地构造一个集合并立即传递给包装器,像我们的例子中所 做的那样。 如果在另一个线程可能进行修改时要对集合进行迭代,仍然需要使用“ 客户端” 锁定:
synchronized (synchHashMap) {
Iterator iter = synchHashMap.keySet().iterator(); while (iter.hasNext()) . . }
如果使用foreach
循环必须使用同样的代码, 因为循环使用了迭代器。注意:如果在迭代过程中,别的线程修改集合,迭代器会失效,抛出 ConcurrentModificationException
异 常。同步仍然是需要的, 因此并发的修改可以被可靠地检测出来。 最好使用java.Util.COnciirrent
包中定义的集合, 不使用同步包装器中的。特别是, 假如它们访问的是不同的桶, 由于 ConcurrentHashMap
已经精心地实现了,多线程可以访问它而且不会彼此阻塞。有一个例外是经常被修改的数组列表。在那种情况下,同步的ArrayList
可 以胜过CopyOnWriteArrayList()
。