Zookeeper学习笔记-Curator基本操作及应用场景

Curator是什么?

Apache Curator是针对Zookeeper开发的一个Java客户端类库,封装了Zookeeper底层的一些开发细节,如连接重试、重复注册Watcher、NodeExistsException等。

除此之外,Curator还提供了各种常见应用场景的抽象封装,如分布式锁、Master选举、和分布式计数器。

先附上Curator官网地址

http://curator.apache.org/

使用Curator时,从兼容性考虑,Zookeeper版本最好为3.5+

Maven依赖简介

GroupId ArtifactId 描述
org.apache.curator curator-recipes 常见应用场景(分布式锁、master选举)的封装,依赖client与framework包
org.apache.curator curator-async Asynchronous DSL with O/R modeling, migrations and many other features(不知咋翻译)
org.apache.curator curator-framework 基于client包,提供了一些高级特性(链接重试等)
org.apache.curator curator-client 封装了zk的基本API
org.apache.curator curator-test 提供一些测试功能
org.apache.curator curator-examples curator的使用例子
org.apache.curator curator-x-discovery 基于framework,实现服务发现功能
org.apache.curator curator-x-discovery-server 和discovery包一并使用,提供了restful风格的服务器

如果只想使用Curator操作Zookeeper增伤改查,则使用curator-client包,及curator-framework包足以,maven依赖如下,由于我安装的zk版本是3.4.12,而curator从3.x版本开始需要zk3.5.x版本,否则会报错,故下文curator版本使用2.13.0

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>2.13.0</version>
</dependency>

如果想使用Curator实现分布式锁、master选举,则依赖curator-recipes包

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.13.0</version>
</dependency>

客户端基本操作

会话创建

调用org.apache.curator.framework.CuratorFrameworkFactory中newClient静态方法创建客户端对象

	public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) {
        return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
    }

    public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
        return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();
    }

参数说明如下

参数名 说明
connectString zk服务器地址,多个逗号分隔,如host1:port1,host2:port2
sessionTimeoutMs 会话超时时间,DEFAULT_SESSION_TIMEOUT_MS=60*1000毫秒
connectionTimeoutMs 链接超时时间,DEFAULT_CONNECTION_TIMEOUT_MS=15*1000毫秒
retryPolicy 重试策略

先介绍下ExponentialBackoffRetry,其构造函数如下

/**
     * @param baseSleepTimeMs 初始重试等待时间
     * @param maxRetries 最大重试次数
     */
    public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
    {
        this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
    }

    /**
     * @param baseSleepTimeMs 初始重试等待时间
     * @param maxRetries 最大重试次数
     * @param maxSleepMs 最大重试等待时间
     */
    public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
    {
        super(validateMaxRetries(maxRetries));
        this.baseSleepTimeMs = baseSleepTimeMs;
        this.maxSleepMs = maxSleepMs;
    }

其每次获取等待时间代码如下,基本思路就是随着重试次数增加,重试时间间隔也会增加,如果重试时间间隔大于最大重试时间间隔,则返回最大重试时间间隔

protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
    {
        // copied from Hadoop's RetryPolicies.java
        long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
        if ( sleepMs > maxSleepMs )
        {
            log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
            sleepMs = maxSleepMs;
        }
        return sleepMs;
    }

下面创建客户端对象

public static void main(String[] args) {
        String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000,3,5000);
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                .connectString(zkServerAddress)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        //很重要 一定要调用start来创建session链接
        zkClient.start();
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

结果如下
在这里插入图片描述

新增节点-create()

调用客户端类org.apache.curator.framework.CuratorFramework#create方法可以创建node节点,该方法返回CreateBuilder对象,该类中几个关键方法说明如下

  1. forPath
/**
     * 根据路径创建节点,并塞入指定数据
     */
    public T        forPath(String path, byte[] data) throws Exception;

    /**
     * 根据路径创建节点,并塞入空数据
     */
    public T        forPath(String path) throws Exception;

执行

