redis cluster使用pipeline
为什么cluster无法使用pipeline
主要是因为redis-cluster的hash分片。具体的redis命令,会根据key计算出一个槽位(slot),然后根据槽位去特定的节点redis上执行操作。那么pipeline中每个单独的操作,需要根据“key”运算一个槽位(JedisClusterCRC16.getSlot(key)),然后根据槽位去特定的机器执行命令。也就是说一次pipeline操作会使用多个节点的redis连接,而目前JedisCluster是无法支持的。
基于redisCluster整合pipeline
设计思路
-
首先创建RedisCluster客户端 --------------getCluster()
-
获取RedisCluster的哈希槽分布情况-----------------getSlotHostMap(params)
-
通过key计算出所属redis的节点,并获取redis客户端----------------------getJedisByKey(params)
-
通过redis客户端完成pipeline操作----------------main(args)
代码实现
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;
import java.util.*;
/**
* nifiDefined
* Created by NightWatch on 2018/12/10.
*/
public class PipelineTest {
public static void main(String[] args) throws InterruptedException, IOException {
String key ="key";
Jedis jedis = getJedisByKey(key);
Pipeline pipeline = jedis.pipelined();
pipeline.somecommend...
pipeline.sync();
pipelined.close();
jedis.close();
}
private static TreeMap<Long, String> getSlotHostMap(String anyHostAndPortStr) {
TreeMap<Long, String> tree = new TreeMap<>();
String parts[] = anyHostAndPortStr.split(":");
HostAndPort anyHostAndPort = new HostAndPort(parts[0], Integer.parseInt(parts[1]));
try{
Jedis jedis = new Jedis(anyHostAndPort.getHost(), anyHostAndPort.getPort());
jedis.auth("RedisPass");
List<Object> list = jedis.clusterSlots();
//TODO:list 结构
// 1) 1) (integer) 5461
// 2) (integer) 10922
// 3) 1) "10.1.2.159"
// 2) (integer) 27002
// 3) "4689bdf95317bb61dfa6bf67f292a0be5c574aa4"
// 4) 1) "10.1.2.159"
// 2) (integer) 27005
// 3) "673ae869ef903d485d75804cf20881fd706fe7ae"
// 2) 1) (integer) 0
// 2) (integer) 5460
// 3) 1) "10.1.2.159"
// 2) (integer) 27001
// 3) "82ca45fd7e195c4a966a17b0b830a6fa44d04688"
// 4) 1) "10.1.2.159"
// 2) (integer) 27004
// 3) "e005323689107d78c7c9396c2fdafce4bf07e586"
// 3) 1) (integer) 10923
// 2) (integer) 16383
// 3) 1) "10.1.2.159"
// 2) (integer) 27003
// 3) "4ce08f73db3258b8b4b4abfb7a79dd4838cd3729"
// 4) 1) "10.1.2.159"
// 2) (integer) 27006
// 3) "668691597f9b990cd62354b34819b935f522f0ca"
//TODO:list 结构
for (Object object : list) {
List<Object> masterAndSlave = (List<Object>) object;
List<Object> master = (List<Object>) masterAndSlave.get(2);
String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1);
tree.put((Long) masterAndSlave.get(0), hostAndPort);
tree.put((Long) masterAndSlave.get(1), hostAndPort);
}
jedis.close();
}catch(Exception e){
e.printStackTrace();
}
return tree;
}
private static JedisCluster getCluster() {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(10);
Set<HostAndPort> hostAndPorts = new HashSet<>();
hostAndPorts.add(new HostAndPort("localhost", 27006));
hostAndPorts.add(new HostAndPort("localhost", 27005));
hostAndPorts.add(new HostAndPort("localhost", 27004));
hostAndPorts.add(new HostAndPort("localhost", 27003));
hostAndPorts.add(new HostAndPort("localhost", 27002));
hostAndPorts.add(new HostAndPort("localhost", 27001));
JedisCluster cluster = new JedisCluster(hostAndPorts, 1000, 1000, 5, "RedisPass", config);
return cluster;
}
private static Jedis getJedisByKey(String key) {
JedisCluster cluster = getCluster();
Map<String, JedisPool> nodes = cluster.getClusterNodes();
String anyHost = nodes.keySet().iterator().next();
TreeMap<Long, String> slotHostMap = getSlotHostMap(anyHost);
int slot = JedisClusterCRC16.getSlot(key);
Map.Entry<Long, String> entry = slotHostMap.lowerEntry(Long.valueOf(slot));
Jedis jedis = nodes.get(entry.getValue()).getResource();
return jedis;
}
}