工作中,经常有将文件中的数据导入数据库的表中,或者将数据库表中的记录保存到文件中。为了提高程序的处理速度,可以设置读线程和写线程,这些线程通过消息队列进行数据交互。本例就是使用了LinkedBlockingQueue来模仿生产者线程和消费者线程进行数据生产和消费。
为了方便,这些不同的类被写在了一个类中,实际使用的时候,可以单独拆开,举一反三地使用。
以下是例子:
LinkedBlockingQueueDemo.java
1 import java.util.Date; 2 import java.util.Random; 3 import java.util.concurrent.LinkedBlockingQueue; 4 import java.util.concurrent.TimeUnit; 5 6 public class LinkedBlockingQueueDemo { 7 // 生产者线程数量 8 private final static int providerThreadAmount = 5; 9 10 // 记录每一个生产者线程是否处理完毕的标记 11 private static boolean[] providerDoneFlag = new boolean[providerThreadAmount]; 12 13 // 整个所有的生产者线程全部结束的标记 14 private static boolean done = false; 15 16 // 一个线程安全的队列,用于生产者和消费者异步地信息交互 17 private static LinkedBlockingQueue<String> linkedBlockingQeque = new LinkedBlockingQueue<String>(); 18 19 static class ProviderThread extends Thread { 20 private Thread thread; 21 private String threadName; 22 private int threadNo; 23 24 public ProviderThread(String threadName2, int threadNo) { 25 this.threadName = threadName2; 26 this.threadNo = threadNo; 27 } 28 29 public void start() { 30 if (thread == null) { 31 thread = new Thread(this, threadName); 32 } 33 34 thread.start(); 35 System.out.println( 36 (new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName()); 37 } 38 39 @Override 40 public void run() { 41 int rows = 0; 42 for (int i = 0; i < 100; i++) { 43 String string = String.format("%s-%d-%s", threadName, i, Thread.currentThread().getName()); 44 linkedBlockingQeque.offer(string); 45 rows++; 46 /* 47 * try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch 48 * (InterruptedException e) { e.printStackTrace(); } 49 */ 50 } 51 52 // 本线程处理完毕的标记 53 LinkedBlockingQueueDemo.providerDoneFlag[threadNo] = true; 54 System.out.println((new Date().getTime()) + " " + threadName + " end. total rows is " + rows + "\t" 55 + Thread.currentThread().getName()); 56 } 57 } 58 59 static class ConsumerThread implements Runnable { 60 private Thread thread; 61 private String threadName; 62 63 public ConsumerThread(String threadName2) { 64 this.threadName = threadName2; 65 } 66 67 public void start() { 68 if (thread == null) { 69 thread = new Thread(this, threadName); 70 } 71 72 thread.start(); 73 System.out.println( 74 (new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName()); 75 } 76 77 @Override 78 public void run() { 79 int rows = 0; 80 // 生产者线程没有结束,或者消息队列中有元素的时候,去队列中取数据 81 while (LinkedBlockingQueueDemo.getDone() == false || linkedBlockingQeque.isEmpty() == false) { 82 try { 83 String string = linkedBlockingQeque.poll(3, TimeUnit.SECONDS); 84 if (string == null) { 85 continue; 86 } 87 88 rows++; 89 90 System.out 91 .println((new Date().getTime()) + " " + threadName + " get msg from linkedBlockingQeque is " 92 + string + "\t" + Thread.currentThread().getName()); 93 /* 94 * try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch 95 * (InterruptedException e) { e.printStackTrace(); } 96 */ 97 98 } catch (InterruptedException e) { 99 e.printStackTrace(); 100 } 101 102 } 103 System.out.println((new Date().getTime()) + " " + threadName + " end total rows is " + rows + "\t" 104 + Thread.currentThread().getName()); 105 } 106 } 107 108 public static synchronized void setDone(boolean flag) { 109 LinkedBlockingQueueDemo.done = flag; 110 } 111 112 public static synchronized boolean getDone() { 113 return LinkedBlockingQueueDemo.done; 114 } 115 116 public static void main(String[] args) { 117 System.out.println((new Date().getTime()) + " " + "process begin at " + Thread.currentThread().getName()); 118 System.out.println( 119 (new Date().getTime()) + " " + "linkedBlockingDeque.hashCode() is " + linkedBlockingQeque.hashCode()); 120 121 // 启动若干生产者线程 122 for (int i = 0; i < providerThreadAmount; i++) { 123 String threadName = String.format("%s-%d", "ProviderThread", i); 124 ProviderThread providerThread = new ProviderThread(threadName, i); 125 providerThread.start(); 126 } 127 128 // 启动若干个消费者线程 129 for (int i = 0; i < 10; i++) { 130 String threadName = String.format("%s-%d", "ConsumerThread", i); 131 ConsumerThread consumerThread = new ConsumerThread(threadName); 132 consumerThread.start(); 133 } 134 135 // 循环检测生产者线程是否处理完毕 136 do { 137 for (boolean b : providerDoneFlag) { 138 if (b == false) { 139 /* 140 * try { Thread.sleep(3 * 1000); System.out.println((new Date().getTime()) + 141 * " "+"sleep 3 seconds. linkedBlockingQeque.size() is "+linkedBlockingQeque. 142 * size() + "\t" + Thread.currentThread().getName()); } catch 143 * (InterruptedException e) { e.printStackTrace(); } 144 */ 145 146 // 只要有一个生产者线程没有结束,则整个生产者线程检测认为没有结束 147 break; 148 } 149 150 LinkedBlockingQueueDemo.setDone(true); 151 } 152 153 // 生产者线程全部结束的时候,跳出检测 154 if (LinkedBlockingQueueDemo.getDone() == true) { 155 break; 156 } 157 } while (true); 158 159 System.out.println((new Date().getTime()) + " process done successfully\t" + Thread.currentThread().getName()); 160 } 161 }
结果略。