根据一个具体案例来分析ByteBuf申请和释放时的线程并发安全问题和非法引用问题
HTTP响应Body获取时异常
HTTP客户端示例代码,采用同步HTTP调用方式:
public class HttpClient{
private Channel channel;
HttpClientHandler handler = new HttpClientHandler();
private void connect(String host, int port) throws Exception{
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
Bootstrap b = new Bootstrap();
b.group(workerGroup)
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpObjectAggregator(Short.MAX_VALUE));
ch.pipeline().addLast(handler);
}
});
ChannelFuture f = b.connect(host, port).sync();
channel = f.channel();
}
private HttpResponse blockSend(FullHttpRequest request) throws InterruptedException, ExecutionException{
request.headers().set(HttpHeaderNames.CONNECT_LENGTH, request.content().readableBytes());
DefaultPromise<HttpResponse> respPromise = new DefaultPromise<HttpResponse>(channel.eventLoop());
handler.setRespPromise(respPromise);
channel.writeAndFlush(request);
HttpResponse response = respPromise.get();
if(response != null)
System.out.println("The client received http response, the body is:" + new String(response.body()));
return response;
}
public static void main(String[] args) throws Exception{
HttpClient client = new HttpClient();
client.connect("127.0.0.1", "8888");
ByteBuf body = Unpooled.wrappedBuffer("Http message!".getBytes("UTF-8"));
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"http://127.0.0.1/user?id=101", body);
HttpResponse response = client.blockSend(request);
}
}
服务端处理正常,客户端运行时报异常:java.lang.UnsupportedOperationException: direct buffer
对HttpResponse代码进行分析,发现消息体获取来源是Netty FullHttpResponse的content字段,相关代码如下(HttpResponse类,上面blockSent()方法的返回类型):
private FullHttpResponse response;
public HttpResponse(FullHttpResponse response){
this.header = response.headers();
this.response = response;
}
//blockSend()打印语句里面调用此处
public byte [] body(){
return body = response.content() != null ? response.content().array() : null;
}
response.content().array()底层调用的是PooledUnsafeDirectByteBuf,它并不支持array()方法,相关代码如下(PooledUnsafeDirectByteBuf类):
public byte[] array(){
//之前的客户端异常正是此处异常
throw new UnsupportedOperationException("direct buffer");
}
为提升性能,Netty默认的I/O Buffer使用直接内存DirectByteBuf,可以减少JVM用户态到内核态Socket读写的内存拷贝(零拷贝),由于是直接内存,无法直接转换成堆内存,因此它并不支持array(),用户需要自己做内存拷贝操作。
对body()修改,采用字节拷贝方式将HTTP Body拷贝到byte[] 数组中,代码如下(HttpResponse类):
public byte [] body(){
body = new byte[response.content().readableBytes()];
response.content().getBytes(0, body);
return body;
}
再次发生异常:io.netty.util.IllegalReferenceCountException: refCnt:0 表示操作了已经被释放的对象。
ByteBuf非法引用问题
ByteBuf实现ReferenceCounted接口,所以每次操作ByteBuf之前,都需要对ByteBuf的生命周期状态进行判断,如果已经被释放,则抛出引用计数异常。
对业务代码进行分析,探寻抛出异常原因。在收到一个HTTP响应消息后,调用respPromise的setSuccess方法,唤醒业务线程继续执行,相关代码如下(HttpClientHandler类):
pubilc class HttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse>{
DefaultPromise<HttpResponse> respPromise;
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception{
if(msg.decoderResult().isFailure(0){
throw new Exception("Decode HttpResponse error: " + msg.decoderResult().cause());
HttpResponse response = new HttpResponse(msg);
respPromise.setSuccess(response);
}
}
}
在获取HTTP响应时,抛出非法引用异常,说明HTTP Body已经被释放,业务代码并没有主动释放ByteBuf,ByteBuf究竟是被谁释放的? – HttpClientHandler集成自SimpleChannelInboundHandler,ChannelRead0方法被调用后,Netty会自动释放FullHttpResponse,源码如下(SimpleChannelInboundHandler类):
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
boolean release = true;
try{
if(acceptInboundMessage(msg)){
I imsg = (I) msg;
channelRead0(ctx, msg);
}else{
release = false;
ctx.firechannelRead(msg);
}
}finally{
if(autoRelease && release){
ReferenceCountUtil.release(msg);
}
}
}
由于执行完channelRead0滞后Netty的NioEventLoop线程就会调用ReferenceCountUtil.release(msg)释放内存,所有后续业务调用方的线程再访问FullHttpResponse就会出现非法引用问题。
优化代码,在channelRead0中初始化HTTP Body,此时FullHttpResponse的content并没有被释放,可以被访问,代码如下(HttpResponse类):
public HttpResponse(FullHttpResponse response){
this.header = response.headers();
this.response = response;
if(response.content() != null){
body = new byte[response.content().readableBytes()];
response.content().getBytes(0, body);
}
public byte [] body(){
return body;
}
}
修改之后,问题解决。
ByteBuf使用注意事项:
- ByteBuf的线程并发安全问题,特别要防止Netty NioEventLoop线程与应用线程并发操作ByteBuf
- ByteBuf的申请和释放,注意避免忘记释放和重复释放,以及释放之后继续访问。重点关注一下ByteBuf的隐式释放情况,例如应用申请了ByteBuf,但被Netty隐式释放了,当应用继续访问或者释放ByteBuf时就会发生异常。
- 注意性能问题。通常的get和set之类获取成员变量的方法不会带来性能问题,但是在此类方法中做复杂的操作,就可能会带来严重的性能问题,例如(每次获取body都做一次内存到堆内存拷贝,如果业务频繁访问body方法,则会带来严重的性能问题):
public byte [] body(){
if(response.content() == null)
return null;
body = new byte[response.content().readableBytes()];
response.content().getBytes(0, body);
return body;
}