RedisTemplate的各种操作(set、hash、list、string)以及BlockingQueue队列处理 高并发

常用基础方法

注入redisTemplate

@Autowired
private RedisTemplate<String,String> redisTemplate;

0 数字自增自降

        Long number = redisTemplate.opsForValue().increment("SAVE_APP_USER_RECORD2", 1);
        Long number = redisTemplate.opsForValue().increment("SAVE_APP_USER_RECORD2", 5);
        Long number = redisTemplate.opsForValue().increment("SAVE_APP_USER_RECORD2", -5);
        //第二个参数传正数就是+多少,传负数就是减多少
        redisTemplate.delete("SAVE_APP_USER_RECORD2");

1 保存和读取Set

SetOperations<String, String> set = redisTemplate.opsForSet();
		set.add("set1","22");  
		set.add("set1","33");  
		set.add("set1","44");  
		Set<String> resultSet =redisTemplate.opsForSet().members("set1");  
		System.out.println("resultSet:"+resultSet); 

运行结果为:

resultSet:[[set3, set2, set1]] jedis

2、Hash结构,保存和读取map:

Map<String,String> map=new HashMap<String,String>();  
map.put("key1","value1");  
map.put("key2","value2");  
map.put("key3","value3");  
map.put("key4","value4");  
map.put("key5","value5");  
redisTemplate.opsForHash().putAll("map1",map);  
Map<String,String> resultMap= redisTemplate.opsForHash().entries("map1");  
List<String>reslutMapList=redisTemplate.opsForHash().values("map1");  
Set<String>resultMapSet=redisTemplate.opsForHash().keys("map1");  
String value=(String)redisTemplate.opsForHash().get("map1","key1");  
System.out.println("value:"+value);  
System.out.println("resultMapSet:"+resultMapSet);  
System.out.println("resultMap:"+resultMap);  
System.out.println("resulreslutMapListtMap:"+reslutMapList); 

redisTemplate.opsForHash().delete("map1");
redisTemplate.opsForHash().delete("map1", "key1");

运行结果为:

value:value1

resultMapSet:[key1, key2, key5, key3, key4]

resultMap:{key3=value3, key2=value2, key1=value1, key5=value5, key4=value4}

resulreslutMapListtMap:[value1, value2, value5, value3, value4]

3、保存和读取list

List<String> list1=new ArrayList<String>();  
 list1.add("a1");  
list1.add("a2");  
 list1.add("a3");  
 
 List<String> list2=new ArrayList<String>();  
  list2.add("b1");  
 list2.add("b2");  
 list2.add("b3");  
redisTemplate.opsForList().leftPush("listkey1",list1);  
redisTemplate.opsForList().rightPush("listkey2",list2);  
List<String> resultList1=(List<String>)redisTemplate.opsForList().leftPop("listkey1");  
 List<String> resultList2=(List<String>)redisTemplate.opsForList().rightPop("listkey2");  
System.out.println("resultList1:"+resultList1);  
System.out.println("resultList2:"+resultList2);  

运行结果:

resultList1:[a1, a2, a3]

resultList2:[b1, b2, b3]

这里需要解释一下:不管是leftPush还是rightPush都可以用leftPop或者rightPoP任意一种获取到其中的值,不过就是获取的遍历方向不一样。有学过数据结构的人都知道里面循环链表是可以前后遍历的,就和这里的场景是一样的。如果还有不懂的话可以去看看这部分的源代码,其实就是遍历方向不同,所以效率也不同。所以最好leftPush用leftPoP遍历,rightPush用rightPoP遍历

4、保存和读取String(最常用的)

System.out.println("缓存正在设置。。。。。。。。。");  
redisTemplate.opsForValue().set("key1","value1");  
redisTemplate.opsForValue().set("key2","value2");  
redisTemplate.opsForValue().set("key3","value3");  
redisTemplate.opsForValue().set("key4","value4");  
System.out.println("缓存已经设置完毕。。。。。。。");  
String result1=redisTemplate.opsForValue().get("key1").toString();  
String result2=redisTemplate.opsForValue().get("key2").toString();  
String result3=redisTemplate.opsForValue().get("key3").toString();  
System.out.println("缓存结果为:result:"+result1+"  "+result2+"   "+result3);  

