@Slf4j
public class PoisonPill {
/**
* 毒丸模式:生产者通过某种方式告知消费者,消息已经发送完毕,消费端将停止接收任何消息。
*/
@Test
public void all() throws InterruptedException {
final ArrayBlockingQueue<Item> queue = new ArrayBlockingQueue<>(10);
final Consumer<Item> consumer = new Consumer<>(queue);
final Producer<Item> producer = new Producer<>(queue);
CompletableFuture.runAsync(() -> {
try {
consumer.start();
} catch (final InterruptedException e) {
log.error("failed", e);
}
});
producer.produce(new User("hello"));
producer.produce(new User("world"));
producer.produce(Producer.POISON_PILL);
TimeUnit.SECONDS.sleep(1);
assertFalse(consumer.isRun());
}
}
class User implements Item {
public User(String name) {
super();
}
}
@Data
@Slf4j
class Consumer<T extends Item> {
private final BlockingQueue<T> queue;
private volatile boolean run = true;
public void start() throws InterruptedException {
while (run) {
final T take = queue.take();
if (Producer.POISON_PILL == take) {
stop();
}
log.info("dowork {}", take);
}
}
private void stop() {
run = false;
}
}
@Data
class Producer<T extends Item> {
private final BlockingQueue<T> queue;
public static final Item POISON_PILL = new Item() {
};
public void produce(T item) throws InterruptedException {
queue.put(item);
}
}
interface Item {
}
毒丸模式【其他模式】
猜你喜欢
转载自www.cnblogs.com/zhuxudong/p/10171219.html
今日推荐
周排行