缓存、数据库双写一致性保障方案
1.业务背景
-
实时性要求较高的解决方案
缓存数据库双写一致性通常是用于数据实时性要求较高的场景,比如说商品库存服务。
解决思路:
-
如果不是在读写并发高的场景下,一般采用CacheAsidePattern即可解决。即先删除缓存,再写数据库。
-
读写并发高的场景下。
在读写并发高的场景下,读取和写入的操作是并发的。比如说现在数据库中的库存为100,缓存中的库存也为100。有一个写请求过来,要求修改库存为99。正常情况下是先删除了缓存,然后修改数据库中的数据为99。读请求过来的时候发现缓存中的数据为0,就会去数据库中查询得到99。然后修改缓存中的数据也为99。但是如果写请求的时候还没来得及将数据库中数据修改为99,这时度请求就过来,发现缓存中的数据为空,就去数据库中读取数据为100,然后又重新将缓存中的数据更新为100,这时写请求将数据库中的数据修改完毕,变为99。这就导致了数据不一致的产生
解决:
将读写请求串行化。将读写请求都放到队列中操作,保证串行执行。然后再每个队列上挂一个线程去执行队列中的请求操作
-
-
实时性要求不高的解决方案(先了解)
对于实时性要求不高的数据,可以采用异步更新数据的方式。比如说商品详情页,它的数据要求不是实时性很高,但是要求大流量,特别是热点数据的读并发较高,这时候就必须有一个缓存数据生产服务。比方说有一个更新商品的服务去更新了数据库中的详情页面数据,不需要实时反应到页面上。这时候,可以将这个修改数据的请求放到消息队列中,缓存数据生产服务监听着这个消息服务,一旦接收到消息,就需要去更新自己缓存中的数据。
2.思路整理
如果保证读请求和写请求是针对同一个商品?我们需要做一个HASH路由,保证同一个商品的请求进入的是同一个内存队列。
每个队列都对应一个工作线程,工作线程拿到对应的请求,执行对应的操作。
3.方案落地
- 1.线程池+内存队列初始化
- 2.两种请求对象的封装
- 3.请求异步执行Service封装
- 4.请求处理的工程线程封装
- 5.两种Controller接口封装
- 6.读请求去重优化
- 7.空数据请求的过滤
3.1 线程池+内存队列初始化
web容器初始化的时候,就需要初始化线程池和内存队列。我们可以自定义一个监听器,然后注册这个监听器。
-
新建listener
-
注册listener
-
测试
容器初始化的流程已经做好了,现在需要实现具体的怎么去初始化线程池和内存队列。
-
新建线程池和内存队列的包装类ThreadPoolAndQueueWrapper
这个包装类用于放在监听器中,调用它的init()方法,就可以执行线程池的初始化和队列的初始化。线程开始提交请求工作。
package com.roncoo.eshop.inventory.thread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.roncoo.eshop.inventory.request.Request; import com.roncoo.eshop.inventory.request.RequestQueue; /** * 初始化容器的时候需要初始化线程池和内存队列 * @author Administrator * */ public class ThreadPoolAndQueueWrapper { private ExecutorService threadPool = Executors.newFixedThreadPool(10); public ThreadPoolAndQueueWrapper() { RequestQueue requestQueue = RequestQueue.getInstance(); //初始化的时候就将内存队列集合填满 for (int i = 0; i < 10; i++) { ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(100); requestQueue.addQueue(queue); //线程池用于提交 请求处理的工作线程 threadPool.submit(new RequestProcessThread(queue)); } } /** * 初始化工作线程池和内存队列的方法,上来就执行 */ public static void init() { //保证初始化的时候只能初始化一次线程池和内存队列 //采用静态内部类的方式保证线程绝对安全 Singleton.getInstance(); } private static class Singleton{ private static ThreadPoolAndQueueWrapper instance; static { instance = new ThreadPoolAndQueueWrapper(); } public static ThreadPoolAndQueueWrapper getInstance() { return instance; } } } 复制代码
-
请求队列的封装RequestQueue,内部持有请求队列的集合,提供添加队列的方法
package com.roncoo.eshop.inventory.request; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; /** * 请求内存队列封装 * @author Administrator * */ public class RequestQueue { /** * 内存队列,是一个集合。因为涉及并发,所以使用ArrayBlockingQueeue,队列中存放的是请求(读请求和写请求) */ private List<ArrayBlockingQueue<Request>> queues = new ArrayList<ArrayBlockingQueue<Request>>(); public static RequestQueue getInstance() { return Singleton.getInstance(); } /** * 添加一个内存队列 * @param queue */ public void addQueue(ArrayBlockingQueue<Request> queue) { this.queues.add(queue); } /** * 内部静态类的方式保证绝对的线程安全 * @author Administrator * */ private static class Singleton { private static RequestQueue instance; static { instance = new RequestQueue(); } public static RequestQueue getInstance() { return instance; } } } 复制代码
-
需要提交到线程池中的工作线程,用于处理Request请求。并且持有自己的内存队列
package com.roncoo.eshop.inventory.thread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import com.roncoo.eshop.inventory.request.Request; /** * 执行请求的工作线程 * @author Administrator * */ public class RequestProcessThread implements Callable<Boolean>{ /** * 自己监控的内存队列 */ private ArrayBlockingQueue<Request> queue; public RequestProcessThread(ArrayBlockingQueue<Request> queue) { this.queue = queue; } /** * 具体的工作流程 */ @Override public Boolean call() throws Exception { while(true) { break; } return true; } } 复制代码
-
请求的封装Request,是一个接口,以后读请求和写请求需要实现这个接口,进行自己的操作逻辑
/** * 请求接口,读请求和写请求要实现这个接口 * @author Administrator * */ public interface Request { } 复制代码
项目结构
3.2 两种请求对象的封装
-
新建实体类ProductInventory
public class ProductInventory { /** * 商品id */ private Integer productId; /** * 库存数量 */ private Long inventoryCnt; public ProductInventory() { } public ProductInventory(Integer productId, Long inventoryCnt) { this.productId = productId; this.inventoryCnt = inventoryCnt; } public Integer getProductId() { return productId; } public void setProductId(Integer productId) { this.productId = productId; } public Long getInventoryCnt() { return inventoryCnt; } public void setInventoryCnt(Long inventoryCnt) { this.inventoryCnt = inventoryCnt; } } 复制代码
-
Request接口中添加业务方法
public interface Request { void process(); } 复制代码
-
库存写请求 ProductInventroyWriteRequest
package com.roncoo.eshop.inventory.request; import com.roncoo.eshop.inventory.model.ProductInventory; import com.roncoo.eshop.inventory.service.IProductInventoryService; /** * 库存写请求 * 写请求执行逻辑:Cache Aside Pattern * 1.先删除缓存 * 2.再更新数据库 * @author Administrator * */ public class ProductInventoryWriteRequest implements Request{ private ProductInventory productInventory; private IProductInventoryService productInventoryService; public ProductInventoryWriteRequest(ProductInventory productInventory, IProductInventoryService productInventoryService) { this.productInventory = productInventory; this.productInventoryService = productInventoryService; } public void process() { //1.删除缓存 productInventoryService.removeCache(productInventory); //2.更新数据库 productInventoryService.updateDb(productInventory); } } 复制代码
-
库存读请求 ProductInventroyReadRequest
package com.roncoo.eshop.inventory.request; import org.springframework.beans.factory.annotation.Autowired; import com.roncoo.eshop.inventory.model.ProductInventory; import com.roncoo.eshop.inventory.service.IProductInventoryService; /** * 商品库存读请求 * 1.查询数据库 * 2.设置缓存 * @author Administrator * */ public class ProductInventoryReadRequest implements Request{ /** * 商品Id */ private Integer productId; @Autowired private IProductInventoryService productInventoryService; public ProductInventoryReadRequest(Integer productId, IProductInventoryService productInventoryService) { this.productId = productId; this.productInventoryService = productInventoryService; } @Override public void process() { //1.从数据库中查询最新商品库存 ProductInventory productInventory = productInventoryService.findProductInventoryByProductId(productId); //2.将商品库存设置到redis缓存中 productInventoryService.setProductInventoryToCache(productInventory); } } 复制代码
项目结构:
3.3 请求异步执行的service封装
这一步的操作主要是将过来的请求根据商品id路由到对应的内存队列中。接受的参数是请求.
个人理解叫service这个名称不太好,换个名称不如叫做接口路由代理
-
service接口
RequestAsyncServiceImpl
package com.roncoo.eshop.inventory.service.impl; import java.util.concurrent.ArrayBlockingQueue; import org.springframework.stereotype.Service; import com.roncoo.eshop.inventory.request.Request; import com.roncoo.eshop.inventory.request.RequestQueue; import com.roncoo.eshop.inventory.service.RequestAsyncService; /** * 处理请求的异步service * 1.将请求路由到不同的内存队列 * 2.将请求放入到内存队列中 * @author Administrator * */ @Service("requestAsyncService") public class RequestAsyncServiceImpl implements RequestAsyncService{ @Override public void process(Request request) { try { //做请求的路由,根据每个请求的商品id,路由到对应的内存队列中 ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId()); queue.put(request); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private ArrayBlockingQueue<Request> getRoutingQueue(Integer productId){ RequestQueue requestQueue = RequestQueue.getInstance(); //先获取productId的hash值 String key = String.valueOf(productId); int h; int hash = (key == null) ?0 : (h = key.hashCode()) ^ (h >>> 16); // 对hash取模,将hash值路由到指定的内存队列 // 比如内存队列大小8 // 用内存队列的数量对hash值取模之后, 结果一定是在0-7之间 // 任何一个商品id都会被固定路由到同样的一个内存队列中去 int index = (requestQueue.queueSize() - 1) & hash; return requestQueue.getQueue(index); } } 复制代码
3.4 处理具体请求的工作线程的代码修改
public class RequestProcessThread implements Callable<Boolean>{
/**
* 自己监控的内存队列
*/
private ArrayBlockingQueue<Request> queue;
public RequestProcessThread(ArrayBlockingQueue<Request> queue) {
this.queue = queue;
}
/**
* 具体的工作流程
*/
@Override
public Boolean call() throws Exception {
while(true) {
//从自己监控的内存队列中拿出请求
Request request = queue.take();
//执行操作
request.process();
break;
}
return true;
}
}
复制代码
工程结构:
3.5 contorller层的封装
主要是读请求,要考虑在200ms之内不断循环,从缓存中获取数据。如果200ms内没有,再去数据库查询
package com.roncoo.eshop.inventory.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.roncoo.eshop.inventory.Response.Response;
import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.request.ProductInventoryWriteRequest;
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.service.IProductInventoryService;
import com.roncoo.eshop.inventory.service.RequestAsyncService;
/**
* 商品库存controller
* @author Administrator
*
*/
@Controller
public class ProductInventoryController {
@Autowired
private RequestAsyncService requestAsyncService;
@Autowired
private IProductInventoryService productInventoryService;
@RequestMapping("/updateProductInventory")
@ResponseBody
public Response updateProductInventory(ProductInventory productInventory) {
Response response = null;
try {
Request request = new ProductInventoryWriteRequest(productInventory, productInventoryService);
requestAsyncService.process(request);
response = new Response(Response.SUCCESS);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return response;
}
@RequestMapping("/getProductInventory")
@ResponseBody
public ProductInventory getProductInventory(Integer productId) {
ProductInventory productInventory = null;
try {
Request request = new ProductInventoryWriteRequest(productInventory, productInventoryService);
requestAsyncService.process(request);
//把读请求交给service异步处理以后,需要等待一会
//等待前面库存更新的操作,同时缓存舒心的操作
//如果等待的时间超过了200ms,那么就自己去数据库中查询
long startTime = System.currentTimeMillis();
long endTime = 0L;
long waitTime = 0L;
while(true) {
if(waitTime > 200) {
break;
}
//尝试从缓存中获取数据
productInventory = productInventoryService.getProductInventoryCache(productId);
//如果有数据,就返回数据
if(productInventory != null) {
return productInventory;
}
else {
Thread.sleep(20);
endTime = System.currentTimeMillis();
waitTime = endTime - startTime;
}
}
//如果在规定的时间内(一般是200ms)没有,那么就尝试自己自己去数据库获取
productInventory = productInventoryService.findProductInventoryByProductId(productId);
if(productInventory != null) {
return productInventory;
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new ProductInventory(productId, -1L);
}
}
复制代码