今天看了下《并发编程实战》,觉得日志服务的生产消费者demo挺有趣的,故自己实现了下;
Bad Way
以下为第一种,较差的实现方式
package hpsyche.log;
import org.junit.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author Hpsyche
*/
public class BadLogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
private final static int CAPACITY = 3;
public BadLogWriter() {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread();
}
public void start() {
if (!logger.isAlive()) {
logger.start();
}
}
public void log(String msg) throws InterruptedException {
System.out.println(msg);
queue.put(msg.split(":")[1]);
}
public void shutdown() {
logger.interrupt();
}
private class LoggerThread extends Thread {
@Override
public void run() {
try {
while (true) {
System.out.println("取出日志:" + queue.take());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void test() {
BadLogWriter log = new BadLogWriter();
log.start();
int i = 1;
try {
while (true) {
Thread.currentThread().sleep(2000);
log.log("队列加入日志:" + (i++));
if (i == 3) {
log.shutdown();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
如出现意外情况,导致日志线程中断(例题中在i==3时,关闭LoggerThread)此时存在两个问题:
-
因为生产者并不是一个专门的线程,难以同时取消此生产者和消费者,如以上案例会出现生产者一直生产,直到阻塞队列LinkedBlockingQueue满,无法解除阻塞状态;
console输出如下
队列加入日志:1
取出日志:1
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
队列加入日志:2
取出日志:2
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
at hpsyche.log.BadLogWriter$LoggerThread.run(BadLogWriter.java:43)
队列加入日志:3
队列加入日志:4
- 那些正在等待写入的日志直接丢失;
NOT BAD WAY
鉴于以上情况,我们考虑设置一个状态码,来标识“请求是否已关闭”,
package hpsyche.log;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.Test;
/**
* @author Hpsyche
*/
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
private final static int CAPACITY = 3;
private boolean isShutdown;
public LogWriter() {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread();
}
public void start() {
if (!logger.isAlive()) {
logger.start();
}
}
public void log(String msg) throws InterruptedException {
if (!isShutdown) {
System.out.println(msg);
queue.put(msg.split(":")[1]);
} else {
// Thread.currentThread().sleep(6000);
throw new IllegalStateException("日志已关闭");
}
}
public void setShutdown(boolean isShutdown) {
this.isShutdown = isShutdown;
}
private class LoggerThread extends Thread {
@Override
public void run() {
try {
while (true) {
// Thread.currentThread().sleep(2000);
System.out.println("取出日志:" + queue.take());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void test() {
LogWriter log = new LogWriter();
log.start();
int i = 1;
while (true) {
try {
//把日志放入队列
log.log("队列加入日志:" + (i++));
if (i == 3) {
log.setShutdown(true);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在i==3时,设置关闭状态码,通过if判断isShutdown来实现对生产者线程的控制,避免了第一种情况中的生产者一直生产的问题;
但是,此时无法解决第二个问题,即会丢失未消费的阻塞队列中的数据,在LoggerThread中加入sleep来测试,如下:
private class LoggerThread extends Thread {
@Override
public void run() {
try {
while (true) {
Thread.currentThread().sleep(2000);
System.out.println("取出日志:" + queue.take());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
此时错误信息如下:
队列加入日志:1
队列加入日志:2
java.lang.IllegalStateException: 日志已关闭
...........
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
此时阻塞队列中的日志直接丢失,可以通过一些技巧来降低这种情况的概率,如在生产者线程结束前,等待一段时间,但是治标不治本,还是有可能导致线程发生故障。
GOOD WAY
在书中提到了一种方式:
由于阻塞队列take能响应中断,故可以通过一个计数器来“保持”日志的输出与存储,生产者需要给计数器递增计数,而消费者线程需要通过计数器来确保消费全部被消息,当输出线程中断时,让通过interrupt让take也中断,此时让计时器去判断是否还需要循环存储日志。
(注意:在计数器的递增、递减和判断过程,都需要加入同步来防止线程问题。)
具体实现如下;
package hpsyche.log;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.Test;
/**
* 日志类添加可靠的取消操作
*
* @author xiaof
*/
public class GoodLogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private boolean isShutdown;
//如果线程停止提交任务,线程不能停,要把剩余的任务提交结束
private int reservations;
private final static int CAPACITY = 500;
public GoodLogWriter() {
//队列长度
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.loggerThread = new LoggerThread();
}
public void start() {
//判断这个线程是否已经启动
if (!loggerThread.isAlive()) {
loggerThread.start();
}
}
public void log(String msg) throws InterruptedException {
//System.out.println("放入2:"+msg);
//放入日志队列并阻塞队列
synchronized (this) {
if (isShutdown)
//一旦shutdown,存储线程直接断开,并不会加入阻塞队列中
throw new IllegalStateException("日志开关没有打开");
++reservations;
}
System.out.println("放入:"+msg);
queue.put(msg);
}
public void stop() {
synchronized (this) {
isShutdown = true;
}
//准备中断存储线程(Thread.currentThread().sleep(100000);会抛出异常,
//同时由于此时还阻塞队列还存在元素,继续循环一次,确保日志不会丢失
loggerThread.interrupt();
}
private class LoggerThread extends Thread {
public void run() {
try {
int i=0;
while (true) {
i++;
try {
//对日志类上锁
synchronized (GoodLogWriter.this) {
//除了判断中断外,还需要reservations == 0
if (isShutdown && reservations == 0) {
System.out.println(666);
break;//停止线程
}
}
System.out.println("第"+i+"次准备存储");
if(i==2){
Thread.currentThread().sleep(100000);
}
//取出日志信息
String msg = queue.take();
System.out.println("存储:"+msg);
//提交成功一条,对阻塞的数据计数减少一条
synchronized (GoodLogWriter.this) {
--reservations;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
System.out.println("日志结束..........");
}
}
}
@Test
public void test() {
GoodLogWriter log = new GoodLogWriter();
log.start();
int i = 1;
while (true) {
try {
Thread.currentThread().sleep(2000);
//把日志放入队列
log.log("日志:" + i++);
Thread.currentThread().sleep(2000);
if (i == 4) {
log.stop();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
代码中已给出详细的注释,只要关注的是reservations
的变化,console输出结果如下:
第1次准备存储
放入:日志:1
存储:日志:1
第2次准备存储
放入:日志:2
放入:日志:3
第3次准备存储java.lang.InterruptedException: sleep interrupted
存储:日志:2
at java.lang.Thread.sleep(Native Method)
第4次准备存储
at hpsyche.log.GoodLogWriter$LoggerThread.run(GoodLogWriter.java:73)
存储:日志:3
666
日志结束..........
java.lang.IllegalStateException: 日志开关没有打开
at hpsyche.log.GoodLogWriter.log(GoodLogWriter.java:40)
at hpsyche.log.GoodLogWriter.test(GoodLogWriter.java:101)
在存储第三次后存储线程被阻塞,此时调用loggerThread.interrupt();
使得线程抛出InterruptedException
,但由于reservations
并不为0,线程会继续循环,直至所有数据,所以我们可以看到:放入了三次日志,也成功存储了三次日志。
总结
其实这个demo并不难理解,感觉其中的思想挺有趣的,貌似阻塞队列其中的put、take操作已经足够,不需要我们再提供计数器去保持什么状态了,但在本文三次操作中,可以看到通过计数与阻塞队列结合,可以实现一定程度的可靠的线程取消操作。