最近在公司做大批量的数据交换用到了阻塞队列(mysql->mongodb,约600w左右的数据,期间包含了其他业务逻辑,不纯是数据交换),效率蛮不错。现在写个queue使用例子,供其他人参考。如有不对之处,欢迎指导...小弟第一次发技术贴
import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * * @Title: MyQueue.java * @Copyright: Copyright (c) 2005 * @Description: <br> <br> * @Company:* * @Created on Jun 15, 2012 10:19:01 AM * @author <a href="mailTo:*">tzb</a> */ public class MyQueue extends ArrayBlockingQueue<Object>{ private static final long serialVersionUID = 1L; /** * 构造一个队列 * @param size */ public MyQueue(int size){ super(size); } /** * 定义一个Puter */ class Puter implements Callable<Object>{ private MyQueue queue; public Puter(MyQueue queue){ this.queue=queue; } public Object call() { try { for (int i = 0; i < 5; i++) { queue.put(i); System.out.println(Thread.currentThread().getName()+" put data :"+i); Thread.sleep(1 * 2000); } //结束时将标志放入队列,防止队列中没有数据后消费者一直阻塞等待,从而告知消费者可以退出 queue.put(endDataFlag); } catch (InterruptedException e) { e.printStackTrace(); } return "put data over..."; } } /** * 定义一个结束标示 */ private static Object endDataFlag=new Object(); /** * * 定义一个Taker */ class Taker implements Callable<Object>{ private MyQueue queue; public Taker(MyQueue queue){ this.queue=queue; } public Object call() { boolean flag=true; while (flag) { try { Object data=queue.take(); //每次获取数据后判断获取的数据是否是最后一个结束标示,如果是并且队列已经为空,则退出消费 if (data == endDataFlag && queue.isEmpty()) { //此处重新将结束标示放入队列,防止其他消费者上次任务消费完毕后,下次去队列中取不到数据而导致阻塞 queue.put(data); flag=false; }else{ System.out.println(Thread.currentThread().getName()+" take data :"+data); } } catch (InterruptedException e) { e.printStackTrace(); } } return "take data over..."; } } public static void main(String[] args) throws Exception { MyQueue que=new MyQueue(5); ExecutorService exs=Executors.newCachedThreadPool(); List<Callable<Object>> tasks=new ArrayList<Callable<Object>>(); tasks.add(que.new Puter(que)); tasks.add(que.new Taker(que)); List<Future<Object>> listResult=exs.invokeAll(tasks);//提交执行所有task for(Future<Object> f:listResult){ System.out.println(Thread.currentThread().getName()+":"+f.get());//获取结果 } exs.shutdown(); System.out.println("task over..."); } }