package com.suyun.cache.dao.impl;
import com.suyun.common.lang.Tuple;
import com.suyun.common.lang.Tuple3;
import com.suyun.common.utils.FunctionalUtil;
import com.suyun.vehicle.model.VehicleDataMap;
import com.suyun.vehicle.model.VehicleDataValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@Component
public class VehicleDataUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(VehicleDataUtil.class);
@Resource(name = "redisTemplate")
private HashOperations<String, String, VehicleDataValue> hops;
@Autowired
private RedisTemplate redisTemplateForHash;
public Map<String, VehicleDataMap> getLatestDataMap(Map<String, List<String>> paramMap) {
if (paramMap != null) {
return queryVDMRedis(paramMap);
}
return null;
}
private Map<String, VehicleDataMap> queryVDMRedis(Map<String, List<String>> paramMap) {
Map<String, List<VehicleDataValue>> map = new HashMap<>();
Map<String, VehicleDataValue> temp = new HashMap<>();
List<List<VehicleDataValue>> list = redisTemplateForHash.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
paramMap.forEach((key, value) -> {
final byte[] rawKey = rawKey(redisKey(key));
final byte[][] rawHashKeys = new byte[value.size()][];
int counter = 0;
for (String hashKey : value) {
rawHashKeys[counter++] = rawHashKey(hashKey);
}
deserializeHashValues(connection.hMGet(rawKey, rawHashKeys));
});
return null;
}
});
ArrayList<String> busIds = new ArrayList(paramMap.keySet());
for (int i = 0, len = busIds.size(); i < len; i++) {
if (list.get(i) != null) {
map.put(busIds.get(i), list.get(i));
}
}
List<VehicleDataValue> timestamps = redisTemplateForHash.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (String busId : busIds) {
final byte[] rawKey = rawKey("vehicle:status:" + busId);
final byte[] rawHashKey = rawHashKey("timestamp");
deserializeHashValue(connection.hGet(rawKey, rawHashKey));
}
return null;
}
});
for (int i = 0, len = busIds.size(); i < len; i++) {
if (list.get(i) != null) {
temp.put(busIds.get(i), timestamps.get(i));
}
}
Map<String, VehicleDataMap> vehicleDataMaps = new HashMap();
map.forEach((k, v) -> {
if (temp.get(k) != null) {
VehicleDataMap vehicleDataMap = new VehicleDataMap(k, (long) temp.get(k).getValue());
List<Tuple3<String, Double, String>> collect = FunctionalUtil.zip(paramMap.get(k).stream(), map.get(k).stream(), (code, value) -> {
return value == null ? null : Tuple.apply(code, value.getValue(), value.getExtraString());
}).filter(Objects::nonNull).collect(Collectors.toList());
Iterator var8 = collect.iterator();
while (var8.hasNext()) {
Tuple3<String, Double, String> tuple3 = (Tuple3) var8.next();
vehicleDataMap.put(tuple3._1(), tuple3._2(), tuple3._3());
}
vehicleDataMaps.put(k, vehicleDataMap);
}
});
return vehicleDataMaps;
}
private String redisKey(String vehicleId) {
return "vehicle:status:" + vehicleId;
}
/**
* 判断线程是否执行完成
*
* @param map
* @return
*/
private boolean isEndExecutor(int len, Map<String, List<VehicleDataValue>> map) {
boolean flag = false;
try {
do {
LOGGER.error("执行的Map【" + map.hashCode() + "】map的长度" + map.size());
} while (len != map.size());
flag = true;
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
return flag;
}
/**
* 获取车辆的最后报文上报时间
*
* @param busIds
* @return
*/
public Map<String, Long> getLastReportTime(List<String> busIds) {
Map<String, VehicleDataValue> temp = new HashMap<>();
List<VehicleDataValue> list = redisTemplateForHash.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (String busId : busIds) {
final byte[] rawKey = rawKey("vehicle:status:" + busId);
final byte[] rawHashKey = rawHashKey("timestamp");
deserializeHashValue(connection.hGet(rawKey, rawHashKey));
}
return null;
}
});
Map<String, Long> map = new HashMap<>();
for (int i = 0, len = busIds.size(); i < len; i++) {
if (list.get(i) != null) {
map.put(busIds.get(i), Double.valueOf(list.get(i).getValue()).longValue());
}
}
return map;
}
byte[] rawKey(Object key) {
Assert.notNull(key, "non null key required");
if (keySerializer() == null && key instanceof byte[]) {
return (byte[]) key;
}
return keySerializer().serialize(key);
}
<HK> byte[] rawHashKey(HK hashKey) {
Assert.notNull(hashKey, "non null hash key required");
if (hashKeySerializer() == null && hashKey instanceof byte[]) {
return (byte[]) hashKey;
}
return hashKeySerializer().serialize(hashKey);
}
<HV> HV deserializeHashValue(byte[] value) {
if (hashValueSerializer() == null) {
return (HV) value;
}
return (HV) hashValueSerializer().deserialize(value);
}
<T> List<T> deserializeHashValues(List<byte[]> rawValues) {
if (hashValueSerializer() == null) {
return (List<T>) rawValues;
}
return SerializationUtils.deserialize(rawValues, hashValueSerializer());
}
RedisSerializer keySerializer() {
return this.redisTemplateForHash.getKeySerializer();
}
RedisSerializer hashKeySerializer() {
return this.redisTemplateForHash.getHashKeySerializer();
}
RedisSerializer hashValueSerializer() {
return this.redisTemplateForHash.getHashValueSerializer();
}
}
Redis使用Pipeline进行批量查询
猜你喜欢
转载自blog.csdn.net/xionglangs/article/details/81065691
今日推荐
周排行