一.并发工具概述
1.传统的多线程并没有提供高级特性,例如:信号量、线程池和执行管理器等,而这些特性恰恰有助于创建强大的并发程序。
2.新的Fork/Join框架针对当前的多核系统,也提供了并行编程的可行性。
3.并发工具包处理于java.util.concurrent包,主要包括同步器、执行顺、并发集合、Fork/Join框架、atomic包、locks包。
4.同步器:为每种特定的同步问题提供了解决方案
5.执行器:用来管理线程的执行
6.并发集合:提供了集合框架中集合的并发版本
7.Fork/Join框架:提供了对并行编程的支持
8.atomic包:提供了不需要锁即可完成并发环境变量使用的原子性操作
9.locks包:使用Lock接口为并发编程提供了同步的另外一种替代方案
二.同步器-Semaphore和CountDownLatch
1.Semaphore同步器
a.经典的信号量,通过计数器控制对共享资源的访问。
b.Semaphore(int count):创建拥有count个许可证的信号量
c.acquire()/acquire(int num):获取1/num个许可证
d.release()/release(int num):释放1/num个许可证
package com.bijian.concurrent.study; import java.util.concurrent.Semaphore; /** * 银行营业部有两个柜台给三个人提供服务 * @author bijian */ public class SemaphoreDemo { public static void main(String[] args) { //最多允许多少个并发线程来进入这个区域 Semaphore semaphore = new Semaphore(2); Person p1 = new Person(semaphore, "P1"); p1.start(); Person p2 = new Person(semaphore, "P2"); p2.start(); Person p3 = new Person(semaphore, "P3"); p3.start(); } } class Person extends Thread { private Semaphore semaphore; public Person(Semaphore semaphore, String name) { setName(name); this.semaphore = semaphore; } public void run() { System.out.println(getName() + " is waiting..."); try { //每个线程过来,首先要获取许可证 semaphore.acquire(); System.out.println(getName() + " is servicing..."); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getName() + " is done!"); //操作结束,要释放许可证 semaphore.release(); } }
运行结果:
P1 is waiting... P3 is waiting... P2 is waiting... P1 is servicing... P3 is servicing... P1 is done! P3 is done! P2 is servicing... P2 is done!
2.CountDownLatch同步器
a.必须发生指定数量的事件后才可以继续运行,如赛跑比赛的倒计时后开始
b.CountDownLatch(int count):必须发生count个数量才可以打开锁存器
c.await():等待锁存器
d.countDown():触发事件
package com.bijian.concurrent.study; import java.util.concurrent.CountDownLatch; /** * 赛跑比赛倒计时 * @author bijian * */ public class CountDownLatchDemo { public static void main(String[] args) { //创建一个需要多少个事件发生才可以指定线程执行的计数器,这里的3表示三个事件发生才可以执行 CountDownLatch countDownLatch = new CountDownLatch(3); new Racer(countDownLatch, "A").start(); new Racer(countDownLatch, "A").start(); new Racer(countDownLatch, "A").start(); for(int i=0;i<3;i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(3 - i); countDownLatch.countDown(); if(i == 2) { System.out.println("Start"); } } } } class Racer extends Thread { private CountDownLatch countDownLatch; public Racer(CountDownLatch countDownLatch, String name) { setName(name); this.countDownLatch = countDownLatch; } public void run() { try { countDownLatch.await(); for(int i=0;i<3;i++) { System.out.println(getName() + " : " + i); } } catch (InterruptedException e) { e.printStackTrace(); } } }
运行结果:
3 2 1 Start C : 0 A : 0 B : 0 A : 1 C : 1 A : 2 B : 1 C : 2 B : 2
三.同步器-CylicBarrier、Exchanger和Phaser
1.CylicBarrier同步器
a.适用于只有多个线程都到达预定点时才可以继续执行。
b.CyclicBarrier(int num):等待线程的数量
c.CyclicBarrier(int num, Runnable action):等待线程的数量以及所有线程到达后的操作
d.await():到达临界点后暂停线程
package com.bijian.concurrent.study; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 模拟斗地主 * @author bijian */ public class CyclicBarrierDemo { public static void main(String[] args) { //斗地主需要三个人,所以这里为3 CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() { //主线程一旦通过循环屏障,就可以执行某个动作,如通过Runnable实现的动作 @Override public void run() { System.out.println("Game start"); } }); new Player(cyclicBarrier, "A").start(); new Player(cyclicBarrier, "B").start(); new Player(cyclicBarrier, "C").start(); } } class Player extends Thread { private CyclicBarrier cyclicBarrier; public Player(CyclicBarrier cyclicBarrier, String name) { setName(name); this.cyclicBarrier = cyclicBarrier; } public void run() { System.out.println(getName() + " is waiting other players..."); try { //每个线程在循环屏障处等待 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
运行结果:
C is waiting other players... B is waiting other players... A is waiting other players... Game start
2.Exchanger同步器
a.简化两个线程间数据的交换
b.Exchanger<V>:指定进行交换的数据类型
c.V exchange(V object):等待线程到达,交换数据
package com.bijian.concurrent.study; import java.util.concurrent.Exchanger; public class ExchangerDemo { public static void main(String[] args) { Exchanger<String> ex = new Exchanger<String>(); new A(ex).start(); new B(ex).start(); } } class A extends Thread { private Exchanger<String> ex; public A(Exchanger<String> ex) { this.ex = ex; } public void run() { String str = null; try { str = ex.exchange("Hello"); System.out.println(str); str = ex.exchange("A"); System.out.println(str); str = ex.exchange("B"); System.out.println(str); }catch(InterruptedException e) { e.printStackTrace(); } } } class B extends Thread { private Exchanger<String> ex; public B(Exchanger<String> ex) { this.ex = ex; } public void run() { String str = null; try { str = ex.exchange("Hi!"); System.out.println(str); str = ex.exchange("1"); System.out.println(str); str = ex.exchange("2"); System.out.println(str); }catch(InterruptedException e) { e.printStackTrace(); } } }
运行结果:
Hello Hi! 1 A B 2
3.Phaser同步器
a.工作方式与CyclicBarrier类似,但是可以定义多个阶段
b.Phaser()/Phaser(int num):使用指定0/num个party创建Phaser
c.register():注册party
d.arriveAndAdvance():到达时等待到所有party到达
e.arriveAndDeregister():到达时注销线程自已
package com.bijian.concurrent.study; import java.util.concurrent.Phaser; public class PhaserDemo { public static void main(String[] args) { Phaser phaser = new Phaser(); System.out.println("starting ..."); //在Worker中只是执行、等待 new Worker(phaser, "Fuwuyuan").start(); new Worker(phaser, "Chushi").start(); new Worker(phaser, "Shangcaiyuan").start(); //表示一个有三个订单,对于每一个订单,都需要所有人处理完毕后,才能继续执行 for(int i=1; i<=3; i++) { phaser.arriveAndAwaitAdvance();//自已处理完了,等待其它线程处理完才能继续进行 System.out.println("Order " + i + " finished!"); } //所有订单执行完毕后,解除所有注册的线程 phaser.arriveAndDeregister(); System.out.println("All done!"); } } class Worker extends Thread { private Phaser phaser; public Worker(Phaser phaser, String name) { this.setName(name); this.phaser = phaser; //把当前线程注册到phaser中 phaser.register(); } public void run() { for(int i=1;i<= 3;i++) { System.out.println("current order is :" + i + ":" + getName()); if(i == 3) { //如果三个订单都处理完成,则解除注销 phaser.arriveAndDeregister(); }else { //如果还有其它订单未处理完,则等待其它订单处理完毕 phaser.arriveAndAwaitAdvance(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行结果:
starting ... current order is :1:Fuwuyuan current order is :1:Shangcaiyuan current order is :1:Chushi Order 1 finished! current order is :2:Shangcaiyuan Order 2 finished! current order is :2:Fuwuyuan current order is :2:Chushi current order is :3:Shangcaiyuan Order 3 finished! All done! current order is :3:Chushi current order is :3:Fuwuyuan
四.执行器
1.执行器
a.用于启动并控制线程的执行
b.核心接口为Executor,包含一个execute(Runnable)用于指定被执行的线程
c.ExecutorService接口用于控制线程执行和管理线程
d.预定义了如下执行器:ThreadPoolExecutor/ScheduledThreadPoolExecutor/ForkJoinPool
2.Callable与Future
a.Callable<V>:表示具有返回值的线程,V:表示返回值类型
b.call():执行任务
c.Future<V>:表示Callable的返回值,V:返回值类型
d.get():获取返回值
实例:
package com.bijian.concurrent.study; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorDemo { public static void main(String[] args) throws Exception { ExecutorService es = Executors.newFixedThreadPool(2); //将Callable提交到线程池中 Future<Integer> r1 = es.submit(new MC(1,100)); Future<Integer> r2 = es.submit(new MC(100,10000)); System.out.println(r1.get() + ":" + r2.get()); es.shutdown(); } } class MC implements Callable<Integer> { private int begin, end; public MC(int begin, int end) { this.begin = begin; this.end = end; } @Override public Integer call() throws Exception { int sum = 0; for(int i=begin;i<end;i++) { sum += i; } return sum; } }
运行结果:
4950:49990050
五.锁与原子操作
1.锁
a.java.util.concurrent.lock包中提供了对锁的支持
b.为使用synchronized控制对资源访问提供了替代机制
c.基本操作模型:访问资源之前申请锁,访问完毕后释放锁
d.lock/tryLock:申请锁
e.unlock:释放锁
f.具体锁类ReentrantLock实现了Lock接口
实例:
package com.bijian.concurrent.study; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockDemo { public static void main(String[] args) { new NT().start(); new NT().start(); new NT().start(); new NT().start(); } } class Data { static int i=0; static Lock lock = new ReentrantLock(); //static synchronized void operate() { static void operate() { //操作之前申诅锁 lock.lock(); i++; System.out.println(i); //操作完毕释放锁 lock.unlock(); } } class NT extends Thread { public void run() { while(true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Data.operate(); } } }
运行结果:
1 2 3 4 5 6 7 8 ...
2.原子操作
a.java.util.concurrent.atom包中提供了对原子操作的支持
b.提供了不需要锁以及其他同步机制就可以进行的一些不可中断操作
c.主要操作为:获取、设置、比较等
实例:
package com.bijian.concurrent.study; import java.util.concurrent.atomic.AtomicInteger; public class AtomDemo { public static void main(String[] args) { new ANT().start(); new ANT().start(); new ANT().start(); new ANT().start(); } } class AData { //用原子操作代替锁的机制 static AtomicInteger ai = new AtomicInteger(0); static void operate() { System.out.println(ai.incrementAndGet()); } } class ANT extends Thread { public void run() { while(true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } AData.operate(); } } }
运行结果:
1 4 3 2 5 8 6 7 ...
六.流编程
1.流的基本知识
a.表示数据移动,移动过程中可能会对数据进行处理
b.不同于IO流,表示流对象
c.操作分为中间操作和终端操作
d.中间操作会产生一个新流
e.终端操作会消费流
2.流的编程模型
a.获取流:stream/parallelSteam(获取串行流/并行流)
b.操作:sort/max/min/...
3.流的基本操作
过滤、排序、缩减、映射、收集、迭代
实例:
package com.bijian.concurrent.study; import java.util.ArrayList; import java.util.List; import java.util.Optional; public class StreamDemo { public static void main(String[] args) { List<String> ls = new ArrayList<>(); ls.add("abc"); ls.add("def"); ls.add("ddd"); ls.add("eee"); ls.add("def"); ls.add("cha"); //max属于终端操作 Optional<String> max = ls.stream().max(String::compareTo); System.out.println("max:" + max.get());//max:eee //forEach属于终端操作,但sorted则是中间操作 ls.stream().sorted().forEach(e -> System.out.println(e)); //不重复的元属的个数 System.out.println(ls.stream().distinct().count());//5 } }
运行结果:
max:eee abc cha ddd def def eee 5
七.Fork/Join框架
1. Fork/Join框架中的主要类
a.ForkJoinTask<V>:描述任务的抽象类
b.ForkJoinPool:管理ForkJoinTask的线程池
c.RecursiveAction:ForkJoinTask子类,描述无返回值的任务
d.RecursiveTask<V>:ForkJoinTask子类,描述有返回值的任务
2.分而治之策略
a.将任务递归划分成更小的子任务,直到子任务足够小,从而能够被连续地处理掉为止
b.优势是处理过程可以使用并行发生,这种情况特别适合基于多核处理器的并行编程
c.根据Java API中定义,分而治之的建议临界点定义在100-1000个操作中的某个位置
3.Fork/Join框架案例
计算1-100000的和
package com.bijian.concurrent.study; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class ForkJoinDemo { public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(); Future<Long> result = forkJoinPool.submit(new NTask(0, 1000001)); System.out.println(result.get()); forkJoinPool.shutdown(); } } class NTask extends RecursiveTask<Long> { static final int THRESHOLD = 1000; private int begin, end; public NTask(int begin, int end) { this.begin = begin; this.end = end; } @Override protected Long compute() { long sum = 0; if((end - begin) <= THRESHOLD) { for(int i=begin;i<end;i++) { sum += i; } }else { int mid = (begin + end) / 2; NTask left = new NTask(begin, mid); left.fork(); NTask right = new NTask(mid + 1, end); right.fork(); Long lr = left.join(); System.out.println(begin + "-" + mid + ":" + lr); Long rr = right.join(); System.out.println(mid + "-" + end + ":" + rr); sum = lr + rr; } return sum; } }
运行结果:
499488998835