在如今互联网的架构趋势下,微服务已经成为一个不可或缺的服务架构了。将一个大的服务拆分若干子服务,然后远程调用,已应对大流量、高并发的系统场景,如今开源的优秀RPC框架很多,例如 thrift、dubbo 、grpc等
本人公司也有两套自主研发的RPC框架,通读之后受益匪浅,下面分享一下,远程调用第三方服务超时中断机制的实现。在调用第三方服务时,如果服务提供方处理过于缓慢,会拖垮调用方,使调用方夯住,所以调用超时中断机制很有必要,是保证服务的可用性的重要手段
典型的微服务项目,一次用户请求,可能在后台的调用流程会历经多个服务,每个服务的可靠性是整个调用流程的前提
客户端调用服务端流程:
本文不再过多的讲解RPC调用流程,直接讲解客户端调用超时中断的代码实现。
原理也不复杂,利用ReentrantLock的Condition进行等待阻塞,等待相应的超时时间后,发现依然没有收到服务端的响应结果后,判断为超时!
代码实现:
首先定义一个netty客户端,用于请求服务端,获取返回结果
public class InvokerClient { private static Channel channel; public void init() throws Exception { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 处理来自服务端的返回结果 socketChannel.pipeline().addLast(new ReceiveHandle()); } }); ChannelFuture cf = bootstrap.connect("127.0.0.1", 3344).sync(); channel = cf.channel(); } //请求服务端 public Object call(Request request) { //此类是保证调用超时中断的核心类 RequestTask requestTask = new RequestTask(); //将请求放入请求工厂,使用请求唯一标识seq,用于辨识服务端返回的对应的响应结果 RequestFactory.put(request.getSeq(), requestTask); channel.writeAndFlush("hello"); //此步是返回response,超时即中断 return requestTask.getResponse(request.getTimeOut()); } }
其中Request是请求参数,里面有timeout超时时间,以及向服务端请求的参数
public class Request { private static final UUID uuid = UUID.randomUUID(); private String seq = uuid.toString(); private Object object; private long timeOut; public Object getObject() { return object; } public Request setObject(Object object) { this.object = object; return this; } public String getSeq() { return seq; } public long getTimeOut() { return timeOut; } public Request setTimeOut(long timeOut) { this.timeOut = timeOut; return this; } }
核心的RequestTask类,用于接受服务端的返回结果,超时中断
public class RequestTask { private boolean isDone = Boolean.FALSE; private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); Object response; //客户端请求服务端后,立即调用此方法获取返回结果,timeout为超时时间 public Object getResponse(long timeOut) { if (!isDone) { try { lock.lock(); //此步等待timeout时间,阻塞,时间达到后,自动执行,此步是超时中断的关键步骤 if (condition.await(timeOut, TimeUnit.MILLISECONDS)) { if (!isDone) { return new TimeoutException(); } return response; } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } return response; } public RequestTask setResponse(Object response) { lock.lock(); try{ //此步是客户端收到服务端的响应结果后,写入response this.response = response; //并唤起上面方法的阻塞状态,此时阻塞结束,结果正常返回 condition.signal(); isDone = true; }finally{ lock.unlock(); } return this; } public boolean isDone() { return isDone; } public RequestTask setDone(boolean done) { isDone = done; return this; } }
ReceiveHandle客户端接收到服务端的响应结果处理handle
public class ReceiveHandle extends SimpleChannelInboundHandler { protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { Response response = (Response) o; //通过seq从请求工厂找到请求的RequestTask RequestTask requestTask = RequestFactory.get(response.getSeq()); //将响应结果写入RequestTask requestTask.setResponse(response); } }
RequestFactory请求工厂
public class RequestFactory { private static final Map<String, RequestTask> map = new ConcurrentHashMap<String, RequestTask>(); public static void put(String uuid, RequestTask requestTask) { map.put(uuid, requestTask); } public static RequestTask get(String uuid) { return map.get(uuid); } }
注: 本人利用业余时间手写了一套轻量级的rpc框架,里面有用到
https://github.com/zhangta0/bigxiang