1.介绍
现在都喜欢用什么高大上的东西,弄出一堆框架来。RPC即为远程过程调用协议,让两个终端之间不需再关注网络传输的实现。
这里以实现简单聊天室为目的,一步一步搭建属于自己的RPC架构。这里起名为everyw,意为eyerywhere,在任何地方都可以使用。
服务器,提供服务者;客户端,使用服务者。
2.服务端功能
聊天服务器最基本的服务就是注册用户,发送聊天信息,这里新建一个表示服务器功能的接口
/** * 服务器功能 */ public interface Api { /** * 注册用户 * @param name * @return */ Message<Void> regist(String name); /** * 发送聊天信息 * @param message */ Message<Void> sendMessage(String message); }
为了显现服务器网络连通等出现的异常,这里提供了一个类似Optional的类Message
/** * 消息体封装类 */ public class Message<T> implements java.io.Serializable { private T value; private String error; private boolean hashError; public Message( ) { } public Message(T obj) { this.value=obj; } public Message(T obj,String error) { this.value=obj; this.error=error; hashError=true; } /** * 是否有异常 * * @return */ public boolean hasError() { return hashError; } /** * 返回默认信息 * * @return */ public String getError() { return error; } /** * 有网络异常时调用 * @param action * @return */ public Message<T> error(Consumer<String> action) { if(hashError) { action.accept(error); } return this; } /** * 成功返回时调用 * @param action * @return */ public Message<T> success(Consumer<Void> action) { if(error==null) { action.accept(null); } return this; } /** * 有返回值时返回 * @param action * @return */ public Message<T> value(Consumer<T> action) { if(value!=null) { action.accept(value); } return this; } public T getValue() { return value; } /** * @param error */ public void setError(String error) { this.error=error; hashError=true; } }
为了让Message可以传输,添加了java.io.Serializable接口。
3.客户端能力
客户端有时需要向服务器展现自己的能力,方便服务器调用。一个简单的聊天室就具有接收消息,处理消息的能力。这里新建一个表示客户端能力的接口
/** * 客户端能力 */ public interface Listener { /** * 接收到消息 */ void onMessage(String msg); }
5.客户端实现
作为一个客户端,除了需要知道服务器的地址,就不需要关注其具体通信了。
public static void main(String[] args) { String url = "127.0.0.1:8080"; ApplicationContext context = ApplicationContext.getContext(url); Api api = context.getService(Api.class); Listener listener = message -> { System.out.println("消息:" + message); }; context.registListener(listener); try (Scanner br = new Scanner(System.in)) { System.out.println("请输入你的名字:"); String line = br.nextLine(); api.regist(line).error(e -> { System.err.println("注册失败:" + e); System.exit(1); }); while (true) { line = br.nextLine(); api.sendMessage(line); } } }
代码量很少很简洁,也无任何网络通信的代码。
ApplicationContext 将是重点要实现的内容,这里通过getContext获取单例应用上下文,getService获取具体服务实现,registListener则是向服务展现客户端能力。
6.服务端实现
首先需要服务的实现者,任何类只需要实现服务Api接口即可。
/** * 服务实现者 */ public static class ServerWoker implements Api { private ClientContext client; private String name; /** * * @param client * 当前客户端上下文 */ public ServerWoker(ClientContext client) { this.client = client; } @Override public Message<Void> regist(String name) { Logger.d(client +"注册用户名:"+name); this.name = name; List<ClientContext> clients = client.getServerContext().getClientContexts(); if (clients != null) { for (ClientContext c : clients) { if (name.equals(c.getSession(true).get("name"))) { return new Message<Void>(null, "用户名已存在"); } } } // 储存用户名 client.getSession(true).put("name", name); return new Message<Void>(); } @Override public Message<Void> sendMessage(String message) { Logger.d("客户端["+client +"]发送消息:"+name+":"+message); List<ClientContext> clients = client.getServerContext().getClientContexts(); if (clients != null) { clients.forEach(c -> { Listener listener = c.getListener(Listener.class); if (listener != null) { listener.onMessage(name + ":" + message); } }); } return new Message<Void>(); } }
业务逻辑也非常的清晰明了。
服务端启动代码也非常简洁容易明白。
public static void main(String[] args) { //创建服务上下文 ServerContext context = ServerContext.create(8080); //为每个客户端绑定能力 context.bindService(client -> { return new ServerWoker(client); }); context.start();// 启动服务 }
这里ServerContext为服务器上下文,包括了创建、启动服务,绑定能力,获取所有客户端上下文等功能。
ClientContext客户端上下文,可以获取使用获取客户端能力,保存使用客户信息等功能。
7.项目组织结构
在实现具体代码之前,先看一下项目结构,如下
依赖关系图,
8.客户端架构具体实现过程
ApplicationContext首先需要通过socket连接,和创建一个读取线程
private ApplicationContext(String url) { int i = url.lastIndexOf(":"); if (i == -1) { this.url = url; this.port = 80; } else { this.url = url.substring(0, i); this.port = Integer.parseInt(url.substring(i + 1)); } isStart = false; listeners=new ArrayList<>(); map=new HashMap<>(); } /** * 启动连接 * @throws IOException @throws */ private synchronized void start() { try { socket = new Socket(url, port); writer = new DataOutputStream(socket.getOutputStream()); reader = new DataInputStream(socket.getInputStream()); new ReadWorker().start(); isStart = true; } catch (IOException e) { e.printStackTrace(); } }
ApplicationContext#getService 传入的是Class对象,并不是具体实现对象。Proxy 提供用于创建动态代理类和实例的静态方法,它还是由这些方法创建的所有动态代理类的超类,通过代理反射的方式可以将抽象接口在运行时转变为具体实现类。
/** * 获取api服务 * * @param clz * @param url * @return */ public <T> T getService(Class<T> clz) { if (!isStart) { start(); } InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Exception { try { // 向服务端发送数据 long bid=System.currentTimeMillis();//消息块id StringBuffer head = new StringBuffer("clz=" + clz.getName() + ";mth=" + method.getName()+";type="+method.getGenericReturnType() + ";length=" + args.length + ";bid="+bid+";"); Logger.d(head); if (args.length != 0) { head.append(Object2String(args)); } writer.writeUTF(head.toString()); writer.flush(); Logger.d("发送完消息"); //等待来接服务器的消息 String data =getAndwait(bid); Logger.d("接收消息"); return String2Object(data); // 服务端接收数据 } catch (Throwable ex) { ex.printStackTrace(); Message result = (Message) method.getReturnType().newInstance(); result.setError(ex.getMessage()); return result; } } }; return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[] { clz }, handler); }
将请求方法,参数封装为协议,通过DataOutputStream将实际请求通过发送出去,并在读取线程中从DataInputStream获取响应数据并转换为返回对象。
Object2String,String2Object这里采用ObjectStream的方式进行转换。
/** * 转换为Object * * @param data * @return * @throws IOException * @throws ClassNotFoundException */ public static Object String2Object(String data) throws IOException, ClassNotFoundException { byte[] bytes = Base64.getDecoder().decode(data); ObjectInputStream oi = new ObjectInputStream(new ByteArrayInputStream(bytes)); return oi.readObject(); } /** * 转换为字符串 * * @param obj * @return * @throws IOException */ public static String Object2String(Object[] objs) throws IOException { return Base64.getEncoder().encodeToString(Object2Bytes(objs)); } /** * 转换为byte数组 * * @param obj * @return * @throws IOException */ public static byte[] Object2Bytes(Object[] objs) throws IOException { ByteArrayOutputStream bot = new ByteArrayOutputStream(); ObjectOutputStream ot = new ObjectOutputStream(bot); for (int i = 0; i < objs.length; i++) { ot.writeObject(objs[i]); } return bot.toByteArray(); }
registListener保存注册器,以等待服务器的调用
/** * 注册能力 */ public void registListener(Object listener) { listeners.add(listener); Class clz = listener.getClass(); long bid=System.currentTimeMillis();//消息块id StringBuffer head = new StringBuffer("clz=" + clz.getName() + ";bid="+bid+";"); try { writer.writeUTF(head.toString()); writer.flush(); } catch (IOException e) { e.printStackTrace(); } }
最后客户端读取线程读取数据,根据不同实现分发功能
public void run() { String line=null; try { while((line=reader.readUTF())!=null) { String[] datas = line.split(";"); if(datas.length==2)//服务端能力回调 { long bid=Long.parseLong(datas[0].split("=")[1]); map.put(bid, datas[1]); Logger.d("服务端能力回调:"+bid); synchronized(lock) { lock.notifyAll(); } }else//服务器调用客户端能力 { String cls = datas[0].split("=")[1]; Class clz = Class.forName(cls); String mth = datas[1].split("=")[1]; String type = datas[2].split("=")[1]; int alen = Integer.parseInt(datas[3].split("=")[1]); long bid = Long.parseLong(datas[4].split("=")[1]); Logger.d("调用客户端能力:"+datas[0]+";"+datas[1]); Object[] args = null; if (alen > 0) { args = String2Objects(datas[5], alen); } for (Object listener : listeners) { Class lcz = listener.getClass(); if (clz.isAssignableFrom(lcz)) { try { Method ath = findMethod(lcz, mth, type, alen); Logger.d("找到能力方法:"+ath); ath.invoke(listener, args); } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { e.printStackTrace(); } } } } } } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } }
9.服务端架构具体实现过程
首先服务器开启ServerSocket监听,处理来接客户端的连接。
@Override public void run() { Logger.d("正在启动服务"); try (ServerSocket server = new ServerSocket(port);) { while (true) { Socket socket = server.accept(); Logger.d("新客户端连接"); ClientContext client = new ClientContext(this, socket,funs); clients.add(client); } } catch (IOException e) { e.printStackTrace(); } }
bindService保存服务具体实现对象,连接时以传递给每个新客户端上下文
/** * 暴露服务给客户端 * * @param fun */ public void bindService(Function<ClientContext, Object> fun) { this.funs.add(fun); }
ClientContext客户端上下文,包括了处理读写信息的转换,服务能力的回调,创建读取处理线程等
/** * @param serverContext * @param socket * @param funs * @throws IOException */ ClientContext(ServerContext serverContext, Socket socket, List<Function<ClientContext, Object>> funs) throws IOException { this.serverContext = serverContext; this.socket = socket; writer = new DataOutputStream(socket.getOutputStream()); reader = new DataInputStream(socket.getInputStream()); // 初始化能力 apis = new ArrayList<>(); for (Function<ClientContext, Object> fun : funs) { Object api = fun.apply(this); apis.add(api); } new ReadWoker().start(); }
getListener获取客户端能力,然服务端并没有客户端具体的实现对象,同样是通过代理方式实现远程的调用
/** * 返回客户端能力 * * @param <T> * @param clz */ public <T> T getListener(Class<T> clz) { Logger.d("返回能力"); // 检验客户端是否有这能力 InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 向客户端发送数据 Logger.d("向客户端发送消息"); long bid = System.currentTimeMillis();// 消息块id StringBuffer head = new StringBuffer("clz=" + clz.getName() + ";mth=" + method.getName() + ";type=" + method.getGenericReturnType() + ";length=" + args.length + ";bid=" + bid + ";"); Logger.d(head); if (args.length != 0) { head.append(Object2String(args)); } writer.writeUTF(head.toString()); writer.flush(); // 为了简化,客户端能力不返回数据 return null; } }; return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[] { clz }, handler); }
最后在读取线程里,分发处理来自客户端不同的请求协议,调用服务端功能或者注册客户端能力
@Override public void run() { try { String line = null; while ((line = reader.readUTF()) != null) { Logger.d("接收指令"); String[] datas = line.split(";"); String cls = datas[0].split("=")[1]; if (datas.length >= 4)// 调用服务功能 { Class clz = Class.forName(cls); String mth = datas[1].split("=")[1]; String type = datas[2].split("=")[1]; int alen = Integer.parseInt(datas[3].split("=")[1]); long bid = Long.parseLong(datas[4].split("=")[1]); Logger.d("调用服务器功能:" + datas[0] + ";" + datas[1] + ";" + datas[2] + ";" + datas[3] + ";" + datas[3]); Object[] args = null; if (alen > 0) { args = String2Objects(datas[5], alen); } for (Object api : apis) { Class apz = api.getClass(); if (clz.isAssignableFrom(apz)) { try { Method ath = findMethod(apz, mth, type, alen); Object robj = ath.invoke(api, args); writer.writeUTF("bid=" + bid + ";" + Object2String(new Object[] { robj })); Logger.d("写入完毕!"); } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { e.printStackTrace(); // TODO 异常放入返回? } break; } } } else// 注册客户端功能 { long bid = Long.parseLong(datas[1].split("=")[1]); Logger.d("注册客服端能力:" + cls); } } } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }finally { serverContext.getClientContexts().remove(ClientContext.this); } }
10.运行结构
运行demo服务端,运行多个demo客户端,实现简单聊天室功能