1.RPC的解释
RPC的全称是 Remote Process Call,即远程过程调用,它应用广泛,实现方式也很多,拥有RMI和WebService等诸多成熟的方案,在业界的到了广泛的使用。
2.对象序列化
无论是何种数据类型,最重都得转换W为二进制流在网络上传输。
- 将对象转化为二进制流的过程称为序列化。
- 将二进制流恢复对象的过程称为反序列化。
在java世界中序列化和反序列化的工具很多,在此笔者不再一一介绍了,请读者自行学习了。以下代码实现基于java原生API。
2.1 RPC的实现
RPC服务实现:
public class RpcServerTest { public static void main(String[] args) { IStudent iStudent = new IStudentImpl(); //发布服务 RpcServer.publish(iStudent, 9999); } }
接口与实现类:
public interface IStudent { String getNameById(int id); }
public class IStudentImpl implements IStudent { @Override public String getNameById(int id) { String st = "id:" + id + " name:lojzes"; return st; } }
RpcServer:发布服务
public class RpcServer { //线程池,用来处理客户端的请求 private static final ExecutorService executorService = Executors.newCachedThreadPool(); public static void publish(Object service, int port) { try { ServerSocket serverSocket = new ServerSocket(port); System.out.println("server started....."); //循环等待客户端连接 while (true) { Socket socket = serverSocket.accept(); //处理请求 executorService.execute(new ThreadHandler(service, socket)); } } catch (Exception e) { e.printStackTrace(); } finally { } } }ThreadHandler:
/** * 备注:处理请求任务 */ public class ThreadHandler implements Runnable { private Object service; private Socket socket; public ThreadHandler(Object service, Socket socket) { this.service = service; this.socket = socket; } @Override public void run() { ObjectInputStream objectInputStream = null; ObjectOutputStream outputStream = null; try { //反序列化获取RpcRequest objectInputStream = new ObjectInputStream(socket.getInputStream()); RpcRequest request = (RpcRequest) objectInputStream.readObject(); String methodName = request.getMethodName(); Class[] argsTypes = request.getArgsTypes(); Object[] args = request.getArgs(); Method method = service.getClass().getMethod(methodName, argsTypes); Object obj = method.invoke(service, args); outputStream = new ObjectOutputStream(socket.getOutputStream()); //输出给客户端结果 outputStream.writeObject(obj); outputStream.flush(); } catch (Exception e) { e.printStackTrace(); } finally { if (outputStream != null) { try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (objectInputStream != null) { try { objectInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
RPC客户端的实现:
public class RpcClientTest { public static void main(String[] args) { //生成代理 IStudent proxy = RpcClientProxy.proxy(IStudent.class, "127.0.0.1", 9999); //远程调用 String str = proxy.getNameById(2018); System.out.println("str = " + str); } }
RpcClientProxy:对客户端接口的代理,用于调用远程方法
public class RpcClientProxy { public static <T> T proxy(Class<T> interfaces,String host,int port){ return (T)Proxy.newProxyInstance(interfaces.getClassLoader(), new Class[]{interfaces}, new ClientInterfaceHandler(host,port)); } }
ClientInterfaceHandler:代理处理类
public class ClientInterfaceHandler implements InvocationHandler { private String host; private int port; public ClientInterfaceHandler(String host, int port) { this.host = host; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { TCPTransportUtils tcpTransportUtils = new TCPTransportUtils(host,port); RpcRequest request = new RpcRequest(); request.setMethodName(method.getName()); request.setArgsTypes(method.getParameterTypes()); request.setArgs(args); request.setClassName(method.getDeclaringClass().getName()); return tcpTransportUtils.send(request); } }TCPTransportUtils:对Tcp封装
public class TCPTransportUtils { private String host; private int port; public TCPTransportUtils(String host, int port) { this.host = host; this.port = port; } public Socket newSocket() { Socket socket = null; try { socket = new Socket(host, port); } catch (IOException e) { e.printStackTrace(); } return socket; } public Object send(RpcRequest rpcRequest) { Socket socket = newSocket(); ObjectOutputStream objectOutputStream = null; ObjectInputStream objectInputStream = null; try { objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(rpcRequest); objectOutputStream.flush(); //从服务端返回 objectInputStream = new ObjectInputStream(socket.getInputStream()); Object o = objectInputStream.readObject(); return o; } catch (Exception e) { e.printStackTrace(); } finally { if (objectOutputStream != null) { try { objectOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (objectInputStream != null) { try { objectInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } return null; } }
RpcRequest:对请求参数的封装
/** * 备注:对请求参数的封装 */ public class RpcRequest implements Serializable { private static final long serialVersionUID = 5523770308345478012L; //发布的服务 private String className; //方法名称 private String methodName; //方法参数类型 private Class[] argsTypes; //方法参数 private Object[] args; public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getArgs() { return args; } public void setArgs(Object[] args) { this.args = args; } public Class[] getArgsTypes() { return argsTypes; } public void setArgsTypes(Class[] argsTypes) { this.argsTypes = argsTypes; } @Override public String toString() { return "RpcRequest{" + "className='" + className + '\'' + ", methodName='" + methodName + '\'' + ", argsTypes=" + Arrays.toString(argsTypes) + ", args=" + Arrays.toString(args) + '}'; } }