1、在maven中,添加依赖
<!--reids-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、在springboot启动时,随时启动redis
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.cache.RedisCacheWriter;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
@Configuration
//@EnableCaching
public class RedisConfig {
/**
* 管理缓存
*/
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl( Duration.ofHours(1)); // 设置缓存有效期一小时
return RedisCacheManager
.builder( RedisCacheWriter.nonLockingRedisCacheWriter(factory))
.cacheDefaults(redisCacheConfiguration).build();
}
/**
* RedisTemplate配置
*/
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
import org.springframework.context.annotation.Configuration;
@Configuration用于定义配置类,可替换xml配置文件,被注解的类内部包含有一个或多个被@Bean注解的方法,这些方法将会被AnnotationConfigApplicationContext或AnnotationConfigWebApplicationContext类进行扫描,并用于构建bean定义,初始化Spring容器。
Spring Boot不需要在xml配置注解扫描,需要你保证你的启动Spring Boot main入口,在这些类的上层包就行。
@Configuation等价于<Beans></Beans>
@Bean等价于<Bean></Bean>
3、redis的包含5中数据结构的工具类方法
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
@Component
public class RedisUtil {
private static Logger logger = LoggerFactory.getLogger(RedisUtil.class);
@Autowired
private RedisTemplate redisTemplate;
/**
* 写入缓存
* @param key
* @param value
* @return
*/
public boolean set(final String key, Object value) {
boolean result = false;
try {
ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
operations.set(key, value);
result = true;
} catch (Exception e) {
}
return result;
}
/**
* 写入缓存设置时效时间
* @param key
* @param value
* @return
*/
public boolean set(final String key, Object value, Long expireTime) {
boolean result = false;
try {
ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
operations.set(key, value);
redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
result = true;
} catch (Exception e) {
logger.error("excepiton",e);
}
return result;
}
/**
* 批量删除对应的value
* @param keys
*/
public void remove(final String... keys) {
for (String key : keys) {
remove(key);
}
}
/**
* 批量删除key
* @param pattern
*/
public void removePattern(final String pattern) {
Set<Serializable> keys = redisTemplate.keys(pattern);
if (keys.size() > 0)
redisTemplate.delete(keys);
}
public List getAllValuesByKeysPatten(String pattern){
Set<String> keys = redisTemplate.keys(pattern);
if (CollectionUtils.isEmpty(keys)) {
return new ArrayList<>();
}
ValueOperations<Serializable, Object> valueOperations = redisTemplate.opsForValue();
List resourceCacheBOList = new ArrayList<>();
for (String accurateKey : keys) {
Object cacheValue = valueOperations.get(accurateKey);
// List sub = JSONArray.parseArray(cacheValue, clea);
resourceCacheBOList.add(cacheValue);
}
return resourceCacheBOList;
}
/**
* 删除对应的value
* @param key
*/
public void remove(final String key) {
if (exists(key)) {
redisTemplate.delete(key);
}
}
/**
* 判断缓存中是否有对应的value
* @param key
* @return
*/
public boolean exists(final String key) {
return redisTemplate.hasKey(key);
}
/**
* 读取缓存
* @param key
* @return
*/
public Object get(final String key) {
// redisTemplate.setHashValueSerializer(new FastJsonRedisSerializer<>(newClass));
Object result = null;
ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
result = operations.get(key);
return result;
}
/**
* 哈希 添加
* @param key
* @param hashKey
* @param value
*/
public void hmSet(String key, Object hashKey, Object value,Class newCleass){
// RedisTemplate newRedisTemplate = redisTemplate;
redisTemplate.setHashValueSerializer(new FastJsonRedisSerializer<>(newCleass));
HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();
hash.put(key,hashKey,value);
// redisTemplate.setHashValueSerializer(new FastJsonRedisSerializer<>(String.class));
}
/**
* 哈希 添加
* @param key
* @param value
*/
public void hmSetMap(String key, Map value){
HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();
hash.putAll(key,value);
}
/**
* 哈希获取数据
* @param key
* @param hashKey
* @return
*/
public Object hmGet(String key, Object hashKey){
HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();
return hash.get(key,hashKey);
}
/**
* 哈希获取全部field数据
* @param key
* @return
*/
public Map<Object, Object> hmGetAll(String key){
HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();
return hash.entries(key);
}
/**
* 列表添加
* @param k
* @param v
*/
public void lPush(String k,Object v){
ListOperations<String, Object> list = redisTemplate.opsForList();
list.rightPush(k,v);
}
/**
* 列表获取
* @param k
* @param l
* @param l1
* @return
*/
public List<Object> lRange(String k, long l, long l1){
ListOperations<String, Object> list = redisTemplate.opsForList();
return list.range(k,l,l1);
}
/**
* 列表截取
* @param k
* @param l
* @param l1
* @return
*/
public void lTrim(String k, long l, long l1){
ListOperations<String, Object> list = redisTemplate.opsForList();
list.trim(k,l,l1);
}
/**
* 集合添加
* @param key
* @param value
*/
public void add(String key,Object value){
SetOperations<String, Object> set = redisTemplate.opsForSet();
set.add(key,value);
}
/**
* 集合获取
* @param key
* @return
*/
public Set<Object> getMembers(String key){
SetOperations<String, Object> set = redisTemplate.opsForSet();
return set.members(key);
}
/**
* 有序集合添加
* @param key
* @param value
* @param scoure
*/
public void zAdd(String key,Object value,double scoure){
ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
zset.add(key,value,scoure);
}
/**
* 有序集合获取
* @param key
* @param scoure
* @param scoure1
* @return
*/
public Set<Object> rangeByScore(String key, double scoure, double scoure1){
ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
return zset.rangeByScore(key, scoure, scoure1);
}
/**
* 有序集合获取
* @param oldKey
* @param newKey
* @return
*/
public boolean reKey(String oldKey,String newKey){
try {
redisTemplate.rename(oldKey,newKey);
return true;
}catch (Exception e){
return false;
}
}
public Map smembers(String key){
try {
HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();
return hash.entries(key);
}catch (Exception e){
return new HashMap();
}
}
public Set<String> getKeys(String pattern) {
return redisTemplate.keys(pattern);
}
}
4、RedisUtil的生产者消费者的消息队列应用
import com.alibaba.fastjson.JSONObject;
import com.a.b.c.commons.util.RedisUtil;
import com.a.b.c.service.trend.curve.data.process.GModelFactory;
import com.a.b.c.service.trend.curve.websocket.server.TWebSocket;
import com.a.b.c.view.model.RTimeData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* <b><code>GTIcatorMessageReceiver</code></b>
* <p>
* class_comment
* </p>
* <b>Create Time:</b> 2019/12/10 17:54
*
* @author ong
* @version 0.0.1
* @since Score-be 0.0.1
*/
@Component
public class GTIcatorMessageReceiver {
@Autowired
private GModelFactory modelFactory;
@Autowired
RedisUtil redisUtil;
/**
* The logger.
*/
private static Logger logger = LoggerFactory.getLogger(GTIcatorMessageReceiver.class);
public void receiveMessage(String message){
Set queryTypes = TWebSocket.getQueryTypes();
//todo add auto config
queryTypes.add("boiler_dongjin,trendCurve,120");
queryTypes.add("boiler_dongjin,trendCurve,130");
queryTypes.add("boiler_dongjin,trendCurve,140");
queryTypes.add("boiler_dongjin,trendCurve,all_steam_pressure");
String [] records = message.split(";");
for (String record : records) {
String [] dataheader = record.substring(0,record.indexOf(",",record.indexOf(",")+1)).split(",");
String dataContent = record.substring(record.indexOf(",",record.indexOf(",")+1)+1,record.length());
String dateLong = dataheader[1].length()<11?dataheader[1]+"000":dataheader[1];
RTimeData RTimeData = new RTimeData();
RTimeData.setDate(Long.parseLong(dateLong));
if(modelFactory.updateSourceData(dataContent)){
for(Object queryType : queryTypes){
if(queryType.toString().contains(dataheader[0]) || queryType.toString().contains("all_")){
List sendData = new ArrayList();
RTimeData.setQueryType(queryType.toString());
RTimeData.setDeviceId(dataheader[0]);
Map<String, Object> realData = modelFactory.getRealTimedataByqueryType(queryType.toString());
RTimeData.setRealData(realData);
try {
String queryTypeStr = queryType.toString();
sendData.add(RTimeData);
String modeDataStr = JSONObject.toJSONString(sendData);
// 发送数据
TWebSocket.sendInfo(queryTypeStr,modeDataStr);
//缓存实时曲线数据
if(queryTypeStr.contains("trendCurve")){
redisUtil.lPush(queryTypeStr,JSONObject.toJSONString(RTimeData));
if(queryTypeStr.contains("all_")){
redisUtil.lTrim(queryTypeStr,-2700,-1);
}else {
redisUtil.lTrim(queryTypeStr,-900,-1);
}
}
} catch (IOException e) {
logger.error("实时数据redis to websocket 发送失败");
}
}
}
}
}
}
}
其中redis的应用“左推右拉”的生产者消费者的消息队列,lpush+ltrim=Capped Collection(有限集合)
5、操作List的lrange运用
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSONObject;
import com.a.b.c.commons.util.ApplicationContextRegister;
import com.a.b.c.commons.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
/**
* <b><code>TWebSocket</code></b>
* <p>
* class_comment
* </p>
* <b>Create Time:</b> 2019/12/9 16:18
*
* @author ong
* @version 0.0.1
* @since core-be 0.0.1
*/
@ServerEndpoint(value = "/trend/curve")
@Component
public class TWebSocket {
/**
* The constant LOG.
*/
private static Logger LOG = LoggerFactory.getLogger(TWebSocket.class);
private RedisUtil redisUtil;
public static TWebSocket TWebSocket;
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<TWebSocket> webSocketSet = new CopyOnWriteArraySet<TWebSocket>();
private static CopyOnWriteArraySet<String> queryTypes= new CopyOnWriteArraySet<String>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//某个客户端连接请求的数据类型
private String queryType;
//是否开始接受数据
private String status;
/**
* 连接建立成功调用的方法
* */
@OnOpen
public void onOpen(Session session) {
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
this.queryType = session.getQueryString(); //
this.status = "wait";
queryTypes.add(this.queryType);
LOG.info("有新连接加入!当前在线人数为" + getOnlineCount());
try {
sendMessage("连接成功");
LOG.info("请求数据类型为:${}",session.getQueryString());
} catch (IOException e) {
LOG.error("websocket IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
//queryTypes.remove(this.queryType);
subOnlineCount(); //在线数减1
LOG.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) throws IOException {
LOG.info("来自客户端的消息:" + message);
//群发消息
if(message != null && "request_history".equals(message) ){
if(redisUtil == null){
ApplicationContext act = ApplicationContextRegister.getApplicationContext();
redisUtil= act.getBean(RedisUtil.class);
}
List<Object> historyData;
if(this.queryType.contains("all_")){
historyData = redisUtil.lRange(this.queryType,-2700,-1);
}else {
historyData = redisUtil.lRange(this.queryType,-900,-1);
}
List coverData = new ArrayList();
for(Object record : historyData){
try{
coverData.add(JSONObject.parse(record.toString()));
}catch (Exception e){}
}
this.sendMessage(JSONObject.toJSONString(coverData));
this.status = "request";
}else if("request".equals(message)){
this.status="request";
}
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
LOG.error("发生错误");
error.printStackTrace();
}
public void sendMessage(String message) throws IOException {
synchronized (session) {
this.session.getBasicRemote().sendText(message);
}
}
/**
* 群发自定义消息
* */
public static void sendInfo(String queryType,String message) throws IOException {
for (TWebSocket item : webSocketSet) {
if(item.queryType.equals(queryType) && "request".equals(item.status)){
try {
item.sendMessage(message);
} catch (IOException e) {
LOG.error("消息数据发送失败 websocker to brower!");
continue;
}
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized Set getQueryTypes() {
return queryTypes;
}
public static synchronized void addOnlineCount() {
TWebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
TWebSocket.onlineCount--;
}
}
redis中每个列表可存储40多亿,List的lrange操作如下: