数据并行化

并行化流操作

可以通过parallel或者parallelStream方法实现数据并行化操作

通过手动使用线程模拟掷骰子事件

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 计算2次筛子投掷的和的概率
 */
public class ManualDiceRools {

    private static final int N = 100000000;

    private final double fraction;
    private final Map<Integer, Double> results;
    private final int numberOfThreads;
    private final ExecutorService executor;
    private final int workPerThread;

    public static void main(String[] args) {
        ManualDiceRools rools = new ManualDiceRools();
        rools.simulateDiceRoles();
    }

    public ManualDiceRools() {
        // 出现一次的概率
        this.fraction = 1.0 / N;
        this.results = new ConcurrentHashMap<>();
        // 获取可用处理器的Java虚拟机的数量
        this.numberOfThreads = Runtime.getRuntime().availableProcessors();
        this.executor = Executors.newFixedThreadPool(numberOfThreads);
        // 每个线程处理的数据量
        this.workPerThread = N / numberOfThreads;
    }

    public void simulateDiceRoles() {
        List<Future<?>> futures = submitJobs();
        awaitCompletion(futures);
        printResults();
    }

    /**
     * 打印结果
     */
    private void printResults() {
        results.entrySet().forEach(System.out::println);
    }

    /**
     * 执行线程
     *
     * @return
     */
    private List<Future<?>> submitJobs() {
        List<Future<?>> futures = new ArrayList<>();
        for (int i=0;i<numberOfThreads;i++) {
            futures.add(executor.submit(makeJob()));
        }
        return futures;
    }

    private Runnable makeJob() {
        return () -> {
            // 使用安全随机生成器
            ThreadLocalRandom random = ThreadLocalRandom.current();
            for (int i=0;i<workPerThread;i++) {
                int entry = twoDiceThrows(random);
                accumulateResult(entry);
            }
        };
    }

    /**
     * 计算生成的概率,即fraction + 之前的概率值
     *
     * @param entry
     */
    private void accumulateResult(int entry) {
        results.compute(entry, (key, previous) -> previous == null ? fraction : previous + fraction);
    }

    /**
     * 生成2次筛子随机的和
     *
     * @param random
     * @return
     */
    private int twoDiceThrows(ThreadLocalRandom random) {
        int firstThrow = random.nextInt(1, 7);
        int secondThrow = random.nextInt(1, 7);
        return firstThrow + secondThrow;
    }

    /**
     * get每个Future的值,获取每个线程计算的结果
     *
     * @param futures
     */
    private void awaitCompletion(List<Future<?>> futures) {
        try {
            futures.forEach(future -> {
                try {
                    future.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        } finally {
            executor.shutdown();
        }
    }
}

使用并行化

ThreadLocalRandom random = ThreadLocalRandom.current();
double fraction = 1.0 / N;
Map<Integer, Double> collect = IntStream.range(0, N)
        .parallel()
        .mapToObj(n -> random.nextInt(1, 7) + random.nextInt(1, 7))// 计算随机产生的和
        .collect(Collectors.groupingBy(side -> side,
                Collectors.summingDouble(n -> fraction)));// 相同的值进行分组,并计算概率值,概率步长为fraction
collect.entrySet().forEach(System.out::println);

限制

之前调用reduce方法,初始值可以为任意值,为了让其在并行化时能正常工作,初始值必须为组合函数的恒等值(比如组合函数(acc, element) -> acc + element时,初始值必须为0,因为任何数字加0值不变)

reduce操作的另一个限制是组合操作必须符合结合律。意味着只要序列的值不变,组合操作的顺序不重要,比如:(4 + 2) +1 = 4 + (2 + 1) =7

要避免的是持有锁,流框架会在需要时,自己处理同步操作

还有个叫sequential的方法,串行的意思,在对流求值时,不能同时处于两种模式,要么是并行要么是串行,如果同时用了这个2个方法,最后调用的那个方法起效

性能

影响并行流性能的主要因素有5个:

  1. 数据大小
    只有数据足够大、每个数据处理管道话费的时间足够多时,并行化才有意义
  2. 源数据结构
    性能好:ArrayList、数组或IntStream.range,这些数据支持随机读取
    性能一般:HashSet、TreeSet,这些数据结构不易公平地被分解
    性能差:LinkedList、Streams.iterate或BufferedReader.lines,难于分解,可能要花O(N)的时间复杂度来分解问题
  3. 装箱
    处理基本类型比处理装箱类型要快
  4. 核的数量
    拥有的核数越多,获得潜在性能提升的幅度越大,在实践中,核的数量不单指你的机器有多少核,而是指运行时能使用多少核
  5. 单元处理开销
    花在流中每个元素身上的时间越长,并行操作带来的性能提升越明显

在底层,并行流还是沿用了fork/join框架,fork递归式的分解问题,然后每段并行执行,最终由join合并返回结果

并行化数组操作

parallelSetAll:使用Lambda表达式更新数据元素

// 使用for循环初始化数组
public static double[] imperativeInitilize(int size) {
    double[] values = new double[size];
    for (int i=0;i<values.length;i++) {
        values[i] = i;
    }
    return values;
}

// 使用并行化数组操作初始化数组,它改变了传入的数组,但没有创建一个新的数组
public static double[] parallelInitialize(int size) {
    double[] values = new double[size];
    Arrays.parallelSetAll(values, i -> i);
    return values;
}

parallelPrefix:会更新一个数组,将每一个元素替换为当前元素和前驱元素的和,这里的“和”不一定是加法,可以说任意一个BinaryOperator

public static double[] simpleMovingAverage(double[] values, int n) {
    double[] sums = Arrays.copyOf(values, values.length);
    // 和前一个值进行相加
    Arrays.parallelPrefix(sums, Double::sum);
    return sums;
}

parallelSort:并行化对数组元素排序

猜你喜欢

转载自my.oschina.net/u/3198904/blog/1788768