redis
进阶(功能与模式)
这个实例为了实现redis
最简单的缓存功能,对主数据库MySql
所有的select
都会刷新已有缓存,如果不存在就会新建缓存,所有的insert
,update
操作都会更新缓存。
redis
缓存服务简单模拟
redis
工具类,使用StringRedisTemplate
和RedisTemplate
的API操作缓存,这里仅封装了最简单的一些方法。
StringRedisTemplate
和RedisTemplate
的区别主要在数据类型和序列化方法。
-
StringRedisTemplate
完全继承于RedisTemplate
类,继承的时候,RedisTemplate
的类型是<String, String> ,因此,private StringRedisTemplate stringRedisTemplate
等同于private RedisTemplate<String,String> redisTemplate
。如果平时存储的都是string类型,那么就直接用StringRedisTemplate
,如果想存取其他类型,比如直接存对象,不想先转为string在存储,就只能用RedisTemplate
。 -
StringRedisTemplate
默认采用的是String 的序列化策略,保存的key和value都是采用的该策略序列化并保存的RedisTemplate
默认采用的是JDK的序列化策略,保存的 key和value都是采用此策略序列化并保存的。
齐全的StringRedisTemplate
和RedisTemplate
的API文档可参考博文:RedisTemplate介绍和StringRedisTemplate的用法
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* redis工具类
*/
@SuppressWarnings("unchecked")
public class RedisUtil {
/**
* StringRedisTemplate默认采用的是String 的序列化策略,保存的key和value都是采用的该策略序列化并保存的
* RedisTemplate默认采用的是JDK的序列化策略,保存的 key和value都是采用此策略序列化并保存的
*/
private static final Logger LOG = LoggerFactory.getLogger(RedisUtil.class);
private static RedisTemplate<String, Object> redisTemplate = ContextUtil.getBean("redisTemplate", RedisTemplate.class);
/**
* 这样子注入会报错,无法注入!!!大坑!用@Resource也存在报错
* 写一个工具类,使用最稳的方法获得ApplicationContext.xml中的bean
*/
// @Autowired
// public void setRedisTemplate(RedisTemplate redisTemplate) {
// RedisUtil.redisTemplate = redisTemplate;
// }
private static StringRedisTemplate stringRedisTemplate = ContextUtil.getBean("stringRedisTemplate", StringRedisTemplate.class);
private static String CACHE_PREFIX;
private static boolean CACHE_CLOSED;
private RedisUtil() {
}
@SuppressWarnings("rawtypes")
private static boolean isEmpty(Object obj) {
if (obj == null) {
return true;
}
if (obj instanceof String) {
String str = obj.toString();
if ("".equals(str.trim())) {
return true;
}
return false;
}
if (obj instanceof List) {
List<Object> list = (List<Object>) obj;
if (list.isEmpty()) {
return true;
}
return false;
}
if (obj instanceof Map) {
Map map = (Map) obj;
if (map.isEmpty()) {
return true;
}
return false;
}
if (obj instanceof Set) {
Set set = (Set) obj;
if (set.isEmpty()) {
return true;
}
return false;
}
if (obj instanceof Object[]) {
Object[] objs = (Object[]) obj;
if (objs.length <= 0) {
return true;
}
return false;
}
return false;
}
/**
* 构建缓存key值
* @param key 缓存key
* @return
*/
private static String buildKey(String key) {
if (CACHE_PREFIX == null || "".equals(CACHE_PREFIX)) {
return key;
}
return CACHE_PREFIX + ":" + key;
}
/**
* 返回缓存的前缀
* @return CACHE_PREFIX_FLAG
*/
public static String getCachePrefix() {
return CACHE_PREFIX;
}
/**
* 设置缓存的前缀
* @param cachePrefix
*/
public static void setCachePrefix(String cachePrefix) {
if (cachePrefix != null && !"".equals(cachePrefix.trim())) {
CACHE_PREFIX = cachePrefix.trim();
}
}
/**
* 关闭缓存
* @return true:成功
* false:失败
*/
public static boolean close() {
LOG.debug(" cache closed ! ");
CACHE_CLOSED = true;
return true;
}
/**
* 打开缓存
* @return true:存在
* false:不存在
*/
public static boolean openCache() {
CACHE_CLOSED = false;
return true;
}
/**
* 检查缓存是否开启
* @return true:已关闭
* false:已开启
*/
public static boolean isClose() {
return CACHE_CLOSED;
}
/**
* 判断key值是否存在
* @param key 缓存的key
* @return true:存在
* false:不存在
*/
public static boolean hasKey(String key) {
LOG.debug(" hasKey key :{}", key);
try {
if (isClose() || isEmpty(key)) {
return false;
}
key = buildKey(key);
return redisTemplate.hasKey(key);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return false;
}
/**
* 匹配符合正则的key
* @param patternKey
* @return key的集合
*/
public static Set<String> keys(String patternKey) {
LOG.debug(" keys key :{}", patternKey);
try {
if (isClose() || isEmpty(patternKey)) {
return Collections.emptySet();
}
return redisTemplate.keys(patternKey);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return Collections.emptySet();
}
/**
* 根据key删除缓存
* @param key
* @return true:成功
* false:失败
*/
public static boolean del(String... key) {
LOG.debug(" delete key :{}", key.toString());
try {
if (isClose() || isEmpty(key)) {
return false;
}
Set<String> keySet = new HashSet<>();
for (String str : key) {
keySet.add(buildKey(str));
}
redisTemplate.delete(keySet);
return true;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return false;
}
/**
* 根据key删除缓存
* @param key
* @return true:成功
* false:失败
*/
public static boolean delPattern(String key) {
LOG.debug(" delete Pattern keys :{}", key);
try {
if (isClose() || isEmpty(key)) {
return false;
}
key = buildKey(key);
redisTemplate.delete(redisTemplate.keys(key));
return true;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return false;
}
/**
* 删除一组key值
* @param keys
* @return true:成功
* false:失败
*/
public static boolean del(Set<String> keys) {
LOG.debug(" delete keys :{}", keys.toString());
try {
if (isClose() || isEmpty(keys)) {
return false;
}
Set<String> keySet = new HashSet<>();
for (String str : keys) {
keySet.add(buildKey(str));
}
redisTemplate.delete(keySet);
return true;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return false;
}
/**
* 设置过期时间
* @param key 缓存key
* @param seconds 过期秒数
* @return true:成功
* false:失败
*/
public static boolean setExp(String key, long seconds) {
LOG.debug(" setExp key :{}, seconds: {}", key, seconds);
try {
if (isClose() || isEmpty(key) || seconds > 0) {
return false;
}
key = buildKey(key);
return redisTemplate.expire(key, seconds, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return false;
}
/**
* 查询过期时间
* @param key 缓存key
* @return 秒数
*/
public static Long getExpire(String key) {
LOG.debug(" getExpire key :{}", key);
try {
if (isClose() || isEmpty(key)) {
return 0L;
}
key = buildKey(key);
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return 0L;
}
/**
* 缓存存入key-value
* @param key 缓存键
* @param value 缓存值
* @return true:成功
* false:失败
*/
public static boolean setString(String key, String value) {
LOG.debug(" setString key :{}, value: {}", key, value);
try {
if (isClose() || isEmpty(key) || isEmpty(value)) {
return false;
}
key = buildKey(key);
stringRedisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return false;
}
/**
* 缓存存入key-value
* @param key 缓存键
* @param value 缓存值
* @param seconds 秒数
* @return true:成功
* false:失败
*/
public static boolean setString(String key, String value, long seconds) {
LOG.debug(" setString key :{}, value: {}, timeout:{}", key, value, seconds);
try {
if (isClose() || isEmpty(key) || isEmpty(value)) {
return false;
}
key = buildKey(key);
stringRedisTemplate.opsForValue().set(key, value, seconds, TimeUnit.SECONDS);
return true;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return false;
}
/**
* 根据key取出String value
* @param key 缓存key值
* @return String 缓存的String
*/
public static String getString(String key) {
LOG.debug(" getString key :{}", key);
try {
if (isClose() || isEmpty(key)) {
return null;
}
key = buildKey(key);
return stringRedisTemplate.opsForValue().get(key);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return null;
}
/**
* 去的缓存中的最大值并+1
* @param key 缓存key值
* @return long 缓存中的最大值+1
*/
public static long incr(String key) {
LOG.debug(" incr key :{}", key);
try {
if (isClose() || isEmpty(key)) {
return 0;
}
key = buildKey(key);
return redisTemplate.opsForValue().increment(key, 1);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return 0;
}
}
ContextUtil
工具类,使用ApplicationContextAware
中的方法,通过beanid
获得配置文件中的bean
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;
/**
* Context 工具类
*/
@SuppressWarnings("static-access")
@Component
public class ContextUtil implements ApplicationContextAware {
private static ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.ac = context;
}
/**
* * 根据提供的bean名称得到相应的服务类
* * @param beanId bean的id
* * @return 返回bean的实例对象
*
*/
public static Object getBean(String beanId) {
return ac.getBean(beanId);
}
/**
* * 根据提供的bean名称得到对应于指定类型的服务类
* * @param beanId bean的id
* * @param clazz bean的类类型
* * @return 返回的bean类型,若类型不匹配,将抛出异常
*
*/
public static <T> T getBean(String beanId, Class<T> clazz) {
return ac.getBean(beanId, clazz);
}
}
写一个测试类模拟最简单的缓存查询过程
/**
* Created by Administrator on 2020-1-23.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
@ContextConfiguration({"classpath:applicationContext.xml"})
public class TestDAO {
@Autowired
private UserDAO userDAO;
// 该静态块用于设置key的前缀和打开缓存
static{
RedisUtil.setCachePrefix("userid");
if(!RedisUtil.openCache()){
System.out.println("open error");
}
}
@Test
public void testQuery(){
// 模拟传入请求的用户id
Long userid = 2l;
String name = new String();
try{
// 去缓存中查找key为前缀和userid拼接的value
name = RedisUtil.getString(String.valueOf(userid));
if(name != null){
// 缓存中存在,则答应缓存数据
System.out.println("get name by redis:" + name);
}else {
// 缓存中不存在,使用DAO去数据库中查询,并打印
name = userDAO.getUsernameById(userid);
System.out.println("get name by mysql:" + name);
//将数据库中查询的数据存入缓存中
if(!RedisUtil.setString(String.valueOf(userid),name)){
System.out.println("set error");
}else{
System.out.println("set OK");
}
}
}catch (Exception e){
System.out.println("error" + e.getMessage());
}
System.out.println("Mission accomplished");
}
}
运行测试
第一次
get name by mysql:222
set OK
Mission accomplished
第二次
get name by redis:222
Mission accomplished
redis
服务器
慢查询
和其他数据库的慢查询机制一样,通过记录某些查询语句查询处理时间较慢的信息记录,来定位项目运行过程存在的问题,redis
提供slowlog-log-slower-than
和slowlog-max-len
这两个配置来设置慢查询日志。
slowlog-log-slower-than
用于定义慢查询,使运行速度低于某个值(微秒)的查询处理为慢查询。
slowlog-max-len
用于记录慢查询日志的长度,即当慢查询操作条数高于某个值,最先记录的慢查询操作记录会被删去,从而增加新的慢查询操作条数,即先进先出的队列模式。
持久化
由于redis
的数据存储在内存中,若redis
服务器因为种种原因崩溃,那么内存中的数据就会丢失,客户端访问不到缓存数据,那么主数据库和服务器的压力就会陡升,会造成一系列的连锁反应。
但是,redis
的持久化则是为了解决这个问题。redis
把内存的中的数据并通过异步的方式将数据写入磁盘,在redis
服务器重新启动的时候加载这些数据,从而最大限度的降低缓存丢失带来的影响。
消息队列&发布/订阅模式使用
消息队列:Redis
中提供List类型的字符串链表。和数据结构中的普通链表的原理一样,在插入时,如果该键并不存在,Redis
将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是4294967295。
消息队列的补充:不停的调用
pop
方法查看List中是否有待处理消息。每调用一次都会发起一次连接,这会造成不必要的浪费。使用Thread.sleep()
方法无法使生产者生产速度完全等于消费者消费速度。所以这里引入阻塞读取。阻塞命令brpop
和blpop
在List中无消息时,阻塞队列,并且每次也是弹出一个消息。这样的机制,使只有当启动生产者生产消息之后,消费者才会自动消费消息,而且当无消息时消费者会阻塞直到有消息。
发布/订阅模式:redis
提供这样的模式,该模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel
),而发布者可以向指定的频道(channel
)发送消息,所有订阅此频道的订阅者都会收到此消息。这样也实现了生产者和消费者的消息队列。
详细实现可参考:redis实现消息队列&发布/订阅模式使用
redis
集群模式
Redis
主要利用Replication
和Sentinel
这两种模式来保证Redis
的高可用性。
详细实现可参考:Redis 的主从同步,及两种高可用方式和Redis集群详解
主从复制Replication
(主从同步)
Redis
的主从结构可以采用一主多从或者级联结构,即一个master和一个或者多个slave组成的模型。
主服务器可以从向任意数量的从服务器上同步数据,从服务器可以是关联其他从服务器的主服务器。由于redis
的发布/订阅模式,使得从数据库在任何地方同步时,可订阅一个频道并接收主服务器完整的消息发布记录。
Redis
主从复制可以根据是否是全量分为全量同步和增量同步。主从刚刚连接的时候,进行全量同步;全同步结束后,进行增量同步。主机需要从从机上获得复制(备份)过的数据,首先会尝试进行增量同步,如不成功,要求从机进行全量同步。
- 全量同步:
1.从服务器连接主服务器,发送SYNC命令;
2.主服务器接收到SYNC命令后,开始执行BGSAVE命令生成RDB文件并使用缓冲区记录此后执行的所有写命令;
3.主服务器BGSAVE执行完后,向所有从服务器发送快照文件,并在发送期间继续记录被执行的写命令
4.从服务器收到快照文件后丢弃所有旧数据,载入收到的快照;
5.主服务器快照发送完毕后开始向从服务器发送缓冲区中的写命令;
6.从服务器完成对快照的载入,开始接收命令请求,并执行来自主服务器缓冲区的写命令
- 增量同步:
Redis
增量复制是指Slave初始化后开始正常工作时主服务器发生的写操作同步到从服务器,主服务器每执行一个写命令就会向从服务器发送相同的写命令,从服务器接收并执行收到的写命令。
若主节点由于故障不能提供服务,需要人工将从节点变更为主节点,同时还要通知应用方更新主节点地址等操作。这样的一系列操作显然不利于后期运维。这里引入哨兵(Redis Sentinel
)
哨兵(Sentinel
)
原理: Redis Sentinel
是一个分布式架构,其中包含若干个Sentinel
节点和Redis
服务器节点,每个Sentinel
节点会对服务器节点和其余Sentinel
节点进行监控,当它发现某个节点不可达时,会对该节点做下线标识。如果被标识的是主服务器节点,它还会和其他Sentinel
节点进行“协商”(这里我理解为一种投票机制),当大多数Sentinel
节点都认为主服务器节点不可达时,它们会选举出一个Sentinel
节点来完成自动故障转移的工作,同时会将这个变化通知给Redis
应用方。整个过程完全是自动的,不需要人工来介入。
redis
客户端
数据类型
redis
支持丰富的数据类型,从最基础的string到复杂的常用到的数据结构都有支持
参考上一篇博文:redis学习小结
事务
支持一次性按顺序执行多个命令的能力,并保证其原子性(就是对数据的更改要么全部执行,要么全部不执行)。
Lua
脚本
在事务的基础上,如果我们需要在服务端一次性的执行更复杂的操作(包含一些逻辑判断),则Lua
就可以排上用场了(比如在获取某一个缓存的时候,同时延长其过期时间)。redis
保证Lua
脚本的原子性,一定的场景下,是可以代替redis
提供的事务相关的命令的。
管道
因为redis
的客户端和服务器的连接时基于TCP的, 默认每次连接都时只能执行一个命令。管道则是允许利用一次连接来处理多条命令,从而可以节省一些TCP连接的开销。管道和事务的差异在于管道是为了节省通信的开销,但是并不会保证原子性。
分布式锁
加锁是在redis
中,给Key键设置一个值,为避免死锁,并给定一个过期时间;解锁的过程就是将Key键删除(为了保证锁操作的原子性,我们用LUA
脚本完成加锁和解锁操作)。
redis
分布式锁的知识参考博文:分布式锁之Redis实现