线程池在项目中的使用
一个项目中可能多次使用到线程池,比如发邮件的时候需要使用线程池,执行消息入库的时候可能需要线程池,我们可以通过数据库配置来实现线程池使用
1.数据库表中中配置线程池的核心参数
主要包括以下参数:
线程池名:excutor_name
核心线程数:core_pool_size
最大线程数:max_pool_size
任务队列大小:max_queue_size
队列的类型:queue_type (LIMITED :有界任务队列,UNLIMITED:无界任务队列)
2.提供线程仓库类和线程工厂类
线程仓库类的作用:初始化一个Map,key为线程池名,name为线程池,提供put线程池,remove线程池,get线程池的方法
线程工厂类的作用:根据数据库初始化所有线程池
仓库类具体实现:
package cn.pool.executor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class ExecutorRepository {
private Map<String, ExecutorService> executors;
private ExecutorRepository() {
executors = new HashMap<String, ExecutorService>();
}
//private static final ExecutorRepository repository = new ExecutorRepository();
/**
* 延迟初始化单例模式
*/
private static class SingletonHandler{
final static ExecutorRepository repository = new ExecutorRepository();
}
public static ExecutorRepository getInstance(){
return SingletonHandler.repository;
}
/**
* put线程池的方法(同步方法)
*/
public synchronized void put(String executorName,ExecutorService exec){
executors.put(executorName, exec);
}
/**
* remove线程池的方法(同步方法)
*/
public synchronized void remove(String executorName){
executors.remove(executorName);
}
/**
* 查找线程池的方法(普通方法)
*/
public ExecutorService get(String executorName){
return executors.get(executorName);
}
}
线程工厂的实现:
@Component
public class ExecutorFactory {
@Autowired
private BaseDao baseDao;
/**
* 类加载的时候初始化所有的线程池
*/
@PostConstruct
public void initExecutors(){
//从数据库中查找所有的线程池
List<ExecutorDTO> executorsList = baseDao.queryList("query.all.executors",new ExecutorDTO());
if(executorsList == null ){
return;
}
//创建所有线程池并放入Map中待用
ExecutorRepository repository = ExecutorRepository.getInstance();
for (ExecutorDTO executor : executorsList) {
repository.put(executor.getExecutorName(), createThreadPoolExecutor(executor));
}
}
/**
* 根据queueType创建线程池
*/
private ExecutorService createThreadPoolExecutor(ExecutorDTO executor) {
ExecutorService exec = null;
if("LIMITED".equals(executor.getQueueType())){ //有界的任务队列
exec= new ThreadPoolExecutor(executor.getCorePoolSize(),
executor.getMaxPoolSize(),
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue(executor.getMaxQueueSize()));
}else{
exec = new ThreadPoolExecutor(executor.getCorePoolSize(),
executor.getMaxPoolSize(),
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
return exec;
}
/**
* 获取对应的线程池
*/
public static ExecutorService getExecutorByName(String executorName){
return ExecutorRepository.getInstance().get(executorName);
}
}
3.提供2种类型的任务
一种为实现Runnable接口的任务,另外一种是实现Callable接口的任务
实现Runnable接口
/**
* 实现Runnable接口的任务接口
* 任务只要实现这个接口即可,即是实现了Runnable接口的任务
*/
public interface IThreadService {
void run(Object o);
}
package cn.pool.executor;
import java.io.Serializable;
/**
* 实现Runnnable接口的任务包装DTO
*/
public class RunnableTask implements Runnable,Serializable{
private static final long serialVersionUID = -993258256608603676L;
private String executorName;
private IThreadService threadService;
private Object param;
@Override
public void run() {
try {
threadService.run(param);
} catch (Exception e) {
// TODO: handle exception
}
}
public String getExecutorName() {
return executorName;
}
public void setExecutorName(String executorName) {
this.executorName = executorName;
}
public IThreadService getThreadService() {
return threadService;
}
public void setThreadService(IThreadService threadService) {
this.threadService = threadService;
}
public Object getParam() {
return param;
}
public void setParam(Object param) {
this.param = param;
}
@Override
public String toString() {
return "RunnableTask [executorName=" + executorName +"]";
}
}
实现Callable接口的任务
/**
* 实现Callable接口的任务接口
* 任务只要实现这个接口即可,即是实现了Callable接口的任务
*/
public interface ICallableService {
Object call(Object o);
}
package cn.pool.executor;
import java.util.concurrent.Callable;
public class CallableTask implements Callable<Object> {
private String executorName;
private ICallableService callableService;
private Object param;
@Override
public Object call() throws Exception {
try {
return callableService.call(param);
} catch (Exception e) {
// TODO: 记录日志等
}
return null;
}
public String getExecutorName() {
return executorName;
}
public void setExecutorName(String executorName) {
this.executorName = executorName;
}
public ICallableService getCallableService() {
return callableService;
}
public void setCallableService(ICallableService callableService) {
this.callableService = callableService;
}
public Object getParam() {
return param;
}
public void setParam(Object param) {
this.param = param;
}
}
4.提供公共的线程池工具类供使用
package cn.pool.executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池工具类
*/
public class ExecutorUtils {
/**
* 执行runnable接口的任务
*/
public static void addThreadToPool(String executorName,IThreadService threadService,Object param){
RunnableTask task =new RunnableTask();
task.setExecutorName(executorName);
task.setThreadService(threadService);
task.setParam(param);
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//获取对应的线程池执行任务
if(exec == null){
throw new RuntimeException("指定的线程池不存在");
}
exec.execute(task);
}
/**
* 执行callable接口的任务
*/
public static Future addCallableTaskToPool(String executorName,ICallableService callableService ,Object param){
CallableTask task = new CallableTask();
task.setExecutorName(executorName);
task.setCallableService(callableService);
task.setParam(param);
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//获取对应的线程池执行任务
if(exec == null){
throw new RuntimeException("指定的线程池不存在");
}
return exec.submit(task);
}
/**
* 2中情况下进行饱和策略(1.线程池关闭的过程中提交任务,2.有界任务队列已经满了,线程数已经达到最大线程数),已经有了4种饱和策略实现
* {@link ThreadPoolExecutor.AbortPolicy}
* {@link ThreadPoolExecutor.CallerRunsPolicy}
* {@link ThreadPoolExecutor.DiscardOldestPolicy}
* {@link ThreadPoolExecutor.DiscardPolicy}
* 默认的是AbortPolicy,直接抛出{@link RejectedExecutionException},以防止提交任务线程终止
* 捕获异常处理,或者实现自己的饱和策略
*
*/
public static void setRejectedExecutionHandler(String executorName,RejectedExecutionHandler handler){
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//获取对应的线程池执行任务
if(exec == null){
throw new RuntimeException("指定的线程池不存在");
}
if(exec instanceof ThreadPoolExecutor){
((ThreadPoolExecutor) exec).setRejectedExecutionHandler(handler);
}
}
/**
* 获取任务队列的大小
*/
public static int getTaskQueueSize(String executorName){
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//获取对应的线程池执行任务
if(exec == null){
throw new RuntimeException("指定的线程池不存在");
}
return ((ThreadPoolExecutor) exec).getQueue().size();
}
/**
* 关闭线程池的方法,此方法等待当前正在执行的任务和队列中等待的任务全部执行完毕后关闭
*/
public static void shutDownExecutor(String executorName){
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//获取对应的线程池执行任务
if(exec == null){
throw new RuntimeException("指定的线程池不存在");
}
exec.shutdown();
}
public static int getAliveThread(String executorName){
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//获取对应的线程池执行任务
if(exec == null){
throw new RuntimeException("指定的线程池不存在");
}
return ((ThreadPoolExecutor) exec).getActiveCount();
}
}
5.线程池的使用:
a.数据库新增需要的线程池(线程池名字和参数)
b.需要使用线程池的service实现IThreadService或者ICallableSercvice,重写run或者call方法
c.通过utils工具类调用即可
public class ExecutorTest {
@Resource(name="cn.paic.mail.MailService"); //MailService实现了IThreadService
private IThreadService mailService;
public void sendMail(){
String param = "1";
ExecutorUtils.addThreadToPool("sendMailExecutorPool", mailService, param);
//执行其他逻辑
}
}