注入redisTemplate
@Autowired
private RedisTemplate<String,String> redisTemplate;
0 数字自增自降
Long number = redisTemplate.opsForValue().increment("SAVE_APP_USER_RECORD2", 1);
Long number = redisTemplate.opsForValue().increment("SAVE_APP_USER_RECORD2", 5);
Long number = redisTemplate.opsForValue().increment("SAVE_APP_USER_RECORD2", -5);
//第二个参数传正数就是+多少,传负数就是减多少
redisTemplate.delete("SAVE_APP_USER_RECORD2");
1 保存和读取Set
SetOperations<String, String> set = redisTemplate.opsForSet();
set.add("set1","22");
set.add("set1","33");
set.add("set1","44");
Set<String> resultSet =redisTemplate.opsForSet().members("set1");
System.out.println("resultSet:"+resultSet);
运行结果为:
resultSet:[[set3, set2, set1]] jedis
2、Hash结构,保存和读取map:
Map<String,String> map=new HashMap<String,String>();
map.put("key1","value1");
map.put("key2","value2");
map.put("key3","value3");
map.put("key4","value4");
map.put("key5","value5");
redisTemplate.opsForHash().putAll("map1",map);
Map<String,String> resultMap= redisTemplate.opsForHash().entries("map1");
List<String>reslutMapList=redisTemplate.opsForHash().values("map1");
Set<String>resultMapSet=redisTemplate.opsForHash().keys("map1");
String value=(String)redisTemplate.opsForHash().get("map1","key1");
System.out.println("value:"+value);
System.out.println("resultMapSet:"+resultMapSet);
System.out.println("resultMap:"+resultMap);
System.out.println("resulreslutMapListtMap:"+reslutMapList);
redisTemplate.opsForHash().delete("map1");
redisTemplate.opsForHash().delete("map1", "key1");
运行结果为:
value:value1
resultMapSet:[key1, key2, key5, key3, key4]
resultMap:{key3=value3, key2=value2, key1=value1, key5=value5, key4=value4}
resulreslutMapListtMap:[value1, value2, value5, value3, value4]
3、保存和读取list
List<String> list1=new ArrayList<String>();
list1.add("a1");
list1.add("a2");
list1.add("a3");
List<String> list2=new ArrayList<String>();
list2.add("b1");
list2.add("b2");
list2.add("b3");
redisTemplate.opsForList().leftPush("listkey1",list1);
redisTemplate.opsForList().rightPush("listkey2",list2);
List<String> resultList1=(List<String>)redisTemplate.opsForList().leftPop("listkey1");
List<String> resultList2=(List<String>)redisTemplate.opsForList().rightPop("listkey2");
System.out.println("resultList1:"+resultList1);
System.out.println("resultList2:"+resultList2);
运行结果:
resultList1:[a1, a2, a3]
resultList2:[b1, b2, b3]
这里需要解释一下:不管是leftPush还是rightPush都可以用leftPop或者rightPoP任意一种获取到其中的值,不过就是获取的遍历方向不一样。有学过数据结构的人都知道里面循环链表是可以前后遍历的,就和这里的场景是一样的。如果还有不懂的话可以去看看这部分的源代码,其实就是遍历方向不同,所以效率也不同。所以最好leftPush用leftPoP遍历,rightPush用rightPoP遍历
4、保存和读取String(最常用的)
System.out.println("缓存正在设置。。。。。。。。。");
redisTemplate.opsForValue().set("key1","value1");
redisTemplate.opsForValue().set("key2","value2");
redisTemplate.opsForValue().set("key3","value3");
redisTemplate.opsForValue().set("key4","value4");
System.out.println("缓存已经设置完毕。。。。。。。");
String result1=redisTemplate.opsForValue().get("key1").toString();
String result2=redisTemplate.opsForValue().get("key2").toString();
String result3=redisTemplate.opsForValue().get("key3").toString();
System.out.println("缓存结果为:result:"+result1+" "+result2+" "+result3);
5 redis 锁 + redis hash结构以及BlockingQueue队列代码
package com.allianity.config;
import com.allianity.config.thread.MyRejectedExecutionHandler;
import com.allianity.config.thread.ThreadExceptionHandle;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* 线程池 配置
*
* @Author YJX
* @Date 2018/3/24
*/
@Configuration
@EnableAsync
public class ExecutorConfig {
//@Autowired
//private TraceableThreadFactory traceableThreadFactory;
static {
Thread.setDefaultUncaughtExceptionHandler(ThreadExceptionHandle.INSTANCE);
}
/**
* 默认的线程池
*
* @return
*/
@Bean
@Primary
@Qualifier("defaultExecutor")
public Executor defaultExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);/*核心线程数*/
executor.setMaxPoolSize(13);/*最大线程数*/
executor.setQueueCapacity(10000);/*队列大小*/
executor.setKeepAliveSeconds(60);/* 某线程空闲超过1分钟,就回收该线程*/
executor.setAllowCoreThreadTimeOut(true); // KeepAliveSeconds 设置也作用于【核心线程数】
executor.setThreadNamePrefix("defaultExecutor-");
//executor.setThreadFactory(traceableThreadFactory);
//executor.setAwaitTerminationSeconds(3);
executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
/**
* App用户埋点持久化线程池
*
* @return
*/
@Bean
@Qualifier("appUsereRecordExecutor")
public Executor appUsereRecordExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);/*核心线程数*/
executor.setMaxPoolSize(13);/*最大线程数*/
executor.setQueueCapacity(30000);/*队列大小*/
executor.setKeepAliveSeconds(60);/* 某线程空闲超过1分钟,就回收该线程*/
executor.setAllowCoreThreadTimeOut(true); // KeepAliveSeconds 设置也作用于【核心线程数】
executor.setThreadNamePrefix("appUsereRecordExecutor-");
executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
/**
* App用户埋点持久化线程池
*
* @return
*/
@Bean
@Qualifier("appUsereRecordExecutor2")
public Executor appUsereRecordExecutor2() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);/*核心线程数*/
executor.setMaxPoolSize(13);/*最大线程数*/
executor.setQueueCapacity(30000);/*队列大小*/
executor.setKeepAliveSeconds(60);/* 某线程空闲超过1分钟,就回收该线程*/
executor.setAllowCoreThreadTimeOut(true); // KeepAliveSeconds 设置也作用于【核心线程数】
executor.setThreadNamePrefix("appUsereRecordExecutor2-");
executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
/*
* Copyright 2020 Wicrenet, Inc. All rights reserved.
*/
package com.allianity.modules.cms.service.impl;
import com.alibaba.fastjson.JSON;
import com.allianity.common.learning.entity.AppUserRecordEntity;
import com.allianity.common.learning.enums.RedisRouteKeyEnum;
import com.allianity.modules.cms.dao.AppUserRecordDao;
import com.allianity.modules.cms.service.AppUserRecordService2;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.joda.time.LocalDateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
* 【埋点统一接口】
*
* @author YJX
* Created on 2020/2/15 18:18
*/
@Service("app_user_record_service_impl2")
public class AppUserRecordServiceImpl2 extends ServiceImpl<AppUserRecordDao, AppUserRecordEntity> implements AppUserRecordService2, InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(AppUserRecordServiceImpl2.class);
//持久化的队列
private volatile BlockingQueue appUserRecordEntitieQueue2;
@Autowired
@Qualifier("appUsereRecordExecutor2")
private Executor executor2;
@Autowired
private AppUserRecordDao appUserRecordDao;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public Boolean saveAppUserRecord2(List<AppUserRecordEntity> appUserRecordEntity) {
try {
// 心跳处理时间逻辑
appUserRecordEntity.stream()
.filter(item -> item.getMatterId() != null)
.forEach(item -> {
switch (item.getEventaction().toString()) {
// 0-开始、1-结束、2-心跳
case "0":
//存Queue
item.setStaringTime(new Date());
item.setEndTime(new Date());
item.setDuration(0L);
appUserRecordEntitieQueue2.add(item);
break;
case "1":
//存Queue
item.setEndTime(new Date());
appUserRecordEntitieQueue2.add(item);
break;
case "2":
//心跳处理存入redis并实时更新,5分钟未刷新结束时间则加入持久化队列
String aperationCode = (String) redisTemplate.opsForHash().get(RedisRouteKeyEnum.APP_USER_RECORD.getKey(), item.getAperationCode());
if (StringUtils.isNotBlank(aperationCode)) {
AppUserRecordEntity recordEntity = JSON.parseObject(aperationCode, AppUserRecordEntity.class);
recordEntity.setDuration(recordEntity.getDuration() + 10);
redisTemplate.opsForHash().put(RedisRouteKeyEnum.APP_USER_RECORD.getKey(), item.getAperationCode(), JSON.toJSONString(recordEntity));
} else {
redisTemplate.opsForHash().put(RedisRouteKeyEnum.APP_USER_RECORD.getKey(), item.getAperationCode(), JSON.toJSONString(item));
}
break;
default:
logger.error("未知的事件类型:Eventaction:{}", JSON.toJSONString(item));
}
});
return true;
} catch (Exception e) {
logger.error("用户记录埋点数据进入队列异常:{}", e);
return false;
}
}
/**
* 定时持久化队列的 appUserRecordEntities数据
* 每30s执行一次
*/
@Scheduled(cron = "0/30 * * * * ? ")
public void execute() {
// redis数据处理:结束时间超过5分钟则将数据加入持久化队列
List<Object> objects = redisTemplate.opsForHash().values(RedisRouteKeyEnum.APP_USER_RECORD.getKey());
if (CollectionUtils.isNotEmpty(objects)) {
//获取锁,
boolean lock = getLock(RedisRouteKeyEnum.APP_USER_RECORD_LOCK_ID.getKey(), RedisRouteKeyEnum.APP_USER_RECORD_LOCK_ID.getTimeToLive());
if (lock) {
Long size = redisTemplate.opsForHash().size(RedisRouteKeyEnum.APP_USER_RECORD.getKey());
objects.forEach(item -> {
AppUserRecordEntity appUserRecordEntity = JSON.parseObject(item.toString(), AppUserRecordEntity.class);
if (new LocalDateTime(appUserRecordEntity.getEndTime()).plusMinutes(5).toDate().getTime() <= LocalDateTime.now().toDate().getTime()) {
//加入队列
appUserRecordEntitieQueue2.add(appUserRecordEntity);
redisTemplate.opsForHash().delete(RedisRouteKeyEnum.APP_USER_RECORD.getKey(), appUserRecordEntity.getAperationCode());
}
});
Long size2 = redisTemplate.opsForHash().size(RedisRouteKeyEnum.APP_USER_RECORD.getKey());
logger.info("用户记录埋点心跳数据redis中有:{}条数据,本次处理:{}条,剩余:{}条", size, size - size2, size2);
//释放锁
releaseLock(RedisRouteKeyEnum.APP_USER_RECORD_LOCK_ID.getKey());
}
}
if (appUserRecordEntitieQueue2.size() <= 0) {
return;
}
ArrayList<AppUserRecordEntity> list = new ArrayList<>();
long l = System.currentTimeMillis();
appUserRecordEntitieQueue2.drainTo(list, 3000);
this.appUserRecordDao.saveBatchAppUserRecordEntity(list);//自己写的批量插入,要做比对哪个效率高些//100条数据835毫秒 //500条数据5558毫秒 //1000条数据 4191毫秒 // 2000条数据 7032毫秒
logger.info("用户记录埋点队列长度: {},耗时:{}豪秒,处理了: {}条数据,队列还剩: {}", (appUserRecordEntitieQueue2.size() + list.size()), (System.currentTimeMillis() - l), list.size(), appUserRecordEntitieQueue2.size());
}
@Override
public void afterPropertiesSet() {
ThreadPoolTaskExecutor poolTaskExecutor = (ThreadPoolTaskExecutor) this.executor2;
this.appUserRecordEntitieQueue2 = poolTaskExecutor.getThreadPoolExecutor().getQueue();
}
/**
* 获得锁
*/
public boolean getLock(String lockId, long millisecond) {
Boolean success = redisTemplate.opsForValue().setIfAbsent(lockId, "lock",
millisecond, TimeUnit.MILLISECONDS);
return success != null && success;
}
/**
* 释放锁
*/
public void releaseLock(String lockId) {
redisTemplate.delete(lockId);
}
}
String value = (String) redisTemplate.opsForHash().get(RedisRouteKeyEnum.NEWS_WATCH_NUMBER_INCREMENT.getKey(), appNews.getNewsId() + "");
if (value == null) {
redisTemplate.opsForHash().put(RedisRouteKeyEnum.NEWS_WATCH_NUMBER_INCREMENT.getKey(), appNews.getNewsId() + "", "1");
} else {
redisTemplate.opsForHash().put(RedisRouteKeyEnum.NEWS_WATCH_NUMBER_INCREMENT.getKey(), appNews.getNewsId() + "", (Long.parseLong(value) + 1L) + "");
}
/**
* 【 课程观看次数更新, 】
*
* @author yangjunxiong
* @date 2020/2/25 02:42
**/
@Scheduled(cron = "0 0/1 * * * ? ")
public void execute() {
//课程观看次数持久化数据库
Map<Object, Object> courseWatchNumberIncrement = redisTemplate.opsForHash().entries(RedisRouteKeyEnum.COURSE_WATCH_NUMBER_INCREMENT.getKey());
for (Map.Entry<Object, Object> entry : courseWatchNumberIncrement.entrySet()) {
redisTemplate.opsForHash().delete(RedisRouteKeyEnum.COURSE_WATCH_NUMBER_INCREMENT.getKey(),entry.getKey());
CourseEntity byId = courseService.getById(entry.getKey().toString());
byId.setWatchNumber(byId.getWatchNumber() + Long.parseLong(entry.getValue().toString()));
courseService.updateById(byId);
}
//资讯观看次数持久化数据库
Map<Object, Object> newsWatchNumberIncrement = redisTemplate.opsForHash().entries(RedisRouteKeyEnum.NEWS_WATCH_NUMBER_INCREMENT.getKey());
for (Map.Entry<Object, Object> entry : newsWatchNumberIncrement.entrySet()) {
redisTemplate.opsForHash().delete(RedisRouteKeyEnum.NEWS_WATCH_NUMBER_INCREMENT.getKey(),entry.getKey());
com.allianity.modules.cms.entity.NewsEntity byId = appNewsDao.getEntity(Long.parseLong(entry.getValue().toString()));
byId.setViewCount(byId.getViewCount() + Integer.parseInt(entry.getValue().toString()));
appNewsDao.updateEntity(byId);
}
}
``