public static void main(String[] args) {
        CuratorFramework zkClient = getZkClient();
        try {
            zkClient.create().forPath("/createNode");
            zkClient.create().forPath("/createNodeWithData","chenyin".getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

结果如下
在这里插入图片描述

如果想创建一个二级目录节点的话,如/createNode1/createNode2
执行如下代码

public static void main(String[] args) {
        CuratorFramework zkClient = getZkClient();
        try {
            zkClient.create().forPath("/createNode1/createNode2");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

发现报错,信息如下,NoNodeException

org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /createNode1/createNode2
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
	at org.apache.curator.framework.imps.CreateBuilderImpl$11.call(CreateBuilderImpl.java:740)
	at org.apache.curator.framework.imps.CreateBuilderImpl$11.call(CreateBuilderImpl.java:723)
	at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109)
	at org.apache.curator.framework.imps.CreateBuilderImpl.pathInForeground(CreateBuilderImpl.java:720)
	at org.apache.curator.framework.imps.CreateBuilderImpl.protectedPathInForeground(CreateBuilderImpl.java:484)
	at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:474)
	at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:454)
	at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:44)
	at com.company.project.core.Zookeeper.main(Zookeeper.java:15)

这是因为找不到createNode1父节点导致,下面介绍CreateBuilder下另外一个方法

  1. creatingParentsIfNeeded
/**
     * 在创建节点时,若父节点不存在,会递归创建节点的父节点
     */
    public ProtectACLCreateModePathAndBytesable<String> creatingParentsIfNeeded();

下面运行,结果正常,不报错

特别需要注意的是,该方法递归创建的父节点一定为持久节点

public static void main(String[] args) {
        CuratorFramework zkClient = getZkClient();
        try {
            zkClient.create().creatingParentsIfNeeded().forPath("/createNode1/createNode2");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  1. withMode
 /**
     * 创建指定类型节点,默认节点类型为持久节点
     */
    public T withMode(CreateMode mode);

CreateMode枚举如下

  1. PERSISTENT(永久节点)
  2. PERSISTENT_SEQUENTIAL(永久有序节点)
  3. EPHEMERAL(临时节点)
  4. EPHEMERAL_SEQUENTIAL(临时有序节点)

具体代码不再演示

查询节点-getData()

获取节点数据代码如下
forPath返回byte[]数组,需要转换成自己需要的数据类型

 public static void main(String[] args) {
        CuratorFramework zkClient = getZkClient();
        try {
            byte[] bytes = zkClient.getData().forPath("/createNodeWithData");
            System.out.println(new String(bytes, StandardCharsets.UTF_8));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

如果要获取节点属性信息怎么做?获取属性方法如下

/**
     * 将节点属性信息存储到stat对象中
     */
    public T storingStatIn(Stat stat);

具体操作如下

public static void main(String[] args) {
        CuratorFramework zkClient = getZkClient();
        try {
            Stat stat = new Stat();
            byte[] bytes = zkClient.getData().storingStatIn(stat).forPath("/createNodeWithData");
            System.out.println(new String(bytes, StandardCharsets.UTF_8));
            System.out.println(JSON.toJSONString(stat));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

打印的stat对象信息如下,属性字段解释说明在上篇文章中介绍过,不再赘述

{
  "aversion": 0,
  "ctime": 1551664130794,
  "cversion": 0,
  "czxid": 21474836487,
  "dataLength": 7,
  "ephemeralOwner": 0,
  "mtime": 1551664130794,
  "mzxid": 21474836487,
  "numChildren": 0,
  "pzxid": 21474836487,
  "version": 0
}

更新节点-setData()

  1. 使用默认数据更新节点信息
zkClient.setData().forPath(path)
  1. 使用指定数据更新节点信息
zkClient.setData().forPath(path,byte[] data)
  1. cas更新节点信息,其中version是节点属性中的dataVersion信息,每次更新version字段加一,即等于数据库中update node set data = newData where version = oldVersion,乐观锁更新操作
zkClient.setData().withVersion(version).forPath(path,byte[] data)

例子如下
1、创建一个名为updateNode的节点,并获取其版本号
2、两次使用第一次创建节点时的属性信息中的version来进行更新,查看更新结果

public static void main(String[] args) {
        CuratorFramework zkClient = getZkClient();
        try {
            String path = "/updateNode";
            String data = "data";
            zkClient.create().forPath(path, data.getBytes());

            Stat stat = new Stat();
            zkClient.getData().storingStatIn(stat).forPath(path);
            System.out.println("第一次创建节点,节点版本号:"+stat.getVersion());
            //第一次更新 此时version为0
            zkClient.setData().withVersion(stat.getVersion()).forPath(path);
            //第二次更新 此时version为1 再用0来更新 报错
            zkClient.setData().withVersion(stat.getVersion()).forPath(path);


        } catch (Exception e) {
            e.printStackTrace();
        }
    }

执行到第二次更新时,报错信息如下

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /updateNode

删除节点-delete()

  1. 删除叶子节点信息
zkClient.delete().forPath(path)
  1. 递归删除节点及其子节点
zkClient.delete().deletingChildrenIfNeeded().forPath(path)
  1. 强制删除节点(加入失败重试机制),zk客户端进行删除操作时,有可能操作失败,使用
    guaranteed()方法后,zk客户端会记录下失败的删除请求,会在会话有效期内不断重试,直到删除成功
zkClient.delete().guaranteed().forPath(path)

感冒严重,尽量坚持1周1更,接下来几篇文章会介绍基于curotor实现的几种典型应用场景,如分布式原子自增,分布式锁,master选举

贴上Curator应用场景分析地址

Curator应用场景(一)-分布式计数器
Curator应用场景(二)-Watch监听机制
Curator应用场景(三)-Master选举使用及原理分析

发布了43 篇原创文章 · 获赞 134 · 访问量 5万+

猜你喜欢

转载自blog.csdn.net/hosaos/article/details/88051525