package com.yy.fastcustom.actualupdate; import java.util.concurrent.BlockingQueue; /** * Created by zzq on 2019/5/16. */ public abstract class ActualDataModelOperationHandler<Data> { private String readWriteFlag; public final String getReadWriteFlag() { return readWriteFlag; } public final void setReadWriteFlag(String readWriteFlag) { this.readWriteFlag = readWriteFlag; } private long readWaitMillis; public final long getReadWaitMillis() { return readWaitMillis; } public final void setReadWaitMillis(long readWaitMillis) { this.readWaitMillis = readWaitMillis; } /** * 更新缓存信息 * * @return 成功返回true */ public abstract boolean removeCache(); /** * 更新数据信息 * * @return 成功返回true */ public abstract boolean updateOriginalDataAndCache(); /** * 综合读取数据(先从缓存读取,读不到在读数据库) * * @return */ public abstract Data getOriginalData(); public abstract Data getCacheData(); public abstract void setCacheData(Data data); public final void updateData() throws Exception { BlockingQueue<ActualDataModelOperationHandler> actualDataModels = ActualDataServiceManager.getUpdateQueue(this.getReadWriteFlag()); actualDataModels.put(this); } public final Data readData() { String flag = this.getReadWriteFlag(); if (ActualDataServiceManager.contains(flag)) { long startTime = System.currentTimeMillis();//当前毫秒数 for (; true; ) { if (ActualDataServiceManager.noContains(flag) || (System.currentTimeMillis() - startTime > this.getReadWaitMillis())) //如果自旋时间大于设定阈值 break; Data retData = this.getCacheData(); if (retData != null) return retData; } } Data retData = this.getCacheData(); if (retData != null) return retData; retData = this.getOriginalData(); if (retData != null) this.setCacheData(retData); return retData; } }
package com.yy.fastcustom.actualupdate; import java.util.*; import java.util.concurrent.*; /** * Created by zzq on 2019/5/16. */ public class ActualDataServiceManager { private static volatile List<DataUpdateQueueWrapper> queueList = null; private static Map<String, Object> updateOperationTag = null; private static ExecutorService executor = null; public static void initQueueNum(int size) { if (queueList == null) synchronized (ActualDataServiceManager.class) { if (queueList == null) initBasicObject(size); } } private static void initBasicObject(int size) { int sizeExtend = size * 1000;//hash范围放大1000倍 queueList = new ArrayList<>(sizeExtend); updateOperationTag = new ConcurrentHashMap<>(); executor = new ThreadPoolExecutor( size, size, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); DataUpdateQueueWrapper ary[] = new DataUpdateQueueWrapper[size]; for (int i = 0; i < size; i++) { DataUpdateQueueWrapper dataUpdateQueueWrapper = new DataUpdateQueueWrapper(); ary[i] = dataUpdateQueueWrapper; executor.submit(dataUpdateQueueWrapper); } for (int i = 0; i < sizeExtend; i++) {//生成虚拟hash节点 queueList.add(ary[i % size]); } } /** * 根据标记获取队列 * * @param flag * @return */ public static BlockingQueue<ActualDataModelOperationHandler> getUpdateQueue(String flag) throws Exception { if (queueList == null) { throw new Exception("在调用该方法前,调用initQueueNum初始化更新队列个数"); } int hash; int index = flag == null ? 0 : (hash = flag.hashCode()) ^ (hash >>> 16); return queueList.get((queueList.size() - 1) & index).getQueue(); } public static void addFlag(String flag) { updateOperationTag.put(flag, "flagValue"); } public static void removeFlag(String flag) { updateOperationTag.remove(flag); } public static boolean contains(String flag) { return updateOperationTag.containsKey(flag); } public static boolean noContains(String flag) { return !contains(flag); } }
package com.yy.fastcustom.actualupdate; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; /** * Created by zzq on 2019/5/16. */ public class DataUpdateQueueWrapper implements Callable { private volatile BlockingQueue<ActualDataModelOperationHandler> updateQueue = null; { updateQueue = new LinkedBlockingQueue<>(); } public BlockingQueue<ActualDataModelOperationHandler> getQueue() { return updateQueue; } @Override public Object call() throws Exception { for (; true; ) { ActualDataModelOperationHandler actualDataModel = updateQueue.take(); String flag = actualDataModel.getReadWriteFlag(); ActualDataServiceManager.addFlag(flag);//这里由单独的工作线程loop队列 try { boolean b1 = actualDataModel.removeCache(); if (b1)//如果缓存删除失败则,不执行更新数据库并刷新到缓存;缓存删除成功即使updateDbAndCache执行失败也能保证旧数据的一致性 actualDataModel.updateOriginalDataAndCache(); } finally { ActualDataServiceManager.removeFlag(flag); } } } }
package com.yy.fastcustom.actualupdate; /** * Created by zzq on 2019/5/27/027. */ public class TestDataModel extends ActualDataModelOperationHandler<String> { public static void main(String[] args) { // ceshi_hash(); String key = "1"; ActualDataServiceManager.initQueueNum(3); final TestDataModel actualDataModel = new TestDataModel(); actualDataModel.setReadWriteFlag(key); actualDataModel.setReadWaitMillis(20 * 1000); try { actualDataModel.updateData(); } catch (Exception e) { e.printStackTrace(); } new Thread(() -> { String ret = null; ret = actualDataModel.readData(); System.out.println("=====end out====" + ret); }).start(); } private static void ceshi_hash() { int p1 = 356349; int p2 = 453546; int p3 = 678345; int p4 = 768678; int p5 = 967897; int r1 = p1 & 2; int r2 = p2 & 2; int r3 = p3 & 2; int r4 = p4 & 2; int r5 = p5 & 2; int r1_ = p1 & 2999; int r2_ = p2 & 2999; int r3_ = p3 & 2999; int r4_ = p4 & 2999; int r5_ = p5 & 2999; System.out.println("===="); } volatile String uu = null; @Override public boolean removeCache() { for (int i = 0; i < 5; i++) { System.out.println("====removeCache==缓存=="); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return true; } @Override public boolean updateOriginalDataAndCache() { for (int i = 0; i < 5; i++) { System.out.println("====updateOriginalDataAndCache==数据库=="); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } if (i == 1) { uu = "获取到了缓存数据"; } } return false; } @Override public void setCacheData(String s) { uu = s; System.out.println("====读取数据库成功,将其放入缓存=="); } @Override public String getOriginalData() { return "没有找到缓存,从数据库加载!"; } @Override public String getCacheData() { System.out.println("====读取缓存但没读取到=="); return null; } }