Mini-Dubbo之服务消费
Consumer目录结构
消费者启动代码
/** * Created by cheng.xi on 2017-04-14 09:28. * 服务消费者端引用服务 */ public class TestApiConsumer { public static void main(String[] args) { DubboConsumer<HelloWorldService> reference = new DubboConsumer<HelloWorldService>(); //注入要调用的接口类对象 reference.setInterface(HelloWorldService.class); //根据上面注入的接口获取动态代理生成的对象(基于JDK动态代理生成) HelloWorldService helloWorldService = reference.get(); System.out.println(helloWorldService.sayHello("zhangsan")); } }
看DubboConsumer代码:
** * Created by cheng.xi on 2017-04-14 17:30. * 服务消费者端,获取代理 */ public class DubboConsumer<T> { //接口名字 private String interfaceName; //接口 private Class<?> interfaceClass; //代理类 private T ref;
//根据设置的接口名称获取代理对象 public T get(){ //获取代理 ref = new DubboConsumerProxy(interfaceClass).getProxy(); return ref; } //--------------------以下是get和set方法----------------// public String getInterfaceName() { return interfaceName; } public void setInterface(String interfaceName) { this.interfaceName = interfaceName; } //设置要调用的接口名称 public void setInterface(Class<?> interfaceClass) { this.interfaceClass = interfaceClass; setInterface(interfaceClass == null ? (String) null : interfaceClass.getName()); } public Class<?> getInterfaceClass() { return interfaceClass; } public void setInterfaceClass(Class<?> interfaceClass) { this.interfaceClass = interfaceClass; } }
看DubboConsumerProxy代码
/** * Created by cheng.xi on 2017-04-14 09:33. * 服务消费者端的代理,使用JDK动态代理 */ public class DubboConsumerProxy implements InvocationHandler{ private Class<?> interfaces; public DubboConsumerProxy(Class<?> interfaces){ this.interfaces = interfaces; } public <T> T getProxy(){ return (T) Proxy.newProxyInstance(interfaces.getClassLoader(),new Class[]{interfaces},this); } //代理执行方法 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object result; //组装请求体 Request request = new Request(); request.setInterfaceName(interfaces.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setArgs(args); //远程调用服务提供者 result = Transporters.connectAndExecute(request); return result; } }
Transporters类源码
public class Transporters { public static Object connectAndExecute(Request request) throws ExecutionException, InterruptedException { //处理器,用来处理返回的消息 NettyClientServiceHandler handler = new NettyClientServiceHandler(); //使用nettyclient来进行连接服务提供者和执行请求 new NettyClient().connectAndExecute(request,handler); //从处理器中获取返回的消息 return handler.getResponse().getResult(); } }
里面创建了NettyClient客户端并调用了connectAndExecute方法
public class NettyClient { public void connectAndExecute(Request request, final NettyClientServiceHandler handler) throws InterruptedException, ExecutionException { EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ObjectDecoder(1024*1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(handler); } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); ChannelFuture future = bootstrap.connect("127.0.0.1", 3347).sync(); Channel channel = future.channel(); channel.writeAndFlush(request).sync(); channel.closeFuture().sync(); }finally { group.shutdownGracefully(); } } }
最后获取返回参数一个请求调用完成