集团发查询请求A给省内,A包含2个子请求A1、A2。由于集团和省内模型不一致,省内的查询接口是以省内模型为最小粒度。
A1映射为省内的B1和B2,A2映射为省内的B3和B4,所以此问题转化为省内需要查询B1、B2、B3、B4,但是需要保证的是:
1、B1和B2的查询结果要转化为A1,返回给集团,所以需要界限B1-4的结果响应边界。
2、B1-4的执行需要并行,以保证效率。
解决方式如下(利用java.util.concurrent.*):
1、建立线程池,管理线程。
public class DThreadPool extends ThreadPoolExecutor{ public void execute(Runnable command) { super.execute(command); } } public class DThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { return new DThread(r); } } public class DThread extends Thread { public DThread(Runnable r) { super(r); this.setName("Soo_DThread"); } }2、每个线程处理一个省内查询请求,如B1、B2
public class DTask implements Runnable { private Model model; // beans public DTask(Model model){ this.model = model; // beans init } public void run() { // 处理业务场景 } }3、额外记录每个集团请求的流水号,以及该流水的查询进度和查询结果
public class DThreadPoolExecutor { public static final DThreadPoolExecutor instance = new DThreadPoolExecutor(); public static DThreadPool executor; private static Map<long> counter = new HashMap<long> (); //流水号-计数 private static Map<long>> results = new HashMap<long>> (); //流水号-结果 private static Map<long> otherArgs = new HashMap<long>();//其他参数 private static Map beans = new HashMap (); private static Map<long> errCount = new HashMap<long> ();// 线程内报错计数 private DThreadPoolExecutor() { } public static DThreadPoolExecutor getInstance() { return instance; } public void start(int corePoolSize,int maximumPoolSize,int keepAliveTime,int poolBufferSize){ executor = new DThreadPool(corePoolSize,maximumPoolSize,keepAliveTime,poolBufferSize); } synchronized public static void ascCounter(Long batch){ Integer count = counter.get(batch); if(count == null) count = 0; count++; counter.put(batch, count); } //其他参数的getter/setter同步方法 }4、实际业务调用类中,需要做的工作有:构建业务模型类,传递给线程;循环等待线程处理结果计数,待流水查询结束后将 结果拿出来,转化拼装(如:等待B1、B2都处理完成才将结果拿出来,实际中省内查询是乱序的)。
public class XXService{ synchronized private String solve(args){ // 构建任务 Model model = new Model(); //model.setXXX DTask task = new DTask(model); exec.submitTask(task); DThreadPoolExecutor.ascCounter(流水号); //计数归零则拿出结果 while(true){ Integer count = DThreadPoolExecutor.getCounter(batch); SooException errCount = DThreadPoolExecutor.getErr(batch); if(EmptyUtil.isNotEmpty(errCount)){ // 如果该批次所有线程中有一个线程报错,则后续线程没有继续执行的必要 sb.append(errCount.getMessage()); remove(batch); throw errCount; } if(count == 0){ List<string> results = DThreadPoolExecutor.getResult(batch); if(EmptyUtil.isNotEmpty(results)){ for(String s : results){ sb.append(s); } } remove(batch); break; } } } }