在生产-消费模式中:通常由两类线程,即若干个生产者的线程和若干个消费者的线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信。
主要构成:
provider:生产者线程负责生产数据
consumer:消费者线程负责消费生产的数据
示例:
provider类:生产者
/**
* 生产者
* @author Administrator
*
*/
public class Provider implements Runnable{
//共享缓存区
private BlockingQueue<Data> queue;
//标识线程是否运行的变量,volatile外部主线程可见性
private volatile boolean isRunning=true;
//原子类 id生成器
private static AtomicInteger count=new AtomicInteger(0);
//随机对象
private Random r=new Random();
public Provider(BlockingQueue<Data> queue) {
this.queue=queue;
}@Override
public void run() {
while(isRunning) {
try {
//随机休眠0-1000毫秒 表示获取数据(产生数据的耗时)
Thread.sleep(r.nextInt(1000));
//获取id数据累计
int id=count.incrementAndGet();
Data data=new Data();
data.setId(id);
data.setName("数据"+id);
System.out.println(Thread.currentThread().getName()+"生产数据:"+data+",进行装载到缓冲区");
if(!this.queue.offer(data,2,TimeUnit.SECONDS)) {
System.out.println("提交缓冲区数据失败。。。。");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop() {
this.isRunning=false;
}
}
consumer类: 消费者
/**
* 消费者
* @author Administrator
*
*/
public class Consumer implements Runnable{private BlockingQueue<Data> queue;
private static Random r=new Random();
public Consumer(BlockingQueue<Data> queue) {
this.queue=queue;
}
@Override
public void run() {
while (true) {
try {
//获取数据
Data data=this.queue.take();
//随机休眠0-1000毫秒 表示数据处理(消费耗时)
Thread.sleep(r.nextInt(1000));
System.out.println(Thread.currentThread().getName()+"消费:"+data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}}
Data类:数据封装类
public class Data {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Data [id=" + id + ", name=" + name + "]";
}
}
Main:测试类
public class Main {
public static void main(String[] args) {
//缓冲区容器
BlockingQueue<Data> queue=new LinkedBlockingQueue<>(10);
//生产者
Provider p1=new Provider(queue);
Provider p2=new Provider(queue);
Provider p3=new Provider(queue);
//消费者
Consumer c1=new Consumer(queue);
Consumer c2=new Consumer(queue);
Consumer c3=new Consumer(queue);
//创建线程池运行,这是一个缓存线程池,可以创建无穷大的线程,没有任务的时候不创建线程,空闲线程存活时间为60s(默认)
ExecutorService es=Executors.newCachedThreadPool();
es.execute(p1);
es.execute(p2);
es.execute(p3);
es.execute(c1);
es.execute(c2);
es.execute(c3);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}