服务器处理网关信息的队列——Redis

1.Redis安装

此次简单介绍redis的单点安装。
安装环境:ubuntu 16.04.5
安装步骤:

  1. 更新源
sudo apt-get update
  1. 安装redis
sudo apt-get install redis-server
  1. 查询是否开启
    redis在安装成功后会自动开启服务 默认端口 6379
ps aux|grep redis
  1. 修改配置文件
    redis配置文件路径 /etc/redis/redis.conf
    注释掉:bind 127.0.0.1,使远程可以访问
    在这里插入图片描述
    添加用户校验 requirepass wentaoZhang
    在这里插入图片描述

  2. 重启服务

service redis-server restart

2.应用场景

redis 的应用场景很多,排行榜、分布式锁、消息队列等。此次我们使用redis 的list 实现一个消息的消费队列,并赋予消息生命周期。
网关传送至服务器的数据有PUSH_DATA和PULL_DATA,需要不同的业务处理,为了应对大并发条件下的数据处理,利用redis的list 将PUSH_DATA和PULL_DATA放入两个消费队列中。

3.撸

我使用Redis 的java 实现 Jedis
添加依赖

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.0.1</version>
        </dependency>

参数硬编到代码


import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {
    private static String HOST = "192.168.237.128";
    private static int PORT = 6379;
    private static String PWD = "wentaoZhang";
    private static JedisPool jedisPool = null;
    private static Jedis jedis = null;

    /**
     * 初始化Redis连接池
     */
    static {
        JedisPoolConfig config = new JedisPoolConfig();
        //最大连接数,如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
        config.setMaxTotal(50);
        //最大空闲数,控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
        config.setMaxIdle(50);
        //最小空闲数
        config.setMinIdle(10);
        //是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个
        config.setTestOnBorrow(true);
        //在return给pool时,是否提前进行validate操作
        config.setTestOnReturn(true);
        //在空闲时检查有效性,默认false
        config.setTestWhileIdle(true);
        //表示一个对象至少停留在idle状态的最短时间,然后才能被idle object evitor扫描并驱逐;
        //这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义
        config.setMinEvictableIdleTimeMillis(30000);
        //表示idle object evitor两次扫描之间要sleep的毫秒数
        config.setTimeBetweenEvictionRunsMillis(60000);
        //表示idle object evitor每次扫描的最多的对象数
        config.setNumTestsPerEvictionRun(1000);
        //等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
        config.setMaxWaitMillis(-1);

        jedisPool = new JedisPool(config, HOST, PORT, 10000, PWD);
    }

    public static Jedis getJedis(){
        jedis =  jedisPool.getResource();
        return jedis;
    }

    public static void returnJedis(){
        if(jedis!= null && jedisPool!= null){
            jedisPool.close();
        }
    }

    public static void lpush(String list,String value){
        if(jedis == null){
            getJedis();
        }
        Long lpush = jedis.lpush(list, value);
        returnJedis();
    }

    public static String rpop(String list){
        if(jedis == null){
            getJedis();
        }
        String rpop = jedis.rpop(list);
        returnJedis();

        return rpop;
    }
}

接Mqtt那篇代码,在回调函数讲不同topic 数据放入不同list。

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {

        System.out.println("Client topic : " + topic);
        System.out.println("Client Qos : " + message.getQos());
        System.out.println("Client msg : " + new String(message.getPayload()));

        RedisUtil.lpush(topic,new String(message.getPayload()));
        String rpop = RedisUtil.rpop(topic);
        System.out.println("rpop :"+rpop);


    }

控制台信息:

MqttSub connect
Client topic : PUSH_DATA
Client Qos : 1
Client msg : {"rxpk":[
	{
		"time":"2013-03-31T16:21:17.528002Z",
		"tmst":3512348611,
		"chan":2,
		"rfch":0,
		"freq":866.349812,
		"stat":1,
		"modu":"LORA",
		"datr":"SF7BW125",
		"codr":"4/6",
		"rssi":-35,
		"lsnr":5.1,
		"size":32,
		"data":"-DS4CGaDCdG+48eJNM3Vai-zDpsR71Pn9CPA9uCON84"
	},{
		"time":"2013-03-31T16:21:17.530974Z",
		"tmst":3512348514,
		"chan":9,
		"rfch":1,
		"freq":869.1,
		"stat":1,
		"modu":"FSK",
		"datr":50000,
		"rssi":-75,
		"size":16,
		"data":"VEVTVF9QQUNLRVRfMTIzNA=="
	},{
		"time":"2013-03-31T16:21:17.532038Z",
		"tmst":3316387610,
		"chan":0,
		"rfch":0,
		"freq":863.00981,
		"stat":1,
		"modu":"LORA",
		"datr":"SF10BW125",
		"codr":"4/7",
		"rssi":-38,
		"lsnr":5.5,
		"size":32,
		"data":"ysgRl452xNLep9S1NTIg2lomKDxUgn3DJ7DE+b00Ass"
	}
]}

rpop :{"rxpk":[
	{
		"time":"2013-03-31T16:21:17.528002Z",
		"tmst":3512348611,
		"chan":2,
		"rfch":0,
		"freq":866.349812,
		"stat":1,
		"modu":"LORA",
		"datr":"SF7BW125",
		"codr":"4/6",
		"rssi":-35,
		"lsnr":5.1,
		"size":32,
		"data":"-DS4CGaDCdG+48eJNM3Vai-zDpsR71Pn9CPA9uCON84"
	},{
		"time":"2013-03-31T16:21:17.530974Z",
		"tmst":3512348514,
		"chan":9,
		"rfch":1,
		"freq":869.1,
		"stat":1,
		"modu":"FSK",
		"datr":50000,
		"rssi":-75,
		"size":16,
		"data":"VEVTVF9QQUNLRVRfMTIzNA=="
	},{
		"time":"2013-03-31T16:21:17.532038Z",
		"tmst":3316387610,
		"chan":0,
		"rfch":0,
		"freq":863.00981,
		"stat":1,
		"modu":"LORA",
		"datr":"SF10BW125",
		"codr":"4/7",
		"rssi":-38,
		"lsnr":5.5,
		"size":32,
		"data":"ysgRl452xNLep9S1NTIg2lomKDxUgn3DJ7DE+b00Ass"
	}
]}

这样做我在做终端测试的时候会遇到一个问题,就是终端网关正常,服务器宕机后,redis的队列会一直增大,超过终端接收窗口的数据,终端不能接受到服务器的回复。在生产环境中由于数据上报间隔长,服务器分布式部署,加上自测lpush rpop 每秒十万+的处理能力,姑且认为没毛病。
我写的都是LoRaWAN NS服务器在设计过程中使用到的技术,菜鸟一枚,献丑了

做这个服务器真的是很麻烦,不过你可以跟着我慢慢来,下回我会处理下data":"ysgRl452xNLep9S1NTIg2lomKDxUgn3DJ7DE+b00Ass
老猴子应该一眼就看出了Base64
下回再唠吧

猜你喜欢

转载自blog.csdn.net/m0_38008027/article/details/93967753