package github.com.AllenDuke.rpc.customer; import github.com.AllenDuke.rpc.netty.NettyClient; import github.com.AllenDuke.rpc.publicSource.Calculator; import github.com.AllenDuke.rpc.publicSource.HelloService; /** * @description 客户端启动类,从NettyClient处获取代理对象,发起RPC * @contact [email protected] * @since 2020/2/11 */ public class ClientBootstrap { //netty消费者 private static final NettyClient customer = new NettyClient(); //自定义的消息协议为: #interfaceName#methodName#2#arg1#arg2 //这里用了fastjson public static void main(String[] args) throws Exception { //创建代理对象 HelloService service = (HelloService) customer.getServiceImpl(HelloService.class); String res = service.hello("Allen","Duke",2); Calculator calculator=(Calculator) customer.getServiceImpl(Calculator.class); calculator.add(1,2); calculator.multipy(3,6); } }
package github.com.AllenDuke.rpc.netty; import com.alibaba.fastjson.JSON; import github.com.AllenDuke.concurrentTest.threadPoolTest.ThreadPoolService; import github.com.AllenDuke.rpc.publicSource.Message; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.lang.reflect.Proxy; import java.util.concurrent.FutureTask; /** * @description netty消费者 * @contact [email protected] * @since 2020/2/11 */ public class NettyClient { //创建自己的线程池 private static ThreadPoolService executor = new ThreadPoolService(); public static NettyClientHandler clientHandler; /** * @description: 返回一个代理对象,其中该代理类中的InvokeHandler中的invoke方法(jdk动态代理的知识)的作用是, * 将调用信息封装成任务,提交到线程池,任务的返回值为 netty线程接收到的返回值 * @param serivceClass 要实现的接口 * @return: java.lang.Object * @date: 2020/2/12 */ public Object getServiceImpl(final Class<?> serivceClass) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serivceClass}, (proxy, method, args) -> { //lamda表达式,匿名内部类实现InvokeInhandler接口,重写invoke方法 if(method.getName().equals("toString")) { return "这是代理类的toString方法,避免递归调用"; } if(clientHandler==null) init();//懒加载 String className = serivceClass.getName(); className=className.substring(className.lastIndexOf(".")+1)+"Impl";//去掉包名 // //自定义消息协议 // String msg=className+"#"+method.getName()+"#"+args.length; // for (Object arg : args) { // if(arg!=null) msg+="#"+arg; // } //利用fastjson Message message=new Message(className,method.getName(),args); FutureTask<Object> task=new FutureTask<Object>(message); executor.execute(task); Object result=task.get();//主线程在这阻塞,等待结果 return JSON.parseObject((String) result,method.getReturnType());//将json文本转为对象 }); } //初始化netty客户端 private static void init() { clientHandler=new NettyClientHandler(); //创建EventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder());//inbound编码器 pipeline.addLast(new StringEncoder());//outbound解码器 pipeline.addLast(clientHandler);//业务处理器 } } ); try { bootstrap.connect("127.0.0.1", 7000).sync(); } catch (Exception e) { e.printStackTrace(); } } }
package github.com.AllenDuke.rpc.netty; import com.alibaba.fastjson.JSON; import github.com.AllenDuke.rpc.publicSource.Message; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * @description netty消费者的业务处理器 * @contact [email protected] * @since 2020/2/11 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter{ private ChannelHandlerContext context; private Object result; //与服务器的连接创建后,就会被调用, 这个方法是第一个被调用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.context=ctx; } //netty线程收到消息,调用handler的chaanneltRead方法,唤醒线程池中的工作线程 @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg; notify(); //唤醒等待的工作线程 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } public synchronized void sendMsg(Message message) throws InterruptedException { context.writeAndFlush(JSON.toJSONString(message));//netty线程发送json文本 wait(); //工作线程阻塞wait,等待channelRead方法获取到服务器的结果后,唤醒 System.out.println("服务端返回结果:"+result); } public Object getResult(){ return result; } }
package github.com.AllenDuke.rpc.provider; import github.com.AllenDuke.rpc.netty.NettyServer; /** * @description 服务端启动类,启动netty服务提供者 * @contact [email protected] * @since 2020/2/11 */ public class ServerBootstrap { public static void main(String[] args) { NettyServer.startServer("127.0.0.1", 7000); } }
package github.com.AllenDuke.rpc.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * @description netty服务提供者 * @contact [email protected] * @since 2020/2/11 */ public class NettyServer { public static void startServer(String hostName, int port) { startServer0(hostName, port); } private static void startServer0(String hostname, int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder());//inbound编码器 pipeline.addLast(new StringEncoder());//outbound解码器 pipeline.addLast(new NettyServerHandler());//业务处理器 } } ); ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync(); System.out.println("服务提供方开始提供服务~~"); channelFuture.channel().closeFuture().sync();//同步方法,直到有结果才往下执行 } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package github.com.AllenDuke.rpc.netty; import com.alibaba.fastjson.JSON; import github.com.AllenDuke.rpc.publicSource.Message; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; /** * @description netty服务提供者的业务处理器 * @contact [email protected] * @since 2020/2/11 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { //实现类所在的包名,可把类都先加载到一个HashMap中 private static String packageName = "github.com.AllenDuke.rpc.serviceImpl."; //key为实现方法的全限定名 private static final Map<String, Method> serviceMap = new HashMap<>(); //key为实现类的全限定名 private static final Map<String, Class> classMap = new HashMap<>(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取客户端发送的消息,并调用服务 System.out.println("服务器收到信息:" + msg + " 准备解码,调用服务"); //将json文本转换为对象 Message message=JSON.parseObject((String) msg, Message.class); // //解析自定义消息协议 // StringBuilder message = new StringBuilder(msg.toString()); // //解析类名 // int classIndex = message.indexOf("#"); // String className = message.substring(0, classIndex) + "Impl"; // message.delete(0, classIndex + 1); // //解析方法名 // int methodIndex = message.indexOf("#"); // String methodName = message.substring(0, methodIndex); // message.delete(0, methodIndex + 1); // //解析参数个数 // int argNumIndex = message.indexOf("#"); // int argNum=Integer.valueOf(message.substring(0,argNumIndex)); // message.delete(0,argNumIndex+1); // Object[] args=new Object[argNum]; // //解析参数,类型转换? // for (int i = 0; i < argNum; i++) { // if(i==argNum-1) args[i]=message.toString(); // else{ // int argIndex=message.indexOf("#"); // args[i]=message.substring(0,argIndex); // message.delete(0,argIndex+1); // } // } String className=message.getClassName(); String methodName=message.getMethodName(); Object[] args=message.getArgs(); Object result = null;//返回结果 Class serviceImpl = classMap.get(packageName + className); //这里forName去寻找类,也可以一开始就把包下的类都加载进来 if(serviceImpl==null) serviceImpl= Class.forName(packageName + className); //类中对应的方法 Method service = serviceMap.get(packageName + className + "." + methodName); if (service == null) for (Method method : serviceImpl.getMethods()) { if (method.getName().equals(methodName)) { service=method; serviceMap.put(packageName + className + "." + methodName, method);//找到后加入hashMap break; } } result = service.invoke(serviceImpl.newInstance(), args );//serviceImpl无参构造要public ctx.writeAndFlush(JSON.toJSONString(result));//转换为json文本 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
package github.com.AllenDuke.rpc.publicSource; import github.com.AllenDuke.rpc.netty.NettyClient; import github.com.AllenDuke.rpc.netty.NettyClientHandler; import java.util.concurrent.Callable; /** * @description 消息体 * @contact [email protected] * @since 2020/2/11 */ public class Message implements Callable { private static NettyClientHandler handler= NettyClient.clientHandler; private String className;//要调用的类名 private String methodName;//要调用的方法名 private Object[] args;//方法的参数 public Message(String className,String methodName,Object[] args){ this.className=className; this.methodName=methodName; this.args=args; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getArgs() { return args; } public void setArgs(Object[] args) { this.args = args; } //封装成任务后,由线程池的工作线程调用 public Object call() throws Exception { handler.sendMsg(this); return handler.getResult(); } }
package github.com.AllenDuke.rpc.publicSource; /** * @description sayHello服务 * @contact [email protected] * @since 2020/2/11 */ public interface HelloService { String hello(String name1,String name2,Integer num); }
package github.com.AllenDuke.rpc.publicSource; /** * @description 计算器服务 * @contact [email protected] * @since 2020/2/11 */ public interface Calculator { int add(int a,int b); int multipy(int a,int b); }
package github.com.AllenDuke.rpc.serviceImpl; import github.com.AllenDuke.rpc.publicSource.Calculator; /** * @description 计算器服务实现 * @contact [email protected] * @since 2020/2/11 */ public class CalculatorImpl implements Calculator { @Override public int add(int a, int b) { return a+b; } @Override public int multipy(int a, int b) { return a*b; } }
线程池的代码在前两篇文章中。
总结:
此次实现总体上看难度不大,但有三个点可能不容易想到。
1.动态代理,这里要站在框架使用者的角度思考,应该做到用户无感知,像是Mybatis的getMapper,还要注意invoke方法的实现。
2.一条消息也是一个任务,工作线程既是执行任务也是发送消息。将消息封装成一个任务,要从接口方法的有返回值来思考。
3.使用fastjson,一开始是自定义消息协议,然后解析,参数还要类型转换,而fastjson刚好可以做到。
当然,实现的方法有千百种,但在可以实现了的条件下,应该考虑怎样的设计看起来更合理,更容易理解,本次模拟实现肯定有很多不妥的地方,还望朋友不吝赐教,共同进步。