背景:实际开发中经常会出现当前线程必须等待子线程执行完毕以后的场景。写了一个简单的例子,使用了BlockingQueue的特性来实现。后面可以优化成按当前线程ID进行控制
使用方法:
public static void main(String[] args) { TaskManager manager = TaskManager.getInstance(4); //新建一批线程任务执行 for(int i = 0; i < 100; i++){ manager.doTask(new Runnable(){ public void run() { System.out.println("------------> " + System.currentTimeMillis()); try { Thread.sleep(100); } catch (InterruptedException e) { } } }); } //等待所有子线程执行完毕 manager.waitAllThreadFinsh(); System.out.println("Finish"); }
package task; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public final class TaskManager { private static TaskManager instance; private ThreadPoolExecutor pool; private TaskSupport taskSupport; private TaskManager(int poolSize){ if(poolSize == 0){ poolSize = Integer.parseInt(System.getenv("NUMBER_OF_PROCESSORS")); } taskSupport = new TaskSupport(poolSize); pool = new ThreadPoolExecutor(poolSize, poolSize, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } /** * 外部执行入口 * @param task */ public void doTask(final Runnable task){ if(taskSupport.waitNotFull()){ pool.execute(new Runnable(){ public void run() { try{ task.run(); } finally{ taskSupport.removeQueue(); } } }); } } public void waitAllThreadFinsh(){ taskSupport.waitAllThreadFinsh(); } public static synchronized TaskManager getInstance(int cpuCount){ if(instance == null){ instance = new TaskManager(cpuCount); } return instance; } }
package task; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; class TaskSupport { BlockingQueue<Object> supportQueue; public TaskSupport(int poolSize) { supportQueue = new ArrayBlockingQueue<Object>(poolSize); } /** * 等待队列有可用空间,之前一直等待。 */ public boolean waitNotFull() { try { while(!supportQueue.offer(new Object(), 1, TimeUnit.SECONDS)){ } } catch (InterruptedException e) { return false; } return true; } /** * 当前线程等待子线程执行完成 */ public void waitAllThreadFinsh(){ while(supportQueue.size() > 0){ try { Thread.sleep(10); } catch (InterruptedException e) { } } } public void removeQueue(){ supportQueue.poll(); } }