java编程过程中对于同步转异步处理、高并发处理,常常会用到阻塞队列,利用其阻塞的特性实现消费者与生产者的解耦。在Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效转移数据的问题。
阻塞队列BlockingQueue是什么?首先他是队列,其次他有阻塞的特性;队列有先进先出、后进先出两种模式,前者一般作为顺序队列使用,后者用于压栈出栈操作。
通过java.util包下面的Queue队列接口可以看到,Queue继承了Collection <E>, Iterable <E>,所以队列是一个集合、支持迭代。Queue提供了以下几种方法
操作 | 方法 | 描述 | 特点 |
增加 | add(E e) | 将e元素插入队列,如果违反容量限制,将抛出IllegalStateException | 会抛出异常 |
删除 | remove | 检索并删除队列头,如果队列为空,抛出异常NoSuchElementException |
会抛出异常 |
增加 | offer(E e) | 与add相同,优于add操作,如果容量不足,不会抛出异常 | 不会抛出异常 |
获取 | poll | 检索并删除队列头,如果队列为空,不会抛出异常 | 不会抛出异常 |
查询 | element | 查询队列头,不删除元素,如果队列为空将抛出异常 | 会抛出异常 |
查询 | peek | 查询队列投,不删除原色,但如果队列为空,则返回null | 不会抛出异常 |
上面这是queue队列接口的定义的基础接口描述,基于Queue接口继承定义了BlockingQueue <E>阻塞队列、BlockingDeque<E>阻塞双端队列以及其他接口。
Blocking阻塞的含义就是一直等待的含义,如果队列满了就一直等待有空间了再放入,如果队列空了就等待有数据的时候再获取,不会因为未能成功操作而立刻返回异常。
Queue就是普通队列、Deque是双端队列,Queue只能支持FIFO先进先出原则,Deque除了能够支持FIFO先进先出还支持LIFO后进先出操作。由于Deque的后进先出的特性,他的特性优于Stack类,更适合作为栈操作。
多线程环境中,通过队列可以很容易实现数据共享,比如“生产者”和“消费者”模型,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但是生产者和消费者的处理速度,肯定是不完全匹配的。因此我们需要引入阻塞的概念:如果生产过剩,那就暂停一下等到消费者消费。以往在解决该问题时采用LOCK锁的方式,但这提高了程序开发中的复杂度,在Concurrent包中的BlockingQueue实现了这种能力。
对比Queue接口定义的方法来看BlockingQueue提供的基础方法,BlockingQueue通过put和take实现了消息阻塞式存入和阻塞式获取。
BlockingQueue接口的实现类包括ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue等。ArrayBlockingQueue是基于数组的阻塞队列必须给定队列深度、LinkedBlockingQueue是基于链表的阻塞队列,不需要指定队列深度(默认容量将是Integer.MAX_VALUE
),为了防止OOM一般也会指定队列深度。
阻塞队列经常用于消费者生产者模型下,以便通过多线程方式提高系统并发量。在多线程编程中另一个重要的点是采用线程池,线程池的使用能够避免高并发下反复创建线程,从而提高了CPU的开销。简单的来说线程池的引入,将通过线程的复用,减少线程创建。
线程池最为典型的创建方法是:
ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程 ExecutorService threadPool = Executors.newFixedThreadPool(5);//固定数量 ExecutorService threadPool = Executors.newCachedThread();//可伸缩
但实际上对于单个线程与固定数量线程在底层的实现是完全一样的,只不过数量不同而已,对于可伸缩线程池在最大线程数量不进行限制,将会造成潜在的栈溢出风险,所以不推荐采用以上三种方式。
一般在企业级应用下的线程池使用,都是进行自行扩展,不使用Executors创建。翻看Executors的底层代码,不难看出以上几种线程池创建都是通过ThreadPoolExecutor实现的。
//创建固定线程数的线程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, //固定初始线程数是nThreads
nThreads,//最大线程数也是nThreads 0L, //0秒钟等待
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());//毫无底线的队列深度(巨大隐患)
}
//创建单个线程数的线程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, //固定初始线程数是1
1,//最大线程数是1 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//毫无底线的队列深度(巨大隐患)
}
从上面两个示例中可以看到线程池的创建其实都是通过ThreadPoolExecutor创建的,下面详细了解一下每个参数
ThradPoolExecutors( int corePoolSize, //核心池线程数大小 int maximumPoolSize, //最大的线程池数大小 long keepAliveTime, //超时等待时间 TimeUnit unit, //时间单位 BlockingQueue<Runnable> workQueue, //阻塞队列(LinkedBlockDeque<>(3))双端队列性能高 ThreadFactory threadFactory, //线程工厂Executors.defaultThreadFactory();默认工厂 RejectedExecutionHandler handler//拒绝策略 AbortPolicy )
核心线程池数量就是默认启动加载时创建的线程数。
最大线程池大小是弹性的,是要大于等于核心池线程数大小的,这个数量一般情况下是在并发大于核心线程数,且无法排队到任务队列中后,才进行自动扩展。
超时等待时间与时间单位结合后,表示在超过这段时间周期后,线程池数量中的线程数会降低,以减少CPU对线程轮询的开销。
阻塞队列是用于缓冲线程任务,一般的流程时线程进来后会先判断核心线程是否都满,如果没有的话可能会尝试开辟新的线程进行处理,当然也会尝试将任务缓存在阻塞队列中等待空闲线程,所以阻塞队列是需要有数量限制的,否则会出现内存溢出的风险。
拒绝策略是说在当前线程池无法处理后,可以使用的决绝执行反馈方式。
拒绝策略包括
new ThreadPoolExecutor.AbortPolicy()\\抛出异常 new ThreadPoolExecutor.DiscardPolicy()\\不抛出异常 new ThreadPoolExecutor.DiscardOldestPolicy()\\尝试获取任务,继续执行 new ThreadPoolExecutor.CallerRunsPolicy()\\尝试运行,由原线程负责执行
通过扩展拒绝策略RejectedExecutionHandler,可以在rejectedExecution对粘连住的线程进行统计和重执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //r是等待被执行的任务,可以进行统计
//e是线程池,可以通过e.execute(r)执行任务
}
线程工厂threadFactory虽然一般采用Executors.defaultThreadFactory(),但是在多线程池配置的的运行时环境,很难区分线程池对应的模块或者组件,所以需要重新一遍线程工厂,从而方便的对线程组进行命名
public class DefaultThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final ThreadGroup group; private final String namePrefix; private final String groupName; public DefaultThreadFactory(String groupName) { this(groupName, "pool-" + groupName + "-thread-"); } public DefaultThreadFactory(String groupName, String namePrefix) { this.groupName = groupName; group = new ThreadGroup(this.groupName); this.namePrefix = namePrefix; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; } }
在创建线程池时,采用自定义的线程工厂,能够容易的扩展线程名称
例如
new DefaultThreadFactory("worryrock")
通过jstack查看线程执行情况时就能够清晰的看到命名为
pool-worryrock-thread-*命名的线程,是由自定义的线程池创建的。