package com.robustel.test.thread; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; public class Driver { static Integer betch = 5; static Integer threadNum = 5; public static CountDownLatch latch = new CountDownLatch(threadNum); private static List<String> operaSet = Collections.synchronizedList(new ArrayList<String>(betch)); static CountDownLatch startSignal = new CountDownLatch(1); static CountDownLatch doneSignal = new CountDownLatch(5); private static List<Thread> pools = new ArrayList<Thread>(1024); private static Integer num = 100; public static void main(String[] args) throws InterruptedException { // 依次创建并启动5个worker线程 overTimeProcess(); System.out.println("Driver is doing something..."); System.out.println("Driver is Finished, start all workers ..."); startSignal.countDown(); // Driver执行完毕,发出开始信号,使所有的worker线程开始执行 doneSignal.await(); // 等待所有的worker线程执行结束 System.out.println("Finished."); System.out.println(operaSet.size()); } public static void overTimeProcess() { // 依次创建并启动5个worker线程 if(pools.size()<=0){ for (int i = 0; i < 5; ++i) { Thread t = new Thread(new Worker(startSignal, doneSignal)); pools.add(t); t.start(); } } //初始化数据 for(int i=0;i<100;i++){ operaSet.add("data:"+"-"+i); } } public static void doWork(String name){ while(operaSet.size() >0){ String md = operaSet.remove(0); sendModuleData(md,name); //数据处理完毕 需要新增 //addTasks(); } } private static void sendModuleData(String md,String name){ System.out.println(name +":op :"+ md); } //新增处理数据 public static void addTasks(){ if(operaSet.size()<=0){ for(int i=num;i<num+10;i++){ operaSet.add("data:"+"-"+i); } num +=10; } } static class Worker implements Runnable{ private final CountDownLatch startSignal; private final CountDownLatch doneSignal; @SuppressWarnings("unused") public Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); // 等待Driver线程执行完毕,获得开始信号 doWork(Thread.currentThread().getName()); doneSignal.countDown(); // 当前worker执行完毕,释放一个完成信号 } catch (InterruptedException e) { e.printStackTrace(); } } } }
java 多线程消费
猜你喜欢
转载自annan211.iteye.com/blog/2406655
今日推荐
周排行