问题
当客户端向服务层同时提交多个任务(Task)时,为了充分发挥服务器的处理计算能力,普通单线程已经不能满足我们的需求了,需要采用多线程或多进程的技术来并发处理task,服务器在处理并发任务时需要满足一下几个要求:
- 1.任务提交后,希望服务器尽可能快的处理并返回结果
- 2.多个任务同时处理时并不影响性能
- 3.当某个任务执行过程中出错了,允许重试,并且不会阻碍其它task的处理
分析
- 1.多个任务同时提交,并且希望服务器尽可能快的处理并返回结果,采用多线程技术来实现,针对每个任务,如果都分配一个线程去处理的话,那么当同时提交成千上万的任务的话,很容易导致服务器系统资源耗尽,服务器宕机。在分配任务时,创建和销毁线程很浪费资源,通过线程池技术来提高线程的执行效率,减去任务执行过程中的线程创建和销毁的时间,大大提高任务执行效率。通过线程池技术,使得系统初始化时存放一定量的线程,客户端提交task给服务器后,将各个task分配给现有的空闲线程去处理,但是当提交很多task的话,很明显,线程池里面的线程数量也会不足,那如何解决呢?在客户端提交task到服务器后,现将task提交到任务队列中(TaskQueue),系统执行task时,从任务队列中提取task再分配给线程池中的空闲线程去执行任务。
- 2.当我们发布任务后,任务交给了服务层取处理,在处理的过程中难免会遇到一些特殊的问题,导致任务执行失败,为了保证任务可以有效的执行并实现我们需要的任务,还需要为任务处理提供一个重试机制,这样当系统执行任务失败的时候,就可以启动重试机制来再次处理提交的任务,增加任务执行的成功率。
设计
- Client
Client 模拟的是多个客户端,submit()方法用于提交task,提交的这个task类型为AsyncTask,该类为专用的任务类,下文会讲述。 - AsyncExecutorService
首先这个类的目的是创建出一个线程池 executor来,用于资源分配,需要初始化一些参数,比如线程数量,最大线程数量,线程存活时长,重试次数以及重试等待时间等,这其中还有一个隐藏的参数--> 任务队列,分配资源时从任务队列中随机选择任务,进行分配,当有任务执行完成后,存在空闲线程时,该executor又会进行资源再分配,从任务队列中选择相应的任务进行执行。
该类还提供了执行任务的接口,从客户端传递过来的任务,在该类进行接收,并执行内部逻辑。 - AsyncTask
AsyncTask是一个Runnable类,整正执行任务的地方,执行的任务通过 抽象方法 doWork()对外提供,这样就可以实现不同的任务操作。在该类中主要实现了重试机制的逻辑,当任务执行失败时进行相应次数的重试执行,保证任务执行成功。
Client代码
package com.ngsky.async;
/**
* @Description TODO
* @Author daxiong
* @Date 7/7/2018 10:48 AM
**/
public class AsyncTaskTest {
public static void main(String[] args){
AsyncExecutorService executorService = new AsyncExecutorService(5, 10, 30, TimeUnit.SECONDS, 2, 5);
long beginTime = System.currentTimeMillis();
for(int i = 0; i < 1000;i++){
MockTask mockTask = new MockTask();
mockTask.setName("task" + i);
mockTask.setRetryTimes(3);
mockTask.setRetryWaitTime(2000);
executorService.execute(mockTask);
}
long endTime = System.currentTimeMillis();
System.out.println("The task spend on " + ((endTime - beginTime) / 60L) + " minutes");
}
}
// every task spend 20s, (1000 * 2) / 60 = 33 minutes
class MockTask extends AsyncTask{
public void doWork() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
AsyncTask.java类
package com.ngsky.async;
/**
* @Description TODO
* @Author daxiong
* @Date 7/7/2018 10:14 AM
**/
public abstract class AsyncTask implements Runnable {
private String name; // task name
private boolean successDone; // whether the task completed successful
private int retryTimes; // if the task was't finished successful, system allow retry to do the task for retrytimes
private int retryWaitTime; // system cant't retry to do work immediately, but execute after retryWaitTime
public void run() {
System.out.println("(task) " + getName() + " beginning execute....");
long beginTime = System.currentTimeMillis();
int currentRetryTimes = 1;
try {
doWork();
System.out.println("(task) " + getName() + " completed successful!");
this.setSuccessDone(true);
} catch (Exception e) {
System.out.println("(task) " + " execute filed..., message " + e);
this.setSuccessDone(false);
}
if (getRetryTimes() <= 0) return;
while (!isSuccessDone() && currentRetryTimes <= getRetryTimes()) {
System.out.println("(task) " + "Executing retry " + currentRetryTimes + "th!" );
if (getRetryWaitTime() > 0) {
try {
Thread.sleep(getRetryWaitTime());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
doWork();
System.out.println("(task) " + getName() + " completed successful!");
this.setSuccessDone(true);
} catch (Exception e) {
System.out.println("(task) " + getName() + " was failed, unknown reason! Please try again!");
this.setSuccessDone(false);
currentRetryTimes++;
}
}
long endTime = System.currentTimeMillis();
System.out.println("(task) " + " spend on " + (endTime - beginTime) + "ms and result is " + (this.isSuccessDone() ? "successful!" : "failed,Please check your task!"));
}
public abstract void doWork() throws Exception;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isSuccessDone() {
return successDone;
}
public void setSuccessDone(boolean successDone) {
this.successDone = successDone;
}
public int getRetryTimes() {
return retryTimes;
}
public void setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
}
public int getRetryWaitTime() {
return retryWaitTime;
}
public void setRetryWaitTime(int retryWaitTime) {
this.retryWaitTime = retryWaitTime;
}
}
AsyncExecutorService.java
package com.ngsky.async;
import java.util.concurrent.*;
/**
* @Description async service interface
* @Author daxiong
* @Date 7/7/2018 9:16 AM
**/
public class AsyncExecutorService {
private int corePoolSize;
private int maximumPoolSize;
private long keepLiveTime;
private TimeUnit timeUnit;
private int retryTimes;
private int retryWaitTime;
private LinkedBlockingQueue<Runnable> blockingQueue;
private ThreadPoolExecutor executor;
public AsyncExecutorService(int corePoolSize, int maximumPoolSize, long keepLiveTimes,
TimeUnit timeUnit, int retryTimes, int retryWaitTimes){
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepLiveTime = keepLiveTimes;
this.timeUnit = timeUnit;
this.retryTimes = retryTimes;
this.retryWaitTime = retryWaitTimes;
init();
}
private void init(){
System.out.println("Async executor initializing...");
blockingQueue = new LinkedBlockingQueue<Runnable>();
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepLiveTime, timeUnit, blockingQueue);
}
// init task information
private AsyncTask initTask(AsyncTask task){
System.out.println("(task) " + task.getName() + " initializing...");
if(retryTimes > 0) task.setRetryTimes(retryTimes);
if(retryWaitTime > 0) task.setRetryWaitTime(retryWaitTime);
return task;
}
public void execute(AsyncTask task){
task = initTask(task);
executor.execute(task);
}
public <T> Future<T> submit(Callable<T> job){
return executor.submit(job);
}
public void shutdown(){
if(executor != null)
executor.shutdown();
}
public int getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
public void setMaximumPoolSize(int maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}
public long getKeepLiveTime() {
return keepLiveTime;
}
public void setKeepLiveTime(long keepLiveTime) {
this.keepLiveTime = keepLiveTime;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}
public int getRetryTimes() {
return retryTimes;
}
public void setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
}
public int getRetryWaitTime() {
return retryWaitTime;
}
public void setRetryWaitTime(int retryWaitTime) {
this.retryWaitTime = retryWaitTime;
}
public LinkedBlockingQueue<Runnable> getBlockingQueue() {
return blockingQueue;
}
public void setBlockingQueue(LinkedBlockingQueue<Runnable> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public ThreadPoolExecutor getExecutor() {
return executor;
}
public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}
}
总结
异步任务调度相关的理论和实现还有许多地方没有涉及到,这里做一个抛砖引玉,比如任务执行过程中的拦截操作,线程池初始化时大小如何选择等等。
任务调度针对的是高并发任务请求,要求保证系统性能,以最快的速度处理任务,提高处理效率和成功率。
选择的技术:多线程,线程池,并发处理,任务调度,资源分配,重试机制。