package concurrent._ThreadPool.logService; import net.jcip.annotations.GuardedBy; import org.omg.PortableInterceptor.SYSTEM_EXCEPTION; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.*; //一个日志的服务,多个生产者,一个消费者 public class Demo { public static void main(String[] args) throws Exception { MyLogService myLogService = new MyLogService(8,5); myLogService.start(); Thread.sleep(5000); System.out.println("请求停止线程池"); myLogService.stop(); } } class MyLogService{ //线程池的数量 private int serviceCount; //阻塞队列的数量 private int blockingqueueCount; //日志服务线程池 public static ExecutorService executorService; //日志服务需要的阻塞队列 private BlockingQueue<String> bq = null; //用户取消而设置的标志位 //GuardedBy表示要取得括号里面的这个锁才能访问这个属性 @GuardedBy("this") private volatile boolean iscancel; //用于记录,put与take次数的差,如果取消标志位设置为真,次数差到0说明,put的操作都已经被消费了,这时候说明没有put操作被阻塞 // 可以将cum线程终止,所有线程都成功停下来了 @GuardedBy("this") private int reservations; //消费者; private FutureTask<Integer> cum = null; //线程池默认数量 private final static int DEFAULT_SERVICE_COUNT = 8; private final static int DEFAULT_QUEUE_COUNT = 4; public MyLogService(int serviceCount , int blockingqueueCount){ this.serviceCount = serviceCount; this.blockingqueueCount = serviceCount; this.iscancel = false; this.reservations = 0; executorService = Executors.newFixedThreadPool(serviceCount); bq = new ArrayBlockingQueue<String>(blockingqueueCount); } public MyLogService(){ this(DEFAULT_SERVICE_COUNT,DEFAULT_QUEUE_COUNT); } //线程池开始工作 public void start(){ this.cum = new FutureTask<Integer>(this.CumLogfactory()); //启动cum executorService.submit(cum); List<FutureTask<String>> list = new ArrayList<FutureTask<String>>(); for(int i = 0 ;i < this.serviceCount ; i ++){ FutureTask<String> futureTask = new FutureTask<String>(this.ProLogFactory()); list.add(futureTask); } for(FutureTask<String> f:list){ executorService.submit(f); } System.out.println("线程池启动工作"); } //线程池停止工作 public void stop() throws ExecutionException, InterruptedException { synchronized (this){ iscancel = true; } Integer result = cum.get(); if(result == 1){ System.out.println("所有线程已经停止"); executorService.shutdown(); } } private ProLog ProLogFactory(){ return new ProLog(); } private CumLog CumLogfactory(){ return new CumLog(); } private class ProLog implements Callable<String>{ private final String name ; //默认名字 private final static String DEFAULT_NAME = "proLog"; public ProLog(){ this(DEFAULT_NAME); } public ProLog(String name){ this.name = name; } @Override public String call() throws Exception { String thradname = Thread.currentThread().getName(); proLog(thradname); return null; } private void proLog(String name) throws InterruptedException { String msg ; while(true){ synchronized (MyLogService.this){ if(iscancel){ System.out.println(name +"发现请求停止,抛出异常"); throw new IllegalStateException(); } ++reservations; } msg = createLog(); //将日志写入阻塞队列中 bq.put(msg); System.out.println(name + "写入到队列中" + msg + "__队列剩余空位_" + bq.remainingCapacity()); } } private String createLog() throws InterruptedException { //创建一个日志 long time = System.currentTimeMillis(); String msg = Long.toString(time); //模拟等待一段时间 Thread.sleep(new Random().nextInt(5000)); return msg; } } private class CumLog implements Callable<Integer>{ private final String name ; //默认名字 private static final String DEFAULT_NAME = "cumLog"; public CumLog(){ this(DEFAULT_NAME); } public CumLog(String name){ this.name = name; } @Override public Integer call() throws Exception { System.out.println("启动一个日志cum"); Integer msg = cumLog(); return msg; } private Integer cumLog() throws InterruptedException, IOException { String msg = null; Integer isdone = 0; //获取日志 while(true){ Thread.sleep(300); synchronized (MyLogService.this){ if(iscancel && reservations==0){ isdone = 1; break; } } msg = bq.take(); synchronized (MyLogService.this){ --reservations; } System.out.println("消费了一个日志"+msg + "__队列剩余空位_" + bq.remainingCapacity()); } return isdone; } } }
结果:
启动一个日志cum 线程池启动工作 pool-1-thread-7写入到队列中1543640100705__队列剩余空位_4 消费了一个日志1543640100705__队列剩余空位_5 pool-1-thread-5写入到队列中1543640100705__队列剩余空位_4 消费了一个日志1543640100705__队列剩余空位_5 pool-1-thread-5写入到队列中1543640103668__队列剩余空位_4 消费了一个日志1543640103668__队列剩余空位_5 pool-1-thread-8写入到队列中1543640100705__队列剩余空位_4 pool-1-thread-6写入到队列中1543640100705__队列剩余空位_3 pool-1-thread-2写入到队列中1543640100705__队列剩余空位_2 消费了一个日志1543640100705__队列剩余空位_3 消费了一个日志1543640100705__队列剩余空位_4 pool-1-thread-4写入到队列中1543640100705__队列剩余空位_3 pool-1-thread-6写入到队列中1543640105030__队列剩余空位_2 pool-1-thread-3写入到队列中1543640100705__队列剩余空位_1 消费了一个日志1543640100705__队列剩余空位_2 请求停止线程池 pool-1-thread-7写入到队列中1543640100867__队列剩余空位_1 pool-1-thread-7发现请求停止,抛出异常 pool-1-thread-7发现请求停止,抛出异常 pool-1-thread-3写入到队列中1543640105582__队列剩余空位_0 pool-1-thread-3发现请求停止,抛出异常 消费了一个日志1543640100705__队列剩余空位_1 消费了一个日志1543640105030__队列剩余空位_2 消费了一个日志1543640100705__队列剩余空位_3 pool-1-thread-8写入到队列中1543640104835__队列剩余空位_2 pool-1-thread-8发现请求停止,抛出异常 消费了一个日志1543640100867__队列剩余空位_3 pool-1-thread-2写入到队列中1543640105042__队列剩余空位_2 pool-1-thread-2发现请求停止,抛出异常 消费了一个日志1543640105582__队列剩余空位_3 消费了一个日志1543640104835__队列剩余空位_4 消费了一个日志1543640105042__队列剩余空位_5 pool-1-thread-6写入到队列中1543640105573__队列剩余空位_4 消费了一个日志1543640105573__队列剩余空位_5 pool-1-thread-6发现请求停止,抛出异常 消费了一个日志1543640104777__队列剩余空位_5 pool-1-thread-5写入到队列中1543640104777__队列剩余空位_5 pool-1-thread-5发现请求停止,抛出异常 pool-1-thread-4写入到队列中1543640105562__队列剩余空位_4 pool-1-thread-4发现请求停止,抛出异常 消费了一个日志1543640105562__队列剩余空位_5 所有线程已经停止