很长时间没有写过文章了,因现在从事的JAVA方向的系统开发,所以独自写了一个基础的任务执行框架,目的是根据所需应用场景,定制系统实现任务。此处有借助公司的框架思想实现的。废话不多说了,直接进入正题。
由于此框架的实现目的是为了扩展强度高,对每一个功能都能实现其任务的定制。所以设计思想如下:
-
程序运行开启两个线程
第一个线程是每隔10s查询数据库里是否有待执行的任务,如果有加入到任务队列中,第二个线程是任务执行线程,这个线程直接创建一个线程池,用以执行任务。
线程1:/** * 任务查询线程 * @author libing * */ public class TaskThread implements Runnable { private static Logger logger = LoggerFactory.getLogger(TaskThread.class); @Override public void run() { while(true) { //添加任务到任务队列 TaskDBUtil.addTaskQueue(); try { logger.debug("准备休眠10秒后取数据库任务"); //休眠10秒 Thread.sleep(10000); } catch (InterruptedException e) { logger.error("休眠失败",e); } } } }
线程2:
/** * 具体任务的执行线程 * @author libing * */ public class BusinessThread implements Runnable { private static Logger logger = LoggerFactory.getLogger(BusinessThread.class); ThreadPoolExecutor executor = new ThreadPoolExecutor(GlobalData.taskMaxNum, GlobalData.taskMaxNum*2, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(GlobalData.taskMaxNum)); @Override public void run() { while(true) { try { //判断任务队列中等待执行的任务数量不可大于线程池中 if(executor.getQueue().size()<GlobalData.taskMaxNum){ TaskBean task = TaskQueue.getTask(); if(task!=null) { //利用反射创建这个任务 //得到这个任务的taskGc Map<String, String> taskGc = TaskGcGlobal.getTaskGc(task.getExecId()); executor.execute((Runnable) Class.forName(taskGc.get("controller")).getConstructor(TaskBean.class).newInstance(task)); logger.debug("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+ executor.getQueue().size()+",已执行完别的任务数目:"+executor.getCompletedTaskCount()); } }else { logger.error("任务队列已满,休眠3秒"); //休眠3秒再去取任务 Thread.sleep(3000); } } catch (Exception e) { logger.error("任务执行异常,停止任务!",e); executor.shutdown(); } } } }
当任务队列中有待执行的任务时,执行线程就取出任务加入到线程池中,通过任务的key通过反射实现对应的任务对象进行动作。
- 第二步,当任务被开启运行后,就创建对应的执行对象。此处我用爬取妹子图的一个任务为例。爬这个图是通过页面一页一页的抓取的,如果要爬取全站的资源,这个任务如果用单线程来跑要消耗的时间非常长,所以这里考虑依然开启一个线程池进行工作。实现代码基本如下:
/**
* 爬图任务实现控制器
* @author libing
*
*/
public class CrawlerController implements Runnable {
private static Logger logger = LoggerFactory.getLogger(CrawlerController.class);
ThreadPoolExecutor executor = new ThreadPoolExecutor(GlobalData.taskThreadMaxNum, GlobalData.taskThreadMaxNum*2, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(GlobalData.taskThreadMaxNum));
private TaskBean task;
private CrawlerDBUtil dbUtil=new CrawlerDBUtil();
public CrawlerController(TaskBean task) {
this.task = task;
}
@Override
public void run() {
//查询爬图配置表
List<CrawlerConfigBean> configList = dbUtil.allCrawlerConfigList();
for(int i=0;i<configList.size();) {
if(executor.getQueue().size()<GlobalData.taskThreadMaxNum){
CrawlerWorker worker=new CrawlerWorker(configList.get(i),dbUtil);
executor.execute(worker);
i++;
}
}
while(true) {
if(executor.getCompletedTaskCount()==configList.size()) {
logger.debug("爬图任务执行完毕!");
//修改当前执行配置的列表当前页数
dbUtil.updateCrawlerConfig(configList);
//修改当前任务为完成run_flag=0
TaskDBUtil.completeTask(this.task);
break;
}
}
}
}
/**
* 爬图具体实现worker(每个页面一个worker)
* @author libing
*
*/
public class CrawlerWorker implements Runnable {
private static Logger logger = LoggerFactory.getLogger(CrawlerWorker.class);
private CrawlerConfigBean crawlerConfig;
private CrawlerDBUtil dbUtil;
public CrawlerWorker(CrawlerConfigBean crawlerConfig, CrawlerDBUtil dbUtil) {
this.crawlerConfig = crawlerConfig;
this.dbUtil = dbUtil;
}
@Override
public void run() {
// 循环当次采集的总页数
for (int i = 1; i <= crawlerConfig.getDayPage(); i++) {
// 当前查看的地址是当次的此页数+采集过的页数
// http://www.mzitu.com/zipai/ comment-page-1/#comments
List<String> imgUrlList = CrawlerUtil.findImgUrlListByUrl(
crawlerConfig.getUrl().replace("@", (i + crawlerConfig.getCurrentPage()) + ""));
dbUtil.saveCrawlerImgResult(imgUrlList);
for (String url : imgUrlList) {
// 下载图片到本地
try {
CrawlerUtil.download(url, GlobalData.crawlerImgPath + crawlerConfig.getIdent() + "/"
+ (crawlerConfig.getCurrentPage() + i));
} catch (Exception e) {
logger.error("当前图片下载失败:"+e);
}
}
}
}
}
至此,这个框架基本实现就是如此,如果有其它的任务定制,依然可以仿照这个爬图任务实现即可。欢迎各位大神指点.