Worker的意思是工作的人,在Worker Thread模式中,工人线程Worker thread会逐个取回工作并进行处理,当所有工作全部完成后,工人线程会等待新的工作到来。
Worker Thread模式也被成为Background Thread(背景线程)模式,另外,如果从保存多个工人线程的场所这一点看,我们也可以称这种模式为Thread Pool模式。
代码示例:
package com.zl.step26;
/**
* 抽象类InstructionBook,代表着组装产品的说明书,
* 其中经过流水线传送带的产品将通过create()方法进行加工
* firstProcess、secondProcess代表加工每个产品的步骤。
*/
public abstract class InstructionBook {
public final void create(){
this.firstProcess();
this.secondProcess();
}
protected abstract void firstProcess();
protected abstract void secondProcess();
}
package com.zl.step26;
public class Production extends InstructionBook {
// 产品ID
private final int prodId ;
public Production(int prodId){
this.prodId = prodId ;
}
@Override
protected void firstProcess() {
System.out.println("execute the " + prodId + " first process ... ");
}
@Override
protected void secondProcess() {
System.out.println("execute the " + prodId + " second process ... ");
}
}
package com.zl.step26;
public class ProductionChannel {
// 传送带上有多少个代加工的产品
private final static int MAX_PROD = 100 ;
// 用来存放代加工产品的队列
private final Production[] productionsQueue ;
// 队列尾
private int tail ;
// 队列头
private int head ;
// 一共有多少个待加工的产品
private int total ;
// 流水线上有多少个工人
private final Worker[] workers ;
// 创建ProductionChannel,指定队列,和工人
public ProductionChannel(int workerSize){
this.workers = new Worker[workerSize] ;
this.productionsQueue = new Production[MAX_PROD] ;
for (int i = 0 ; i< workerSize ; i++ ) {
workers[i] = new Worker("Worker-"+i,this);
workers[i].start();
}
}
// 接收来自上游的代加工的产品
public void offerProduction(Production production){
synchronized (this){
// 代加工的产品超过最大值时,阻塞线程
while (total >= productionsQueue.length) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
productionsQueue[tail] = production ;
tail = (tail+1)%productionsQueue.length ;
total++;
// 激活线程
this.notifyAll();
}
}
// 工人线程 worker 从队列上获取产品, 进行加工
public Production takeProduction(){
synchronized (this) {
// 当队列中没有产品时,进行等待
while (total <= 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取产品
Production prod = productionsQueue[head] ;
head = (head+1) % productionsQueue.length;
total -- ;
this.notifyAll();
return prod;
}
}
}
package com.zl.step26;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class Worker extends Thread {
private final ProductionChannel channel ;
private final static Random random = new Random(System.currentTimeMillis()) ;
public Worker(String workName , ProductionChannel channel) {
super(workName);
this.channel = channel ;
}
@Override
public void run(){
while (true){
// 从队列中获取产品,并消费
Production production = channel.takeProduction();
System.out.println(getName()+" process the "+production);
production.create();
try {
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
扫描二维码关注公众号,回复:
4931654 查看本文章
package com.zl.step26;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;
/**
* 5个工人生产, 8个工人消费
*/
public class Test {
public static void main(String[] args) {
final ProductionChannel channel = new ProductionChannel(5);
AtomicInteger productionNo = new AtomicInteger() ;
IntStream.range(1,8).forEach(i->{
new Thread(()->{
while (true) {
channel.offerProduction(new Production(productionNo.getAndIncrement()));
try {
TimeUnit.SECONDS.sleep(current().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
});
}
}