(一)
new Thread(new SolrDataHandleThread()).start();
new Thread(new ElasticSearchDataHandler()).start();
new Thread(new RedisDataHandler()).start();
private class SolrDataHandleThread implements Runnable {
public void run() {
log.info("in SolrDataHandleThread run()--->begin");
log.info("pageSize is--->" + pageSize);
//调saf接口 根据商家id查询商家管理系统oracle数据 如商家的所属运营人员及部门的数据
List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getAllVenderAuthResultData();
if (resultDataAllList != null && resultDataAllList.size() > 0) {
int totalCount = resultDataAllList.size();
int totalPages = totalCount % pageSize > 0 ? (totalCount / pageSize + 1) : totalCount / pageSize;
List<VenderAuthResultData> resultDataTempList = new ArrayList<VenderAuthResultData>();
for (int page = 1; page <= totalPages; page++) {
int startIndex = (page - 1) * pageSize;
int endIndex = startIndex + pageSize;
if (totalCount > endIndex) {
resultDataTempList = resultDataAllList.subList(startIndex, endIndex);
} else {
resultDataTempList = resultDataAllList.subList(startIndex, totalCount);
}
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
if (resultDataTempList != null && resultDataTempList.size() > 0) {
for (VenderAuthResultData vdl : resultDataTempList) {
VenderInfoDTO venderInfoDTO = penaltyBlackWhiteClient.getVenderInfoDtoByVenderId(vdl.getVenderId());
if (venderInfoDTO != null) {
//构建刷solr数据
SolrInputDocument doc = buildSolrInputDocument(vdl, venderInfoDTO);
docs.add(doc);
}
}
}
try {
solrServer92.add(docs);
solrServer92.commit();
} catch (Exception e) {
log.error("批量同步所有mysql数据到92solr服务器-刷新全部solr数据遇到错误", e);
}
try {
solrServer96.add(docs);
solrServer96.commit();
} catch (Exception e) {
log.error("批量同步所有mysql数据到96solr服务器-刷新全部solr数据遇到错误", e);
}
log.info("deal with data end, page number is--->" + page);
}
}
log.info("in SolrDataHandleThread run()--->end");
}
}
private class ElasticSearchDataHandler implements Runnable {
public void run() {
log.info("Elastic search thread begin to process data !");
boolean switchEs = ConfigCenterUtil.getSwitchConfig(SellerAuthStrategy.ES_SWITCH);
log.info("The switch of writing to ElasticSearch is opened :" + switchEs);
if(switchEs) {
List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getAllVenderAuthResultData();
if (resultDataAllList != null && resultDataAllList.size() > 0) {
log.info("Size of data is " + resultDataAllList.size());
for (VenderAuthResultData vdl : resultDataAllList) {
VenderInfoDTO venderInfoDTO = penaltyBlackWhiteClient.getVenderInfoDtoByVenderId(vdl.getVenderId());
if (venderInfoDTO != null) {
SellerAuthDocument sellerAuthDocument = buildSellerAuthDocument(vdl, venderInfoDTO);
indexElasticSearchDoc(sellerAuthDocument);
}
}
}
}
log.info("Elastic search thread process data successfully !");
}
}
private class RedisDataHandler implements Runnable {
public void run() {
log.info("begin to synchronize the redis data in thread : RedisDataHandler, with page size: " + pageSize);
Calendar calendar = getLegalCalendar();
String[] monthArray = getMonths(calendar);
for(String m : monthArray) {
log.info("Begin to process sync data to redis server.with month : " + m);
List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getCurrentYearMonthVenderAuthResultDataByOpTime(m);
if(null != resultDataAllList && resultDataAllList.size() > 0) {
syncRedisDataProcess(resultDataAllList);
}else{
log.info("The synchronous data is empty. the month is " + m);
}
log.info("Finish to process sync data to redis server.with month : " + m);
}
log.info("finish to synchronize the redis data in thread : RedisDataHandler, with page size: " + pageSize);
}
}
===============================================================================
(二)
注入线程池:
private ThreadPoolUtil threadPoolUtil;
public void setThreadPoolUtil(ThreadPoolUtil threadPoolUtil) {
this.threadPoolUtil = threadPoolUtil;
}
bean配置:
<!-- 线程池util -->
<bean id="threadPoolUtil" class="com.jd.util.ThreadPoolUtil"/>
调用:
private void writeCustomsOpenStatusToRedis(Long customsId, int openStatus) {
try {
final int openStatusTmp = openStatus;
final Long customsIdTmp = customsId;
threadPoolUtil.getCachedThreadPool().execute(new Runnable() {
public void run() {
if(customsIdTmp == null) {
return;
}
if (openStatusTmp == 1) {
redisUtils.set(RedisConstants.CUSTOMS_OPEN_STATUS_SURFIX + customsIdTmp, String.valueOf(openStatusTmp));
} else {
redisUtils.del(RedisConstants.CUSTOMS_OPEN_STATUS_SURFIX + customsIdTmp);
}
}
});
}catch (Exception e) {
log.error("构建海关启用停用redis数据失败", e);
}
}
private void writeCustomsVenderToRedis(Long customsId, Long venderId, int customsVenderStatus) {
try {
final Long venderIdTemp = venderId;
final Long customsIdTmp = customsId;
final int customsVenderStatusTmp = customsVenderStatus;
threadPoolUtil.getCachedThreadPool().execute(new Runnable() {
public void run() {
if (customsVenderStatusTmp == 1) {
redisUtils.hset(RedisConstants.CUSTOMS_VENDER_ID_SURFIX + venderIdTemp, String.valueOf(customsIdTmp), "1");
} else {
redisUtils.hdel(RedisConstants.CUSTOMS_VENDER_ID_SURFIX + venderIdTemp, String.valueOf(customsIdTmp));
}
}
});
}catch (Exception e) {
log.error("构建海关店铺添加或移除redis数据失败", e);
}
}
线程池类ThreadPoolUtil.java:
package com.jd.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 创建获取线程池的util
* User: shaodong
* Date: 13-1-5
* Time: 下午12:46
* To change this template use File | Settings | File Templates.
*/
public class ThreadPoolUtil {
/**
* 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,
* 这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。
# 如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
*/
private final ExecutorService CACHED_THREAD_POOL = Executors.newCachedThreadPool();
/**
* 创建一个固定大小的线程池,最大50个,超过50个的时候,会阻塞等待
*/
private final ExecutorService FIXED_THREAD_POOL = Executors.newFixedThreadPool(50);
/**
* 获得固定大小的线程池
* @return
*/
public ExecutorService getCachedThreadPool() {
return CACHED_THREAD_POOL;
}
// public static void main(String args[]){
// new ThreadPoolUtil().m();
// }
//
// void m() {
// //创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
// ExecutorService pool = Executors.newFixedThreadPool(50);
// //创建实现了runnable接口的对象
// for(int i=1; i<=100; i++) {
// pool.execute(new MyThread(i));
// }
// pool.shutdown();
// }
//
// class MyThread extends Thread{
// int i = 0;
// public MyThread(int _i){
// System.out.println("create thread:"+_i);
// i = _i;
// }
//
// @Override
// public void run(){
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
// }
// System.out.println(Thread.currentThread().getName()+" is running... thread:"+i);
// }
// }
}