Curator应用场景(一)-分布式计数器DistributedAtomicInteger

curator-recipes功能简介

curator-recipes包中包含了对zookeeper场景应用场景的封装,好的项目源码让人从包名就能看出其功能,下面先看下recipes的包结构
在这里插入图片描述
简单介绍下不同包及其对应功能

包名 功能简介
atomic 分布式计数器(DistributedAtomicLong),能在分布式环境下实现原子自增
barriers 分布式屏障(DistributedBarrier),使用屏障来阻塞分布式环境中进程的运行,直到满足特定的条件
cache 监听机制,分为NodeCache(监听节点数据变化),PathChildrenCache(监听节点的子节点数据变化),TreeCache(既能监听自身节点数据变化也能监听子节点数据变化)
leader leader选举
locks 分布式锁
nodes 提供持久化节点(PersistentNode)服务,即使客户端与zk服务的连接或者会话断开
queue 分布式队列(包括优先级队列DistributedPriorityQueue,延迟队列DistributedDelayQueue等)
shared 分布式计数器SharedCount

在介绍curator的分布式原子计数器之前,先抛出一个经典的面试问题,i++在多线程环境下是否存在问题?

先来看一段测试代码,其中CountDownLatch是为了让主线程阻塞直到所有子线程执行完,其应用场景如下:CountDownLatch调用await()方法的线程将一直阻塞等待,直到这个CountDownLatch对象的计数值减到0(每次调用countDown方法计数器减一)为止。例子里每个子线程自增100000次后调用countDown()方法将计数器减一,初始化数值10,10个线程全部跑完自增后,主线程await方法不再阻塞,输出count值

    static int count = 0;
    static CountDownLatch countDownLatch = new CountDownLatch(10);
    public static void main(String[] args) {
        //创建10个线程 每个线程内部自增100000次
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100000; j++) {
                    count++;
                }
                countDownLatch.countDown();
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count);
    }

执行几次后,发现结果不总是10*100000,故可以证明i++确实存在线程安全问题

Java中i++自增线程安全问题的由来

先来了解下几个基本概念

Java内存模型

  1. Java内存模型规定所有的变量都是存在主存中
  2. 每个线程都有自己的工作内存
  3. 线程对变量的操作都必须在线程内部工作内存中进行,不能直接对主存中变量进行操作

网上一张图描绘的很形象
在这里插入图片描述
现在可以联想到i++操作,线程对共享变量的修改总是分为 读-改-写 这3步,即

  1. 线程先从主存中读取变量到本地工作内存中(读)
  2. 线程在本地工作内存中对变量进行+1操作(改)
  3. 线程将本地内存中变量的值写回主存(写)

再想,多线程又是可以并行执行的

假设初始count值为0,如果n个线程同时读到了0这个值,那么更新后count值是多少?

这就是i++出现线程安全问题的核心原因 ,i++操作分为三步,其中只有读取和写入操作是原子的,读改写三步一起执行时,无法保证原子性

原子性

原子性就是指该操作是不可再分的,要么全部执行,要么全部不执行。最经典的就是银行转账问题,对应到i++问题中就是上面所述的:只有读取和写入操作是原子的,而读改写合并操作不能保证原子性。
Java中保证原子性操作的有:

  1. Synchronized和Lock,加锁使得同一时刻只有一个线程能访问共享变量,操作自然是原子的
  2. java.util.concurrent.atomic下的原子操作类,如AtomicInteger,AtomicReference,基于Cas算法实现了类似乐观锁版本更新控制的原子操作

这两种方法,下面会分别介绍如何使用来解决i++原子性问题

内存可见性

可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改后变量的值

Java里提供了volatile关键字来修饰变量,使得线程对一个变量值的修改会立马更新到主存之中,其它线程会重新从主存中去获取最新的变量值。

那么volatile关键字是如何使得其它线程能立马获取到最新的变量值呢?

如果对声明了Volatile变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是就算写回到内存,如果其他(线程)处理器缓存的值还是旧的,再执行计算操作就会有问题,所以在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器要对这个数据进行修改操作的时候,会强制重新从系统内存里把数据读到处理器缓存里

核心就是两点

  1. Lock前缀指令会引起处理器缓存回写到内存
  2. 一个处理器的缓存回写到内存会导致其他处理器的缓存无效(嗅探机制)

那么问题来了,volatile关键字是否能解决i++自增的线程安全问题呢?不是每个线程修改后的值都能立马被其他线程看到么?

