关键词:redis,队列,左进右出,线程池,多线程。
本文,
1、借用redis队列,按照“左进右出”的规则,实现数据的存和取。说明:对于redis来说,存取都是单线程的,即不同服务器不会取到相同的数据。
2、使用一个线程模拟生产数据,存放数据;
3、创建固定大小的线程池,开10个线程,实现多线程处理数据;从而模拟出10台服务器,对数据的分散处理。
4、读者可以根据实际情况,增加权重,大数据情况下的多key方案,hash路由key方案等。
package com.liuxd.redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by Liuxd on 2018/9/8.
*/
public class DistributedComputing {
private static JedisPool jedisPool;
private final static String key = "IList";
private final static Random random = new Random();
static {
JedisPoolConfig config = new JedisPoolConfig();
// config.setMaxActive(5000);
config.setMaxIdle(256);
// config.setMaxWait(5000L);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setMinEvictableIdleTimeMillis(60000L);
config.setTimeBetweenEvictionRunsMillis(3000L);
config.setNumTestsPerEvictionRun(-1);
jedisPool = new JedisPool(config, "192.168.251.51", 6373, 60000);
}
public static void main(String[] args) {
// 模拟生产者生产数据
pushValue();
// 数据处理
execute();
}
/**
* 生产数据
*/
public static void pushValue() {
new Thread(new Runnable() {
@Override
public void run() {
int num = 0;
while (true) {
num++;
System.out.println("生产者服务器往队列放入消息:"+num);
lpush(key.getBytes(), String.valueOf("消息:" + num).getBytes());
}
}
}).start();
}
/**
* 模拟多态服务器处理数据
*/
public static void execute() {
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
pool.execute(new Thread(new Task("集群服务器" + i + "")));
}
pool.shutdown();
}
static class Task implements Runnable {
String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
byte[] array = rpop(key.getBytes());
while (true) {
try {
Thread.sleep(random.nextInt(5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (null != array) {
String value = new String(array);
System.out.println(name + "处理消息" + value);
} else {
System.out.println(name + "处理消息,消息为空,未做执行");
}
array = rpop(key.getBytes());
}
}
}
/**
* 存储REDIS队列 顺序存储
*
* @param key reids键名
* @param value 键值
*/
public static void lpush(byte[] key, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.lpush(key, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 获取队列数据
*
* @param key 键名
* @return
*/
public static byte[] rpop(byte[] key) {
byte[] bytes = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
bytes = jedis.rpop(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return bytes;
}
public static long llen(byte[] key) {
long len = 0;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.llen(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return len;
}
private static void close(Jedis jedis) {
try {
jedisPool.returnResource(jedis);
} catch (Exception e) {
if (jedis.isConnected()) {
jedis.quit();
jedis.disconnect();
}
}
}
}