共享可变性设计中存在风险以及解决方法(五)

本篇是《Java虚拟机并发编程》第六章的阅读笔记

在(四)中,因为程序中不止一个与可变状态相关或依赖的变量,所以我们使用显示锁的来进行同步操作。

虽然是用了同步锁成功执行了代码,但同时也会产生许多的问题,因为同步本身就有缺陷。例如可能会产生死锁,活锁;可能会因为是人工加锁,所以会导致错误的概率增加,你需要一个个确认是否每个地方都做了适当的同步,开发效率低,等等。

这里我们通过使用软件事务内存STM模型来使得线程安全的处理共享可变变量

简单介绍STM

STM是一种多线程之间数据共享的同步机制。它是模拟数据库事务的并发控制机制来控制在并行工作时对共享内存的访问控制。它是锁的一种代替机制。它拥有了ACID中的ACI的特性。

最大的好处就是它提高了开发效率,灵活和扩展性。对于并行编程而言,只需将线程中需要访问共享内存的关键逻辑部分划分出来封装到一个事务中即可,不再需要关心相关的同步,锁产生的问题,全部交给事务内存系统来处理。

对于事务就不多说了。

在Java中使用STM有如下的选择:

  1. 直接在Java中使用Clojure STM
  2. 使用Multiverse的STM的注解形式
  3. 使用Akka,它既支持STM又支持角色的模型

在这里使用Akka中提供的Scala的包来使用STM

package com.stm;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;

public class EnergySource {
    private final long MAXLEVEL = 100;
    final Ref.View<Long> level = STM.newRef(MAXLEVEL);
    final Ref.View<Long> usageCount = STM.newRef(0L);
    final Ref.View<Boolean> keepRunning = STM.newRef(true);
    private static final ScheduledExecutorService replenishTimer = 
            Executors.newScheduledThreadPool(10, new ThreadFactory(){

                @Override
                public Thread newThread(Runnable runnable) {
                    Thread t = new Thread(runnable);
                    t.setDaemon(true);
                    return t;
                }

            });

    private EnergySource(){}

    private void init(){
        replenishTimer.schedule(new Runnable(){
            public void run(){
                replenish();
                if(keepRunning.get()){
                    replenishTimer.schedule(this, 1, TimeUnit.SECONDS);
                }
            }
        }, 1, TimeUnit.SECONDS);
    }

    public static EnergySource create(){
        final EnergySource energySource = new EnergySource();
        energySource.init();
        return energySource;
    }

    public void stopEnergySource(){
        keepRunning.swap(false);
    }

    public Long getUnitsAvailable(){
        return level.get();
    }

    public Long getUsageCount(){
        return usageCount.get();
    }

    public boolean useEnergy(final long units){
        return STM.atomic(new Callable<Boolean>(){
            @Override
            public Boolean call() throws Exception {
                long currentLevel = level.get();
                if(units>0 && currentLevel >= units){
                    level.swap(currentLevel - units);
                    usageCount.set(usageCount.get() + 1);
                    return true;
                }else{
                    return false;
                }
            }
        });
    }

    private void replenish(){
        STM.atomic(new Runnable(){
            @Override
            public void run() {
                long currentLevel = level.get();
                if(currentLevel < MAXLEVEL){
                     level.swap(currentLevel + 1);
                }
            }
        });
    }

    private static final EnergySource es = EnergySource.create();

    public static void main(String[] args) throws InterruptedException{
        List<Callable<Object>> tasks = new ArrayList<>();
        System.out.println("Energy level at start: " + es.getUnitsAvailable());
        for(int i=0; i<10; i++){
            tasks.add(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    for(int j=0; j<7; j++)
                        es.useEnergy(1);
                    return null;
                }

            });
        }

        final ExecutorService service = Executors.newFixedThreadPool(10);
        service.invokeAll(tasks);

        System.out.println("Energy level at end: " + es.getUnitsAvailable());
        System.out.println("Usage: " + es.getUsageCount());

        es.stopEnergySource();
        service.shutdown();
    }
}
  1. 首先level和usageCount都被声明为托管引用(后面会介绍),并各自持有一个不可变的Long类型的值,虽然我们不能直接更改它的值,但是我们仍然可以通过更改托管引用使其安全指向新值。
  2. 由于我们会在useEnergy函数中同时修改电量和使用次数,所以useEnergy函数需要使用一个显示的事务来完成这些操作,以确保对这两个字段变更是原子的。用Callable接口将逻辑代码封装到一个事务里面。同理replenish函数也是一样。
  3. 在EnergySource上一个版本中,ScheduledExecutorService会周期性地(每秒钟一次)调用replenish()函数,直至整个任务结束,这就要求stopEnergySource()必须是同步的,因为有可能会有多个线程调用这个函数(假设有可能)
    • 在这个版本中,我们不用在周期性地调用replenish函数,而只会在对象实例初始化的时候执行一下调度操作。在每次调用replenish函数时,我们都会根据keepRunning的值来决定函数是否应该在1s之后被再度执行
    • 这一变化消除了stopEnergySource函数和调度器/计时器(timer)之间的耦合。现在,stopEnergySource函数只依赖于keepRunning这个标志。而该标志可以很容易地通过STM事务来管理

该版本和上一版本相比就简洁明了许多。

托管引用和不可变的值

在上面的例子中,最初托管引用是level,不可变的值是100。你可以想象level中有一个指针并指向100。

这样就相当于将可变实体(托管引用)和状态(不可变的值)分离了。为什么这么说,因为

  1. 对于普通变量来说,以前声明普通对象的时候是通过分配在栈上reference指向在堆中为该对象实例分配的那块内存。所以如果你要修改对象中的数据就可以直接访问该对象的的内存进行修改。
  2. 对于托管引用和不可变来说,虽然也是通过指针指向100这个数据分配的内存,但是100是不可变的,换句话说你不可以在这块内存中进行修改,要想修改数据,只能重新分配新的内存创建一个新的值然后将指针指向这个新的值。当然,修改指针的操作规定了是原子的,所以要么修改成功对所有线程可见,要么修改失败数据不变。这也是为什么在多线程的条件下,对于单独托管引用的访问和值的更新操作它是线程安全的。

猜你喜欢

转载自blog.csdn.net/qq_24986539/article/details/52887269