毒丸模式【其他模式】

@Slf4j
public class PoisonPill {

    /**
     * 毒丸模式:生产者通过某种方式告知消费者,消息已经发送完毕,消费端将停止接收任何消息。
     */
    @Test
    public void all() throws InterruptedException {
        final ArrayBlockingQueue<Item> queue = new ArrayBlockingQueue<>(10);
        final Consumer<Item> consumer = new Consumer<>(queue);
        final Producer<Item> producer = new Producer<>(queue);

        CompletableFuture.runAsync(() -> {
            try {
                consumer.start();
            } catch (final InterruptedException e) {
                log.error("failed", e);
            }
        });

        producer.produce(new User("hello"));
        producer.produce(new User("world"));
        producer.produce(Producer.POISON_PILL);
        TimeUnit.SECONDS.sleep(1);
        assertFalse(consumer.isRun());
    }
}

class User implements Item {
    public User(String name) {
        super();
    }
}

@Data
@Slf4j
class Consumer<T extends Item> {
    private final BlockingQueue<T> queue;
    private volatile boolean run = true;

    public void start() throws InterruptedException {
        while (run) {
            final T take = queue.take();
            if (Producer.POISON_PILL == take) {
                stop();
            }
            log.info("dowork {}", take);
        }
    }

    private void stop() {
        run = false;
    }
}

@Data
class Producer<T extends Item> {
    private final BlockingQueue<T> queue;
    public static final Item POISON_PILL = new Item() {
    };

    public void produce(T item) throws InterruptedException {
        queue.put(item);
    }
}

interface Item {
}

猜你喜欢

转载自www.cnblogs.com/zhuxudong/p/10171219.html