JUC版生产者消费者模型测试

package t1;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MySource {
private volatile boolean flag = true;
private AtomicInteger atomicInteger = new AtomicInteger();
private BlockingQueue<String> blockingQueue = null;

public MySource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println("传入的BlockingQueue为:" + blockingQueue.getClass().getName());
}

public void prod() {
System.out.println(Thread.currentThread().getName() + ":生产者启动");
String result = null;
boolean ret;
while (flag) {
try {
result = atomicInteger.incrementAndGet() + "";
ret = blockingQueue.offer(result, 2L, TimeUnit.SECONDS);
if (ret) {
System.out.println(Thread.currentThread().getName() + ":成功插入" + result);
} else {
System.out.println(Thread.currentThread().getName() + ":插入失败" + result);
}
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":生产结束");
}

public void consum() {
System.out.println(Thread.currentThread().getName() + ":消费者启动");
String result = null;
while (flag) {
try {
result = blockingQueue.poll(2, TimeUnit.SECONDS);
if (result == null || result.equalsIgnoreCase("")) {
System.out.println(Thread.currentThread().getName() + ":超过2秒没有取到,消费结束");
System.out.println();
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + ":消费成功" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public void stop() {
this.flag = false;
}

}

public class TestBlockingQueue {

public static void main(String[] args) {
MySource mySource = new MySource(new ArrayBlockingQueue<>(3));
new Thread(() -> {
mySource.prod();
}, "prod").start();

new Thread(() -> {
mySource.consum();
}, "consum").start();

try {
TimeUnit.SECONDS.sleep(5);
System.out.println("5秒钟后结束");
} catch (InterruptedException e) {
e.printStackTrace();
}

mySource.stop();
}
}

输出结果:

传入的BlockingQueue为:java.util.concurrent.ArrayBlockingQueue
prod:生产者启动
consum:消费者启动
prod:成功插入1
consum:消费成功1
consum:消费成功2
prod:成功插入2
consum:消费成功3
prod:成功插入3
prod:成功插入4
consum:消费成功4
prod:成功插入5
consum:消费成功5
5秒钟后结束
prod:生产结束
consum:超过2秒没有取到,消费结束

猜你喜欢

转载自www.cnblogs.com/dengw125792/p/12629928.html