5 redis 锁 + redis hash结构以及BlockingQueue队列代码

package com.allianity.config;

import com.allianity.config.thread.MyRejectedExecutionHandler;
import com.allianity.config.thread.ThreadExceptionHandle;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

/**
 * 线程池 配置
 *
 * @Author YJX
 * @Date 2018/3/24
 */
@Configuration
@EnableAsync
public class ExecutorConfig {
    //@Autowired
    //private TraceableThreadFactory traceableThreadFactory;

    static {
        Thread.setDefaultUncaughtExceptionHandler(ThreadExceptionHandle.INSTANCE);
    }

    /**
     * 默认的线程池
     *
     * @return
     */
    @Bean
    @Primary
    @Qualifier("defaultExecutor")
    public Executor defaultExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);/*核心线程数*/
        executor.setMaxPoolSize(13);/*最大线程数*/
        executor.setQueueCapacity(10000);/*队列大小*/
        executor.setKeepAliveSeconds(60);/* 某线程空闲超过1分钟,就回收该线程*/
        executor.setAllowCoreThreadTimeOut(true);   // KeepAliveSeconds 设置也作用于【核心线程数】
        executor.setThreadNamePrefix("defaultExecutor-");
        //executor.setThreadFactory(traceableThreadFactory);
        //executor.setAwaitTerminationSeconds(3);
        executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    /**
     * App用户埋点持久化线程池
     *
     * @return
     */
    @Bean
    @Qualifier("appUsereRecordExecutor")
    public Executor appUsereRecordExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);/*核心线程数*/
        executor.setMaxPoolSize(13);/*最大线程数*/
        executor.setQueueCapacity(30000);/*队列大小*/
        executor.setKeepAliveSeconds(60);/* 某线程空闲超过1分钟,就回收该线程*/
        executor.setAllowCoreThreadTimeOut(true);   // KeepAliveSeconds 设置也作用于【核心线程数】
        executor.setThreadNamePrefix("appUsereRecordExecutor-");
        executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    /**
     * App用户埋点持久化线程池
     *
     * @return
     */
    @Bean
    @Qualifier("appUsereRecordExecutor2")
    public Executor appUsereRecordExecutor2() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);/*核心线程数*/
        executor.setMaxPoolSize(13);/*最大线程数*/
        executor.setQueueCapacity(30000);/*队列大小*/
        executor.setKeepAliveSeconds(60);/* 某线程空闲超过1分钟,就回收该线程*/
        executor.setAllowCoreThreadTimeOut(true);   // KeepAliveSeconds 设置也作用于【核心线程数】
        executor.setThreadNamePrefix("appUsereRecordExecutor2-");
        executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}


/*
 * Copyright 2020 Wicrenet, Inc. All rights reserved.
 */
package com.allianity.modules.cms.service.impl;

import com.alibaba.fastjson.JSON;
import com.allianity.common.learning.entity.AppUserRecordEntity;
import com.allianity.common.learning.enums.RedisRouteKeyEnum;
import com.allianity.modules.cms.dao.AppUserRecordDao;
import com.allianity.modules.cms.service.AppUserRecordService2;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.joda.time.LocalDateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
 * 【埋点统一接口】
 *
 * @author YJX
 * Created on 2020/2/15 18:18
 */
