引用
在取 sequence的 dao里 用了一个 mock的 实现 ,然后 把queue的容量设置为3, 但是每次取可以取到4
这样第一次会把第4个元素 不放进去, 只放进去3个 ,
package com.ssc.dbcttool.seqgenerator; import java.util.List; public interface KeyDao { List<Long> getSequences(String seqName, Long capbility); } package com.ssc.dbcttool.seqgenerator; import java.util.ArrayList; import java.util.List; public class MockKeyDao implements KeyDao{ @Override public List<Long> getSequences(String seqName, Long capbility) { List<Long> longList = new ArrayList<Long>(); longList.add(1000L); longList.add(1001L); longList.add(1010L); longList.add(1100L); return longList; } }
package com.ssc.dbcttool.seqgenerator; public class KeyGeneratorClient { public static void main(String[] args) { SeqGeneratorImpl gen = new SeqGeneratorImpl(); gen.setGenerateKeysDao(new MockKeyDao()); System.out.println("key = " + gen.getSeqKey("aaa")); System.out.println("key = " + gen.getSeqKey("aaa")); System.out.println("key = " + gen.getSeqKey("aaa")); System.out.println("key = " + gen.getSeqKey("aaa")); System.out.println("key = " + gen.getSeqKey("aaa")); System.out.println("key = " + gen.getSeqKey("aaa")); System.out.println("key = " + gen.getSeqKey("aaa")); System.out.println("key = " + gen.getSeqKey("aaa")); System.out.println("key = " + gen.getSeqKey("aaa")); } }
package com.ssc.dbcttool.seqgenerator; import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import com.statestr.gcth.core.concurrency.DefaultThreadFactory; public class SeqGeneratorImpl { final int capacity = 3; ConcurrentMap<String, BlockingQueue<Long>> keyQueues = new ConcurrentHashMap<String, BlockingQueue<Long>>(); ScheduledExecutorService scheduler; public SeqGeneratorImpl() { scheduler = new ScheduledThreadPoolExecutor(2, new DefaultThreadFactory().daemon().eh( new UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { System.out.println("Thread " + t.getName() + " for Key Generator throw a UncaughtException."); } })); } public Long getSeqKey(String keyName) { Long key = null; BlockingQueue<Long> keyQueue = keyQueues.get(keyName); if (keyQueue == null) { keyQueue = getQueue(keyName); } try { key = keyQueue.take(); } catch (Exception e) { Thread.currentThread().interrupt(); System.out.println("Failed to get sequence for unknown reason, the operation is interrupted."); } return key; } synchronized BlockingQueue<Long> getQueue(String keyName) { BlockingQueue<Long> keyQueue = keyQueues.get(keyName); if (keyQueue == null) { keyQueue = new LinkedBlockingQueue<Long>(capacity); keyQueues.putIfAbsent(keyName, keyQueue); triggerTheTask(keyName, keyQueue); } return keyQueue; } void triggerTheTask(String name, BlockingQueue<Long> keyQueue) { scheduler.scheduleWithFixedDelay(new FetchKeyTask(name, keyQueue), 0, 3000, TimeUnit.MILLISECONDS); } protected class FetchKeyTask implements Runnable { private String keyName; private BlockingQueue<Long> keyQueue = null; public FetchKeyTask(BlockingQueue<Long> keyQueue) { this(null, keyQueue); } public FetchKeyTask(String keyName, BlockingQueue<Long> keyQueue) { this.keyName = keyName; this.keyQueue = keyQueue; } @Override public void run() { int remainingCapacity = keyQueue.remainingCapacity(); if (capacity * (1 - 0.5) < remainingCapacity) { try { List<Long> keys = getKeys(keyName, remainingCapacity); for (Long key : keys) { keyQueue.put(key); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("key fetch task is interrupted."); } catch (Exception e) { System.out.println("key fetch task happens exception:"); } System.out.println("retrieve keys from database, the keys' amount is " + remainingCapacity); } } } protected List<Long> getKeys(String name, long capacity) { return generateKeysDao.getSequences(name, capacity); } protected KeyDao generateKeysDao; public void setGenerateKeysDao(KeyDao generateKeysDao) { this.generateKeysDao = generateKeysDao; } }