一 问题
老项目的redis 要切换到redis集群,所以客户端从leettuce的RedisAsyncCommands切换到到redisson.
一个常见的hash增长:
public long hincrby(String key,String field,Long amount){
//新旧代码开关
if( 老的Redis){
老代码
}else{
新代码
}
}
redisClient.hincrby(key,field,amount).get()
查看下官网的对应操作手册:
HDEL | RMap.fastRemove(), RMap.fastRemoveAsync(), RMapReactive.fastRemove(); |
HEXISTS | RMap.containsKey(), RMap.containsKeyAsync(), RMapReactive.containsKey(); |
HGET | RMap.get(), RMap.getAsync(), RMapReactive.get(); |
HSTRLEN | RMap.valueSize(), RMap.valueSizeAsync(), RMapReactive.valueSize(); |
HGETALL | RMap.readAllEntrySet(), RMap.readAllEntrySetAsync(), RMapReactive.readAllEntrySet(); |
HINCRBY | RMap.addAndGet(), RMap.addAndGetAsync(), RMapReactive.addAndGet(); |
HINCRBYFLOAT | RMap.addAndGet(), RMap.addAndGetAsync(), RMapReactive.addAndGet(); |
HKEYS | RMap.readAllKeySet(), RMap.readAllKeySetAsync(), RMapReactive.readAllKeySet(); |
HLEN | RMap.size(), RMap.sizeAsync(), RMapReactive.size(); |
HMGET | RMap.getAll(), RMap.getAllAsync(), RMapReactive.getAll(); |
HMSET | RMap.putAll(), RMap.putAllAsync(), RMapReactive.putAll(); |
HSET | RMap.put(), RMap.putAsync(), RMapReactive.put(); |
HSETNX | RMap.fastPutIfAbsent(), RMap.fastPutIfAbsentAsync, RMapReactive.fastPutIfAbsent(); |
那就是Rmap.addandGet().
RMap<String, Long> map = redissonCilent.getMap(key);
result = map.addAndGet(field,amount);
如果只是api级别的简单替换,没啥问题。
但是桌子涉及业务场景的时候,新老redis替换,数据要保证平滑。就是原来假设某个field 有值是9了,再调用就该返回10.
这时候切成新的redis集群,没同步就是从0开始,返回1了,这显然有问题。
因此考虑到平滑的问题,先给对应的值同步老数据。
map.put(field, oldnum );
这时候,打开开关切换到新的redisson集群,测试就报错了。
二 分析
错误很长,放到最后。看半天还是Google终于找到问题:看下RedissonMap 的底层实现
@Override
public V addAndGet(K key, Number value) {
return get(addAndGetAsync(key, value));
}
@Override
public RFuture<V> addAndGetAsync(final K key, Number value) {
checkKey(key);
checkValue(value);
final RFuture<V> future = addAndGetOperationAsync(key, value);
if (hasNoWriter()) {
return future;
}
MapWriterTask<V> listener = new MapWriterTask<V>() {
@Override
public void execute() {
options.getWriter().write(key, future.getNow());
}
};
return mapWriterFuture(future, listener);
}
//核心
protected RFuture<V> addAndGetOperationAsync(K key, Number value) {
ByteBuf keyState = encodeMapKey(key);
RFuture<V> future = commandExecutor.writeAsync(getName(key), StringCodec.INSTANCE,
new RedisCommand<Object>("HINCRBYFLOAT", new NumberConvertor(value.getClass())),
getName(key), keyState, new BigDecimal(value.toString()).toPlainString());
return future;
}
核心代码 就是 addAndGetOperationAsync
这里参数很多。调用了CommandAsyncExecutor 的
<T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
这样看就清晰了,这里调用的RedisCommand命令是HINCRBYFLOAT,对应的编码是 StringCodec.INSTANCE。
引用下相关背景知识:
HINCRBYFLOAT key field increment
为哈希表 key 中的域 field 加上浮点数增量 increment 。
如果哈希表中没有域 field ,那么 HINCRBYFLOAT 会先将域 field 的值设为 0 ,然后再执行加法操作。
如果键 key 不存在,那么 HINCRBYFLOAT 会先创建一个哈希表,再创建域 field ,最后再执行加法操作。
当以下任意一个条件发生时,返回一个错误:
- 域 field 的值不是字符串类型(因为 redis 中的数字和浮点数都以字符串的形式保存,所以它们都属于字符串类型)
- 域 field 当前的值或给定的增量 increment 不能解释(parse)为双精度浮点数(double precision floating point number)
HINCRBYFLOAT 命令的详细功能和 INCRBYFLOAT 命令类似,请查看 INCRBYFLOAT 命令获取更多相关信息。
可用版本:
>= 2.6.0
时间复杂度:
O(1)
返回值:
执行加法操作之后 field 域的值。
我的入参是:Long 类型,我使用了默认编码:JsonJacksonCodec 。所以才报这个错误。
改为指定的编码,StringCodec 就可以了。
贴一下:redisson的编码集
Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。Redisson提供了以下几种的对象编码应用,以供大家选择:
编码类名称 | 说明 |
---|---|
org.redisson.codec.JsonJacksonCodec |
Jackson JSON 编码 默认编码 |
org.redisson.codec.AvroJacksonCodec |
Avro 一个二进制的JSON编码 |
org.redisson.codec.SmileJacksonCodec |
Smile 另一个二进制的JSON编码 |
org.redisson.codec.CborJacksonCodec |
CBOR 又一个二进制的JSON编码 |
org.redisson.codec.MsgPackJacksonCodec |
MsgPack 再来一个二进制的JSON编码 |
org.redisson.codec.IonJacksonCodec |
Amazon Ion 亚马逊的Ion编码,格式与JSON类似 |
org.redisson.codec.KryoCodec |
Kryo 二进制对象序列化编码 |
org.redisson.codec.SerializationCodec |
JDK序列化编码 |
org.redisson.codec.FstCodec |
FST 10倍于JDK序列化性能而且100%兼容的编码 |
org.redisson.codec.LZ4Codec |
LZ4 压缩型序列化对象编码 |
org.redisson.codec.SnappyCodec |
Snappy 另一个压缩型序列化对象编码 |
org.redisson.client.codec.JsonJacksonMapCodec |
基于Jackson的映射类使用的编码。可用于避免序列化类的信息,以及用于解决使用byte[] 遇到的问题。 |
org.redisson.client.codec.StringCodec |
纯字符串编码(无转换) |
org.redisson.client.codec.LongCodec |
纯整长型数字编码(无转换) |
org.redisson.client.codec.ByteArrayCodec |
字节数组编码 |
org.redisson.codec.CompositeCodec |
用来组合多种不同编码在一起 |
最后贴一下错误日志:
hgetFromNewRedis error,key=ee
org.redisson.client.RedisException: Unexpected exception while processing command
at org.redisson.command.CommandAsyncService.convertException(CommandAsyncService.java:324) ~[redisson-3.5.6.jar:na]
at org.redisson.command.CommandAsyncService.get(CommandAsyncService.java:167) ~[redisson-3.5.6.jar:na]
at org.redisson.RedissonObject.get(RedissonObject.java:75) ~[redisson-3.5.6.jar:na]
at org.redisson.RedissonMap.get(RedissonMap.java:251) ~[redisson-3.5.6.jar:na]
at com.benmu.matador.workflow.RedisCommandProxy.hget(RedisCommandProxy.java:220) ~[matador-workflow-1.0.21.jar:na]
at com.benmu.matador.integration.web.controller.DoorController.hget(DoorController.java:150) [DoorController.class:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_71]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_71]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) [spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) [spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110) [spring-webmvc-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:817) [spring-webmvc-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:731) [spring-webmvc-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85) [spring-webmvc-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:959) [spring-webmvc-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:893) [spring-webmvc-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:968) [spring-webmvc-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:859) [spring-webmvc-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:621) [servlet-api.jar:na]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:844) [spring-webmvc-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:728) [servlet-api.jar:na]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:305) [catalina.jar:7.0.47]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.47]
at com.benmu.matador.integration.web.security.PrivilegeFilter.doFilter(PrivilegeFilter.java:52) [PrivilegeFilter.class:na]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243) [catalina.jar:7.0.47]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.47]
at com.benmu.matador.integration.web.security.LoginFilter.doFilter(LoginFilter.java:47) [LoginFilter.class:na]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243) [catalina.jar:7.0.47]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.47]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) [tomcat7-websocket.jar:7.0.47]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243) [catalina.jar:7.0.47]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.47]
at com.benmu.ServletWatcher.doFilter(ServletWatcher.java:137) [common-core-1.3.3.jar:na]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243) [catalina.jar:7.0.47]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.47]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:121) [spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243) [catalina.jar:7.0.47]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.47]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:222) [catalina.jar:7.0.47]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:123) [catalina.jar:7.0.47]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) [catalina.jar:7.0.47]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:171) [catalina.jar:7.0.47]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:100) [catalina.jar:7.0.47]
at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:953) [catalina.jar:7.0.47]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:118) [catalina.jar:7.0.47]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408) [catalina.jar:7.0.47]
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1041) [tomcat-coyote.jar:7.0.47]
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:603) [tomcat-coyote.jar:7.0.47]
at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:312) [tomcat-coyote.jar:7.0.47]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(1): SlicedAbstractByteBuf(ridx: 0, widx: 1, cap: 1/1, unwrapped: PooledUnsafeDirectByteBuf(ridx: 7, widx: 7, cap: 512))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1411) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:764) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at org.redisson.codec.SnappyCodec$3.decode(SnappyCodec.java:74) ~[redisson-3.5.6.jar:na]
at org.redisson.client.handler.CommandDecoder.decode(CommandDecoder.java:257) ~[redisson-3.5.6.jar:na]
at org.redisson.client.handler.CommandDecoder.decode(CommandDecoder.java:101) ~[redisson-3.5.6.jar:na]
at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:367) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1320) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:905) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:563) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:504) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:418) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:390) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145) ~[netty-all-4.1.1.Final.jar:4.1.1.Final]
... 1 common frames omitted
我一开始围绕错误日志搜关键词,netty的数组越界问题,搜出来都是不太想干的问题。还是去看下底层代码更有收获。
希望对你有所帮助。
参考: