线程间除了同步互斥使用共享代码块或者共享资源外,有时需要线程间的通信来控制共享信息。生产者/消费者模型就是很好的例子;提供一个资源池,生产者向里面生产资源、消费者负责消费;资源池满,生产者线程等待,消费者消费资源后调用特定方法唤醒生产者线程;反之,资源池为空,消费者线程等待,生产者向资源池生产资源后,调用特定方法唤醒消费者线程。
这里实现使用java中 显示锁 Lock锁定共享变量,生产者write阶段,首先获取资源对象的对象锁,此时消费者无法read(使用资源)资源;同样,消费者消费资源时,获取资源对象锁,此时生产者无法write资源。注意:必须是同一个资源对象!!!
定义了Lock对象后,就可以调用其方法newCondition,获得条件对象,该用于线程间的通信,Condition对象提供了三个方法:
1)await():使当前线程等待,直到背唤醒
2)sigal():唤醒等待线程
3)sigalAll():唤醒所有等待线程
package shixian;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConsumerProducer {
private static Buffer buffer = new Buffer();
public static void main(String[] args) {
//Create a thread pool with two threads
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new ProducerTask());
executor.execute(new ConsumerTask());
}
//A task for adding an int to the buffer
private static class ProducerTask implements Runnable{
@Override
public void run() {
try {
int i = 1;
while(true) {
buffer.write(i++);
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static class ConsumerTask implements Runnable{
@Override
public void run() {
int i = 1;
while(true) {
try {
System.out.println("\t\tConsumer reads " + buffer.read());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//An inner class for buffer
private static class Buffer{
private static final int CAPACITY = 1; //buffer size
private LinkedList<Integer> queue = new LinkedList<>();
//Create a new lock
private static Lock lock = new ReentrantLock();
//Create two conditions
private static Condition notEmpty = lock.newCondition(); //非空
private static Condition notFull = lock.newCondition(); //非满
public void write(int value) {
lock.lock(); //Acquire the lock
try {
while(queue.size() == CAPACITY) {
System.out.println("Wait for notFull condition");
notFull.await();
}
queue.offer(value);
System.out.println("Producer writes " + value);
notEmpty.signal(); //Signal notEmpty condition
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock(); //Release the lock
}
}
public int read() {
int value = 0;
lock.lock();
try {
while(queue.isEmpty()) {
System.out.println("\t\tWait for notEmpty condition");
notEmpty.await();
}
value = queue.remove();
notFull.signal(); //Signal notFull condition
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
return value;
}
}
}
}
/**
Producer writes 1
Consumer reads 1
Wait for notEmpty condition
Producer writes 2
Consumer reads 2
Producer writes 3
Consumer reads 3
Producer writes 4
Consumer reads 4
Producer writes 5
Consumer reads 5
Producer writes 6
Consumer reads 6
* */