多线程跑批处理大数据
具体思路:是先把大量数据(100w)按照1000切分成多份,然后把1000个数据排序放在10个线程中,进行多线程处理。
public void handle() throws BusinessException {
int pageSize = 1000;
// 目前处理逻辑中,存在多线程并发,故保险起见线程数设置不要超过10
int threadNum = 10;
Page<PDto> page = new Page<PDto>(pageSize);
int total = 0;
// 循环分页查询数据进行处理
for (;;) {
pMapper.selectP(page, new HashMap<String, Object>());// 满足数据集合
if (null == page || null == page.getResult() || page.getResult().isEmpty()) {
LOG.info("未查询到数据,跑批结束#");
break;
}
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);
total += page.getResult().size();
// 将数据进行排序,防止并发问题
Map<Long, List<PDto>> aMap = new HashMap<Long, List<PDto>>();
List<PDto> pPageList = page.getResult();
for (PDto pDto : pPageList) {
if (aMap.get(pDto.getMid()) == null) {
aMap.put(pDto.getMid(), new ArrayList<PDto>());
}
aMap.get(pDto.getMid()).add(pDto);
}
// 将需要执行的数据分配到各个线程中去
Map<String, List<PDto>> threadWorkMap = new HashMap<String, List<PDto>>();
int j = 1;
for (List<PDto> list : aMap.values()) {
if (threadWorkMap.get("thread_" + j) == null) {
threadWorkMap.put("thread_" + j, new ArrayList<PDto>());
}
threadWorkMap.get("thread_" + j).addAll(list);
if (++j > threadNum) {
j = 1;
}
}
// 启动每个线程处理任务
for (int m = 1; m <= threadNum; m++) {
List<PDto> threadWorkList = threadWorkMap.get("thread_" + m);
if (threadWorkList == null || threadWorkList.size() == 0) {
continue;
}
Runnable task = bThread(threadWorkList);
executor.submit(task);
}
try {
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
LOG.error("当前线程被中断#", e);
}
// 返回数据小于最小分页数,说明已经处理完毕
if (page.getResult().size() < pageSize) {
LOG.info("結束......");
break;
}
}
LOG.info("总数:{}笔", total);
}
数据业务逻辑处理
private Runnable bPayThread(final List<PDto> pList) {
Runnable run = new Runnable() {
@Override
public void run() {
long startTime = System.currentTimeMillis();
// 分别处理每一个数据
for (PDto pDto : pList) {
try {
// 主业务逻辑处理
muService.pRdTx(pDto);
} catch (Exception e) {
// 如果处理失败,则记录失败日志,并标记
}
}
long endTime = System.currentTimeMillis();
float seconds = (endTime - startTime) / 1000F;
LOG.info("线程ID:{},还款笔数:{},执行时间:{}秒", Thread.currentThread().getId(), pList.size(), seconds);
}
};
return run;
}