etcd事件监听

    etcd是CoreOS开发的一个高可用的键值存储系统,主要用于共享配置和服务发现。它使用Go语言编写,并通过Raft一致性算法处理日志复制以保证强一致性。etcd目前的最新版本是2.2.0。

    和zookeeper的二进制接口不同,它提供了HTTP/JSON的rest api接口,所以对使用它的客户端来说是很友好的,几乎每种编程语言都有较成熟的http client开发包,基于这些开发包很容易编写etcd客户端。在java中已经有个比较好的客户端etcd4j(https://github.com/jurmous/etcd4j),类似zookeeper有好用的curator库。

   

   etcd事件

    etcd中和数据变化(包括目录&值key、value变化)相关的事件类型有:set, delete, update, create, compareAndSwap、compareAndDelete。在etcd http response (json格式)action属性就是事件类型。

     etcd的事件watch监听接口也是使用http访问,它提供两种监听模式,一种是一次性监听,类似zookeeper的事件watch,监听到一次事件后,需要客户端重新发起监听,比较繁琐。另一种是持久监听(stream),当有事件时,会连续触发,不需要客户端重新发起监听。

     和zookeeper一样也存在客户端丢失监听事件的可能,它不保证每个事件客户端都能监听到,不过我们在实际使用过程中,通常是在应用启动时,在开始监听之后或之前先查询etcd,将需要的数据全量加载到内存中,后续才是根据监听事件来增量更新内存中的数据或全量刷新数据。另外目前版本的etcd最多只保存1千条事件历史,满1千条后,如果有新的事件产生,事件历史库中最老的事件将被丟弃,这个可以理解为etcd服务端事件丢失。由于有1千条的限制,在有事件浪涌时(在单位时间内例如1秒内产生2千条事件),事件监听处理较慢的时候或未监听时也会发生客户端事件丢失。   

 

 

    一次性watch监听

   可用curl发送http请求来进行演示,假设在本机安装有etcd,客户端访问端口为2379(旧版本监听端口为4001),监听key为/application。一次性监听命令如下:

    curl "http://127.0.0.1:2379/v2/keys/application?wait=true"

    查询字符串中的wait=true表示监听,还可加查询参数recursive=true表示附加监听/application的子目录和后辈目录,例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true" 。

    启动监听后etcd如有变化事件发生,则返回json格式的事件,结果类似如下:

    HTTP/1.1 200 OK

    Content-Type: application/json

    X-Etcd-Cluster-Id: e88d54f6225f06ad

    X-Etcd-Index: 271

    X-Raft-Index: 872202

    X-Raft-Term: 5

    Date: Sat, 26 Sep 2015 08:43:17 GMT

    Transfer-Encoding: chunked

 

    {"action":"set","node":{"key":"/application/store","value":"        {...}","modifiedIndex":271,"createdIndex":271},"prevNode":{...}}

   上面的curl返回结果中显示了http头,是加了curl的-i选项。

   从结果可看出action属性为set事件,modifiedIndex是该事件的index,为271。X-Etcd-Index头表示启动监听时etcd的当前index。etcd的index是单调递增的正整数,该整数取值空间是etcd中的所有key共享的。

    由于是一次性监听,所以curl会退出,继续监听要重新运行curl。

    和zookeeper事件监听一样,上面的http请求只能监听到命令发出之后产生的事件(更精确的说是在etcd服务器收到监听请求之后),之前发生的事件是监听不到的。另外在前一个监听得到触发开始(中间包括事件处理耗时)到启动下一次监听之间(并且下一次监听是etcd服务器收到监听请求才真正开始生效,这中间还有网络延)肯定存在时间间隔,如果在这个时间间隔(区间)内有事件发生,客户端是监听不到这些事件的,这个也和zookeeper类似。也就是如果两个事件并行发生或在发生时间上相距很近(例如update一个key后立即delete),后一事件的发生刚好落在这个间隔内,那这个事件在采用一次性监听模式的客户端中会丢失,这些事件对客户端监听不可见。

    

 

    那是不是就一定不能获取先前的事件了?这个要看情况,只要事件还在etcd的事件历史中,并且知道事件的index或起始index,还是可以取到单个事件的。在查询字符串中加waitIndex参数,其值是该事件的index或起始index。例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true&waitIndex=269" ,表示查询/application目录或后辈目录中index等于或大于269的事件,如果有则返回一个最接近269的事件,否则会挂起curl,直到有满足条件的事件发生。这个其实是查询事件,已经不是正规的事件监听了。所以如果知道起始index,可以每次index加1来遍历查询事件,遍历查询时可以根据返回事件中的index来调整后续查询条件中使用的index。 

    如果waitIndex对应的事件已被etcd丢弃(见前面提到的etcd事件历史库),此时如果获取该事件,etcd将返回http状态码400( Bad Request),http body类似如下:

    {"errorCode":401,"message":"The event in requested index is outdated and cleared","cause":"the requested history has been cleared [3305/1]","index":4304} 

    另外可以监听根目录(根key),例如curl "http://127.0.0.1:2379/v2/keys?wait=true&recursive=true" 。

 

    持久监听(流式watch)

    要在查询字符串中加stream=true,例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true&stream=true"

    发出上述命令后,会和etcd服务器之间建立一个http长连接(此处的长连接不是指http keep alive,虽然也需要keep alive),其应答http头中的Transfer-Encoding为chunked,类似server push和comet中的multipart/x-mixed-replace,其http response body不结束,后续的每个事件作为http body的一部分,在该长连接中以一个或多个body chunk块的形式推送给客户端。可以看到在流式持久监听中curl即使收到事件也不会退出,它一直在等待后续将要发生的事件。一次性监听http头中也是chunked,但收到一个事件后,etcd会发送一个结束chunk(大小为0,表示http response结束),因此收到该chunk后curl会退出。

     相比一次性监听,除了更简便,持久监听也比一次性监听可靠性高,不会出现上面提到的在时间间隔内监听不到事件的情况。

      持久监听也是监听从命令发出之后发生的事件,先前的事件是监听不到的,不像JMS的持久订阅可以收到启动之前的jms消息。

      如果在持久监听中加waitIndex参数,分两种情况:一种是waitIndex的值小于或等于启动监听时etcd的当前index(这个值在X-Etcd-Index http头中可看到),则curl接收到满足条件的事件后挂起(不退出),后续再也收不到其他事件,即最多只能收到一个事件。第二种是waitIndex的值大于启动监听时etcd的当前index,则waitIndex参数无效,持久监听的行为同没有waitIndex参数一样。

 

 

   使用etcd4j进行事件监听

   在使用etcd4j进行事件监听时,有个注意事项:如果在同一个jvm虚拟机中既有修改查询etcd数据的操作,也有监听etcd事件,监听事件使用的EtcdClient实例和修改&查询操作使用的EtcdClient实例应分开,不能共用同一个实例,否则目前的etcd4j(2.7.0版本)会出现问题。多个key可以共用一个EtcdClient实例来进行监听,也可每个key使用一个单独的实例来进行监听。

  • 一次性监听

  //创建etcd客户端实例

  EtcdClient   etcdClient = new EtcdClient (new URI[]{new URI("127.0.0.1:2379")});

  /*setRetryPolicy设置重试策略

  * recursive()监听/application目录和其后辈目录中的事件,设置查询字符串recursive=true.

  *  waitForChange()监听,设置查询字符串wait=true.

 * -1表示永不超时,send()发送http请求进行监听

  */

  EtcdResponsePromise<EtcdKeysResponse> promise = etcd.get("/application").setRetryPolicy(

                        new RetryNTimes(300,Integer.MAX_VALUE)).recursive().waitForChange().

                        timeout(-1, TimeUnit.SECONDS).send();

    //增加listener

    promise.addListener(new IsSimplePromiseResponseHandler<EtcdKeysResponse>()

    {

         @Override

         public void onResponse(ResponsePromise<EtcdKeysResponse> promise) {

                   //从promise中取响应,检查是否有异常

                   EtcdKeysResponse response = promise.getNow();

                   Throwable  exception = promise.getException();

                    ... 处理事件

                    //再次发起http请求进行监听

                    ...

         }   

     }

   );

 

     还可使用waitForChange(long waitIndex)来监听指定的事件。

 

  • 持久监听

     etcd4j的当前版本2.7.0不支持持久监听,可以使用

Java异步async httpclient(https://github.com/AsyncHttpClient/async-http-client)来实现持久监听,最挫最原始的持久监听实验代码如下:

     AsyncHttpClient asyncHttpClient = ...;

      //发送http get异步请求,不超时,且提供一个匿名的异步请求完成handler

      asyncHttpClient.prepareGet("http://127.0.0.1:2379/v2/keys/application?   wait=true&recursive=true&stream=true").setRequestTimeout(-1).execute(

       new AsyncHandler<String>() {

           //这个异常回调方法可类比为zookeeper的会话过期

           @Override

           public void onThrowable(Throwable t) {

                  // TODO Auto-generated method stub

                  //todo:异常,重新重试发起监听

            }

             

             //在会话期,有事件时会回调onBodyPartReceived方法

           @Override

           public State  onBodyPartReceived(HttpResponseBodyPart  bodyPart)

throws Exception {

                 // TODO Auto-generated method stub

                // bodyPart对应一个http chunk块,一个事件的数据可能由多个chunk块组成

                System.out.println("event="+new String(bodyPart.getBodyByteBuffer().array(),"utf-8"));

                //todo:将chunk中的数据加入缓冲区,待收到完整的事件数据后将json格式

                //的数据解析成java对象

               ...

               return State.CONTINUE;

         }

  

        @Override

        public  State onStatusReceived(HttpResponseStatus  responseStatus)

throws Exception {

               // TODO Auto-generated method stub

               return State.CONTINUE;

         }

            

            //一个会话期只调用一次,这个回调方法可类比为zookeeper会话建立

         @Override

         public  State onHeadersReceived(HttpResponseHeaders headers)

throws Exception {

               // TODO Auto-generated method stub

               return State.CONTINUE;

          }

 

         @Override

          public String onCompleted() throws Exception {

                    // TODO Auto-generated method stub

                   return "endWatch";

          }

   

    });

   

     

    

 

    

    

    

   

    

 

 

    

猜你喜欢

转载自wanshi.iteye.com/blog/2246352