看官们自己先思考3分钟?

举个例子,假设volatile变量count初始值为0,A,B线程同时对变量count进行+1操作

  1. A线程读取到了count值,然后进行+1操作
  2. A线程还没将变量写入主存时(即没有触发volatile关键字特性时),B线程读取到了主存中的值(还是0)
  3. B也对count进行+1,然后A,B分别将变量写入主存,count值可能就为1。

所以,volatile关键字不能保证原子性,保证原子性还得通过synchronized,Lock和
java.util.concurrent.atomic下的原子操作类

当然Java里通过synchronized和Lock也能够保证可见性,synchronized和Lock能保证同一时刻只有一个线程获取锁然后执行同步代码,并且在释放锁之前会将对变量的修改刷新到主存当中,这样下个线程获取变量值时,就能获取到最小的值

单机版线程安全原子自增

Synchronized

将i++例子修改如下,发现结果输出每次都是10*100000

static int count = 0;
    static CountDownLatch countDownLatch = new CountDownLatch(10);
    static Object lock = new Object();
    public static void main(String[] args) {
        //创建10个线程 每个线程内部自增100000次
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100000; j++) {
	                //锁住lock对象,lock对象所有线程共享
                    synchronized (lock) {
                        count++;
                    }
                }
                countDownLatch.countDown();
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count);
    }

然后问题又来了,synchronized关键字又是如何实现只有一个线程进行同步操作的呢?

先写一个小demo,例子如下

public class SynchronizedDemo {
    public static void main(String[] args) {
    	//锁住SynchronizedDemo类对象
        synchronized (SynchronizedDemo.class) {
        }
        //调用SynchronizedDemo类中静态方法,也是锁住SynchronizedDemo类对象
        method();
    }
    private static synchronized void method() {
    
    }
}

两步操作,第一步synchronized (SynchronizedDemo.class),锁当前类对象,执行完后再执行synchronized静态方法,锁的也是静态对象

下面来看看这部分代码的字节码文件,编译文件后,在target目录下找到SynchronizedDemo.class文件所在位置,执行

javap -v SynchronizedDemo.class 

结果输出如下

 public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=3, args_size=1
         0: ldc           #2                  // class com/company/project/core/SynchronizedDemo
         2: dup
         3: astore_1
         4: monitorenter
         5: aload_1
         6: monitorexit
         7: goto          15
        10: astore_2
        11: aload_1
        12: monitorexit
        13: aload_2
        14: athrow
        15: invokestatic  #3                  // Method method:()V
        18: return

可以看到字节码中有monitorentermonitorexit两个指令,指令功能如下

  • monitorenter 获取对象监视器
  • monitorexit 释放对象监视器

任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取该对象的监视器才能进入同步块和同步方法(monitorenter),退出后释放监视器(monitorexit),这就是synchronized关键字能实现同步的原理

Lock

代码如下

static int count = 0;
    static CountDownLatch countDownLatch = new CountDownLatch(10);
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        //创建10个线程 每个线程内部自增100000次
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100000; j++) {
                    lock.lock();
                    count++;
                    lock.unlock();
                }
                countDownLatch.countDown();
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count);
    }

至于ReentrantLock实现原理,比较复杂,本文不做介绍,有兴趣的可以看下JDK中AbstractQueuedSynchronizer源码。ReentrantLock相比Synchronized,主要区别有以下几点

比较点 Synchronized ReentrantLock
实现机制 JVM监视器 JDK API实现
是否公平 非公平锁 支持公平、非公平锁,默认公平锁
是否可中断 不可中断 ReentrantLock.lockInterruptibly()允许在等待时由其它线程调用等待线程的Thread.interrupt方法来中断
超时特性 阻塞等待 tryLock(long timeout, TimeUnit unit),超时则放弃等待

AtomicInteger

将count变量设为AtomicInteger,使用incrementAndGet方法也能实现,变量原子自增

static CountDownLatch countDownLatch = new CountDownLatch(10);
    static AtomicInteger count = new AtomicInteger(0);
    public static void main(String[] args) {
        //创建10个线程 每个线程内部自增100000次
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100000; j++) {
                    count.incrementAndGet();
                }
                countDownLatch.countDown();
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count.get());
    }

问题又来了,哈哈,AtomicInteger又是如何实现原子自增的呢?

