最近在集群上跑spark时发现有些reduceByKey操作结果不符合预期,大致伪代码如下(公司统一用java,就没写成scala,用了scala的简写节省字数)。就是类似WordCount的简单计算,DimType是一个枚举类
JavaPairRDD<DimType, Long> rawRdd=...; JavaPairRDD<DimType, Long> reducedRdd = entryPairRDD .reduceByKey(_+_); List<Tuple2<DimType, Long>> results = reducedRdd.collect(); for (Tuple2<DimType, Long> tuple2 : results) { logger.info("Result: " + tuple2); ...; }
脚本在单节点运行正常,但是设置多个Executor(如spark.executor.instances=2)结果就发生重复项,输出大致如下这样:
Result: (A,1) Result: (A,2) Result: (B,3) Result: (C,3) Result: (B,2) Result: (C,4)
所有枚举项都出现了两次(正好等于executor的实例数),就好像各个Executor之间没有进行reduce一样
出现这个情况的原因比较tricky,因为spark的Shuffle过程会根据key的hashCode来判定相等,而恰恰Enum类的hashCode比较特殊,系统写死了就等于内存地址
public final int hashCode() { return super.hashCode(); }
这就导致在同不同进程里的枚举项被当成了不同的key,于是没有聚合起来
本来重写hashCode就可以解决问题,但坑爹的是Enum.hashCode()还被定义成final方法,无法被子类覆盖。所以只能自己在外面再封装一层对象,然后重新hashCode(),例如用Enum.name().hashCode()。或者干脆就不要用枚举类来做RDD的Key,以免发生类似问题
另外如果用其他自定义类做key的时候,一定要记得重写hashCode和equals,否则跨进程的时候也会发生类似问题