import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class MyQueue { // 创建一个锁,用于数据同步 private Object lock = new Object(); // 创建一个List,用于管理数据 private static LinkedList<String> list = new LinkedList<>(); // 创建一个原子对象,负责自定义队列计数 private static AtomicInteger index = new AtomicInteger(0); // 队列最小存储个数 private final int minSize = 0; // 队列最大存储个数 private final int maxSize; // 构造函数 public MyQueue(int size) { this.maxSize = size; } // 向队列中添加数据 public void add(String context) throws InterruptedException { synchronized (lock) { if (MyQueue.index.get() == this.maxSize) { System.out.println("队列已满,请等待..."); lock.wait(); } System.out.println("准备插入数据..."); // 向队列中加入元素 MyQueue.list.add(context); // 队列计数器+1 MyQueue.index.incrementAndGet(); // 通知另一个线程,可以开始提取数据(唤醒) lock.notify(); System.out.println("队列插入数据:[" + context + "]成功!"); } } // 从队列中移除第一次插入的数据 public void take() throws InterruptedException { synchronized (lock) { if (this.minSize == MyQueue.index.get()) { System.out.println("队列中没有任何数据..."); lock.wait(); } // 移除第一个进入队列的数据 MyQueue.list.removeFirst(); // 队列计数器-1 MyQueue.index.decrementAndGet(); // 通知另一个线程,可以开始插入数据(唤醒) lock.notify(); System.out.println("从队列中移除一项数据"); } } public static void main(String[] args) throws InterruptedException { MyQueue myQueue = new MyQueue(2); myQueue.add("a"); myQueue.add("b"); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { myQueue.add("d"); myQueue.take(); myQueue.add("c"); myQueue.add("d"); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { myQueue.take(); TimeUnit.SECONDS.sleep(1); myQueue.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); t1.start(); t2.start(); } }
执行结果: 准备插入数据... 队列插入数据:[a]成功! 准备插入数据... 队列插入数据:[b]成功! 队列已满,请等待... 从队列中移除一项数据 准备插入数据... 队列插入数据:[d]成功! 从队列中移除一项数据 准备插入数据... 队列插入数据:[c]成功! 队列已满,请等待... 从队列中移除一项数据 准备插入数据... 队列插入数据:[d]成功!