ConcurrentHashMap的优雅使用
相对于hashMap,ConcurrentHashMap则具有原子性的读写特性,但是ConcurrentHashMap的聚合方法(例如:size、isEmpty 和 containsValue…)在并发场景下,可能只是一个参考值,不可用于流程的控制。在一个线程pullAll的时候,另一个线程获取size,此时的size就是错误的。
要想避免上述问题,则可以通过添加synchronized来控制
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
@Slf4j
public class ConcurrentHashMapMain {
private static int THREAD_COUNT = 10;
private static int ITEM_COUNT = 1000;
//生成指定大小的ConcurrentHashMap
private static ConcurrentHashMap<String, Long> getConcurrentHashMapData(int count) {
return LongStream.range(1, count+1)
//针对基础类型参数需要添加boxed才可用collect。
.boxed()
//新建一个Stream 内容为1到count,然后以这个流创建一个ConcurrentHashMap
.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
(o1, o2) -> o1, ConcurrentHashMap::new));
}
public static void main(String[] args) throws InterruptedException {
errorTest();
successTest();
}
public static void errorTest() throws InterruptedException {
//创建900个item的ConcurrentHashMap
ConcurrentHashMap<String, Long> concurrentHashMap = getConcurrentHashMapData(ITEM_COUNT - 100);
log.info("concurrentHashMap.size{}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 12).parallel().forEach(i -> {
int needAddNumber = ITEM_COUNT - concurrentHashMap.size();
log.info("concurrentHashMap.size{}", concurrentHashMap.size());
concurrentHashMap.putAll(getConcurrentHashMapData(needAddNumber));
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(10, TimeUnit.MINUTES);
log.info("after add, concurrentHashMap.size{}", concurrentHashMap.size());
}
public static void successTest() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getConcurrentHashMapData(ITEM_COUNT - 100);
log.info("concurrentHashMap.size{}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 12).parallel().forEach(i -> {
//给concurrentHashMap 加个锁,保证只有一个线程可以处理
synchronized (concurrentHashMap) {
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("concurrentHashMap.size{}", concurrentHashMap.size());
concurrentHashMap.putAll(getConcurrentHashMapData(gap));
}
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(10, TimeUnit.MINUTES);
log.info("after add, concurrentHashMap.size{}", concurrentHashMap.size());
}
}
运行结果为:
09:33:44.235 [main] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.244 [ForkJoinPool-1-worker-9] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.244 [ForkJoinPool-1-worker-11] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.244 [ForkJoinPool-1-worker-6] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.244 [ForkJoinPool-1-worker-2] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.244 [ForkJoinPool-1-worker-15] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.244 [ForkJoinPool-1-worker-4] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.244 [ForkJoinPool-1-worker-1] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.244 [ForkJoinPool-1-worker-13] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.244 [ForkJoinPool-1-worker-8] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.244 [ForkJoinPool-1-worker-10] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.249 [ForkJoinPool-1-worker-11] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1264
09:33:44.249 [ForkJoinPool-1-worker-1] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1272
09:33:44.250 [main] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - after add, concurrentHashMap.size1900
09:33:44.254 [main] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.255 [ForkJoinPool-2-worker-9] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size900
09:33:44.256 [ForkJoinPool-2-worker-10] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.256 [ForkJoinPool-2-worker-10] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.256 [ForkJoinPool-2-worker-8] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.256 [ForkJoinPool-2-worker-1] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.257 [ForkJoinPool-2-worker-13] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.257 [ForkJoinPool-2-worker-2] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.257 [ForkJoinPool-2-worker-11] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.257 [ForkJoinPool-2-worker-15] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.257 [ForkJoinPool-2-worker-4] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.257 [ForkJoinPool-2-worker-6] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.257 [ForkJoinPool-2-worker-9] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - concurrentHashMap.size1000
09:33:44.258 [main] INFO com.arno.train.ConcurrentHash.ConcurrentHashMapMain - after add, concurrentHashMap.size1000