原创转载请注明出处:http://agilestyle.iteye.com/blog/2343193
生产者与消费者模式
使用wait/notify实现生产者与消费者模式
package org.fool.test; import java.util.Queue; import java.util.Random; import java.util.concurrent.ConcurrentLinkedDeque; public class WaitNotifyTest { public static void main(String[] args) { Queue<Integer> queue = new ConcurrentLinkedDeque<>(); Thread p = new Thread(new Producer(queue)); Thread c1 = new Thread(new Consumer(queue)); Thread c2 = new Thread(new Consumer(queue)); p.setName("Producer"); c1.setName("Consumer-1"); c2.setName("Consumer-2"); p.start(); c1.start(); c2.start(); } public static class Producer implements Runnable { private Queue<Integer> queue; public Producer(Queue<Integer> queue) { this.queue = queue; } @Override public void run() { while (true) { synchronized (queue) { try { while (queue.size() == 10) { queue.wait(); } Thread.sleep(1000); int n = new Random().nextInt(10000); queue.add(n); System.out.println(Thread.currentThread().getName() + " produce: " + n); queue.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public static class Consumer implements Runnable { private Queue<Integer> queue; public Consumer(Queue<Integer> queue) { this.queue = queue; } @Override public void run() { while (true) { try { synchronized (queue) { while (queue.isEmpty()) { queue.wait(); } Thread.sleep(1000); int n = queue.remove(); System.out.println(Thread.currentThread().getName() + " consume: " + n); queue.notifyAll(); } } catch (InterruptedException e) { e.printStackTrace(); } } } } }
Console Output
使用BlockingQueue实现生产者与消费者模式
package org.fool.java.concurrent.producerconsumer; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTest { public static void main(String[] args) { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(100); Producer producer = new Producer(queue); Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); Thread p = new Thread(producer); p.setName("Producer"); Thread c1 = new Thread(consumer1); c1.setName("Consumer1"); Thread c2 = new Thread(consumer2); c2.setName("Consumer2"); p.start(); c1.start(); c2.start(); } public static class Producer implements Runnable { private final BlockingQueue<Integer> queue; public Producer(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { try { while (true) { Thread.sleep(1000); queue.put(produce()); } } catch (InterruptedException e) { e.printStackTrace(); } } private int produce() { int n = new Random().nextInt(10000); System.out.println(Thread.currentThread().getName() + " produce: " + n); return n; } } public static class Consumer implements Runnable { private final BlockingQueue<Integer> queue; public Consumer(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { try { while (true) { Thread.sleep(2000); consume(queue.take()); } } catch (InterruptedException e) { e.printStackTrace(); } } private void consume(Integer n) { System.out.println(Thread.currentThread().getName() + " consume: " + n); } } }
Console Output
使用Semaphore实现生产者与消费者模式,并且限制生产者与消费者的数量
Service.java
package org.fool.java.concurrent.semaphore.producerconsumer; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Service { private volatile Semaphore setSemaphore = new Semaphore(10); private volatile Semaphore getSemaphore = new Semaphore(30); private volatile ReentrantLock lock = new ReentrantLock(); private volatile Condition setCondition = lock.newCondition(); private volatile Condition getCondition = lock.newCondition(); private Object[] producePosition = new Object[4]; private boolean isEmpty() { boolean isEmpty = true; for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] != null) { isEmpty = false; break; } } return isEmpty; } private boolean isFull() { boolean isFull = true; for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] == null) { isFull = false; break; } } return isFull; } public void set() { try { setSemaphore.acquire(); lock.lock(); while (isFull()) { setCondition.await(); } for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] == null) { producePosition[i] = " Data "; System.out.println(Thread.currentThread().getName() + " produced " + producePosition[i]); break; } } getCondition.signalAll(); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { setSemaphore.release(); } } public void get() { try { getSemaphore.acquire(); lock.lock(); while (isEmpty()) { getCondition.await(); } for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] != null) { System.out.println(Thread.currentThread().getName() + " consumed " + producePosition[i]); } producePosition[i] = null; break; } setCondition.signalAll(); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { getSemaphore.release(); } } }
Note:
这里设置了10个Producer,30个Consumer
ProducerThread.java
package org.fool.java.concurrent.semaphore.producerconsumer; public class ProducerThread implements Runnable { private Service service; public ProducerThread(Service service) { this.service = service; } @Override public void run() { service.set(); } }
ConsumerThread.java
package org.fool.java.concurrent.semaphore.producerconsumer; public class ConsumerThread implements Runnable { private Service service; public ConsumerThread(Service service) { this.service = service; } @Override public void run() { service.get(); } }
Test.java
package org.fool.java.concurrent.semaphore.producerconsumer; import java.util.Random; public class Test { public static void main(String[] args) { Service service = new Service(); for (int i = 0; i < 50; i++) { new Thread(new ProducerThread(service)).start(); new Thread(new ProducerThread(service)).start(); new Thread(new ProducerThread(service)).start(); new Thread(new ConsumerThread(service)).start(); new Thread(new ConsumerThread(service)).start(); new Thread(new ConsumerThread(service)).start(); } } }
Run
"C:\Program Files\Java\jdk1.8.0_77\bin\java" -Didea.launcher.port=7535 "-Didea.launcher.bin.path=F:\Program Files\JetBrains\IntelliJ IDEA\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_77\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_77\jre\lib\rt.jar;E:\workspace-idea\HelloJava\out\production\HelloJava;F:\Program Files\JetBrains\IntelliJ IDEA\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain org.fool.java.concurrent.semaphore.producerconsumer.Test Thread-0 produced Data Thread-1 produced Data Thread-2 produced Data Thread-3 consumed Data Thread-6 produced Data Thread-7 produced Data Thread-9 consumed Data Thread-8 produced Data Thread-10 consumed Data Thread-12 produced Data Thread-15 consumed Data Thread-13 produced Data Thread-16 consumed Data Thread-14 produced Data Thread-17 consumed Data Thread-18 produced Data Thread-21 consumed Data Thread-19 produced Data Thread-22 consumed Data Thread-20 produced Data Thread-23 consumed Data Thread-24 produced Data Thread-27 consumed Data Thread-25 produced Data Thread-28 consumed Data Thread-26 produced Data Thread-29 consumed Data Thread-30 produced Data Thread-33 consumed Data Thread-31 produced Data Thread-34 consumed Data Thread-32 produced Data Thread-35 consumed Data Thread-36 produced Data Thread-39 consumed Data Thread-37 produced Data Thread-40 consumed Data Thread-38 produced Data Thread-41 consumed Data Thread-42 produced Data Thread-45 consumed Data Thread-43 produced Data Thread-46 consumed Data Thread-44 produced Data Thread-47 consumed Data Thread-48 produced Data Thread-51 consumed Data Thread-49 produced Data Thread-52 consumed Data Thread-55 produced Data Thread-53 consumed Data Thread-50 produced Data Thread-57 consumed Data Thread-54 produced Data Thread-58 consumed Data Thread-56 produced Data Thread-59 consumed Data Thread-60 produced Data Thread-63 consumed Data Thread-61 produced Data Thread-64 consumed Data Thread-62 produced Data Thread-65 consumed Data Thread-66 produced Data Thread-69 consumed Data Thread-67 produced Data Thread-70 consumed Data Thread-68 produced Data Thread-71 consumed Data Thread-72 produced Data Thread-75 consumed Data Thread-73 produced Data Thread-76 consumed Data Thread-74 produced Data Thread-77 consumed Data Thread-78 produced Data Thread-81 consumed Data Thread-79 produced Data Thread-82 consumed Data Thread-80 produced Data Thread-83 consumed Data Thread-84 produced Data Thread-87 consumed Data Thread-85 produced Data Thread-88 consumed Data Thread-86 produced Data Thread-89 consumed Data Thread-90 produced Data Thread-93 consumed Data Thread-91 produced Data Thread-95 consumed Data Thread-92 produced Data Thread-99 consumed Data Thread-96 produced Data Thread-100 consumed Data Thread-97 produced Data Thread-101 consumed Data Thread-98 produced Data Thread-105 consumed Data Thread-102 produced Data Thread-106 consumed Data Thread-104 produced Data Thread-107 consumed Data Thread-103 produced Data Thread-111 consumed Data Thread-108 produced Data Thread-112 consumed Data Thread-109 produced Data Thread-113 consumed Data Thread-110 produced Data Thread-117 consumed Data Thread-114 produced Data Thread-118 consumed Data Thread-115 produced Data Thread-119 consumed Data Thread-116 produced Data Thread-123 consumed Data Thread-120 produced Data Thread-124 consumed Data Thread-121 produced Data Thread-125 consumed Data Thread-122 produced Data Thread-129 consumed Data Thread-126 produced Data Thread-131 consumed Data Thread-127 produced Data Thread-135 consumed Data Thread-132 produced Data Thread-137 consumed Data Thread-133 produced Data Thread-136 consumed Data Thread-128 produced Data Thread-141 consumed Data Thread-138 produced Data Thread-143 consumed Data Thread-134 produced Data Thread-147 consumed Data Thread-140 produced Data Thread-149 consumed Data Thread-146 produced Data Thread-148 consumed Data Thread-139 produced Data Thread-153 consumed Data Thread-144 produced Data Thread-154 consumed Data Thread-150 produced Data Thread-155 consumed Data Thread-151 produced Data Thread-159 consumed Data Thread-156 produced Data Thread-160 consumed Data Thread-157 produced Data Thread-161 consumed Data Thread-158 produced Data Thread-165 consumed Data Thread-152 produced Data Thread-166 consumed Data Thread-162 produced Data Thread-167 consumed Data Thread-163 produced Data Thread-171 consumed Data Thread-168 produced Data Thread-172 consumed Data Thread-169 produced Data Thread-173 consumed Data Thread-170 produced Data Thread-177 consumed Data Thread-164 produced Data Thread-178 consumed Data Thread-174 produced Data Thread-179 consumed Data Thread-175 produced Data Thread-183 consumed Data Thread-180 produced Data Thread-184 consumed Data Thread-181 produced Data Thread-185 consumed Data Thread-182 produced Data Thread-189 consumed Data Thread-176 produced Data Thread-190 consumed Data Thread-186 produced Data Thread-191 consumed Data Thread-187 produced Data Thread-195 consumed Data Thread-192 produced Data Thread-196 consumed Data Thread-193 produced Data Thread-197 consumed Data Thread-194 produced Data Thread-201 consumed Data Thread-188 produced Data Thread-203 consumed Data Thread-200 produced Data Thread-202 consumed Data Thread-145 produced Data Thread-207 consumed Data Thread-198 produced Data Thread-208 consumed Data Thread-206 produced Data Thread-209 consumed Data Thread-204 produced Data Thread-213 consumed Data Thread-205 produced Data Thread-215 consumed Data Thread-212 produced Data Thread-214 consumed Data Thread-199 produced Data Thread-219 consumed Data Thread-211 produced Data Thread-220 consumed Data Thread-218 produced Data Thread-221 consumed Data Thread-210 produced Data Thread-225 consumed Data Thread-222 produced Data Thread-226 consumed Data Thread-223 produced Data Thread-227 consumed Data Thread-224 produced Data Thread-231 consumed Data Thread-217 produced Data Thread-232 consumed Data Thread-228 produced Data Thread-233 consumed Data Thread-229 produced Data Thread-237 consumed Data Thread-234 produced Data Thread-238 consumed Data Thread-235 produced Data Thread-239 consumed Data Thread-236 produced Data Thread-243 consumed Data Thread-230 produced Data Thread-244 consumed Data Thread-240 produced Data Thread-245 consumed Data Thread-241 produced Data Thread-249 consumed Data Thread-246 produced Data Thread-250 consumed Data Thread-247 produced Data Thread-251 consumed Data Thread-248 produced Data Thread-255 consumed Data Thread-242 produced Data Thread-257 consumed Data Thread-258 produced Data Thread-261 consumed Data Thread-253 produced Data Thread-263 consumed Data Thread-254 produced Data Thread-267 consumed Data Thread-216 produced Data Thread-268 consumed Data Thread-264 produced Data Thread-269 consumed Data Thread-260 produced Data Thread-273 consumed Data Thread-270 produced Data Thread-275 consumed Data Thread-272 produced Data Thread-274 consumed Data Thread-265 produced Data Thread-279 consumed Data Thread-277 produced Data Thread-281 consumed Data Thread-266 produced Data Thread-285 consumed Data Thread-252 produced Data Thread-286 consumed Data Thread-282 produced Data Thread-287 consumed Data Thread-288 produced Data Thread-292 consumed Data Thread-289 produced Data Thread-293 consumed Data Thread-290 produced Data Thread-297 consumed Data Thread-276 produced Data Thread-298 consumed Data Thread-294 produced Data Thread-299 consumed Data Thread-295 produced Data
Reference
Java并发编程核心方法与框架