使用Avro可实现如下几种方式的轻量级RPC, 每种方式都可用动态编码和静态编码来实现:
HTTP:
HttpServer
HttpTransceiver
UDP
DatagramServer
DatagramTransceiver
Netty
NettyServer
NettyTransceiver
TCP
SocketServer
SocketTransceiver
安全TCP
SaslSocketServer
SaslSocketTransceiver
1. 添加maven依赖包
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-ipc</artifactId> <version>1.8.1</version> </dependency>
2. 静态方法实现Netty RPC
2.1 下载 avro-tools-1.8.1.jar
2.2 编译mail.avpr, 生成java代码Message.java 和 Mail.java
mail.avpr:
{"namespace": "org.hdp.practice.rpc.netty", "protocol": "Mail", "types": [ {"name": "Message", "type": "record", "fields": [ {"name": "to", "type": "string"}, {"name": "from", "type": "string"}, {"name": "body", "type": "string"} ] } ], "messages": { "send": { "request": [{"name": "message", "type": "Message"}], "response": "string" } } }
运行命令: java -jar avro-tools-1.8.1.jar compile protocol mail.avpr .
2.3 java代码
public class MyServer { public static class MailImpl implements Mail{ @Override public Utf8 send(Message message) { System.out.println("Sending message"); return new Utf8("Sending message to " + message.getTo().toString() + " from " + message.getFrom().toString() + " with body " + message.getBody().toString()); } } public static void main(String[] args) throws IOException { System.out.println("Starting server"); NettyServer server = new NettyServer(new SpecificResponder(Mail.class, new MailImpl()), new InetSocketAddress(65111)); System.out.println("Server started"); } } public class MyClient { public static void main(String[] args) throws IOException { if (args.length != 3) { System.out.println("Usage: <to> <from> <body>"); System.exit(1); } NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(65111)); Mail proxy = (Mail) SpecificRequestor.getClient(Mail.class, client); System.out.println("Client built, got proxy"); Message message = new Message(); message.setTo(new Utf8(args[0])); message.setFrom(new Utf8(args[1])); message.setBody(new Utf8(args[2])); System.out.println("Calling proxy.send with message: " + message.toString()); System.out.println("Result: " + proxy.send(message)); client.close(); } }
3. 动态方法实现HTTP RPC
3.1 message.avpr
{ "namespace": "cn.slimsmart.avro.demo", "protocol": "messageProtocol", "doc": "This is a message.", "name": "Message", "types": [ {"name":"message", "type":"record", "fields":[ {"name":"name", "type":"string"}, {"name":"type", "type":"int"}, {"name":"price", "type":"double"}, {"name":"valid", "type":"boolean"}, {"name":"content", "type":"string"} ] } ], "messages": { "sendMessage":{ "doc" : "message test", "request" :[{"name":"message","type":"message" }], "response" :"message" } } }
3.2 java 代码
public class Server extends GenericResponder { private Protocol protocol = null; private int port; public Server(Protocol protocol, int port) { super(protocol); this.protocol = protocol; this.port = port; } @Override public Object respond(Message message, Object request) throws Exception { GenericRecord req = (GenericRecord) request; GenericRecord reMessage = null; if (message.getName().equals("sendMessage")) { GenericRecord msg = (GenericRecord)req.get("message"); System.out.print("接收到数据:"); System.out.println(msg); //取得返回值的类型 reMessage = new GenericData.Record(protocol.getType("message")); //直接构造回复 reMessage.put("name", "苹果"); reMessage.put("type", 100); reMessage.put("price", 4.6); reMessage.put("valid", true); reMessage.put("content", "最新上架货物"); } return reMessage; } public void run() { try { HttpServer server = new HttpServer(this, port); server.start(); server.join(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new Server(Utils.getProtocol(), 9090).run(); } } public class Client { private Protocol protocol = null; private String host = null; private int port = 0; private int count = 0; public Client(Protocol protocol, String host, int port, int count) { this.protocol = protocol; this.host = host; this.port = port; this.count = count; } public long sendMessage() throws Exception { GenericRecord requestData = new GenericData.Record(protocol.getType("message")); requestData.put("name", "香梨"); requestData.put("type", 36); requestData.put("price", 5.6); requestData.put("valid", true); requestData.put("content", "价钱便宜"); // 初始化请求数据 GenericRecord request = new GenericData.Record(protocol.getMessages().get("sendMessage").getRequest()); request.put("message", requestData); Transceiver t = new HttpTransceiver(new URL("http://" + host + ":" + port)); GenericRequestor requestor = new GenericRequestor(protocol, t); long start = System.currentTimeMillis(); for (int i = 0; i < count; i++) { Object result = requestor.request("sendMessage", request); if (result instanceof GenericData.Record) { GenericData.Record record = (GenericData.Record) result; System.out.println(record); } } long end = System.currentTimeMillis(); System.out.println((end - start)+"ms"); return end - start; } public long run() { long res = 0; try { res = sendMessage(); } catch (Exception e) { e.printStackTrace(); } return res; } public static void main(String[] args) throws Exception { new Client(Utils.getProtocol(), "127.0.0.1", 9090, 5).run(); } } public class Utils { public static Protocol getProtocol() { Protocol protocol = null; try { String url = Utils.class.getResource("").getPath()+"message.avpr"; protocol = Protocol.parse(new File(url)); } catch (IOException e) { e.printStackTrace(); } return protocol; } }
参考:
https://github.com/phunt/avro-rpc-quickstart/
https://my.oschina.net/tearsky/blog/509610
http://blog.jobbole.com/92290/