前言:最近在处理实际业务时遇到一个问题,订单表中有50-100万数据需要生成订单流水。应用部署在了四台服务器上,如何在竞争到锁的服务器上处理订单生成订单流水的速度更快,考虑使用一个线程池去负责读取数据,一个线程池负责去插入数据,在此之前先编写一个Demo类来测试方案是否可行。
一、首先回顾一下创建线程的三种方式
1、Thread类
class testTask extends Thread{
@Override
public void run() {
}
}
2、Runnable接口
class WriteTask implements Runnable{
private Student student;
public WriteTask(Student student){
this.student = student;
}
@Override
public void run() {
}
}
3、Callable接口
class ReadTask implements Callable<Student> {
@Override
public Student call() throws Exception {
return null;
}
}
二、从三种线程的创建方式来看,如果需要线程运行完所需要运行的代码之后有返回结果,那么就需要使用到第三种方式Callable,我现在需要模拟一个线程池去读取数据库中的数据库,另外一个线程池去数据库中插入数据。那么重要的是第一个线程池读到数据返回读到的数据之后,另外一个线程池如何能获取到了,直接上代码。
1、简单起见我就不连接数据库,直接编写一个数据提供类模拟读取数据库数据操作。直接输出来模拟向数据库中插入数据。
/**
* 生产数据
* @return 实体对象集合
*/
public static Student productionData(){
Student student = new Student();
UUID uuid = UUID.randomUUID();
String sname = "张三";
String sage = "21";
//String[] split = uuid.toString().split("-");
// StringUtils.join(split);
//添加数据
student.setSid(uuid.toString());
student.setSname(sname);
student.setSage(sage);
return student;
}
2、定义一个读取数据的Callable子类,定义一个写入数据的Runnable的子类
读取数据逻辑:
/**
* 线程需要运行的读取task
*/
static class ReadTask implements Callable<Student> {
@Override
public Student call() throws Exception {
System.out.println(Thread.currentThread().getName()+"正在获取数据----->");
Student student = ReadLockAndWriteLock.productionData();
System.out.println(Thread.currentThread().getName()+"获取数据已完成----->数据为:"+ student);
return student;
}
}
写入数据逻辑:
/**
* 线程需要运行的写入task
*/
static class WriteTask implements Runnable{
//在执行写入时需要出入写入的数据
private Student student;
public WriteTask(Student student){
this.student = student;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"正在插入数据"+"----->"+"插入的数据为:"+student);
System.out.println(Thread.currentThread().getName()+"插入数据已完成----->");
}
}
3、编写测试代码
public static void main(String[] args) {
//定义读和写两个线程池
ExecutorService readThreadPool = Executors.newFixedThreadPool(5);
ExecutorService writeThreadPool = Executors.newFixedThreadPool(5);
//创建线程池结果管理器
CompletionService<Student> completionService = new ExecutorCompletionService<Student>(readThreadPool);
//循环定义是个任务
for (int i = 0; i < 10; i++){
completionService.submit(new ReadTask());
new Thread().start();
}
//当执行读取的线程池任务执行完毕之后,关闭线程池
readThreadPool.shutdown();
//循环十次类获取线程池管理类返回的数据
try {
for (int i = 0; i < 10; i++){
Student student = completionService.take().get();
writeThreadPool.execute(new WriteTask(student));
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
//在执行写入任务的线程池执行完毕之后,关闭线程池
writeThreadPool.shutdown();
}
}
接收另外一个线程池的返回结果,主要是依赖CompletionService(线程池管理器),如果获取返回结果详细见代码,代码中都有添加注释。