import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { protected ConcurrentLinkedQueue<Task> workerQueue = new ConcurrentLinkedQueue(); protected Map<String, Thread> workers = new HashMap<String, Thread>(); protected ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap(); public Master(Worker worker, int workCounts) { worker.setResultMap(this.resultMap); worker.setWorkerQueue(this.workerQueue); for (int i = 0; i < workCounts; i++) { workers.put("任务" + i, new Thread(worker)); } } //判断所有的子任务是否结束 protected boolean isComplete() { for (Map.Entry<String, Thread> set : workers.entrySet()) { if (set.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } //总共有多少个未完成工作 protected int isNotCompletedCount() { int count = 0; for (Map.Entry<String, Thread> set : workers.entrySet()) { if (set.getValue().getState() != Thread.State.TERMINATED) { count++; } } return count; } public void submit(Task task) { workerQueue.add(task); } public void execute() { for (Map.Entry<String, Thread> set : workers.entrySet()) { set.getValue().start(); } } public Integer getResult() { Integer result = 0; for (Map.Entry result_ : resultMap.entrySet()) { result += Integer.valueOf(result_.getValue().toString()); } return result; } } |