@Service("app_user_record_service_impl2")
public class AppUserRecordServiceImpl2 extends ServiceImpl<AppUserRecordDao, AppUserRecordEntity> implements AppUserRecordService2, InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(AppUserRecordServiceImpl2.class);

    //持久化的队列
    private volatile BlockingQueue appUserRecordEntitieQueue2;

    @Autowired
    @Qualifier("appUsereRecordExecutor2")
    private Executor         executor2;
    @Autowired
    private AppUserRecordDao appUserRecordDao;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Override
    public Boolean saveAppUserRecord2(List<AppUserRecordEntity> appUserRecordEntity) {

        try {
//          心跳处理时间逻辑
            appUserRecordEntity.stream()
                    .filter(item -> item.getMatterId() != null)
                    .forEach(item -> {
                        switch (item.getEventaction().toString()) {
//                          0-开始、1-结束、2-心跳
                            case "0":
                                //存Queue
                                item.setStaringTime(new Date());
                                item.setEndTime(new Date());
                                item.setDuration(0L);
                                appUserRecordEntitieQueue2.add(item);
                                break;
                            case "1":
                                //存Queue
                                item.setEndTime(new Date());
                                appUserRecordEntitieQueue2.add(item);
                                break;
                            case "2":
                                //心跳处理存入redis并实时更新,5分钟未刷新结束时间则加入持久化队列
                                String aperationCode = (String) redisTemplate.opsForHash().get(RedisRouteKeyEnum.APP_USER_RECORD.getKey(), item.getAperationCode());
                                if (StringUtils.isNotBlank(aperationCode)) {
                                    AppUserRecordEntity recordEntity = JSON.parseObject(aperationCode, AppUserRecordEntity.class);
                                    recordEntity.setDuration(recordEntity.getDuration() + 10);
                                    redisTemplate.opsForHash().put(RedisRouteKeyEnum.APP_USER_RECORD.getKey(), item.getAperationCode(), JSON.toJSONString(recordEntity));
                                } else {
                                    redisTemplate.opsForHash().put(RedisRouteKeyEnum.APP_USER_RECORD.getKey(), item.getAperationCode(), JSON.toJSONString(item));
                                }
                                break;
                            default:
                                logger.error("未知的事件类型:Eventaction:{}", JSON.toJSONString(item));
                        }

                    });
            return true;
        } catch (Exception e) {
            logger.error("用户记录埋点数据进入队列异常:{}", e);
            return false;
        }
    }

    /**
     * 定时持久化队列的  appUserRecordEntities数据
     * 每30s执行一次
     */
    @Scheduled(cron = "0/30 * * * * ? ")
    public void execute() {
//      redis数据处理:结束时间超过5分钟则将数据加入持久化队列
        List<Object> objects = redisTemplate.opsForHash().values(RedisRouteKeyEnum.APP_USER_RECORD.getKey());
        if (CollectionUtils.isNotEmpty(objects)) {
            //获取锁,
            boolean lock = getLock(RedisRouteKeyEnum.APP_USER_RECORD_LOCK_ID.getKey(), RedisRouteKeyEnum.APP_USER_RECORD_LOCK_ID.getTimeToLive());
            if (lock) {
                Long size = redisTemplate.opsForHash().size(RedisRouteKeyEnum.APP_USER_RECORD.getKey());
                objects.forEach(item -> {
                    AppUserRecordEntity appUserRecordEntity = JSON.parseObject(item.toString(), AppUserRecordEntity.class);
                    if (new LocalDateTime(appUserRecordEntity.getEndTime()).plusMinutes(5).toDate().getTime() <= LocalDateTime.now().toDate().getTime()) {
                        //加入队列
                        appUserRecordEntitieQueue2.add(appUserRecordEntity);
                        redisTemplate.opsForHash().delete(RedisRouteKeyEnum.APP_USER_RECORD.getKey(), appUserRecordEntity.getAperationCode());
                    }
                });
                Long size2 = redisTemplate.opsForHash().size(RedisRouteKeyEnum.APP_USER_RECORD.getKey());
                logger.info("用户记录埋点心跳数据redis中有:{}条数据,本次处理:{}条,剩余:{}条", size, size - size2, size2);
                //释放锁
                releaseLock(RedisRouteKeyEnum.APP_USER_RECORD_LOCK_ID.getKey());
            }
        }

        if (appUserRecordEntitieQueue2.size() <= 0) {
            return;
        }
        ArrayList<AppUserRecordEntity> list = new ArrayList<>();
        long l = System.currentTimeMillis();
        appUserRecordEntitieQueue2.drainTo(list, 3000);
        this.appUserRecordDao.saveBatchAppUserRecordEntity(list);//自己写的批量插入,要做比对哪个效率高些//100条数据835毫秒  //500条数据5558毫秒  //1000条数据 4191毫秒 // 2000条数据 7032毫秒
        logger.info("用户记录埋点队列长度: {},耗时:{}豪秒,处理了: {}条数据,队列还剩: {}", (appUserRecordEntitieQueue2.size() + list.size()), (System.currentTimeMillis() - l), list.size(), appUserRecordEntitieQueue2.size());
    }

    @Override
    public void afterPropertiesSet() {
        ThreadPoolTaskExecutor poolTaskExecutor = (ThreadPoolTaskExecutor) this.executor2;
        this.appUserRecordEntitieQueue2 = poolTaskExecutor.getThreadPoolExecutor().getQueue();
    }

    /**
     * 获得锁
     */
    public boolean getLock(String lockId, long millisecond) {
        Boolean success = redisTemplate.opsForValue().setIfAbsent(lockId, "lock",
                millisecond, TimeUnit.MILLISECONDS);
        return success != null && success;
    }

    /**
     * 释放锁
     */
    public void releaseLock(String lockId) {
        redisTemplate.delete(lockId);
    }
}

            String value = (String) redisTemplate.opsForHash().get(RedisRouteKeyEnum.NEWS_WATCH_NUMBER_INCREMENT.getKey(), appNews.getNewsId() + "");
            if (value == null) {
                redisTemplate.opsForHash().put(RedisRouteKeyEnum.NEWS_WATCH_NUMBER_INCREMENT.getKey(), appNews.getNewsId() + "", "1");
            } else {
                redisTemplate.opsForHash().put(RedisRouteKeyEnum.NEWS_WATCH_NUMBER_INCREMENT.getKey(), appNews.getNewsId() + "", (Long.parseLong(value) + 1L) + "");
            }


    /**
     * 【 课程观看次数更新, 】
     *
     * @author yangjunxiong
     * @date 2020/2/25 02:42
     **/
    @Scheduled(cron = "0 0/1 * * * ? ")
    public void execute() {
        //课程观看次数持久化数据库
        Map<Object, Object> courseWatchNumberIncrement = redisTemplate.opsForHash().entries(RedisRouteKeyEnum.COURSE_WATCH_NUMBER_INCREMENT.getKey());
        for (Map.Entry<Object, Object> entry : courseWatchNumberIncrement.entrySet()) {
            redisTemplate.opsForHash().delete(RedisRouteKeyEnum.COURSE_WATCH_NUMBER_INCREMENT.getKey(),entry.getKey());
            CourseEntity byId = courseService.getById(entry.getKey().toString());
            byId.setWatchNumber(byId.getWatchNumber() + Long.parseLong(entry.getValue().toString()));
            courseService.updateById(byId);
        }
        //资讯观看次数持久化数据库
        Map<Object, Object> newsWatchNumberIncrement = redisTemplate.opsForHash().entries(RedisRouteKeyEnum.NEWS_WATCH_NUMBER_INCREMENT.getKey());
        for (Map.Entry<Object, Object> entry : newsWatchNumberIncrement.entrySet()) {
            redisTemplate.opsForHash().delete(RedisRouteKeyEnum.NEWS_WATCH_NUMBER_INCREMENT.getKey(),entry.getKey());
            com.allianity.modules.cms.entity.NewsEntity byId = appNewsDao.getEntity(Long.parseLong(entry.getValue().toString()));

            byId.setViewCount(byId.getViewCount() + Integer.parseInt(entry.getValue().toString()));
            appNewsDao.updateEntity(byId);
        }

    }
``
发布了71 篇原创文章 · 获赞 3 · 访问量 8740

猜你喜欢

转载自blog.csdn.net/qq_40250122/article/details/104402664