版权声明: https://blog.csdn.net/xiongbingcool/article/details/80989894
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 使用线程安全队列(ConcurrentLinkedQueue)实现生产-消费模型
*
* @author xbing
*/
public class TestConcurrentLinkedQueue {
public static void main(String[] args) {
// 生产者线程A:不断的生产单个数据并装入队列中
new Thread(() -> {
Random random = new Random();
while (true) {
int num = random.nextInt();
System.out.println(Thread.currentThread().getName() + ":" + num);
BA.INSTANCE.track(String.valueOf(num));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 生产者线程B:不断的生产批量数据并装入队列中
new Thread(() -> {
List<String> lists = new ArrayList<>();
Random random = new Random();
while (true) {
// 随机生成一批数据
for (int i = 0; i < random.nextInt(9) + 1; i++) {
double num = random.nextDouble();
System.out.println(Thread.currentThread().getName() + ":" + num);
lists.add(String.valueOf(num));
}
BA.INSTANCE.trackAll(lists);
lists.clear();
try {
Thread.sleep(4 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
/**
* 单例模式:启动线程一直消费
* 支持按条数和时间分批限制
*/
public enum BA {
INSTANCE;
// 线程操作安全队列,装载数据
private Queue<String> queue = new ConcurrentLinkedQueue<>();
// 每批发送数
private final int MAX_NUM = 20;
// 每批数量不达标时多长时间发送
private final long MAX_TIME = 3 * 60 * 1000;
BA() {
new Thread(() -> {
int count = 0;
long time = System.currentTimeMillis();
StringBuffer sb = new StringBuffer();
while (true) {
long curr = System.currentTimeMillis();
if (count >= MAX_NUM || (curr - time) >= MAX_TIME) {
// 初始化最大保留数和最长保留时间
time = curr;
count = 0;
String s = sb.toString();
sb.setLength(0);
System.out.println(Thread.currentThread().getName() + ":" + s);
}
String s = queue.poll();
if (s != null) {
sb.append(" | ").append(s);
count++;
}
}
}).start();
}
/**
* 向队列中添加单个数据
*
* @param str
*/
public void track(String str) {
queue.add(str);
}
/**
* 向队列中批量添加数据
*
* @param strs
*/
public void trackAll(List<String> strs) {
queue.addAll(strs);
}
}
}