并行化流操作
可以通过parallel或者parallelStream方法实现数据并行化操作
通过手动使用线程模拟掷骰子事件
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* 计算2次筛子投掷的和的概率
*/
public class ManualDiceRools {
private static final int N = 100000000;
private final double fraction;
private final Map<Integer, Double> results;
private final int numberOfThreads;
private final ExecutorService executor;
private final int workPerThread;
public static void main(String[] args) {
ManualDiceRools rools = new ManualDiceRools();
rools.simulateDiceRoles();
}
public ManualDiceRools() {
// 出现一次的概率
this.fraction = 1.0 / N;
this.results = new ConcurrentHashMap<>();
// 获取可用处理器的Java虚拟机的数量
this.numberOfThreads = Runtime.getRuntime().availableProcessors();
this.executor = Executors.newFixedThreadPool(numberOfThreads);
// 每个线程处理的数据量
this.workPerThread = N / numberOfThreads;
}
public void simulateDiceRoles() {
List<Future<?>> futures = submitJobs();
awaitCompletion(futures);
printResults();
}
/**
* 打印结果
*/
private void printResults() {
results.entrySet().forEach(System.out::println);
}
/**
* 执行线程
*
* @return
*/
private List<Future<?>> submitJobs() {
List<Future<?>> futures = new ArrayList<>();
for (int i=0;i<numberOfThreads;i++) {
futures.add(executor.submit(makeJob()));
}
return futures;
}
private Runnable makeJob() {
return () -> {
// 使用安全随机生成器
ThreadLocalRandom random = ThreadLocalRandom.current();
for (int i=0;i<workPerThread;i++) {
int entry = twoDiceThrows(random);
accumulateResult(entry);
}
};
}
/**
* 计算生成的概率,即fraction + 之前的概率值
*
* @param entry
*/
private void accumulateResult(int entry) {
results.compute(entry, (key, previous) -> previous == null ? fraction : previous + fraction);
}
/**
* 生成2次筛子随机的和
*
* @param random
* @return
*/
private int twoDiceThrows(ThreadLocalRandom random) {
int firstThrow = random.nextInt(1, 7);
int secondThrow = random.nextInt(1, 7);
return firstThrow + secondThrow;
}
/**
* get每个Future的值,获取每个线程计算的结果
*
* @param futures
*/
private void awaitCompletion(List<Future<?>> futures) {
try {
futures.forEach(future -> {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
} finally {
executor.shutdown();
}
}
}
使用并行化
ThreadLocalRandom random = ThreadLocalRandom.current();
double fraction = 1.0 / N;
Map<Integer, Double> collect = IntStream.range(0, N)
.parallel()
.mapToObj(n -> random.nextInt(1, 7) + random.nextInt(1, 7))// 计算随机产生的和
.collect(Collectors.groupingBy(side -> side,
Collectors.summingDouble(n -> fraction)));// 相同的值进行分组,并计算概率值,概率步长为fraction
collect.entrySet().forEach(System.out::println);
限制
之前调用reduce方法,初始值可以为任意值,为了让其在并行化时能正常工作,初始值必须为组合函数的恒等值(比如组合函数(acc, element) -> acc + element时,初始值必须为0,因为任何数字加0值不变)
reduce操作的另一个限制是组合操作必须符合结合律。意味着只要序列的值不变,组合操作的顺序不重要,比如:(4 + 2) +1 = 4 + (2 + 1) =7
要避免的是持有锁,流框架会在需要时,自己处理同步操作
还有个叫sequential的方法,串行的意思,在对流求值时,不能同时处于两种模式,要么是并行要么是串行,如果同时用了这个2个方法,最后调用的那个方法起效
性能
影响并行流性能的主要因素有5个:
- 数据大小
只有数据足够大、每个数据处理管道话费的时间足够多时,并行化才有意义 - 源数据结构
性能好:ArrayList、数组或IntStream.range,这些数据支持随机读取
性能一般:HashSet、TreeSet,这些数据结构不易公平地被分解
性能差:LinkedList、Streams.iterate或BufferedReader.lines,难于分解,可能要花O(N)的时间复杂度来分解问题 - 装箱
处理基本类型比处理装箱类型要快 - 核的数量
拥有的核数越多,获得潜在性能提升的幅度越大,在实践中,核的数量不单指你的机器有多少核,而是指运行时能使用多少核 - 单元处理开销
花在流中每个元素身上的时间越长,并行操作带来的性能提升越明显
在底层,并行流还是沿用了fork/join框架,fork递归式的分解问题,然后每段并行执行,最终由join合并返回结果
并行化数组操作
parallelSetAll:使用Lambda表达式更新数据元素
// 使用for循环初始化数组
public static double[] imperativeInitilize(int size) {
double[] values = new double[size];
for (int i=0;i<values.length;i++) {
values[i] = i;
}
return values;
}
// 使用并行化数组操作初始化数组,它改变了传入的数组,但没有创建一个新的数组
public static double[] parallelInitialize(int size) {
double[] values = new double[size];
Arrays.parallelSetAll(values, i -> i);
return values;
}
parallelPrefix:会更新一个数组,将每一个元素替换为当前元素和前驱元素的和,这里的“和”不一定是加法,可以说任意一个BinaryOperator
public static double[] simpleMovingAverage(double[] values, int n) {
double[] sums = Arrays.copyOf(values, values.length);
// 和前一个值进行相加
Arrays.parallelPrefix(sums, Double::sum);
return sums;
}
parallelSort:并行化对数组元素排序