Android多线程编程之详解阻塞队列和线程池
阻塞队列简介
阻塞队列常用于生产者和消费者场景,生产者往往是往队列里添加元素的线程,消费者
是从队列里拿元素的线程吗,阻塞队列就是生产者存放元素的容器,是消费者拿元素的容器
- 常见阻塞场景
- 当前队列中没有数据的情况下,消费端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
- 当队列种数据填充满的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中
有空的位置,线程被自动唤醒
2.BlockingQueue
放入数据:
- offer(anobject):表示如果可以将anobject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false(本方法不阻塞当前执行方法的线程)
- offer(e o,long timeout,TimeUnit unit):可以设定等待的时间,如果在指定的时间还不能往队列中
加入blockQuene,则返回失败 - put(anobject):将anobject加到BlockingQueue里,如果BlockQueue没有控件,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续
获取数据:
- poll(long timeout,TimeUnit unit):从BlockQueue中取出一个队首的对象,如果在指定时间内队列一旦有时间可以取,则立即返回队列中的数据,否则直到时间超过还没有数据可取,返回失败
- take():取走BlockingQueue排在位首的对象,若BlockingQueue为空,则阻塞进入等待状态,直到
BlockingQueue有新的数据加入 - drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数)
通过该方法,可以提升获取数据的效率,无需分多次分批加锁或释放锁。
Java中的阻塞队列
1.ArrayBlockingQueue:由数组结构组成的有界阻塞队列
他是用数组实现的有界阻塞队列,并按照先进先出的原则对元素进行排序,默认情况下
不保证线程公平的访问队列,公平的访问队列就是指阻塞的所有生产者线程或消费者线程
当队列不可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者先生产,先阻塞
的消费者线程可以先从队列里获取元素,通常情况下为了保证公平性会降低吞吐量。
2.LinkedBlockingQueue:由链表结构组成的有限阻塞队列
他是基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出的原则对
元素进行排序,其内部也会维持着一个数据缓冲队列,当生产者往队列中放入一个数据时,
队列会从生产者手中获取数据并缓存在队列内部,而生产者立即返回,只有当队列缓冲区达到
缓存容量的最大值时(可以指定该值),才会阻塞生产者线程,直到消费者从队列中消费
掉一份数据,生产者线程会被唤醒,反之,对于消费者这段的处理也基于同样的原理,
而LinkedBlockingQueue之所以能够高效的处理处理并发数据,还因为其对于生产者端和消费者端
分别采用独立的锁来控制数据同步。
以上两个常用的阻塞队列,还有五种不再详细介绍。
下面分析ArrayBlockingQueue的源代码:
private static final long serialVersionUID = -817911632652898426L;
final Object[] items;//阻塞队列维护的一个object类型的数组
int takeIndex;//队首元素
int putIndex;//队尾元素
int count;//队列中的元素
final ReentrantLock lock;//重入锁
private final Condition notEmpty;//条件对象判断数组不是满的
private final Condition notFull;//条件对象判断数组不是空的
transient Itrs itrs;
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
/**
* Returns item at index i.
*/
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
//取元素
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;//锁
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();//阻塞线程,等待notFull.signalAll()唤醒
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();//阻塞线程,等待notEmpty.await()唤醒
return dequeue();
} finally {
lock.unlock();
}
}
使用阻塞队列就无需考虑同步和线程间通信的问题。
public class VolatikeDemo {
private int queueSize=10;
private ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<>(queueSize);
public static void main(String args[]){
VolatikeDemo demo=new VolatikeDemo();
Consumer consumer=demo.new Consumer();
Producer producer=demo.new Producer();
consumer.start();
producer.start();
}
class Consumer extends Thread{
@Override
public void run() {
while(true){
try {
int res=queue.take();
System.out.println(res);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
while(true){
try {
this.sleep(200);
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
线程池
ThreadPoolExecutor
可以通过创建ThreadPoolExecutor来创建一个线程池,ThreadPoolExecutor类一共有四个构造方法
其中拥有最多参数的构造方法:
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(?,?,?,?,?);
- corePoolSize:核心线程数,默认情况下线程池是空的,只有任务提交是才会线程,如果
当前运行的线程数少于corePoolSize,则创建新线程来处理任务,如果等于或者多余
corepoolsize则不会创建新线程,如果调用prestartAllcoreThread()方法:线程池会提前
创建并启动所有的核心线程来等待任务。 - maximumPoolSize:线程池允许创建的最大线程数,如果任务队列满了,并且线程数小于
maximumPoolSize,则线程仍会创建新线程来处理任务。 - keepAliveTime:非核心线程闲置的超时时间,超过这个时间则回收,如果任务很多,并且每个
任务的执行时间的时间很短,则可以调大keepAliveTime来提高线程的利用率。 - TimeUnit:keepAliveTime参数的时间单位,可选的单位有天,小时,分钟,秒,毫秒等
- workQueue:任务队列,如果当前线程数大与corePoolSize则将任务添加到此任务队列中,
该任务队列是BlockingQuenu类型的,也就是阻塞队列 - ThreadFactory:线程工厂,可以线程工厂给每个创建出来的线程池设置名字,一般情况下无需要
设置此参数 - RejectedExecutionHandler:饱和策略,这是当任务队列和线程池都满了时所采取的应对策略
默认是无法处理新任务(AbordPolicy)并抛出RejectedExecutionException异常,此外还有三种策略,分别如下:
1.CallersRunsPolicy:用调查者所在的线程来处理任务,此策略提供简单的
反馈控制机制,能够减缓新任务的提交速度(简言之,降低提交速度)
2.DiscardPolicy:不能执行的任务,并将该任务删除
3.DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。
线程池的处理流程和原理
1.提交任务后,线程池先判断线程数是否达到了核心线程数(corepoolSize)
如果还没有达到核心线程数,则创建核心线程处理任务,否则执行下一步。
2.线程池判断任务队列是否满了,如果没满,将任务加入任务队列,否则执行
下一步。
3.线程池判断线程数是否达到最大线程数,如果未达到,则创建非核心线程处理任务,
否则就执行饱和策略,默认会抛出RejectedExecutionExeception异常。
通过线程池的执行示意图我们可以看出,如果我们执行ThreadPoolExecutor的execute方法,
会遇到各种情况
1.如果线程池中的线程数没有达到核心线程数,则创建核心线程执行任务。
2.如果线程池中的线程数大于或等于核心线程数,则加入任务队列,线程池中的空闲线程会不断的
从任务队列中取任务执行。
3.如果任务队列满了,并且线程数没有达到最大线程数,则创建非核心线程去处理任务。
4.如果线程数超过了最大线程数,则执行饱和策略。
4种常用的线程池
FixedThreadPool:他是可重用固定线程数的线程池,他的主要特点如下:
只有核心线程,没有非核心线程,并且数量是固定的,keepAliveTime设置
为0意味着多余的线程会被立即终止,因此不会产生多余的线程,采用了
无界阻塞队列LinkedBlockingQueue。
当执行execute方法时:如果当前运行的线程数未达到核心线程数,则创建一个新线程来处理任务,如果运行线程数等于核心线程数,则将任务添加到阻塞队列中,FixThread就是拥有固定数量核心线程的线程池,并且这些核心线程不会被回收,当线程池中有空闲线程就会去任务队列取任务
CacheThreadPool
CacheThreadPool:他是一个根据需要创建线程的线程池,他的主要特点如下:
CacheThreadPool的corePoolSize为0,maximumPoolSize设置为最大值,他没有
核心线程,非核心线程是无界的,keepAliveTime设置为60s,并使用了阻塞队列
SynchronousQueue,他是一个不存储元素的阻塞队列,每个插入操作必须要
等待另外一个线程的移除操作,同样一个移除操作也需要等待插入操作。
当执行execute方法时:首先会执行synchroniusQueue的offer方法来提交任务,并且
查询线程池中是否有空闲的线程执行SynchronousQueue的poll方法来移除任务,如果有
则配对成功,将任务交给这个线程去处理,如果没有则配对失败,创建新的线程去执行任务
当线程池中的线程空闲时,他会执行SynchronusQueue的poll方法,等待synchronoudQueue提交
的新任务,如果60s没有新任务提交到synchronousQueue,则这个线程就会终止。cacheThreadPool适合大量需要立即处理并且耗时少的任务。
SingleThreadExecutor:他是使用单个工作线程的线程池,corePoolSize和maximumPoolSize都为
1,意味着SingleThreadExecutor只有一个核心线程,其他核心参数都和FixThreadPool一样,SingleThreadExecutor执行execute方法时,如果当前运行的线程数未达到核心线程数,也就是当前没有运行的线程,则创建一个新线程来处理任务,如果当前有运行的线程,则将任务添加到阻塞队列中,因此SingleThreadExecutor能确保所有的任务都在一个线程中按照顺序逐一执行