写作时间:2019-12-01
Spring Boot: 2.1 ,JDK: 1.8, IDE: IntelliJ IDEA
1. 说明
响应式编程操作Redis
,Lettuce
能够支持Reactive
方式。
Spring Data Redis 中主要的支持:
ReactiveRedisConnection
ReactiveRedisConnectionFactory
ReactiveRedisTemplate
opsForXxx()
响应式编程的概念请参考:
第三十篇:SpringBoot Reactor响应式编程介绍
第三十一篇:SpringBoot Reactor响应式编程实战一。
2. Docker 启动Redis
查看历史上启动过的Redis进程
% docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8eb8d32453bb redis "docker-entrypoint.s…" 2 months ago Up 19 minutes 0.0.0.0:6379->6379/tcp redis
启动Redis进程
% docker start redis
redis
启动Redis客户端, 查看所有的keys,目前是空的。
% docker exec -it redis redis-cli
127.0.0.1:6379> keys *
(empty list or set)
详细解释可以参考:
第廿三篇:SpringBoot之Docker入门
第三十二篇:Redis Docker入门
3. 工程建立
参照教程【SpringBoot 2.1 | 第一篇:构建第一个SpringBoot工程】新建一个Spring Boot项目,名字叫RPCClient, 在目录src/main/java/resources
下找到配置文件application.properties
,重命名为application.yml
。
Spring Boot 版本选择2.1.10,依赖勾选Developer Tools > Lombok.
SQL > Spring Data JDBC, H2 Database
NoSQL > Spring Data Reactive Redis
4. application.yml配置文件
spring:
redis:
host: "localhost"
5. 数据库建表和初始化数据
在 application.yml
的同级目录,创建文件schema.sql
drop table t_coffee if exists;
create table t_coffee (
id bigint auto_increment,
create_time timestamp,
update_time timestamp,
name varchar(255),
price bigint,
primary key (id)
);
在 application.yml
的同级目录,创建文件data.sql
insert into t_coffee (name, price, create_time, update_time) values ('espresso', 2000, now(), now());
insert into t_coffee (name, price, create_time, update_time) values ('latte', 2500, now(), now());
insert into t_coffee (name, price, create_time, update_time) values ('capuccino', 2500, now(), now());
insert into t_coffee (name, price, create_time, update_time) values ('mocha', 3000, now(), now());
insert into t_coffee (name, price, create_time, update_time) values ('macchiato', 3000, now(), now());
6. 创建Model
com.zgpeace.reactive.Coffee
package com.zgpeace.reactive;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Coffee {
private Long id;
private String name;
private Long price;
}
7. Controller
package com.zgpeace.reactive;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveHashOperations;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@SpringBootApplication
@Slf4j
public class ReactiveApplication implements ApplicationRunner {
private static final String KEY = "COFFEE_MENU";
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private ReactiveStringRedisTemplate redisTemplate;
public static void main(String[] args) {
SpringApplication.run(ReactiveApplication.class, args);
}
@Bean
ReactiveStringRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
return new ReactiveStringRedisTemplate(factory);
}
@Override
public void run(ApplicationArguments args) throws Exception {
ReactiveHashOperations<String, String, String> hashOps = redisTemplate.opsForHash();
CountDownLatch cdl = new CountDownLatch(1);
List<Coffee> list = jdbcTemplate.query(
"select * from t_coffee", (rs, i) ->
Coffee.builder()
.id(rs.getLong("id"))
.name(rs.getString("name"))
.price(rs.getLong("price"))
.build()
);
Flux.fromIterable(list)
.publishOn(Schedulers.single())
.doOnComplete(() -> log.info("list ok"))
.flatMap(c -> {
log.info("try to put {},{}", c.getName(), c.getPrice());
return hashOps.put(KEY, c.getName(), c.getPrice().toString());
})
.doOnComplete(() -> log.info("set ok"))
.concatWith(redisTemplate.expire(KEY, Duration.ofMinutes(1)))
.doOnComplete(() -> log.info("expire ok"))
.onErrorResume(e -> {
log.error("exception {}", e.getMessage());
return Mono.just(false);
})
.subscribe(b -> log.info("Boolean: {}", b),
e -> log.error("Exception {}", e.getMessage()),
() -> cdl.countDown());
log.info("Waiting");
cdl.await();
}
}
解析:
jdbcTemplate.query
查询数据库中数据,并组装为ListFlux.fromIterable(list)
遍历列表publishOn(Schedulers.single())
在单线程中Publisher
执行flatMap
把数组存入redis
concatWith(redisTemplate.expire(KEY, Duration.ofMinutes(1)))
设置Redis数据过期时间为1分钟。CountDownLatch
同步线程锁,一次只允许一个线程任务处理。
结果输出如下
Bootstrapping Spring Data repositories in DEFAULT mode.
[ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 1ms. Found 0 repository interfaces.
[ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
[ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
[ main] c.zgpeace.reactive.ReactiveApplication : Started ReactiveApplication in 1.279 seconds (JVM running for 1.779)
[ main] c.zgpeace.reactive.ReactiveApplication : Waiting
[ single-1] c.zgpeace.reactive.ReactiveApplication : try to put espresso,2000
[ single-1] io.lettuce.core.EpollProvider : Starting without optional epoll library
[ single-1] io.lettuce.core.KqueueProvider : Starting without optional kqueue library
[ single-1] c.zgpeace.reactive.ReactiveApplication : try to put latte,2500
[ single-1] c.zgpeace.reactive.ReactiveApplication : try to put capuccino,2500
[ single-1] c.zgpeace.reactive.ReactiveApplication : try to put mocha,3000
[ single-1] c.zgpeace.reactive.ReactiveApplication : try to put macchiato,3000
[ single-1] c.zgpeace.reactive.ReactiveApplication : list ok
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication : set ok
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication : Boolean: true
[ioEventLoop-4-1] c.zgpeace.reactive.ReactiveApplication : expire ok
[ Thread-5] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
[ Thread-5] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
8. 查看Redis数据
在Terminal中查看Redis数据
127.0.0.1:6379> keys *
1) "COFFEE_MENU"
127.0.0.1:6379> hgetall COFFEE_MENU
1) "espresso"
2) "2000"
3) "latte"
4) "2500"
5) "capuccino"
6) "2500"
7) "mocha"
8) "3000"
9) "macchiato"
10) "3000"
9. 代码下载
https://github.com/zgpeace/Spring-Boot2.1/tree/master/reactor/ReactiveRedisDemo
10. 参考
https://github.com/geektime-geekbang/geektime-spring-family