文章目录
I、Redis 使用说明文档
Redis 是完全开源免费的,是一个高性能的key-value数据库,目前市面上主流的数据库
Redis、Memcache、Tair(淘宝自研发)
Redis的官网:https://redis.io/
注意:Redis官方是没有windows版本的
一、redis 应用场景及数据类型
1.1、应用场景
1、短信验证码Code 缓存
2、数据库数据缓存
3、Token令牌的生成
4、网页计数器
5、分布式锁
等等
1.2、redis 数据类型
1、string (最常用m可存放任意数据,json,二进制数据等)
2、Hash (key value 数据)
3、List (无序集合)
4、Set (有序集合)
5、sorted Set (有序集合+分数值排序)
二、redis 安装 (linux)
2.1、下载
redis-4.0.11 百度网盘链接:https://pan.baidu.com/s/1P-nao7kHSQ12hsYoDW-jdQ 提取码:p4uh
redis-5.0.12 百度网盘链接:https://pan.baidu.com/s/11cXmRRNDKBxJpfPi5_I5cw 提取码:5llq
2.2、 安装
1、redis 包放到linux 后执行命令安装
tar -zxvf redis-5.0.12.tar.gz # 解压(redis目录执行)
yum install gcc # 安装 gcc 环境(redis/C++)
rpm -qa|grep gcc # 验证gcc是否安装成功
make # 编译( 进解压后的目录 )
make install # 初始化环境 ( 解压后的目录 )
编译redis需要 gcc 环境,所以先用yum安装 gcc
2.3、远程连接配置
修改 redis.conf
1、 protected-mode yes 改为 protected-mode no # 在没有密码的情况下,关闭保护模式)
2、 bind 127.0.0.1 注释掉 #, 取消绑定本地地址, 允许远程连接
3、 daemonize no 改为 daemonize yes # 是否为进程守护,关闭ssh窗口后即是否在后台继续运行)
4、 # requirepass foobared 改为 requirepass 123456 # 连接账号:auth 连接密码:123456)
ps: 如果是云服务器放行 6379 端口,如果是虚拟机关闭防火墙( systemctl stop firewalld | systemctl disable firewalld)
2.4、启动 + 查询redis进程
1、 ./src/redis-server # 正常启动
2、 ./src/redis-server redis.conf # 后台启动
3、 ps aux|grep redis # 查询redis进程
4、 ./redis-cli # 使用命令连接redis
ps:集群会使用到多个redis,可以添加redis2.conf || redis3.conf 同时启动多个redis( 修改 port 端口号)
2.5、redis 关闭
# 正常停止,加save是更新数据到本地(硬盘),防止数据丢失
./src/redis-cli -p 6379 shutdown save
# 强制停止
ps aux|grep redis
kill -9 pid
三、redis 安装 (win)
3.1、下载
redis-5.0.5-windows 版本下载链接:https://pan.baidu.com/s/1VVftOb6TcqX7fREOse0jlg 提取码:h7a1
3.2、所有
解压缩后双击脚本: startup.bat
启动
四、springboot 整合 redis
4.1、添加 maven 依赖
<!-- redis-jedis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- commons-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
4.2、添加 yml 配置
server:
port: 9001
spring:
redis:
host: 47.108.201.177
port: 6379
password: 123456
database: 0
timeout: 1000s # 数据库连接超时时间,2.0 中该参数的类型为Duration,这里在配置的时候需要指明单位
# 连接池配置,2.0中直接使用jedis或者lettuce配置连接池
jedis:
pool:
# 最大空闲连接数
max-idle: 500
# 最小空闲连接数
min-idle: 50
# 等待可用连接的最大时间,负数为不限制
max-wait: -1
# 最大活跃连接数,负数为不限制
max-active: -1
4.3、添加 RedisConfig 配置类
1、配置缓存数据格式
2、开启注解缓存数据功能 @EnableCaching 注解开启
/***
*
* 处理 redis缓存key 乱码, 以及保存对象数据设置为 json存储(默认二进制)
* <P>
* 继承CachingConfigurerSupport, 为了自定义生成KEY的策略。可以不继承。
* </P>
* @author wangsong
* @date 2021/3/2 0002 17:38
* @return
* @version 1.0.0
*/
@Configuration
@EnableCaching // 启用缓存,这个注解很重要;可以使用
public class RedisConfig extends CachingConfigurerSupport {
//@Value("${spring.cache.redis.time-to-live}")
private Duration timeToLive = Duration.ZERO;
@Bean(name = "redisTemplate")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
template.setValueSerializer(jackson2JsonRedisSerializer);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(stringRedisSerializer);
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题)
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(timeToLive)
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
4.4、添加 RedisUtil 数据操作工具类
package com.ws.test.redisdemo.util;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("all")
@Component
public class RedisUtil {
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 指定缓存失效时间
*
* @param key 键
* @param time 时间(秒)
* @return
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
*
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
*
* @param key 可以传一个值 或多个
*/
public void delete(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
//============================String=============================
/**
* 普通缓存获取
*
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
*
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增 或 递减
*
* @param key 键
* @param delta 要增加或减少几(+正数 -负数)
* @return
*/
public long increment(String key, long delta) {
return redisTemplate.opsForValue().increment(key, delta);
}
//================================Map=================================
/**
* HashGet
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return 值
*/
public Object hGet(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
*
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hEntries(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
* @return true 成功 false 失败
*/
public boolean hPutAll(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
*
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hPutAll(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hPut(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hPut(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hDelete(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加或减少几(+ 正数 -负数)
* @return
*/
public double hIncrement(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
//============================set=============================
/**
* 根据key获取Set中的所有值
*
* @param key 键
* @return
*/
public Set<Object> sMembers(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sIsMember(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sAdd(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sAdd(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
* @return
*/
public long sSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long sRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
//===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
* @return
*/
public List<Object> lRange(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
* @return
*/
public long lSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
* @return
*/
public Object lIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lRightPush(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lRightPush(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0) expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lRightPushAll(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lRightPushAll(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0) expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lSet(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
五、redis 保存对象(二进制/json)
5.1、保存数据到redis 代码
@RestController
@RequestMapping("/test1")
public class Test1Controller {
@Autowired
private RedisUtil redisUtil;
/**
* 保存实体对象和获取对象
* @return
*/
@GetMapping("/a1")
public User binaryStorage() {
User user = new User();
user.setUsername("兮家小二");
user.setAge(24);
// 保存对象
redisUtil.set("user", user);
// 获取对象
User newUser = (User) redisUtil.get("user");
return newUser;
}
}
User 类, 我项目中已添加了 mybatis-plus 和 lombok (可自行集成)
@TableName("user")
@Data
public class User implements Serializable {
@TableId(type = IdType.AUTO)
private Integer id;
private String username;
private Integer age;
}
5.2、保存为json
4.3 步骤已配置默认配置为 json 数据格式保存
执行 5.1 步骤的接口,使用RedisClient 查看数据如下
5.2、保存为二进制
1、删除 4.3 步骤中 RedisConfig 中的使用配置,保留RedisConfig 类
2、注意: 所有二进制保存实体类必须序列化: User中添加 implements Serializable
@Configuration
@EnableCaching // 启用缓存,这个注解很重要;可以使用
public class RedisConfig extends CachingConfigurerSupport {
}
执行 5.1 步骤的接口,使用RedisClient 查看数据如下
出现无法查看的数据, 工具的问题, 有些工具可以查看key,内容是二进制的
但缓存是可以正常使用的,如下图:
六、redis 数据缓存与数据同步
6.1、数据缓存
@RestController
@RequestMapping("/test2")
public class Test2Controller {
@Autowired
private UserMapper userMapper;
/**
* 查询数据并添加指定的参数到缓存中
* <P>
* 使用说明: id查询添加缓存,id编辑+id删除时清除缓存, 达到清理缓存的作用,另外定期清理所有缓存,以此保证缓存中没有脏数据
* @Cacheable 注解说明
* 1、value参数: 缓存key前缀
* 2、key: 缓存key动态参数,以使用EL 表达式动态拼接动态参数, 不指定默认使用拼接方法的所有参数
* 如: 请求id=1, 则缓存 key 为 user::1
* 3、condition 参数为结果为true使用缓存, 为 false 不缓存, 可使用EL 表达式, 如 #id > 1 的结果 (可以为空,默认缓存)
* </P>
* @return
*/
@GetMapping("/findId")
@Cacheable(value = {
"user"}, key = "#id", condition = "true")
public User findId(String id) {
return userMapper.selectById(id);
}
/**
* 添加时添加缓存
* <P>
* @CachePut 说明
* 1、会先执行业务方法
* 2、返回值为缓存数据 value
* 3、缓存key = #result.id 表示 user::id参数 为缓存key
* </P>
*
* @return
*/
@GetMapping("/insert")
@CachePut(value = {
"user"}, key = "#result.id", condition = "true")
public User insert(String name) {
User user = new User();
user.setUsername(name);
user.setAge(22);
int insert = userMapper.insert(user);
return user;
}
/**
* 编辑数据 并清除指定参数的缓存, 如果方法抛出异常则不会清除缓存
* <P>
* @CacheEvict 注解说明
* 1、allEntries 参数: 会清除user 的下的所有缓存, false 只清除指定id 的缓存
* 2、其他同查询参数
* </P>
* @return
*/
@GetMapping("/upd")
@CacheEvict(value = {
"user"}, key = "#id", condition = "true") //, allEntries = true
public Boolean upd(Integer id, String name) {
User user = new User();
user.setId(id);
user.setUsername(name);
int i = userMapper.updateById(user);
return i == 1;
}
}
6.2、数据同步
出现数据不一致场景
1、当直接修改数据库时,将查询查询数据不一致问题
2、当查询到数据后,数据库立即被 upd操作修改,在放入之前查询到的数据到 redis 缓存
处理方法
1:直接清除Redis的缓存,重新读取数据库即可
2:使用mq异步订阅mysql binlog实现增量同步
3:使用alibaba的canal
七、redis 持久化(RDB、AOF)
redis 是持久化的缓存框架, 会自动保存数据到硬盘, 计算redis 宕机了,重启后数据依然存在
7.1、RDB (定时同步):
RDB 为全量同步,直接缓存整个redis数据,每次任务执行时判断增加了那些key, 在把新增加的key保存到本地的 rdb 文件中
eedis 默认已开启了RDB存储
redis.conf 配置定时更新配置
搜索 save 找到
save 900 1 # 在900秒(15分钟)之后,如果至少有1个key发生变化,则dump内存快照。
save 300 10 # 在300秒(5分钟)之后,如果至少有10个key发生变化,则dump内存快照。
save 60 10000 # 在60秒(1分钟)之后,如果至少有10000个key发生变化,则dump内存快照
7.2、AOF (增量同步):
Aof是以执行命令的形式实现同步
配置:
appendfsync always # 每次有数据修改发生时都会写入AOF文件,能够保证数据不丢失,但是效率非常低。
appendfsync everysec # 每秒钟同步一次,可能会丢失1s内的数据,但是效率非常高。
appendfsync no # 从不同步。高效但是数据不会被持久化。
直接修改 redis.conf 中 appendonly yes
建议最好还是使用everysec 既能够保证数据的同步、效率也还可以。
八、redis 事务
Multi 开启事务
EXEC 提交事务
Watch 可以监听一个或者多个key,在提交事务之前是否有发生了变化 如果发生边了变化就不会提交事务,没有发生变化才可以提交事务
九、redis key过期监听
可以做延时任务,如: 订单自动超时
9.1、修改redis 配置 redis.conf
notify-keyspace-events "" 修改为 notify-keyspace-events Ex
9.2、boot 项目配置监听
package cn.ws.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
9.3、boot 项目监听所有失效 key 回调方法
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* Redis-key失效监听事件,所有key 失效都会走此方法
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取失效的key
String expiredKey = message.toString();
System.out.println(expiredKey);
}
}
9.4、key 过期打印示例
十、redis 分布式锁
分布式锁:
保证只有一个 jvm 执行,多个jvm安全问题, 如:集群时定时任务在同个jvm 同时执行相同的任务
分布式锁实现思路:
1、获取锁(创建key 获取锁) 2、释放(执行完业务删除key ) 3、超时 (一定时间内key没删除自动删除 5s)
redission
10.1、分布式锁实现方案
1、基于数据库
2、基于zk (临时节点+事件通知)–> 临时节点不能重复,谁创建成功谁就获得锁
3、基于redis (setnx 方式)–> redis key不能重复,谁先创建谁就获得锁
10.2、Jedis 实现核心代码
public class RedisLock {
private static final int setnxSuccss = 1;
/**
* 获取锁
*
* @param lockKey 定义锁的key
* @param notLockTimeOut 没有获取锁的超时时间
* @param lockTimeOut 使用锁的超时时间
* @return
*/
public String getLock(String lockKey, int notLockTimeOut, int lockTimeOut) {
// 获取Redis连接
Jedis jedis = RedisUtil.getJedis();
// 定义没有获取锁的超时时间
Long endTimeOut = System.currentTimeMillis() + notLockTimeOut;
while (System.currentTimeMillis() < endTimeOut) {
String lockValue = UUID.randomUUID().toString();
// 如果在多线程情况下谁能够setnx 成功返回0 谁就获取到锁
if (jedis.setnx(lockKey, lockValue) == setnxSuccss) {
jedis.expire(lockKey, lockTimeOut / 1000);
return lockValue;
}
// 否则情况下 在超时时间内继续循环
}
try {
if (jedis != null) {
jedis.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 释放锁 其实就是将该key删除
*
* @return
*/
public Boolean unLock(String lockKey, String lockValue) {
Jedis jedis = RedisUtil.getJedis();
// 确定是对应的锁 ,才删除
if (lockValue.equals(jedis.get(lockKey))) {
return jedis.del(lockKey) > 0 ? true : false;
}
return false;
}
}
10.3、redisTemplate 实现核心代码
@Component
public class RedisLock {
@Resource
private RedisTemplate redisTemplate;
/**
* 获取一个redis分布锁, 100% 只有一个线程能获取
* <P>
* connection.setNX : 设置一个永久对象,value值为锁的过期时间
* oldTime < nowTime : 获取原锁时间判断原锁是否已过期
* connection.getSet : 新线程获得锁并设置过期时间 且 再次获取原锁的过期时间
* oldValue==null ... : 判断key 的原锁是否存在 和在此 原锁是否已过期 (可能同时有多个线程进入,需要再次判断)
* </P>
*
* @param lockKey 锁住的key
* @param lockExpireMils 锁住的时长。如果超时未解锁,视为加锁线程死亡,其他线程可夺取锁
* @return
*/
@SuppressWarnings("all")
public boolean lock(String lockKey, long lockExpireMils) {
return (Boolean) redisTemplate.execute((RedisCallback) connection -> {
long nowTime = System.currentTimeMillis();
Boolean acquire = connection.setNX(lockKey.getBytes(), String.valueOf(nowTime + lockExpireMils + 1).getBytes());
if (acquire) {
return Boolean.TRUE;
} else {
byte[] value = connection.get(lockKey.getBytes());
if (Objects.nonNull(value) && value.length > 0) {
long oldTime = Long.parseLong(new String(value));
if (oldTime < nowTime) {
byte[] oldValue = connection.getSet(lockKey.getBytes(), String.valueOf(nowTime + lockExpireMils + 1).getBytes());
return oldValue == null ? false : Long.parseLong(new String(oldValue)) < nowTime;
}
}
}
return Boolean.FALSE;
});
}
/**
* 释放锁 其实就是将该key删除
*
* @return
*/
@SuppressWarnings("all")
public Boolean unLock(String lockKey) {
return redisTemplate.delete(lockKey);
}
}
使用锁
@Autowired
private RedisLock redisLock;
// 获取锁,并设置锁获取时间,防止死锁, 可 while (true) 获取锁,一点时间内没有获取到直接抛出异常
boolean lock = redisLock.lock("orderNo", 10000);
// 释放锁 (执行完业务后,手动释放,让下一个线程可以获取锁)
boolean unLock = redisLock.unLock("orderNo");
十一、redis 发布和订阅 (MQ)
11.1、发布者
/**
* 发布者配置
*/
@Configuration
public class PubConfig {
/**
* 订阅发布的主题
* @return
*/
@Bean
ChannelTopic topic() {
return new ChannelTopic( "pubsub:queue" );
}
}
/**
* 发布者
*/
@Component
public class MsgPublisher {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private ChannelTopic topic;
public void sendMsg(String msg) {
redisTemplate.convertAndSend(topic.getTopic(), "Message: " + msg);
}
}
11.2、订阅者
/**
* 订阅者配置
*/
@Configuration
public class SubConfig {
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new MsgListener());
}
@Bean()
@Primary // 如果同时配置了key过期监听,请添加@Primary注解
RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(messageListener(), new ChannelTopic("pubsub:queue"));
return container;
}
}
/**
* 订阅者
*/
@Component
public class MsgListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
System.out.println( "接收成功: " + message.toString() );
}
}
11.3、发送消息测试
发布者发布消息
/**
* 消息发布订阅
* @author ws
* @mail [email protected]
* @date 2020/2/21 0021 16:03
*/
@RestController
@RequestMapping("/test4")
public class Test4Controller {
@Autowired
private MsgPublisher msgPublisher;
@RequestMapping("/send")
public String sendMsg(@RequestParam("msg") String msg){
msgPublisher.sendMsg(msg);
return "发送成功!";
}
}
订阅者打印内容
十二、redis 集群(主从复制,哨兵,穿透,击穿,雪崩)
12.1、主从复制
1.Redis从节点 向 主节点建立socket 长连接
2.Redis采用全量或者增量的形式将数据同步给从节点
从Redis2.8版本以后 过程采用增量和全量同步
全量复制:一般用于在初次的复制场景(从节点与主节点一次建立)(RDB)
增量复制:网络出现问题,从节点再次连接主节点时,主节点补发缺少的数据,主节点有新的set操作,每次数据增量同步 (AOF)
相关配置 Redis.conf
在从redis 的配置 slaveof 指向主 redis 地址和密码
slaveof 192.168.212.160 6379
masterauth 123456
查询主从复制配置命令
> info replication
多从节点配置原理
如果有多个从节点,使用树结构方式来进行配置, 主要最上级的主节点可以进行写操作,从节点不能进行写操作
注意:主从复制不可能保证强一致性
主从复制存在那些缺陷
如果主节点存在了问题,整个Redis环境是不可以实现写的操作,需要人工更改配置变为主操作
如何解决该问题:使用哨兵机制可以帮助解决Redis集群主从选举策略。
12.2、哨兵
Redis的哨兵机制就是解决我们以上主从复制存在缺陷(选举问题),解决问题保证我们的Redis高可用,实现自动化故障发现与故障转移。
哨兵机制是如果获取所有节点的
哨兵机制只需要配置监听我们的主节点就可以获取当前整个Redis集群的环境列表,采用 info replication 命令形式。从上级往下级去找到所有节点
哨兵为什么要集群
哨兵不建议是单机的,最好每个Redis节点都需要配置哨兵监听, 有几个redis就有几个哨兵, 主要目的用于选举
多个哨兵都执行同一个主的master节点,订阅到相同都通道,通过订阅通知,随后多个哨兵相互建立长连接。
哨兵如何发现故障
如果单个哨兵会向主的master 节点发送ping的命令,没有响应,哨兵会认为主观不可用状态, 会发送给其他都哨兵在次确认该master节点是否不可用,当前确认的哨兵节点数>=quorum(可配置),会实现重新选举。(类似zk的过半机制)
配置哨兵
redis下 sentinel.conf 文件复制到 redis下的bin目录
修改 sentinel.conf 配置文件
# 修改为后台启用
daemonize yes
# 指定redis主节点 ( 2表示需要确认master节点是否ping成功的哨兵, 如果2台哨兵没有拼通, 重新选举主节点)
sentinel monitor mymaster 192.168.212.160 6379 2
# 哨兵配置需要所有redis密码一致, 或不要密码
sentinel auth-pass mymaster 123456
启动哨兵
./redis-sentinel ./sentinel.conf
注意:
1、哨兵重新选举后 sentinel.conf 中的主节点配置将自动改变
2、哨兵重新选举后,主节点将发送变化,主从复制配置将失效,数据无法进行同步, 需要修改redis.cong 配置文件的主从,重启redis来调整,或使用命令(自行查询)
12.3、缓存穿透
缓存穿透:
在正常的逻辑下,使用id查询,会缓存到redis, 如数据库不存在的id( 相当于黑客使用模拟id发起请求, 高并发下 ) 那么每次都会去数据库查询,这就是缓存穿透,将对数据库造成压力
解决方案:
1、使用api 限流,防御 ddos (建议方案)
2、缓存空值到redis,并设置有效期(随机id 无法避免)
3、布隆过滤器 (自行查阅资料)
12.4、缓存击穿
缓存击穿:
热点key: 在高并发的情况,当一个缓存key过期时,因为访问该key请求较大,多个请求同时发现缓存过期,因此对多个请求同时数据库查询、同时向Redis写入缓存数据,这样会导致数据库的压力非常大;
解决方案:
1.使用分布式锁
2.使用本地锁
3.软过过期 , 设置热点数据永不过期或者异步延长过期时间;
12.5、服务雪崩
服务雪崩:
缓存雪崩指缓存服务器重启或者大量的缓存集中在某个时间段失效,突然给数据库产生了巨大的压力,甚至击垮数据库的情况。
解决思路:
1、对不用的数据使用不同的失效时间,加上随机数
2、使用集群分摊压力
十三、Redis Cluster (集群方式升级)
Redis 哨兵集群模式:
每个节点都保存全量同步数据,冗余的数据比较多;
Redis Cluster 集群模式:
采用分片集群模式,数据使用哈希槽均摊到每个redis节点中存放,可以减少冗余数据,缺点就是构建该集群模式成本非常高,redis3.0 后支持
集群模式 RedisCluster,原理采用hash槽的概念,预先分配16384个卡槽,并且将该卡槽分配给具体服务的节点;通过key进行crc16(key)%16384 获取余数,余数就是对应的卡槽的位置,一个卡槽可以存放多个不同的key,从而将读或者写转发到该卡槽的服务的节点。 最大的有点:动态扩容、缩容。
13.1、Redis Cluster 集群搭建 (redis 5.0 + 配置方式):
搭建伪集群
# 创建配置文件
mkdir rediscluster
cd rediscluster/
mkdir redis7000
mkdir redis7001
mkdir redis7002
mkdir redis7003
mkdir redis7004
mkdir redis7005
# 每个配置文件内容
daemonize yes # 后台启动
protected-mode no ; # 允许外部访问
port 7005 # 修改端口号,从7000到7005
cluster-enabled yes # 开启cluster,去掉注释
cluster-config-file 7000nodes.conf # 自动生成
cluster-node-timeout 15000 # 节点通信时间
logfile /usr/rediscluster/redis7005/redis.log
# 启动redis
/usr/redis/bin/redis-server /usr/rediscluster/redis7000/redis.conf
/usr/redis/bin/redis-server /usr/rediscluster/redis7001/redis.conf
/usr/redis/bin/redis-server /usr/rediscluster/redis7002/redis.conf
/usr/redis/bin/redis-server /usr/rediscluster/redis7003/redis.conf
/usr/redis/bin/redis-server /usr/rediscluster/redis7004/redis.conf
/usr/redis/bin/redis-server /usr/rediscluster/redis7005/redis.conf
# 分配卡槽 (cluster-replicas 1 代表一主一从, 6台就是3主3从, 如果设置为2 就是2主2从)
/usr/redis/bin/redis-cli --cluster create 192.168.212.163:7000 192.168.212.163:7001 192.168.212.163:7002 192.168.212.163:7003 192.168.212.163:7004 192.168.212.163:7005 --cluster-replicas 1
# 连接redis 注意需要添加-c, 否则无法在任意节点添加数据
/usr/redis/bin/redis-cli -h 192.168.212.163 -p 7000 –c
# 查看集群(redis-cli 连接后)
cluster nodes
13.2、Redis Cluster 快速扩容:
# 搭建redis7006 + redis7007服务
省略...
# 新增主节点
/usr/redis/bin/redis-cli --cluster add-node 192.168.212.163:7006 192.168.212.163:7000
# 新增从节点 (5d94171eb34ed4396bf5b9db8efaab4d96d0cf10 为 cluster nodes查询到的7006 的id)
/usr/redis/bin/redis-cli --cluster add-node 192.168.212.163:7007 192.168.212.163:7000 --cluster-salve --cluster-master-id 5d94171eb34ed4396bf5b9db8efaab4d96d0cf10
# 分配卡槽 (连接任意一台节点自动识别分配),分配后自动分配卡槽以及带卡槽的数据给新的节点
/usr/redis/bin/redis-cli --cluster reshard 192.168.212.163:7000
- 1、设置分配大小(建议为 16384/卡槽数)
- 2、设置分配给哪个卡槽, 输入cluster nodes查询到的 卡槽id
- 3、设置卡槽方式, all(平均分配-建议) done(手动指定)
# 查看集群(redis-cli 连接后)
cluster nodes
13.3、Redis Cluster 缩容
cluster-from = 缩容节点id
cluster-to = 缩容后卡槽移动到哪个节点的 id (可配置多个)
注意:下方是一行命令
/usr/redis/bin/redis-cli --cluster reshard 192.168.212.163:7000
--cluster-from 5d94171eb34ed4396bf5b9db8efaab4d96d0cf10
--cluster-to 511058958a3b80dd600e060c2500050c6c5a02ab
--cluster-slots
13.4、Redis Cluster 自动选举
Redis Cluster 的子节点中的 主节点和从节点 会自动进行选举, 主节点挂了从节点自动升级为主节点 ,原主节点重启后会变会从节点
13.5、jedis Cluster 集群连接
hostAndPortsSet 添加任意一个节点即可, 会自动寻找其他节点并重定向
public class Cluster {
private static JedisCluster jedis;
static {
// 添加集群的服务节点Set集合
Set<HostAndPort> hostAndPortsSet = new HashSet<HostAndPort>();
// 添加节点
hostAndPortsSet.add(new HostAndPort("192.168.56.180", 7777));
hostAndPortsSet.add(new HostAndPort("192.168.56.180", 8888));
hostAndPortsSet.add(new HostAndPort("192.168.56.181", 7777));
hostAndPortsSet.add(new HostAndPort("192.168.56.181", 8888));
hostAndPortsSet.add(new HostAndPort("192.168.56.182", 7777));
hostAndPortsSet.add(new HostAndPort("192.168.56.182", 8888));
// Jedis连接池配置
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 最大空闲连接数, 默认8个
jedisPoolConfig.setMaxIdle(100);
// 最大连接数, 默认8个
jedisPoolConfig.setMaxTotal(500);
//最小空闲连接数, 默认0
jedisPoolConfig.setMinIdle(0);
// 获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1
jedisPoolConfig.setMaxWaitMillis(2000); // 设置2秒
//对拿到的connection进行validateObject校验
jedisPoolConfig.setTestOnBorrow(true);
jedis = new JedisCluster(hostAndPortsSet, jedisPoolConfig);
}
}
II、Redis实战
一、生成全局唯一订单号(自增 increment)
1.1、生分布式唯一编号
代码放在 RedisUtil 工具类中 , RedisUtil 参考: I -> 四 -> 4.4 步骤
/**
* 获取唯一编号(20位),目前只适用于个 redis
* <P>
* 分布式架构获取唯一编号(基于redis)--> 订单号,交易号,退款号等等,如果redis 集群,需设置自增歩长,请使用命令设置或所有其他方法规避
* </P>
*
* @param key key前缀-- 实际key等于前缀+每秒时间戳(同一秒delta 自增,下一秒根据时间戳生成新的key,delta重新计算)
* @param delta 默认初始自增值
* *** 2019-10-20 12:00:01 000001 --> 20191020120001000001,20191020120001000002,20191020120001000003......
* *** 2019-10-20 12:00:02 000001 --> 20191020120002000001,20191020120002000002,20191020120002000003......
* @return java.lang.String
* @author ws
* @mail [email protected]
* @date 2020/2/20 0020 15:59
*/
public String getNo(String key, Long delta) {
try {
// delta为空默认值1
if (null == delta) {
delta = 1L;
}
// 生成14位的时间戳(每秒使用新的时间戳当key)
String timeStamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
// 获得redis-key
String newKey = key + ":" + timeStamp;
// 获取自增值(时间戳+自定义key)
Long increment = redisTemplate.opsForValue().increment(newKey, delta);
// 设置时间戳生成的key的有效期为2秒,删除已无用的key
redisTemplate.expire(newKey, 2, TimeUnit.SECONDS);
// 获取订单号,时间戳 + 唯一自增Id( 6位数,不过前方补0)
return timeStamp + String.format("%06d", increment);
} catch (Exception e) {
// redis 宕机时采用时间戳加随机数
String timeStamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
Random random = new Random();
//14位时间戳到 + 6位随机数
for (int i = 0; i < 6; i++) {
timeStamp += random.nextInt(10) + "";
}
return timeStamp;
}
}
1.2、测试接口
@RestController
@RequestMapping("/test11")
public class Test11Controller {
@Autowired
private RedisUtil redisUtil;
/**
* 生成分布式唯一订单号(测试api)
* @return
*/
@GetMapping("/getNo")
public String getNo() {
// 获取订单号
String orderOn = redisUtil.getNo("order", 1L);
// 设置订单有效期30秒
redisUtil.set("order" + orderOn, 0, 30);
return orderOn;
}
}
二、控制多个jvm 定时任务执行(锁)
使用aop 机制 和 自定义注解方式实现
2.1、添加注解
/**
* @author ws
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TaskLock {
/**
* redis 锁的 key
*/
String lockKed() default "";
/**
* key在redis里存在的时间。默认10秒=
* 锁的有效期设置请小于定时任务时间, 如定时任务10秒执行一次,那么 expireTime < 10
* 锁的有效期在服务器没有时间差的情况下建议 > 1秒 (可使用默认10, 定时任务每次执行 >10 可使用默认)
* 如两个jvm中时间存在一定细微差异,可能导致定时任务执行的时间有一点细微的不一致情况发生,该时间尽量 > 服务器时间差 + N秒
*/
long expireTime() default 10;
}
2.2、编辑aop 类 (自动获取锁)
-
redisLock 请参考 10.3 步骤 redisTemplate 实现核心代码
-
利用注解+ aop 机制自动抢锁,获取到锁的线程才执行业务方法
@Aspect
@Slf4j
@Component
public class TaskLockAspect {
private static final String LOCK_VALUE = "taskLock-";
@Autowired
private RedisLock redisLock;
@Around("execution(* *.*(..)) && @annotation(com.ws.test.redisdemo.taskScheduling.annotation.TaskLock)")
public void cacheLockPoint(ProceedingJoinPoint proceed) {
MethodSignature signature = (MethodSignature) proceed.getSignature();
Method method = signature.getMethod();
TaskLock taskLock = method.getAnnotation(TaskLock.class);
if (StringUtils.isBlank(taskLock.lockKed())) {
log.info(taskLock.lockKed() + "is no lockKey");
}
boolean lock = redisLock.lock(LOCK_VALUE + taskLock.lockKed(), taskLock.expireTime() * 1000);
if (lock) {
log.info(taskLock.lockKed() + "获取锁成功");
try {
proceed.proceed();
// 不释放,让其自动过期,已次来保证定时任务绝对不被重复执行, 避免定时任务业务代码执行过快, 第二个jvm执行时锁已经被释放的情况发送
} catch (Throwable e) {
log.error("method:{},运行错误!", method, e);
}
} else {
log.error(taskLock.lockKed() + "获取锁失败");
}
}
}
2.3、编写定时任务
创建定时任务,
- lockKed 的 锁名称 设置为相同的, 即lockKed 相同的方法将会进行抢锁
- 分布式部署,一个定时任务的 lockKed 是相同的,哪个jvm 先获取到哪个 jvm 就执行定时任务
下方代码模拟 3个 jvm 执行同一个定时任务,并命名为 ABC, 进行模拟测试
@Component
@Configuration
@EnableScheduling
@Slf4j
public class TaskTest {
public final static String cron = "0/2 * * * * ?";
@Scheduled(cron = TaskTest.cron)
@TaskLock(lockKed = "task1", expireTime = 1)
public void executeTask1() {
System.out.println( LocalDateTime.now()+ "hello world!" + "--B" );
}
@Scheduled(cron = TaskTest.cron)
@TaskLock(lockKed = "task1", expireTime = 1)
public void executeTask2() {
System.out.println( LocalDateTime.now()+ "hello world!" + "--A" );
}
@Scheduled(cron = TaskTest.cron)
@TaskLock(lockKed = "task1", expireTime = 1)
public void executeTask3() {
System.out.println( LocalDateTime.now()+ "hello world!" + "--C" );
}
}
2.4、测试代码
测试如下,每次只有一个线程获取锁 并输出内容, 另外两个线程没有获取到锁,将不执行业务代码
-
以上部分内容来自于蚂蚁课堂 http://www.mayikt.com/
-
个人开源项目(通用后台管理系统)–> https://gitee.com/wslxm/spring-boot-plus2 , 喜欢的可以看看
III、redis 基础提问
1、reids 的应用场景
2、reids 如何实现分布式锁(setNX)
3、reids 有几种数据类型, 分别是什么
4、reids 单个key 支持的最大容量是多少
5、reids 如果宕机了数据会丢失嘛
6、reids 和 memcache 的区别
7、reids 和 mysql 的区别
8、redis 如何防止内存穿透 (使用数据库不存在的id )
9、redis 如果内存满了会怎么样(无法写入,自动执行淘汰策略 )
10、redis 在win下和linux下的最大区别
- 本文到此结束,如果觉得有用,动动小手点赞或关注一下呗,将不定时持续更新更多的内容…,感谢大家的观看!