看到这里需先注意,消费量应大于生产量避免OOM
package com.desigin.producerConsumer;
import java.util.concurrent.BlockingQueue;public class Consumer implements Runnable {
private BlockingQueue<Task> tasks;
public Consumer(BlockingQueue<Task> tasks) {
super();
this.tasks = tasks;
}
public void run() {
while (true) {
Task task = null;
synchronized (this) {
if (null != tasks.peek()) {
task = tasks.poll();
System.out.println("ok i am working..." + tasks.size());
}
}
if (null != task)
task.introduceSelf();
}
}
}
package com.desigin.producerConsumer;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue<Task> tasks;
public Producer(BlockingQueue<Task> tasks) {
super();
this.tasks = tasks;
}
public void run() {
while (true) {
long nanoTime = System.nanoTime();
if (nanoTime % 1000000000 == 0) {
System.out.println("so happy i can put now...");
Task task = new Task(nanoTime, String.valueOf(nanoTime));
try {
put(task);
} catch (InterruptedException e) {
System.out.println("队列已满,请稍后再试");
}
}
}
}
public void put(Task task) throws InterruptedException {
tasks.put(task);
}
}
package com.desigin.producerConsumer;
import lombok.Data;
@Data
public class Task {
private Long id;
private String name;
public Task() {
}
public void introduceSelf() {
System.out.println(toString());
}
@Override
public String toString() {
return "Task [id=" + id + ", name=" + name + "]";
}
public Task(Long id, String name) {
super();
this.id = id;
this.name = name;
}
}
package com.desigin.producerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class runApp {
public static void main(String[] args) {
BlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
Thread customer = new Thread(new Consumer(tasks));
Thread producer = new Thread(new Producer(tasks));
producer.start();
customer.start();
}
}