让我们来看下incrementAndGet方法源码(基于JDK1.8 1.7及之前代码略有不同,思路类似)

public class AtomicInteger extends Number implements java.io.Serializable {
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    
	static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    //将变量值定义为volatile,保证每个线程都能读到最新修改的值
    private volatile int value;

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

}

Unsafe类中getAndAddInt方法源码如下

//unsafe类 
public final int getAndAddInt(Object obj, long valueOffset, int delta) {
        int expect;
        //循环
        do {
            //根据Atomic对象及value偏移量调用native方法寻找主存中最新的value值
            expect = this.getIntVolatile(obj, valueOffset);
        	//CAS更新操作,旧值为当前主存中取到的值expect,新值为expect+delta(增量)
        } while(!this.compareAndSwapInt(obj, valueOffset, expect, expect + delta));
 
        //返回旧值
        return expect;
    }

sun.misc.Unsafe是JDK内部用的工具类。它通过暴露一些Java意义上说“不安全”的功能给Java层代码,来让JDK能够更多的使用Java代码来实现一些原本是平台相关的、需要使用native语言(例如C或C++)才可以实现的功能。该类不应该在JDK核心类库之外使用

可以看到incrementAndGet方法最终调用Unsafe类中getAndAddInt方法,getAndAddInt方法三个参数说明如下

obj:AtomicInteger对象
valueOffset:AtomicInteger中value在主存中的偏移量
delta:value变动值

下面分析下unsafe类中getAndAddInt执行流程

  1. 开始死循环
  2. expect = this.getIntVolatile(obj, valueOffset); 根据对象及value在对象中的偏移量获取当前主存中value最新值
  3. 如果compareAndSwapInt(旧值为expect,新值为expect+delta)不成功,继续取主存中当前值,继续cas操作,如果成功,退出循环,返回当前值

可以看到这就是一种cas更新,即类似乐观锁版本更新的一种操作思路(同时可以有多个线程操作变量,但只会有一个成功,增加了cpu开销),而synchronized则是一种悲观锁更新策略(同时只有一个线程能操作变量)

分布式线程安全原子自增

上面介绍了3种在单机环境下,多线程之间如何实现线程安全自增(计数)的实现方法及原理,那么如果是在分布式环境中呢?自然synchronized,lock,atomicInteger等基于Java的方法不能满足,因为这些都只能在当前JVM环境中生效,而分布式环境中多个JVM实例是很正常的事

下面介绍Curator基于Zookeeper实现的分布式计数器

Curator recipes包下实现了DistributedAtomicIntegerDistributedAtomicLong等分布式原子自增计数器

基本用法

先简单通过一个例子来介绍下用法

public class Zookeeper {
    static CountDownLatch countDownLatch = new CountDownLatch(10);

    public static void main(String[] args) throws Exception {
        CuratorFramework zkClient = getZkClient();
		//指定计数器存放路径 及重试策略
        DistributedAtomicInteger distributedAtomicInteger = new DistributedAtomicInteger(zkClient, "/counter", new ExponentialBackoffRetry(1000, 3));
        //多线程自增10*100次
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    try {
                    	//调用add方法自增
                        AtomicValue<Integer> result = distributedAtomicInteger.add(1);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                countDownLatch.countDown();

            }).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //查看结果
        System.out.println("多线程自增结果" + distributedAtomicInteger.get().postValue());
    }

    private static CuratorFramework getZkClient() {
        String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                .connectString(zkServerAddress)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        zkClient.start();
        return zkClient;
    }

}

按道理说,自增完后,结果是不是应该是10*100?,道友们运行几次后会发现实际值不全是1000。为什么呢?这玩意是假的?

我们看下AtomicValue result = distributedAtomicInteger.add(1)这行代码中,add()方法源码

/**
     * Add delta to the current value and return the new value information. Remember to always
     * check {@link AtomicValue#succeeded()}.
     *
     * @param delta amount to add
     * @return value info
     * @throws Exception ZooKeeper errors
     */
    @Override
    public AtomicValue<Integer>    add(Integer delta) throws Exception
    {
        return worker(delta);
    }

