参考
https://avro.apache.org/docs/current/gettingstartedjava.html
https://github.com/apache/avro
1、创建父子工程
ddshow-avro-test
ddshow-avro-test-api //配置接口协议文件,生成接口文件//客户端
ddshow-avro-test-core //服务端实现接口
2、ddshow-avro-test
pom 配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>happy-parent</artifactId> <groupId>com.youku.happy</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>com.laifeng</groupId> <artifactId>ddshow-avro-test</artifactId> <version>1.0-SNAPSHOT</version> <packaging>pom</packaging> <name>ddshow-avro-test</name> <url>http://www.laifeng.com</url> <modules> <module>ddshow-avro-test-core</module> <module>ddshow-avro-test-api</module> </modules> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
2、ddshow-avro-test-api
POM配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>ddshow-avro-test</artifactId> <groupId>com.laifeng</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>ddshow-avro-test-api</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>ddshow-avro-test-api</name> <url>http://www.laifeng.com/</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.laifeng</groupId> <artifactId>laifeng-rpc</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-ipc</artifactId> <version>1.7.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.7.7</version> <executions> <execution> <id>schemas</id> <phase>generate-sources</phase> <goals> <goal>schema</goal> <goal>protocol</goal> <goal>idl-protocol</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
3、编写schema文件
类型定义:
https://avro.apache.org/docs/current/spec.html#schema_primitive
Primitive Types The set of primitive type names is: null: no value boolean: a binary value int: 32-bit signed integer long: 64-bit signed integer float: single precision (32-bit) IEEE 754 floating-point number double: double precision (64-bit) IEEE 754 floating-point number bytes: sequence of 8-bit unsigned bytes string: unicode character sequence Primitive types have no specified attributes. Primitive type names are also defined type names. Thus, for example, the schema "string" is equivalent to: {"type": "string"} Complex Types Avro supports six kinds of complex types: records, enums, arrays, maps, unions and fixed. Records Records use the type name "record" and support three attributes: name: a JSON string providing the name of the record (required). namespace, a JSON string that qualifies the name; doc: a JSON string providing documentation to the user of this schema (optional). aliases: a JSON array of strings, providing alternate names for this record (optional). fields: a JSON array, listing fields (required). Each field is a JSON object with the following attributes: name: a JSON string providing the name of the field (required), and doc: a JSON string describing this field for users (optional). type: A JSON object defining a schema, or a JSON string naming a record definition (required). default: A default value for this field, used when reading instances that lack this field (optional). Permitted values depend on the field's schema type, according to the table below. Default values for union fields correspond to the first schema in the union. Default values for bytes and fixed fields are JSON strings, where Unicode code points 0-255 are mapped to unsigned 8-bit byte values 0-255. field default values avro type json type example null null null boolean boolean true int,long integer 1 float,double number 1.1 bytes string "\u00FF" string string "foo" record object {"a": 1} enum string "FOO" array array [1] map object {"a": 1} fixed string "\u00ff" order: specifies how this field impacts sort ordering of this record (optional). Valid values are "ascending" (the default), "descending", or "ignore". For more details on how this is used, see the the sort order section below. aliases: a JSON array of strings, providing alternate names for this field (optional). For example, a linked-list of 64-bit values may be defined with: { "type": "record", "name": "LongList", "aliases": ["LinkedLongs"], // old name for this "fields" : [ {"name": "value", "type": "long"}, // each element has a long {"name": "next", "type": ["null", "LongList"]} // optional next element ] }
3.1 example.avro
{ "namespace": "com.acme", "protocol": "HelloWorld", "doc": "Protocol Greetings", "types": [ {"name": "Greeting", "type": "record", "fields": [ {"name": "message", "type": "string"}]}, {"name": "Curse", "type": "error", "fields": [ {"name": "message", "type": "string"}]} ], "messages": { "hello": { "doc": "Say hello.", "request": [{"name": "greeting", "type": "Greeting" }], "response": "Greeting", "errors": ["Curse"] } } }
Avro的Schema文件有三种格式,依次是avdl,avpr,avsc,后两种是json格式,avdl可以转成avpr。
D:\IDEA\idea_project_new\ddshow-avro-test\ddshow-avro-test-api>mvn clean install
3.2
http://www.open-open.com/lib/view/open1369363962228.html
http://langyu.iteye.com/blog/708568
3.3 编写avpr文档
{ "protocol": "AnchorRecommendService", "namespace": "com.laifeng.anchor.api", "doc": "AnchorRecommendService API Protocol.", "types": [ { "type": "record", "name": "UserInfo", "fields": [ {"name": "userId","type": "int"} ] }, { "type": "record", "name": "AnchorInfo", "fields": [ {"name": "anchorId","type": "int"} ] }, { "type": "record", "name": "AnchorRecommendList", "fields": [ {"name": "AnchorRecommendList","type": ["null",{"type": "array", "items": "AnchorInfo","default":"null"}]} ] }, { "type": "error", "name": "AnchorRecommendServiceError", "fields": [ {"name": "code","type" : "int"}, {"name": "message","type": "string"} ] } ], "messages": { "getAnchorRecommendList": { "request": [ { "name": "userInfo", "type": "UserInfo" } ], "response": "AnchorRecommendList", "errors": [ "AnchorRecommendServiceError" ] } } }
3.4 通过命令生成aprv
http://mirrors.cnnic.cn/apache/avro/avro-1.7.7/java/
下载
avro-tools-1.7.7.jar
编写 AnchorRecommendService.txt 文件
/** * Messager AnchorRecommend API Protocol. * java -jar avro-tools-1.7.7.jar idl AnchorRecommendService.txt AnchorRecommendService.avpr */ @namespace("com.laifeng.anchor.api") protocol AnchorRecommendService { error AnchorRecommendServiceError { int code; string message; } array<int> getAnchorRecommendList(int userid) throws AnchorRecommendServiceError; }
通过命令
java -jar avro-tools-1.7.7.jar idl AnchorRecommendService.txt AnchorRecommendService.avpr
{ "protocol" : "AnchorRecommendService", "namespace" : "com.laifeng.anchor.api", "doc" : "* Messager AnchorRecommend API Protocol.", "types" : [ { "type" : "error", "name" : "AnchorRecommendServiceError", "fields" : [ { "name" : "code", "type" : "int" }, { "name" : "message", "type" : "string" } ] } ], "messages" : { "getAnchorRecommendList" : { "request" : [ { "name" : "userid", "type" : "int" } ], "response" : { "type" : "array", "items" : "int" }, "errors" : [ "AnchorRecommendServiceError" ] } } }
4、服务端写法
//zookeeper 连接初始化 String zookeeperConnection = configProperties.getProperty("rpc.zookeeper.connection"); if (StringUtils.isEmpty(zookeeperConnection)){ throw new IllegalArgumentException("Properties 'rpc.zookeeper.connection' is empty."); } curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnection, new RetryUntilElapsed(6 * 1000, 1000)); curatorFramework.start(); //注册zookeeper /laifeng节点 anchorRecommendServiceRegistry = new CuratorDiscoveryAvroServiceRegistry(curatorFramework); logger.info("AnchorRecommend server registry initialized."); int port = Integer.valueOf(configProperties.getProperty("anchorRecommend.server.port")); String host = configProperties.getProperty("anchorRecommend.server.host"); if (StringUtils.isEmpty(host)) { host = InetAddress.getLocalHost().getHostAddress(); } IP = host; anchorRecommendServer = new NettyServer(new SpecificResponder(AnchorRecommendService.class, anchorRecommendService), new InetSocketAddress(port)); ServiceDefination serviceDefination = ServiceDefination.newBuilder() .idPrefix(SERVER_PRE + NODE_ID) .appendUUID(false) .iface(AnchorRecommendService.class) .host(host) .port(port) .build(); anchorRecommendServiceRegistry.registerService(serviceDefination); System.out.println("Task anchorRecommendServiceRegistry start.");
5、客户端写法
public class ClientTest { AnchorRecommendService anchorRecommendService; public static void main(String[] args) throws Exception { //10.10.72.181:2181 dev String zookeeperConnection = System.getProperties().getProperty("rpc.zookeeper.connection", "10.10.72.181:2181"); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnection, new RetryUntilElapsed(6 * 1000, 1000)); curatorFramework.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println("Service reg changed client " + curatorFramework + " event " + curatorEvent + "."); } }); curatorFramework.start(); ServiceFactory serviceFactory = new CuratorDiscoveryAvroServiceFacotry(curatorFramework); ClientTest clientTest = new ClientTest(); clientTest.anchorRecommendService = serviceFactory.getService(AnchorRecommendService.class); try { List<Integer> list = clientTest.anchorRecommendService.getAnchorRecommendList(427757509); System.out.println("Task delay " + list.size() + "ms sended start."); if(CollectionUtils.isNotEmpty(list)){ for (int i = 0; i < list.size(); i++) { System.out.println("Anchorid" + list.get(i) + " sended."); } } } catch (AnchorRecommendServiceError e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } CloseableUtils.closeQuietly(serviceFactory); CloseableUtils.closeQuietly(curatorFramework); } }
7、本地运行服务端
VM options配置: -Dspring.profiles.active=dev -Xms1024M -Xmx1024M -Xss128k -XX:PermSize=256m -XX:MaxPermSize=256m
在zookeeper插件中会看到服务已注册
{"name":"com.laifeng.anchor.api.AnchorRecommendService","id":"AnchorRecommend-d8a62fba-4192-48d8-bcf4-54081bd8a6d8","address":"10.10.72.182","port":4356,"sslPort":null,"payload":null,"registrationTimeUTC":1440558189758,"serviceType":"DYNAMIC","uriSpec":{"parts":[{"value":"scheme","variable":true},{"value":"://","variable":false},{"value":"host","variable":true},{"value":":","variable":false},{"value":"port","variable":true}]}}
然后启动客户端
Service reg changed client org.apache.curator.framework.imps.CuratorFrameworkImpl@2de460dc event CuratorEventImpl{type=CLOSING, resultCode=0, path='null', name='null', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null}.
8、部署上线
mvn clean install
在D:\IDEA\idea_project_new\ddshow-avro-test\ddshow-avro-test-core\target目录下找到
ddshow-avro-test-core-1.0-SNAPSHOT-distribution.zip
在linux服务器上解压:
unzip ddshow-avro-test-core-1.0-SNAPSHOT-distribution.zip
tty:[0] jobs:[1] cwd:[/opt/cafe/ddshow-avro-test]
11:10 [[email protected]]$ ls
ddshow-avro-test-core ddshow-avro-test-core-1.0-SNAPSHOT ddshow-avro-test-core-1.0-SNAPSHOT-distribution.zip
9、启动命令
nohup java -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:/opt/cafe/ddshow-avro-test/ddshow-avro-test-core/logs/startup_gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/cafe/ddshow-avro-test/ddshow-avro-test-core/logs/startup_oom.log -Dspring.profiles.active=test -cp /opt/cafe/ddshow-avro-test/ddshow-avro-test-core-1.0-SNAPSHOT/classes:/opt/cafe/ddshow-avro-test/ddshow-avro-test-core-1.0-SNAPSHOT/lib/*:/opt/cafe/ddshow-avro-test/ddshow-avro-test-core-1.0-SNAPSHOT com.laifeng.anchor.sever.AnchorRecommendServer >> /opt/cafe/ddshow-avro-test/ddshow-avro-test-core/logs/startup_ddshow-avro-test.log 2>&1 &
10、启动两个以上服务后 在zookeeper上
注册了2个服务:
AnchorRecommend-7c5f0566-80d2-43b7-8b86-c10727a9922e
AnchorRecommend-d8a62fba-4192-48d8-bcf4-54081bd8a6d8
在本地启动客户端测试即可。