RPC调用超时中断机制的实现

在如今互联网的架构趋势下,微服务已经成为一个不可或缺的服务架构了。将一个大的服务拆分若干子服务,然后远程调用,已应对大流量、高并发的系统场景,如今开源的优秀RPC框架很多,例如 thrift、dubbo 、grpc等

本人公司也有两套自主研发的RPC框架,通读之后受益匪浅,下面分享一下,远程调用第三方服务超时中断机制的实现。在调用第三方服务时,如果服务提供方处理过于缓慢,会拖垮调用方,使调用方夯住,所以调用超时中断机制很有必要,是保证服务的可用性的重要手段

典型的微服务项目,一次用户请求,可能在后台的调用流程会历经多个服务,每个服务的可靠性是整个调用流程的前提

客户端调用服务端流程:

本文不再过多的讲解RPC调用流程,直接讲解客户端调用超时中断的代码实现。

原理也不复杂,利用ReentrantLock的Condition进行等待阻塞,等待相应的超时时间后,发现依然没有收到服务端的响应结果后,判断为超时!

代码实现:

首先定义一个netty客户端,用于请求服务端,获取返回结果

public class InvokerClient {

    private static Channel channel;

    public void init() throws Exception {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 处理来自服务端的返回结果
                        socketChannel.pipeline().addLast(new ReceiveHandle());
                    }
                });

        ChannelFuture cf = bootstrap.connect("127.0.0.1", 3344).sync();
        channel = cf.channel();
    }

    //请求服务端
    public Object call(Request request) {
        //此类是保证调用超时中断的核心类
        RequestTask requestTask = new RequestTask();
        //将请求放入请求工厂,使用请求唯一标识seq,用于辨识服务端返回的对应的响应结果
        RequestFactory.put(request.getSeq(), requestTask);
        channel.writeAndFlush("hello");

        //此步是返回response,超时即中断
        return requestTask.getResponse(request.getTimeOut());
    }
}
其中Request是请求参数,里面有timeout超时时间,以及向服务端请求的参数
public class Request {

    private static final UUID uuid = UUID.randomUUID();

    private String seq = uuid.toString();

    private Object object;

    private long timeOut;

    public Object getObject() {
        return object;
    }

    public Request setObject(Object object) {
        this.object = object;
        return this;
    }

    public String getSeq() {
        return seq;
    }

    public long getTimeOut() {
        return timeOut;
    }

    public Request setTimeOut(long timeOut) {
        this.timeOut = timeOut;
        return this;
    }
}

核心的RequestTask类,用于接受服务端的返回结果,超时中断

public class RequestTask {

    private boolean isDone = Boolean.FALSE;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    Object response;

    //客户端请求服务端后,立即调用此方法获取返回结果,timeout为超时时间
    public Object getResponse(long timeOut) {
        if (!isDone) {
            try {
                lock.lock();
                //此步等待timeout时间,阻塞,时间达到后,自动执行,此步是超时中断的关键步骤
                if (condition.await(timeOut, TimeUnit.MILLISECONDS)) {
                    if (!isDone) {
                        return new TimeoutException();
                    }
                    return response;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        return response;
    }

    public RequestTask setResponse(Object response) {
        lock.lock();
        try{
           //此步是客户端收到服务端的响应结果后,写入response
           this.response = response;
           //并唤起上面方法的阻塞状态,此时阻塞结束,结果正常返回
           condition.signal();
           isDone = true;
        }finally{
           lock.unlock();
        }
        return this;
    }

    public boolean isDone() {
        return isDone;
    }

    public RequestTask setDone(boolean done) {
        isDone = done;
        return this;
    }

}
ReceiveHandle客户端接收到服务端的响应结果处理handle
public class ReceiveHandle extends SimpleChannelInboundHandler {

    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {

        Response response = (Response) o;

        //通过seq从请求工厂找到请求的RequestTask
        RequestTask requestTask = RequestFactory.get(response.getSeq());
        
        //将响应结果写入RequestTask
        requestTask.setResponse(response);
    }
}

RequestFactory请求工厂

public class RequestFactory {

    private static final Map<String, RequestTask> map = new ConcurrentHashMap<String, RequestTask>();

    public static void put(String uuid, RequestTask requestTask) {
        map.put(uuid, requestTask);
    }

    public static RequestTask get(String uuid) {
        return map.get(uuid);
    }
}

注: 本人利用业余时间手写了一套轻量级的rpc框架,里面有用到
https://github.com/zhangta0/bigxiang

猜你喜欢

转载自blog.csdn.net/CSDNzhangtao5/article/details/103075755