前言
在上一篇《java中的线程安全》中,总结了什么是线程安全,并在文章末尾提到如何保证线程安全的几种常用手段,并且重点说明了为了保证线程安全一般都会付出一定代价。
比如使用线程安全的容器ConcurrentHashMap,本质上还是会有分段锁;使用volatile保证可见性,同样会有性能消耗:线程中无法使用缓存数据每次都必须从主存中获取;使用原子工具包中的AtomicInteger,本质上还是通过CAS和volatile,同样存在性能消耗;加锁方式就不说了,性能消耗是最大的。
上面提到的性能消耗,其实是在不需要保证线程安全的情况下对比的。也就是说在多线程环境下,如果不需要访问同一份数据,也就不用做线程安全处理,这时的性能是最好的。
有没有办法让多线程访问非线程安全的数据,并且不需要加锁就能实现线程安全呢?看似很矛盾,但在特定环境下是可以做到的。下面就开始讲讲这种特殊的情况。
并发计数器案例
现在要做一个并发计数器,用来统计系统中4个重要方法的调用次数,在做code tracking工具时经常会见到的场景,本示例是跟踪4个方法的调用量,其实可以是任意固定个数。也许你会觉得很简单,使用ConcurrentHashMap+AtomicInteger就可以实现,简易实现过程如下:
/** * Created by gantianxing on 2017/12/24. */ public class ThreadSafe3 { //非线程安全容器 public static Map<String,AtomicInteger> datas3 = new ConcurrentHashMap<>(4); static { //4个计数器 初始值都是0 datas3.put("business1",new AtomicInteger(0)); datas3.put("business2",new AtomicInteger(0)); datas3.put("business3",new AtomicInteger(0)); datas3.put("business4",new AtomicInteger(0)); } public static void main(String[] args) throws Exception{ System.out.println("开始时间"+System.currentTimeMillis()); //创建固定4个线程的线程池 ExecutorService threadPool = Executors.newFixedThreadPool(4); //分别对4个计数器+1,并行执行10000次 Counter counter1 = new Counter("business1"); for (int i=0;i<10000;i++) { threadPool.execute(counter1); } Counter counter2 = new Counter("business2"); for (int i=0;i<10000;i++){ threadPool.execute(counter2); } Counter counter3 = new Counter("business3"); for (int i=0;i<10000;i++){ threadPool.execute(counter3); } Counter counter4 = new Counter("business4"); for (int i=0;i<10000;i++){ threadPool.execute(counter4); } //打印4个计数器统计结果 Thread.sleep(10000);//睡眠10秒保证所有线程执行完成 System.out.println(datas3.get("business1").get()); System.out.println(datas3.get("business2").get()); System.out.println(datas3.get("business3").get()); System.out.println(datas3.get("business4").get()); } } //多线程操作线程安全容器 ThreadSafe3.datas3 class Counter implements Runnable{ private String businessId; public Counter(String businessId) { this.businessId = businessId; } @Override public void run() { AtomicInteger oldNum = ThreadSafe3.datas3.get(businessId); oldNum.incrementAndGet();//计数器+1 if(oldNum.get()==10000){ System.out.println(businessId+"结束时间:"+System.currentTimeMillis()); } } }
实现过程大致为:
首先 初始化4个计数器,为了保证线程安全计数器使用AtomicInteger,并且使用一个ConcurrentHashMap进行存储这4个计数器,并初始化为0;
然后 主线程通过newFixedThreadPool创建线程个数为4的线程池,分别对4个计数器并行执行10000次加1操作,模拟4个业务方法分别被调用10000次。
最后 打印4个计数器的最终结果,如果结果都为10000,说明运行结果正确。
这里使用了线程安全的容器ConcurrentHashMap、以及并发计数器AtomicInteger确保了线程安全,执行结果4个计时器都是10000。
使用非线程安全的容器
文章开头已经提到使用ConcurrentHashMap和AtomicInteger确保线程安全,始终会有性能消耗。如果改为非线程安全容器HashMap,并且直接使用int做计数器,就没有性能消耗,但此时也无法保证线程安全了,下面来看非线程安全的实现:
/** * Created by gantianxing on 2017/12/24. */ public class ThreadSafe2 { //非线程安全容器 public static Map<String,Integer> datas2 = new HashMap<>(4); static { //4个计数器 初始值都是0 datas2.put("business1",0); datas2.put("business2",0); datas2.put("business3",0); datas2.put("business4",0); } public static void main(String[] args) throws Exception{ //创建固定4个线程的线程池 ExecutorService threadPool = Executors.newFixedThreadPool(4); //分别对4个计数器+1,并行执行10000次 Counter counter1 = new Counter("business1"); for (int i=0;i<10000;i++) { threadPool.execute(counter1); } Counter counter2 = new Counter("business2"); for (int i=0;i<10000;i++){ threadPool.execute(counter2); } Counter counter3 = new Counter("business3"); for (int i=0;i<10000;i++){ threadPool.execute(counter3); } Counter counter4 = new Counter("business4"); for (int i=0;i<10000;i++){ threadPool.execute(counter4); } //打印4个计数器统计结果 Thread.sleep(10000);//睡眠10秒保证所有线程执行完成 System.out.println(datas2.get("business1")); System.out.println(datas2.get("business2")); System.out.println(datas2.get("business3")); System.out.println(datas2.get("business4")); } } //多线程操作线程不安全容器 ThreadSafe2.datas2 class Counter implements Runnable{ private String businessId; public Counter(String businessId) { this.businessId = businessId; } @Override public void run() { int oldNum = ThreadSafe2.datas2.get(businessId); ThreadSafe2.datas2.put(businessId,oldNum+1);//计数器+1 } }
跟第一版相比,只是把ConcurrentHashMap改为了HashMap,AtomicInteger改为了int。多次执行main方法,每次都会得到不一样的结果,也就出现了线程安全问题(感兴趣的朋友可以直接复制代码运行测试)。这肯定不是期望的结果。
有朋友会说既然这时错误的写法,为什么要写出来呢?别急这只是为了引出第三种写法。
改进版:使用非线程安全的容器
我们知道引起线程安全问题的根本原因就是,多个线程操作了同一份数据。现在有4个计时器,只要我们能保证每个计数器都是由一个固定的线程处理,也就没有线程安全问题了。基于这个原理,第三版实现代码如下:
/** * Created by gantianxing on 2017/12/24. */ public class ThreadSafe { //容量已知 且固定,在这种场景下,可以考虑替换为非线程安全容器 // Map<String,String> datas = new ConcurrentHashMap<>(4); public static Map<String,Integer> datas = new HashMap<>(4); static { //4个计数器 初始值都是0 datas.put("business1",0); datas.put("business2",0); datas.put("business3",0); datas.put("business4",0); } public static void main(String[] args) throws Exception{ //为4个不同的类型分别定义独立的单线程化线程池 ExecutorService business1 = Executors.newSingleThreadExecutor(); ExecutorService business2 = Executors.newSingleThreadExecutor(); ExecutorService business3 = Executors.newSingleThreadExecutor(); ExecutorService business4 = Executors.newSingleThreadExecutor(); System.out.println("开始时间"+System.currentTimeMillis()); //分别对4个计数器+1,并行执行10000次 Counter counter1 = new Counter("business1"); for (int i=0;i<10000;i++){ business1.execute(counter1); } Counter counter2 = new Counter("business2"); for (int i=0;i<10000;i++){ business2.execute(counter2); } Counter counter3 = new Counter("business3"); for (int i=0;i<10000;i++){ business3.execute(counter3); } Counter counter4 = new Counter("business4"); for (int i=0;i<10000;i++){ business4.execute(counter4); } //打印4个计数器统计结果 Thread.sleep(10000);//睡眠10秒保证所有线程执行完成 System.out.println(datas.get("business1")); System.out.println(datas.get("business2")); System.out.println(datas.get("business3")); System.out.println(datas.get("business4")); } } //多线程操作线程不安全容器 ThreadSafe.datas class Counter implements Runnable{ private String businessId; public Counter(String businessId) { this.businessId = businessId; } @Override public void run() { int oldNum = ThreadSafe.datas.get(businessId); int newNum = oldNum+1; ThreadSafe.datas.put(businessId,newNum);//计数器+1 if(newNum==10000){ System.out.println(businessId+"结束时间:"+System.currentTimeMillis()); } } }
实现过程 跟第二版只有一点区别,这里使用newSingleThreadExecutor分别创建了4个线程池,每个线程池里只有1个线程,每个线程池只处理一个计数器。
多次运行这段程序,4个计数器每次的打印的结果都是10000。这就是我们想要的结果。下面来看下执行数据流程图:
可以看到每个计数器都是由一个单独线程处理,无需使用volatile保证可见性;并且每个计数器在高并发下进入各自不同的队列进行排队(newSingleThreadExecutor是无界队列,容易内存溢出,真实场景中可以考虑使用有界队列),保证了各个计数器内部操作是串行执行,同时多个计数器之间是并行执行。
第三种实现相对于第一种实现的性能会好一些,本地测试第一种方式需要70多ms,第三种需要60多ms。可能你会觉得差异不大,在真实的线上环境中需要并发执行的线程会更多。本身计算器只是辅助工具,为了不影响正常业务,能省一点算一点。
总结
在解决线程安全问题时,能用volatile就不要用加锁;能用线程安全容器,也不要用加锁;当然,能不用线程安全容器处理的,就不要用线程安全容器。但如果你不是很确定的情况下,那还是直接加锁吧(但别以为加锁很简单),毕竟不出bug才是首要任务。