项目需求使用Netty搭建Tcp服务器,实现与Http Web服务器一样的同步“请求-响应”的单工同步通信方式。虽然Netty提供了异步IO和同步IO的统一实现,但是我们的需求关键并不是IO的同步异步关系,而是实现请求-响应这种典型的一问一答交互方式。要实现这个需求,需要解决两个问题:
1.请求和响应的正确匹配
客户端发送消息后,服务端返回响应结果,那么此结果怎么和客户端的请求正确匹配呢,即一个消息请求如何对应一个消息响应呢?
解决思路:客户端程序中为每一条请求消息设置一个全局唯一id,服务端返回的响应结果中包含该id,这样客户端就可以通过id来正确匹配请求响应了。
2.请求线程和响应线程的通信。
Netty异步IO通信模型,请求消息发送线程和channel中channelRead()(接收响应)方法接收线程并不是同一线程,要实现请求线程在发出消息后,同步等待服务端的响应,就需要解决,Netty客户端在接受到响应之后,怎么通知请求线程结果。
解决思路:多线程Guarded Suspension设计模式(当现在并不适合马上执行某个操作时,就要求想要执行该操作的线程等待),实现等待—通知机制,即客户端线程在发送请求后,进入等待,服务器返回响应后,根据消息的唯一id来唤醒客户端的请求线程,并把结果返回给请求线程。
代码示例
项目中使用Netty+Protobuf进行Tcp通信。大体思路:客户端发送请求后将<请求ID,Future>的键值对保存到一个缓存中,这时候用Future等待结果,挂住请求线程;当Netty客户端收到服务端的响应后,响应线程根据请求ID从缓存中取出Future,然后设置响应结果到Future中。这个时候利用CountDownLatch的通知机制,通知请求线程。请求线程从Future中拿到响应结果,然后做业务处理。
HprotoFuture
异步转同步的核心,即发送请求线程与响应接收线程的同步协调者
public class HprotoFuture<T extends Message> {
/**
* message id
*/
private long uuid;
/**
* Response 协议
*/
private Response response;
/**
* 响应体实例
*/
private T entity;
/**
* 响应体类型
*/
private Class<T> responseType;
/**
* 线程同步,
*/
private CountDownLatch latch = new CountDownLatch(1);
/**
* 异常监听,处理服务端响应的异常信息
*/
private ExceptionHandler exceptionHandler;
public HprotoFuture(long uuid,Class<T> responseType, ExceptionHandler handler) {
this.uuid = uuid;
this.exceptionHandler = handler;
this.responseType = responseType;
}
/**
* 注入Response
* 如果服务端返回2000 Ok,解码响应实体Entity,唤醒get线程
* 否则处理异常,唤醒get线程,返回entity=null
* 业务响应异常交由此future处理,channel handler只处理底层协议相关逻辑
*
* @param response
*/
public void onResponse(Response response) {
this.response = response;
if (response != null) {
try {
Any any = response.getData();
if (response.getStatus() != StatusCode.OK.code()) {
//异常响应
exceptionHandler.caught(new ServerResponseException(response.getMessage()));
} else {
//正常响应
entity = any.unpack(responseType);
}
} catch (InvalidProtocolBufferException e) {
exceptionHandler.caught(e);
log.error("Type of the Any reflect exception");
} finally {
//唤醒get()线程,释放FutureContext资源
latch.countDown();
FutureContext.release(uuid);
}
}
}
/**
* 阻塞获取response
*
* @return
*/
public T get() {
if (entity == null) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return entity;
}
/**
* 阻塞获取response
* 超时等待,避免永久休眠
* @param timeout
* @param unit
* @return
* @throws TimeoutException
*/
public T get(long timeout, TimeUnit unit) throws TimeoutException {
if (entity == null) {
try {
if (!latch.await(timeout, unit)) {
throw new TimeoutException("get time out");
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
//释放FutureContext资源
FutureContext.release(uuid);
}
}
return entity;
}
public long getUuid() {
return uuid;
}
}
FutureContext
缓存与管理Future与Message的关系
public class FutureContext {
/**
* 一个请求对应一个HprotoFuture
*/
private static final ConcurrentHashMap<Long, HprotoFuture<?>> CONTAINER = new ConcurrentHashMap<>();
/**
* 记录Future
* @param future
*/
public static void apply(HprotoFuture<?> future) {
if (future == null) {
return;
}
CONTAINER.put(future.getUuid(), future);
}
/**
* 获取future
* @param uuid
* @return
*/
public static HprotoFuture get(Long uuid) {
if (uuid == null) {
return null;
}
return CONTAINER.get(uuid);
}
/**
* 释放future
* @param uuid
*/
public static void release(Long uuid) {
if (uuid == null) {
return;
}
CONTAINER.remove(uuid);
}
}
Client客户端
省略部分代码
public class HprotoClient {
/**
* Netty客户端属性
*/
.....
/**
* 成员方法
*
*/
......
/**
* 同步发送
* 每一条 request message,生成一个唯一HprotoFuture管理其response
* HprotoFuture 阻塞同步返回response
*
* @param message 信息 Google ProtoBuf Message子类
* @param responseType 响应体类型
* @param <T>
* @return 响应超时返回null
*/
public <T extends Message> T send(ProtocolPattern message, Class<T> responseType) {
HprotoFuture<T> future = new HprotoFuture<>(message.getUuid(),responseType,exceptionHandler);
FutureContext.apply(future);
try {
netClient.send(message);
return future.get(readTimeout, readUnit);
} catch (TimeoutException e) {
exceptionHandler.caught(e);
} catch (IOException e2) {
exceptionHandler.caught(e2);
}
return null;
}
}
响应Handler
public class ClientResponseHandler extends ChannelInboundHandlerAdapter {
/**
* 异常监听类,处理客服端异常
*/
private ExceptionHandler exceptionHandler;
public ClientResponseHandler(ExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
/**
* 读取响应
* 将Response放入对应Future,FutureContext上下文释放Future
* 保障一条message 消费一个response
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ProtocolPattern protocol = (ProtocolPattern) msg;
Response response = (Response) protocol.getBody();
Long uuid=protocol.getUuid();
HprotoFuture future=FutureContext.get(uuid);
if (future != null) {
future.onResponse(response);
}
}finally {
ReferenceCountUtil.release(msg);
}
}
/**
* 异常监听处理异常
* 关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (exceptionHandler != null) {
exceptionHandler.caught(cause);
}
ctx.close();
}