Spring Data Redis
Lettuce 能够支持 Reactive 方式,但是Jedis不支持。另外,在springboot中,默认的已经为Lettuce 而不是Jedis。
Spring Data Redis 中主要的支持
- ReactiveRedisConnection
通过ReactiveRedisConnection建立了一个reactIve的连接,这个连接通过ReactiveRedisConnectionFactory来构造的 - ReactiveRedisConnectionFactory
- ReactiveRedisTemplate
• opsForXxx()
具体操作代码如下:
@Slf4j
@SpringBootApplication
public class RedisReactiveTestApplication implements ApplicationRunner {
private static final String KEY = "COFFEE_MENU";
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private ReactiveStringRedisTemplate redisTemplate;
public static void main(String[] args) { SpringApplication.run(RedisReactiveTestApplication.class, args); }
@Bean
ReactiveStringRedisTemplate reactiveStringRedisTemplate(ReactiveRedisConnectionFactory factory) {
return new ReactiveStringRedisTemplate(factory);
} //springboot 为我们默认提供了的reactiveObjectRedisTemplate
@Override
public void run(ApplicationArguments args) throws Exception {
ReactiveHashOperations<String, String, String> hash = redisTemplate.opsForHash(); //取出我对hash的ops
CountDownLatch cd1 = new CountDownLatch(1);
//CountDownLatch允许一个或者多个线程一直等待,直到一组其它操作执行完成。在使用CountDownLatch时,需要指定一个整数值,此值是线程将要等待的操作数。当某个线程为了要执行这些操作而等待时,需要调用await方法。await方法让线程进入休眠状态直到所有等待的操作完成为止。当等待的某个操作执行完成,它使用countDown方法来减少CountDownLatch类的内部计数器。当内部计数器递减为0时,CountDownLatch会唤醒所有调用await方法而休眠的线程们。
List<Coffee> list = jdbcTemplate.query(
"select * from t_coffee" , (rs, i) -> //将对应的每一个对象进行处理
Coffee.builder()
.id(rs.getLong("id"))
.name(rs.getString("name"))
.price(rs.getLong("price"))
.build()
);
Flux.fromIterable(list)
.publishOn(Schedulers.single())
.doOnComplete(()->log.info("list ok"))
.flatMap(c->{
log.info("try to put {},{}",c.getName(),c.getPrice());
return hash.put(KEY,c.getName(),c.getPrice().toString());//保存到Redis中
})
.doOnComplete(()->log.info("set ok"))
.concatWith(redisTemplate.expire(KEY, Duration.ofMinutes(1)))//设置有效期(1分钟)
.doOnComplete(()->log.info("expire ok"))
.onErrorResume(e->{
log.error("exception {}",e.getMessage());
return Mono.just(false); //处理异常
})
.subscribe(b->log.info("Boolean {}",b), //打印每个元素的Boolean值
e->log.error("Exception {}",e.getMessage()), //有异常的时候 打印异常信息
()->cd1.countDown());
//内部计数器减一,如果计数达到零,唤醒所有等待的线程。
log.info("Waiting");
cd1.await();
}
}
结果如下:
Redis结果如下: