什么是线程安全
当多个线程访问某一个类(对象或方法)时,这个类始终都能表现出正确的行为,那么这个类(对象或方法)就是线程安全的。
并发编程三要素
要想并发程序正确地执行,必须要保证原子性、可见性以及有序性。只要有一个没有被保证,就有可能会导致程序运行不正确。
(1)原子性
一个或多个操作要么全部执行成功要么全部执行失败.
(2)可见性
当多个线程访问同一个变量时,如果其中一个线程对其作了修改,其他线程能立即看到修改后的值.
(3)有序性
程序执行的顺序按照代码的先后顺序执行. (CPU对代码进行优化排序会影响到多线程并发执行的正确性)
synchronized
(1)同步代码块
- 使用synchronized关键字加上一个锁对象来定义一段代码, 这就叫同步代码块
- 多个同步代码块必须使用相同的锁对象, 他们才是同步的
public class StringLock { public void method() { //每次调用该方法都会new一个字符串对象 synchronized (new String("sss")) { try { while(true){ System.out.println("当前线程 : " + Thread.currentThread().getName() + "开始"); Thread.sleep(1000); System.out.println("当前线程 : " + Thread.currentThread().getName() + "结束"); } } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { final StringLock stringLock = new StringLock(); Thread t1 = new Thread(new Runnable() { @Override public void run() { stringLock.method(); } },"t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { stringLock.method(); } },"t2"); t1.start(); t2.start(); } } 当前线程 : t1开始 当前线程 : t2开始 当前线程 : t1结束 当前线程 : t2结束 当前线程 : t1开始 ...
public void method() { //常量池的缓存功能,每次都是同一个字符串 synchronized ("sss") { //this, StringLock.class, 具有相同的结果 try { while(true){ System.out.println("当前线程 : " + Thread.currentThread().getName() + "开始"); Thread.sleep(1000); System.out.println("当前线程 : " + Thread.currentThread().getName() + "结束"); } } catch (InterruptedException e) { e.printStackTrace(); } } } 当前线程 : t2开始 当前线程 : t2结束 当前线程 : t2开始 当前线程 : t2结束 当前线程 : t2开始 ...
(2)同步方法
- 使用synchronized关键字修饰一个方法, 该方法中所有的代码都是同步的
- 关键字synchronized取得的锁是对象锁,如果new了两个对象,那么就是两把不同的锁,它们互不影响.
- 有一种情况则是相同的锁,即在静态方法上加synchronized关键字
public class MultiThread { /** static */ private int num = 0; /** static 静态方法只能访问静态变量 */ public synchronized void printNum(String tag){ try { if(tag.equals("a")){ num = 100; System.out.println("tag a, set num over!"); Thread.sleep(1000); } else { num = 200; System.out.println("tag b, set num over!"); } System.out.println("tag " + tag + ", num = " + num); } catch (InterruptedException e) { e.printStackTrace(); } } //注意观察run方法输出顺序 public static void main(String[] args) { //俩个不同的对象 final MultiThread m1 = new MultiThread(); final MultiThread m2 = new MultiThread(); Thread t1 = new Thread(new Runnable() { @Override public void run() { m1.printNum("a"); } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { m2.printNum("b"); } }); t1.start(); t2.start(); } } 加staitc前的结果: tag a, set num over! tag b, set num over! tag b, num = 200 tag a, num = 100 加staitc后的结果: tag a, set num over! tag a, num = 100 tag b, set num over! tag b, num = 200
(3)线程间的通信
- wait() 当前线程等待, 释放锁.
- notify() 随机唤醒一个线程(同一把锁), 当前线程不释放锁.
public class Test { Object lock = new Object(); public void method1(){ synchronized (lock) { try { System.out.println("线程"+ Thread.currentThread().getName()+ "等待"); lock.wait(); //当前线程等待,释放锁 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程"+ Thread.currentThread().getName()+ "获取到了锁"); } } public void method2(){ synchronized (lock) { lock.notify(); //唤醒一个线程,不释放锁 System.out.println("线程"+Thread.currentThread().getName()+"调用了object.notify()"); } System.out.println("线程"+Thread.currentThread().getName()+"释放了锁"); } public static void main(String[] args) { final Test t = new Test(); new Thread(new Runnable() { @Override public void run() { t.method1(); } } , "t1").start(); new Thread(new Runnable() { @Override public void run() { t.method2(); } } , "t2").start(); } } 线程t1等待 线程t2调用了object.notify() 线程t2释放了锁 线程t1获取到了锁
ReentrantLock
(1)同步
- 使用ReentrantLock 类的lock()和unlock()方法进行同步
public class UseReentrantLock { private Lock lock = new ReentrantLock(); public void method1(){ try { lock.lock(); System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method1.."); Thread.sleep(1000); System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method1.."); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void method2(){ try { lock.lock(); System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method2.."); Thread.sleep(2000); System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method2.."); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) { final UseReentrantLock ur = new UseReentrantLock(); Thread t1 = new Thread(new Runnable() { @Override public void run() { ur.method1(); ur.method2(); } }, "t1"); t1.start(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } 当前线程:t1进入method1.. 当前线程:t1退出method1.. 当前线程:t1进入method2.. 当前线程:t1退出method2..
(2)通信
- 使用ReentrantLock 类的newCondition()方法可以获取Condition 对象
- 需要等待的时候使用Condition 的await()方法, 唤醒的时候用signal()方法
public class UseCondition { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void method1(){ try { lock.lock(); System.out.println("当前线程:" + Thread.currentThread().getName() + "进入等待状态.."); Thread.sleep(3000); System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁.."); condition.await(); // Object wait System.out.println("当前线程:" + Thread.currentThread().getName() +"继续执行..."); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void method2(){ try { lock.lock(); System.out.println("当前线程:" + Thread.currentThread().getName() + "进入.."); Thread.sleep(3000); System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒.."); condition.signal(); //Object notify } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) { final UseCondition uc = new UseCondition(); Thread t1 = new Thread(new Runnable() { @Override public void run() { uc.method1(); } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { uc.method2(); } }, "t2"); t1.start(); t2.start(); } } 当前线程:t1进入等待状态.. 当前线程:t1释放锁.. 当前线程:t2进入.. 当前线程:t2发出唤醒.. 当前线程:t1继续执行...
ReentrantReadWriteLock
读写锁ReentrantReadWriteLock其核心就是实现读写分离的锁.在高并发访问下,尤其是读多写少的情况下,性能要远高于重入锁(ReentrantLock).
synchronized/ReentrantLock在同一时间内只能有一个线程访问被锁定的代码,而读写锁则不同,其本质是分为两个锁,即读锁和写锁.在读锁下,多个线程可以并发的进行访问,但是在写锁的时候,只能一个一个的顺序访问.
口诀: 读读共享, 读写互斥, 写写互斥.
public class UseReentrantReadWriteLock {
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private ReadLock readLock = rwLock.readLock();
private WriteLock writeLock = rwLock.writeLock();
public void read(){
try {
readLock.lock();
System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
Thread.sleep(3000);
System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
} catch (Exception e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
public void write(){
try {
writeLock.lock();
System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
Thread.sleep(3000);
System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
} catch (Exception e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) {
final UseReentrantReadWriteLock urrw = new UseReentrantReadWriteLock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
urrw.read();
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
urrw.read();
}
}, "t2");
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
urrw.write();
}
}, "t3");
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
urrw.write();
}
}, "t4");
// t1.start(); // R
// t2.start(); // R
// t1.start(); // R
// t3.start(); // W
t3.start(); // W
t4.start(); // W
}
}
当前线程:t4进入...
当前线程:t4退出...
当前线程:t3进入...
当前线程:t3退出...
volatile
(1)Java内存模型
Java内存模型规定所有的变量都是存在主存当中,每个线程都有自己的工作内存。线程对变量的所有操作都必须在工作内存中进行,而不能直接对主存进行操作。并且每个线程都不能访问其他线程的工作内存。
(2)volatile的作用
volatile关键字的主要作用是使变量在多个线程间可见. 不保证原子性, 一定程度上保证有序性.
(3)可见性
- 使用volatile关键字会强制将修改的值立即写入主存
- 其它线程工作内存中的缓存变量无效, 只能再次从主存中读取.
//一般用作状态标记量 public class RunThread extends Thread{ private volatile boolean isRunning = true; private void setRunning(boolean isRunning){ this.isRunning = isRunning; } public void run(){ System.out.println("进入run方法.."); while(isRunning == true){ //.. } System.out.println("线程停止"); } public static void main(String[] args) throws InterruptedException { RunThread rt = new RunThread(); rt.start(); Thread.sleep(1000); rt.setRunning(false); System.out.println("isRunning的值已经被设置了false"); } } 不加volatile关键字,线程不会停止。
(4)有序性
- volatile变量不能进行优化排序
//线程1: context = loadContext(); //语句1 inited = true; //语句2 //线程2: while(!inited ){ //inited默认值为false sleep() } doSomething(context);
上面代码中,由于语句1和语句2没有数据依赖性,因此可能会被重排序。假如发生了重排序,在线程1执行过程中执行了语句2,而此是线程2会以为context初始化工作已经完成,那么就会跳出while循环,去执行doSomething(context)方法,而此时context并没有被初始化,就会导致程序出错。
这里如果用volatile关键字对inited变量进行修饰,就不会出现这种问题了,因为当执行到语句2时,必定能保证context已经初始化完毕(volatile变量不能进行优化排序)。
ThreadLocal
ThreadLocal为变量在每个线程中都创建了一个副本, 那么每个线程可以访问自己内部的副本变量. 是一种以空间换时间的手段.
public class ConnThreadLocal {
public static ThreadLocal<String> th = new ThreadLocal<String>();
public void setTh(String value){
th.set(value);
}
public void getTh(){
System.out.println(Thread.currentThread().getName() + ":" + this.th.get());
}
public static void main(String[] args) throws InterruptedException {
final ConnThreadLocal ct = new ConnThreadLocal();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
ct.setTh("张三");
ct.getTh();
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
ct.getTh();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t2");
t1.start();
t2.start();
}
}
t1:张三
t2:null
并发类容器
同步类容器(Hashtable/Vector/Collections.synchronizedXxx())通过加关键字synchronized来保证线程安全, 但这种方法降低了并发性,当多个线程竞争容器时,吞吐量严重降低.
Java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包.
ConcurrentHashMap
ConcurrentHashMap用来代替Hashtable和同步的Map(Collections.synchronizedMap()).
ConcurrentHashMap在内部分为很多segment, 每个Segment其实就是一个类似Hashtable的结构(Hashtable是根据散列值分段存储的, 在同步的时候锁住了所有的段), 在进行写操作时只需对元素所在的Segment进行加锁即可, 不会影响到其他的Segment, 这样一样, 在最理想的情况下, ConcurrentHashMap可以最高同时支持Segment数量大小的写操作.
(1)Segment
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile int count; //Segment中元素的数量
transient int modCount; //对table的大小造成影响的操作的数量(比如put或者remove操作)
transient int threshold; //阈值,Segment里面元素的数量超过这个值就会对Segment进行扩容
transient volatile HashEntry<K,V>[] table; //链表数组,数组中的每一个元素代表了一个链表的头部
final float loadFactor; //负载因子,用于确定threshold
}
(2)HashEntry
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
(3)实例
ConcurrentMap<String, String> cm = new ConcurrentHashMap<>();
cm.put("1", "a");
cm.put("2", "b");
cm.put("3", "c");
cm.putIfAbsent("4", "d"); //如果key不存在则put
for (Entry<String, String> entry : cm.entrySet()) {
System.out.println(entry);
}
1=a
2=b
3=c
4=d
CopyOnWrite容器
JDK里的CopyOnWrite容器有两种:CopyOnWriteArrayList 和 CopyOnWriteArraySet.
CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后往新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
(1)注意
- CopyOnWrite的写操作是需要加锁的,否则多线程写的时候会Copy出N个副本出来。
/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) */ public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
- 读的时候不需要加锁,如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的CopyOnWrite容器。
/** * {@inheritDoc} * * @throws IndexOutOfBoundsException {@inheritDoc} */ public E get(int index) { return get(getArray(), index); }
(2)应用场景
- CopyOnWrite并发容器用于读多写少的并发场景, 比如白名单,黑名单,商品类目的访问和更新场景
(3)缺点
- 内存占用问题
- 如每晚使用CopyOnWrite机制更新大对象,会造成了每晚15秒的Full GC,应用响应时间也随之变长
- 数据一致性问题
- CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。
(4)实例
CopyOnWriteArrayList<String> cl = new CopyOnWriteArrayList<>(); cl.add("a"); cl.add("b"); cl.add("c"); cl.addIfAbsent("c"); for (String str : cl) { System.out.println(str); } a b c
CopyOnWriteArraySet<String> cs = new CopyOnWriteArraySet<>(); cs.add("a"); cs.add("b"); cs.add("c"); cs.add("c"); cs.add("b"); for (String str : cs) { System.out.println(str); } a b c
ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue.它是一个基于链接节点的无界线程安全队列(非阻塞,无界).
add(E e) : 在该队列的尾部插入指定的元素
remove(Object o) : 从该队列中删除指定元素(如果存在)
offer(E e) : 在该队列的尾部插入指定的元素
peek() : 获取此队列的头,如果此队列为空,则返回 null
poll() : 获取并删除此队列的头部,如果此队列为空,则返回 null
BlockingQueue
(1)主要方法
add(E e) : 将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满,则会抛出异常;
remove(Object o) : 从该队列中删除指定元素(如果存在)
offer(E e) :将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
peek() :获取此队列的头,如果此队列为空,则返回 null
poll() :获取并删除此队列的头部,如果此队列为空,则返回 null
put() : 向队尾存入元素,如果队列满,则等待;
take() : 从队首获取并删除元素,如果队列为空,则等待
(2)ArrayBlockingQueue
基于数组实现的一个有界阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小.
- ArrayBlockingQueue类中的几个成员变量
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /** The queued items */ private final E[] items; /** items index for next take, poll or remove */ private int takeIndex; /** items index for next put, offer, or add. */ private int putIndex; /** Number of items in the queue */ private int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; }
- put()方法
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
- take()方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
(3)LinkedBlockingQueue
基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
(4)SynchronousQueue
一种没有缓冲的队列,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然.
(5)PriorityBlockingQueue
基于优先级的无界阻塞队列.注意并不每加一个元素就排一次序,而是在第一次取数据时排序.
public class Task implements Comparable<Task>{
private int id ;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public int compareTo(Task task) {
//根据id从小到大排序
return this.id - task.id;
}
public String toString(){
return this.id + ":" + this.name;
}
}
public static void main(String[] args) throws Exception{
PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
Task t1 = new Task();
t1.setId(3);
t1.setName("id为3");
Task t2 = new Task();
t2.setId(4);
t2.setName("id为4");
Task t3 = new Task();
t3.setId(1);
t3.setName("id为1");
q.add(t1); //3
q.add(t2); //4
q.add(t3); //1
// 1 3 4
System.out.println("容器:" + q);
System.out.println(q.take().getId());
System.out.println("容器:" + q);
System.out.println(q.take().getId());
System.out.println(q.take().getId());
}
容器:[1:id为1, 4:id为4, 3:id为3]
1
容器:[3:id为3, 4:id为4]
3
4
(6)DelayQueue
DelayQueue是一种延时的无界阻塞队列. DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。
public class DelayTask implements Delayed{
public String name;
public Long delayTime;
public TimeUnit delayTimeUnit;
public DelayTask(String name, long delayTime, TimeUnit delayTimeUnit) {
this.name = name;
this.delayTime = delayTime;
this.delayTimeUnit = delayTimeUnit;
}
/**
* 用来判断是否到了截止时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 从小到大排序
*/
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
}else if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
}
return 0;
}
}
public static void main(String[] args) {
DelayQueue<DelayTask> queue = new DelayQueue<>();
queue.add(new DelayTask("1", 3000L + System.currentTimeMillis(), TimeUnit.MILLISECONDS));
queue.add(new DelayTask("2", 1000L + System.currentTimeMillis(), TimeUnit.MILLISECONDS));
queue.add(new DelayTask("3", 2000L + System.currentTimeMillis(), TimeUnit.MILLISECONDS));
while(!queue.isEmpty()) {
try {
DelayTask task = queue.take();
System.out.println(task.name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
并发编程三大设计模式
Future模式
客户端发出call请求, 这个请求处理需要很长一段时间. 服务端在进行数据处理时先返回一个抽象结果, 客户端收到结果后可以进行其他业务处理,而不用等待. 有点类似于我们发送Ajax请求的时候,页面是异步的进行后台处理,用户无须一直等待请求的结果,可以继续浏览或操作其他内容.
public interface Data {
String getRequest();
}
public class RealData implements Data{
private String result ;
public RealData (String queryStr){
System.out.println("根据" + queryStr + "进行查询,这是一个很耗时的操作..");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("操作完毕,获取结果");
result = "查询结果";
}
@Override
public String getRequest() {
return result;
}
}
public class FutureData implements Data{
private RealData realData ;
private boolean isReady = false;
public synchronized void setRealData(RealData realData) {
//如果已经装载完毕了,就直接返回
if(isReady){
return;
}
//如果没装载,进行装载真实对象
this.realData = realData;
isReady = true;
//进行通知
notify();
}
@Override
public synchronized String getRequest() {
//如果没装载好 程序就一直处于阻塞状态
while(!isReady){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//装载好直接获取数据即可
return this.realData.getRequest();
}
}
public class FutureClient {
public Data request(final String queryStr){
//1 我想要一个代理对象(Data接口的实现类)先返回给发送请求的客户端,告诉他请求已经接收到,可以做其他的事情
final FutureData futureData = new FutureData();
//2 启动一个新的线程,去加载真实的数据,传递给这个代理对象
new Thread(new Runnable() {
@Override
public void run() {
//3 这个新的线程可以去慢慢的加载真实对象,然后传递给代理对象
RealData realData = new RealData(queryStr);
futureData.setRealData(realData);
}
}).start();
return futureData;
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
FutureClient fc = new FutureClient();
Data data = fc.request("请求参数");
System.out.println("请求发送成功!");
System.out.println("做其他的事情...");
String result = data.getRequest();
System.out.println(result);
}
}
请求发送成功!
做其他的事情...
根据请求参数进行查询,这是一个很耗时的操作..
操作完毕,获取结果
查询结果
Master-Wordker模式
Master-Worker模式是常用的并行模式.它的核心思想是系统由两类进程协作工作:Master进程和Worker进程.Master负责接收和分配任务,Worker负责处理子任务.当各个Worker子进程处理完成后,会将结果返回给Master,由Master做归纳和总结.其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量.
public class Task {
private int id ;
private String name;
private int price;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
}
public class Worker implements Runnable {
//盛放任务的容器
private ConcurrentLinkedQueue<Task> workQueue;
//盛放每一个worker执行任务的结果集合
private ConcurrentHashMap<String, Object> resultMap;
public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
}
@Override
public void run() {
while(true){
Task input = this.workQueue.poll();
if(input == null) break;
//真正的去做业务处理
Object output = MyWorker.handle(input);
this.resultMap.put(Integer.toString(input.getId()), output);
}
}
public static Object handle(Task input) {
return null;
}
}
public class MyWorker extends Worker {
public static Object handle(Task input) {
Object output = null;
try {
//表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库...
Thread.sleep(500);
output = input.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
}
}
public class Master {
//1 应该有一个承装任务的集合
private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
//2 使用HashMap去承装所有的worker对象
private HashMap<String, Thread> workers = new HashMap<String, Thread>();
//3 使用一个容器承装每一个worker并非执行任务的结果集
private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
//4 构造方法
public Master(Worker worker, int workerCount){
// 每一个worker对象都需要有Master的引用 workQueue用于任务的领取,resultMap用于任务的提交
worker.setWorkerQueue(this.workQueue);
worker.setResultMap(this.resultMap);
for(int i = 0 ; i < workerCount; i++){
//key表示每一个worker的名字, value表示线程执行对象
workers.put("子节点" + Integer.toString(i), new Thread(worker));
}
}
//5 提交方法
public void submit(Task task){
this.workQueue.add(task);
}
//6 需要有一个执行的方法(启动应用程序 让所有的worker工作)
public void execute(){
for(Map.Entry<String, Thread> me : workers.entrySet()){
me.getValue().start();
}
}
//8 判断线程是否执行完毕
public boolean isComplete() {
for(Map.Entry<String, Thread> me : workers.entrySet()){
if(me.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
//9 返回结果集数据
public int getResult() {
int ret = 0;
for(Map.Entry<String, Object> me : resultMap.entrySet()){
//汇总的逻辑..
ret += (Integer)me.getValue();
}
return ret;
}
}
public class Main {
public static void main(String[] args) {
System.out.println("我的机器可用Processor数量:" + Runtime.getRuntime().availableProcessors());
Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors());
Random r = new Random();
for(int i = 1; i<= 100; i++){
Task t = new Task();
t.setId(i);
t.setName("任务"+i);
t.setPrice(r.nextInt(1000));
master.submit(t);
}
master.execute();
long start = System.currentTimeMillis();
while(true){
if(master.isComplete()){
long end = System.currentTimeMillis() - start;
int ret = master.getResult();
System.out.println("最终结果:" + ret + ", 执行耗时:" + end);
break;
}
}
}
}
最终结果:50063, 执行时间:2502
生产者-消费者模式
在生产者-消费者模式中,通常有两类线程,即若干个生产者的线程和若干个消费者的线程.生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信.
public final class Data {
private String id;
private String name;
public Data(String id, String name){
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString(){
return "{id: " + id + ", name: " + name + "}";
}
}
public class Consumer implements Runnable{
private BlockingQueue<Data> queue;
public Consumer(BlockingQueue queue){
this.queue = queue;
}
//随机对象
private static Random r = new Random();
@Override
public void run() {
while(true){
try {
//获取数据
Data data = this.queue.take();
//进行数据处理。休眠0 - 1000毫秒模拟耗时
Thread.sleep(r.nextInt(1000));
System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Provider implements Runnable{
//共享缓存区
private BlockingQueue<Data> queue;
//多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
private volatile boolean isRunning = true;
//id生成器
private static AtomicInteger count = new AtomicInteger();
//随机对象
private static Random r = new Random();
public Provider(BlockingQueue queue){
this.queue = queue;
}
@Override
public void run() {
while(isRunning){
try {
//随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
Thread.sleep(r.nextInt(1000));
//获取的数据进行累计...
int id = count.incrementAndGet();
//比如通过一个getData方法获取了
Data data = new Data(Integer.toString(id), "数据" + id);
System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){ //2s内还有没添加到queue
System.out.println("提交缓冲区数据失败....");
//do something... 比如重新提交
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop(){
this.isRunning = false;
}
}
public class Main {
public static void main(String[] args) throws Exception {
//内存缓冲区
BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
//生产者
Provider p1 = new Provider(queue);
Provider p2 = new Provider(queue);
Provider p3 = new Provider(queue);
//消费者
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
//创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(p1);
cachePool.execute(p2);
cachePool.execute(p3);
cachePool.execute(c1);
cachePool.execute(c2);
cachePool.execute(c3);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// cachePool.shutdown();
// cachePool.shutdownNow();
}
}
当前线程:pool-1-thread-2, 获取了数据,id为:1, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-3, 获取了数据,id为:2, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-3, 获取了数据,id为:3, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-1, 获取了数据,id为:4, 进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-6, 消费成功,消费数据为id: 3
当前消费线程:pool-1-thread-5, 消费成功,消费数据为id: 2
当前线程:pool-1-thread-2, 获取了数据,id为:5, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-2, 获取了数据,id为:6, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-2, 获取了数据,id为:7, 进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-4, 消费成功,消费数据为id: 1
当前线程:pool-1-thread-1, 获取了数据,id为:8, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-2, 获取了数据,id为:9, 进行装载到公共缓冲区中...
当前线程:pool-1-thread-3, 获取了数据,id为:10, 进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-6, 消费成功,消费数据为id: 4
线程池
一般情况下,当我们需要使用线程时就去创建一个线程,这样实现起来很简单,但如果并发的线程数量很多,而且每个线程都是执行一个时间很短的任务就结束了,这样频繁的创建销毁线程就会严重降低系统的效率. 线程池可以让线程执行完一个任务后,并不被销毁,继续执行其他任务.
Executors
Executors扮演着线程工厂的角色, 我们通过Executors可以创建特定功能的线程池.
(1)newFixedThreadPool(int nThreads)
- 创建一个固定数量的线程池,该线程池的线程数始终不变. 当一个任务提交时,若有线程空闲,则立即执行,若没有,则会缓存在一个无界队列中等待有空闲的线程去执行.
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
(2)newSingleThreadExecutor()
- 创建只有一个线程的线程池,若线程空闲则执行,否则缓存在一个无界队列中.
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
(3)newCachedThreadPool()
创建一个可根据实际情况调整线程个数的线程池,不限制最大线程数量.优先使用空闲的线程执行任务,空闲的线程不够就创建新线程,且每一个空闲线程会在60s后被自动回收.
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
(4)newScheduledThreadPool(int corePoolSize)
- 创建一个线程池,可以调度命令在给定的延迟之后运行,或定期执行。 corePoolSize 是要保留在池中的线程数,即使它们处于空闲状态
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
(5)线程池的常用方法
execute() : 向线程池提交一个任务,交由线程池去执行
submit() : 与execute()相比,有返回结果
shutdown() : 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow() : 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
ThreadPoolExecutor
若Executors工厂类中的线程池无法满足我们的需求,我们可以通过ThreadPoolExecutor类自己去定义线程池.
(1)ThreadPoolExecutor构造参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
- corePoolSize : 核心池的大小,指要保留在池中的线程数,即使它们处于空闲状态.
- maximumPoolSize : 线程池最大线程数,指在线程池中最多能创建多少个线程
- keepAliveTime : 表示空闲线程多久后会被回收
- unit : 参数keepAliveTime的时间单位, 有7种取值
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
- workQueue : 一个阻塞队列,用来存储等待执行的任务.常用的阻塞队列:
LinkedBlockingQueue; SynchronousQueue;
- threadFactory : 线程工厂,主要用来创建线程
- handler:表示当拒绝处理任务时的策略. 有以下四种取值, 也可以通过实现RejectedExecutionHandler接口,自定义拒绝策略.
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
(2)线程池的执行过程
当提交一个任务时,如果线程池中的线程数小于corePoolSize,则优先创建新线程,否则将任务加入缓冲队列. 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程. 若线程数大于maximumPoolSize,则执行拒绝策略.
对于无界的任务队列来说,线程池中只有corePoolSize个线程,新任务不断被加入到缓冲队列中,直到系统资源耗尽.
(3)实例
//自定义线程
public class MyTask implements Runnable {
private int taskId;
private String taskName;
public MyTask(int taskId, String taskName){
this.taskId = taskId;
this.taskName = taskName;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
try {
System.out.println("run taskId =" + this.taskId);
Thread.sleep(5*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String toString(){
return Integer.toString(this.taskId);
}
}
public class MyRejected implements RejectedExecutionHandler{
public MyRejected(){
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定义处理..");
System.out.println("当前被拒绝任务为:" + r.toString());
}
}
public class UseThreadPoolExecutor1 {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, //coreSize
2, //MaxSize
60, //60
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3)
, new MyRejected()
);
MyTask mt1 = new MyTask(1, "任务1");
MyTask mt2 = new MyTask(2, "任务2");
MyTask mt3 = new MyTask(3, "任务3");
MyTask mt4 = new MyTask(4, "任务4");
MyTask mt5 = new MyTask(5, "任务5");
MyTask mt6 = new MyTask(6, "任务6");
pool.execute(mt1);
pool.execute(mt2);
pool.execute(mt3);
pool.execute(mt4);
pool.execute(mt5);
pool.execute(mt6);
pool.shutdown();
}
}
run taskId =1
run taskId =5
自定义处理..
当前被拒绝任务为:6
run taskId =2
run taskId =3
run taskId =4
并发工具类
AtomicInteger
AtomicInteger用于实现并发情况下的计数器。
public class AtomicDemo {
private static AtomicInteger count = new AtomicInteger(0);
//多个addAndGet在一个方法内是非原子性的,需要加synchronized进行修饰,保证4个addAndGet整体原子性
public synchronized int multiAdd(){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
count.addAndGet(1);
count.addAndGet(2);
count.addAndGet(3);
count.addAndGet(4); //+10
return count.get();
}
public static void main(String[] args) {
final AtomicDemo demo = new AtomicDemo();
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(demo.multiAdd());
}
}).start();
}
}
}
CountDownLatch
允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止.
在初始化CountDownLatch时给定一个计数, 该计数为需要优先执行的线程数. countDown()方法每调用一次计数减一,直到计数为零时所有await()方法阻塞的线程全部释放.
public class UseCountDownLatch {
public static void main(String[] args) {
final CountDownLatch countDown = new CountDownLatch(2);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("进入线程t1" + "等待其他线程处理完成...");
countDown.await();
System.out.println("t1线程继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("t2线程进行初始化操作...");
Thread.sleep(3000);
System.out.println("t2线程初始化完毕,通知t1线程继续...");
countDown.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("t3线程进行初始化操作...");
Thread.sleep(4000);
System.out.println("t3线程初始化完毕,通知t1线程继续...");
countDown.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
t3.start();
}
}
进入线程t1等待其他线程处理完成...
t3线程进行初始化操作...
t2线程进行初始化操作...
t2线程初始化完毕,通知t1线程继续...
t3线程初始化完毕,通知t1线程继续...
t1线程继续执行...
CyclicBarrier
允许一组线程全部等待,直到所有线程都准备好之后再执行.
在初始化CyclicBarrier时给定一个计数,await()方法每调用一次计数减一,直到计数为零时线程再才能执行.
public class UseCyclicBarrier {
static class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(5));
System.out.println(name + " 准备OK.");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " Go!!");
}
}
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "zhangsan")));
executor.submit(new Thread(new Runner(barrier, "lisi")));
executor.submit(new Thread(new Runner(barrier, "wangwu")));
executor.shutdown();
}
}
lisi 准备OK.
zhangsan 准备OK.
wangwu 准备OK.
wangwu Go!!
lisi Go!!
zhangsan Go!!
Semaphore
用于控制系统的并发量.
在初始化Semaphore时给定一个计数,该计数为系统最大的并发线程数. acquire()方法获取许可,release()方法释放许可.
public class UseSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5);
// 模拟20个客户端访问
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 获取许可
semp.acquire();
System.out.println("Accessing: " + NO);
//模拟实际业务逻辑
Thread.sleep((long) (Math.random() * 10000));
// 访问完后,释放
semp.release();
} catch (InterruptedException e) {
}
}
};
exec.execute(run);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("等待执行的线程数" + semp.getQueueLength());
exec.shutdown();
}
}
Accessing: 2
Accessing: 3
Accessing: 0
Accessing: 1
Accessing: 4
等待执行的线程数15
...
Future
JDK对Future模式的实现.Future模式非常适合在处理耗时很长的业务逻辑时使用,可以有效的减少系统的响应时间,提高系统的吞吐量.
public class UseFuture implements Callable<String>{
private String para;
public UseFuture(String para){
this.para = para;
}
/**
* 这里是真实的业务逻辑,其执行可能很慢
*/
@Override
public String call() throws Exception {
//模拟执行耗时
Thread.sleep(5000);
String result = this.para + "处理完成";
return result;
}
//主控制函数
public static void main(String[] args) throws Exception {
String queryStr = "query";
//传入需要真正进行业务逻辑处理的类
FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));
FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(future);
executor.submit(future2);
System.out.println("请求完毕");
try {
System.out.println("处理实际的业务逻辑...");
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
//调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待
System.out.println("数据:" + future.get());
System.out.println("数据:" + future2.get());
executor.shutdown();
}
}
请求完毕
处理实际的业务逻辑...
数据:query处理完成
数据:query处理完成