“生产者消费者程序”是并发编程的基础,也同时是一道热点面试题。除了使用传统的Lock、Synchronized等加锁方式实现以外,还可以使用一些三方工具库实现。今天介绍的是如何使用google guava库中的Monitor实现“生产者消费者”问题。google guava是一个非常实用的并发类库,非常建议每一位开发者使用。
以下,就是使用google guava中的Monitor实现生产者消费者的具体代码。
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class ProducerAndConsumer {
//生产者、消费者共同维护的缓冲区
private LinkedList<Integer> buffer = new LinkedList<>();
//设置缓冲区的大小为10
private static final int MAX_BUFFER_SIZE = 10;
//生产或消费数据的编号
private static AtomicInteger count = new AtomicInteger(0);
private Monitor monitor = new Monitor();
//生产者向缓冲区中生产数据
public void produce(int value) {
try {
//enterWhen相当于synchronized的加锁操作;当参数enterWhen()的参数为true时,monitor就会给共享资源加锁并阻塞其他线程
monitor.enterWhen(monitor.newGuard(() -> buffer.size() < MAX_BUFFER_SIZE));
buffer.addLast(value);
} catch (InterruptedException e) {
System.out.println("生产:" + value+",缓冲区剩余大小:"+buffer.size());
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
//monitor.leave()相当于结束synchronized的加锁操作(即释放锁)
monitor.leave();
}
}
//消费者从缓冲区中消费数据
public int consume() {
try {
monitor.enterWhen(monitor.newGuard(() -> !buffer.isEmpty()));
int value = buffer.removeFirst();
System.out.println("消费:" + value+",缓冲区剩余大小:"+buffer.size());
return value;
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
monitor.leave();
}
}
//测试程序
public static void main(String[] args) {
ProducerAndConsumer demo = new ProducerAndConsumer();
//创建有6个线程的线程池
ExecutorService pool = Executors.newFixedThreadPool(6);//定义线程数
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(pool);
//向线程池中提交3个生产数据的线程任务
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
while (true) {
int result = count.getAndIncrement();
demo.produce(result);
}
});
}
//向线程池中提交3个消费数据的线程任务
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
while (true) {
Integer result = demo.consume();
}
});
}
try {
Thread.sleep(2000);//模拟其他业务
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.shutdown();
}
}
运行截图如图所示。
需要说明的是,本例代码中的pool.shutdown();实际是没有任何意义的。看一下shutdown()的javadoc说明,如下。
*/
public interface ExecutorService extends Executor {
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*/
void shutdown();
...
}
javadoc说的很明白:“这个方法会允许之前已提交的线程任务继续执行,但会拒绝新的线程任务”。也就是说,pool.shutdown();并不会立刻关闭线程池。然而本例中的线程任务是有while(true)修饰,因此永远不会结束,也就永远不会被shutdown()关闭。
获取本例完整代码及guava的jar包:在本公众号后台回复“guava”。
- 完 -
推荐阅读