大家在研发过程中,难免会遇到使用子线程或者异步线程的场景。它的实现方式有多种,下面就梳理一下。
1、Java原生写法
继承Thread,或者实现runnable接口;
不过因为这种方式没有统一的管理方式,到项目后期会看到到处都在创建线程,难以维护。
2、使用JDK自带的线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyExecutorUtils {
// 定义静态常量,当然可以设置很多属性,这里简单创建了一下
public static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
}
public class MyPopRunnableImpl implements Runnable{
public String arg1;
public String arg2;
public MyPopRunnableImpl() {}
public MyPopRunnableImpl(String arg1, String arg2) {
this.arg1= arg1;
this.arg2= arg2;
}
@Override public void run() {
MsgServer.AddPop(arg1, arg2);
}
}
MyExecutorUtils.fixedThreadPool.execute(new MyPopRunnableImpl(arg1,arg2));
3、使用spring Boot异步线程框架
/**
* 事件发布器
*/
public static ApplicationEventPublisher publisher;
/**
* 静态变量通过set方法注入
* @param publisher\t发布器
* @author chengmeng
*/
@Autowired
public void setSdkService(ApplicationEventPublisher publisher){
SqlExecutor.publisher = publisher;
}
//调用
publisher.publishEvent(new AsynWriteToDbEvent(instanceNo));
//配置类
@Configuration
public class TaskExecutorConfig {
@Bean("bpmTaskExecutor")
public TaskExecutor bpmTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("BpmThread-");
executor.initialize();
return executor;
}
}
//监听器
@Component
public class AsynWriteToDbListener {
@Async("bpmTaskExecutor")
@EventListener(AsynWriteToDbEvent.class)
public void listener(AsynWriteToDbEvent event) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Log.error("线程睡眠1秒钟异常", e);
}
}
4、使用future工具
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class ExecutorUtils {
/**
* 线程池单例
*/
public static volatile ExecutorService M_TASK =
ExecutorManageUtils.newFixedThreadPool(300, "LIEMS_M_TASK");
/**
* 缓存加载线程池
*/
public static volatile ExecutorService CACHE_TASK =
ExecutorManageUtils.newFixedThreadPool(100, "LIEMS_CACHE_TASK");
/**
* 获取当前运行中的所有线程池
* @return Map<String, ExecutorService> 线程池集合
*/
public static Map<String, ExecutorService> getAll() {
final Map<String, ExecutorService> executors = new HashMap<>();
final Field[] fields = ExecutorUtils.class.getDeclaredFields();
for (Field field : fields) {
final int modifiers = field.getModifiers();
Object value = null;
try {
value = field.get(null);
} catch (IllegalAccessException e) {
Log.error("获取线程字段值失败", e);
}
if (Modifier.isPublic(modifiers) && Modifier.isStatic(modifiers)
&& value instanceof ExecutorService) {
executors.put(field.getName(), (ExecutorService) value);
}
}
return executors;
}
/**
* 关闭所有线程池
*/
public static void shutdownAll() {
final Map<String, ExecutorService> all = getAll();
all.values().forEach(ExecutorManageUtils::shutdown);
}
}
/**
* 执行缓存加载任务,并且把future增加到list
* @param service 线程池
* @param task task任务
* @param futures 总任务list
*/
public static void executeTask(ExecutorService service, Runnable task, List<Future<?>> futures) {
final Future<?> future = service.submit(task);
futures.add(future);
}
final List<Future<?>> fus = new ArrayList<>();
// 加载数据库物理表的缓存
executeTask(CACHE_TASK, () -> BeanUtils.getBeanWithNonNull(CommDatabaseTableCached.class).load(), fus);
// 阻塞等待所有加载缓存线程结束
for (Future<?> fu : fus) {
fu.get();
}