简介
生产者和消费者共同操纵一个数据缓冲区,其中生成者向缓冲区中写入数据,消费者从缓冲区中读取数据。
本程序实现如下,生产者和消费者实现Runnable
接口,不断写入或读取数据。多个生产者和消费者线程通过Executor
进行管理。
对数据缓冲区的操作中,使用写入位置指针和读取位置指针分别标明写入或读取位置。
测试程序:
// 声明唯一的数据缓存区
DataCache cache = new DataCache();
// 生产者线程池
ExecutorService exec1 = Executors.newCachedThreadPool();
// 消费者线程池
ExecutorService exec2 = Executors.newCachedThreadPool();
for(int i=0; i<5; i++) {
exec1.execute(new Producer(i, cache));
}
for(int i=0; i<5; i++) {
exec2.execute(new Customer(i, cache));
}
exec1.shutdown();
exec2.shutdown();
数据缓冲区:
class DataCache {
public int[] datas = new int[10];
public int inPointer = 0; // 写入位置指针
public int outPointer = 0; // 读取位置指针
public int order = 0; // 测试使用,用于生成有序的数据
// 翻转标记,当写指针先走到缓冲区的末尾时,回到为缓冲区的初始位置,
// 复用已被读过的空间,此时打开翻转标记。当读指针走到缓冲区的末尾时,
// 也回到缓冲区的初始位置,关闭翻转标记。
public boolean reverse = false;
}
生产者:
class Producer implements Runnable {
private final int id;
private DataCache cache;
public Producer(int id, DataCache cache) {
this.id = id;
this.cache = cache;
}
/** 执行方法:循环执行写操作 */
@Override
public void run() {
while(true) {
try {
this.write();
} catch (InterruptedException e) {
System.out.println("[@Writer#" + id + "] is interrupted...");
}
}
}
/** 写操作,找到合适的写的位置,并写入数据 */
public void write() throws InterruptedException {
synchronized (this.cache) {
if(this.cache.inPointer < this.cache.datas.length) {
if(this.cache.reverse) {
if(this.cache.inPointer < this.cache.outPointer)
writeData();
else
this.cache.wait();
} else {
writeData();
}
} else {
this.cache.reverse = true;
this.cache.inPointer = 0;
if(this.cache.outPointer > 0) {
writeData();
} else {
this.cache.wait();
}
}
}
}
/** 写入数据 */
public void writeData() {
int pos = this.cache.inPointer++;
int data = this.cache.order++;
this.cache.datas[pos] = data;
synchronized (this.cache) {
this.cache.notifyAll();
}
System.out.println("[Writer#" + id + "] write in [ " + pos + ", " + data + " ]");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("[Writer#" + id + "] is interrupted...");
}
}
}
消费者:
class Customer implements Runnable {
private final int id;
private DataCache cache;
public Customer(int id, DataCache cache) {
this.id = id;
this.cache = cache;
}
/** 执行方法:循环执行读操作 */
@Override
public void run() {
while(true) {
try {
this.read();
} catch (InterruptedException e) {
System.out.println("[@Reader#" + id + "] is interrupted...");
}
}
}
/** 读操作,找到合适的读的位置,并读出数据 */
public void read() throws InterruptedException {
synchronized (this.cache) {
if(this.cache.outPointer < this.cache.datas.length) {
if(this.cache.reverse) {
readData();
} else {
if(this.cache.outPointer < this.cache.inPointer)
readData();
else
this.cache.wait();
}
} else {
this.cache.reverse = false;
this.cache.outPointer = 0;
if(this.cache.inPointer > 0)
readData();
else
this.cache.wait();
}
}
}
/** 读出数据 */
public void readData() {
int pos = this.cache.outPointer++;
int data = this.cache.datas[pos];
System.out.println("[Reader#" + id + "] read in [ " + pos + ", " + data + " ]");
synchronized (this.cache) {
this.cache.notifyAll();
}
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
System.out.println("[Reader#" + id + "] is interrupted...");
}
}
}
运行结果
[Writer #0 ] write in [ 0, 0 ]
[Reader #4 ] read in [ 0, 0 ]
[Writer #4 ] write in [ 1, 1 ]
[Writer #4 ] write in [ 2, 2 ]
[Writer #3 ] write in [ 3, 3 ]
[Writer #2 ] write in [ 4, 4 ]
[Writer #2 ] write in [ 5, 5 ]
[Writer #2 ] write in [ 6, 6 ]
[Writer #2 ] write in [ 7, 7 ]
[Writer #2 ] write in [ 8, 8 ]
[Writer #2 ] write in [ 9, 9 ]
[Writer #1 ] write in [ 0, 10 ]
[Reader #0 ] read in [ 1, 1 ]
[Reader #0 ] read in [ 2, 2 ]
[Reader #0 ] read in [ 3, 3 ]
[Reader #1 ] read in [ 4, 4 ]
[Reader #1 ] read in [ 5, 5 ]
[Reader #1 ] read in [ 6, 6 ]
[Reader #2 ] read in [ 7, 7 ]
[Reader #2 ] read in [ 8, 8 ]
[Reader #2 ] read in [ 9, 9 ]
[Writer #0 ] write in [ 1, 11 ]
[Reader #2 ] read in [ 0, 10 ]
[Reader #4 ] read in [ 1, 11 ]
[Writer #4 ] write in [ 2, 12 ]
[Writer #4 ] write in [ 3, 13 ]
[Writer #4 ] write in [ 4, 14 ]
[Writer #4 ] write in [ 5, 15 ]
[Writer #4 ] write in [ 6, 16 ]
[Writer #4 ] write in [ 7, 17 ]
... CTRL+C ...