其中写道 Remember to always check {@link AtomicValue#succeeded()}. 也就是说,这个方法的自增是不一定会成功的,在前面初始化分布式机器数对象时,传入了重试策略,如果分布式环境中出现了并发自增的情况,会不断重试,如果重试后还失败,则结果返回失败

原理分析

自增add方法中调用了woker(delta)方法,下面看下该方法源码

private AtomicValue<Integer>   worker(final Integer addAmount) throws Exception
    {
        Preconditions.checkNotNull(addAmount, "addAmount cannot be null");

        MakeValue               makeValue = new MakeValue()
        {
            @Override
            public byte[] makeFrom(byte[] previous)
            {
                int        previousValue = (previous != null) ? bytesToValue(previous) : 0;
                int        newValue = previousValue + addAmount;
                return valueToBytes(newValue);
            }
        };

        AtomicValue<byte[]>     result = value.trySet(makeValue);
        return new AtomicInteger(result);
    }

再跟踪到trySet中

AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);
		
		//先尝试乐观锁更新
        tryOptimistic(result, makeValue);
        //如果乐观锁更新失败,则加分布式锁,再进行更新
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

乐观锁更新

可以从方法名就可以看出实现思路,先来看下tryOptimistic(result, makeValue)方法,其中makeValue对象包含了根据当前值计算新值的计算逻辑

private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
    {
        long            startMs = System.currentTimeMillis();
        //重试次数
        int             retryCount = 0;
		//是否成功
        boolean         done = false;
        while ( !done )
        {
        	//增加统计次数
            result.stats.incrementOptimisticTries();
            //如果更新成功,直接返回
            if ( tryOnce(result, makeValue) )
            {
                result.succeeded = true;
                done = true;
            }
            else
            {
            	//如果更新不成功,看是否还允许重试(和定义的重试策略有关)
            	//如果不允许重试,直接返回
                if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
                {
                    done = true;
                }
            }
        }

        result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);
    }

更新方法在tryOnce代码块中,源码如下

private boolean tryOnce(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
    {
        Stat        stat = new Stat();
        //是否需要创建节点,如果节点存在,返回false,标识不需要创建,并且将节点stat信息存入stat中,如果节点不存在,返回true
        boolean     createIt = getCurrentValue(result, stat);

        boolean     success = false;
        try
        {
        	//计算期望新值
            byte[]  newValue = makeValue.makeFrom(result.preValue);
            if ( createIt )
            {
            	//节点不存在,则新建节点
                client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
            }
            else
            {
            	//节点已经存在的,根据节点stat中的dataVersion来进行乐观锁更新
                client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
            }
            result.postValue = Arrays.copyOf(newValue, newValue.length);
            success = true;
        }
        catch ( KeeperException.NodeExistsException e )
        {
            // do Retry
        }
        ...

        return success;
    }

分布式锁更新

如果尝试乐观锁形式更新失败后,会尝试进行加分布式锁对value进行更新,tryWithMutex(result, makeValue)代码如下

//分布式锁
private final InterProcessMutex mutex;

private void tryWithMutex(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
     {
         long            startMs = System.currentTimeMillis();
         int             retryCount = 0;
		//抢锁成功
         if ( mutex.acquire(promotedToLock.getMaxLockTime(), promotedToLock.getMaxLockTimeUnit()) )
         {
             try
             {
                 boolean         done = false;
                 while ( !done )
                 {
                 	 //增加统计次数
                     result.stats.incrementPromotedTries();
                     //尝试乐观锁更新
                     if ( tryOnce(result, makeValue) )
                     {
                         result.succeeded = true;
                         done = true;
                     }
                     else
                     {
                         if ( !promotedToLock.getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
                         {
                             done = true;
                         }
                     }
                 }
             }
             finally
             {
                 mutex.release();
             }
         }

         result.stats.setPromotedTimeMs(System.currentTimeMillis() - startMs);
    }

可以看到加锁形式,与乐观锁形式的区别就是先使用分布式锁锁住,其余内容一致。

下面理一理整体流程,流程不是很复杂,就不画流程图了

  1. 先尝试乐观锁形式更新
  2. 如果计数器zkPath对应节点不存在,新建节点并塞入最新值
  3. 如果计数器zkPath对应节点存在,则利用stata的dataversion进行乐观锁更新
  4. 若更新成功,返回
  5. 若更新失败,判断重试次数是否在重试策略允许范围内,若允许重试,重复2、3两步
  6. 如果1-5步乐观锁形式更新失败,尝试加锁,再在同步块内进行乐观锁更新
  7. 若加锁失败,返回失败
发布了43 篇原创文章 · 获赞 134 · 访问量 5万+

猜你喜欢

转载自blog.csdn.net/hosaos/article/details/88233109