今天尝试了一个新的算子 repartitionAndSortWithinPartitions , 遇到了一个问题。
具体异常报错如下:
java.lang.ArrayIndexOutOfBoundsException: -7
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
完整报错见文末:
通过以上报错信息,我们不难看出给出的报错信息是数组越界, 但是我查找了半天都没有找到如何越界的。最终, 在一步步的 调试与println 的帮助下 ,我定位到了错误的原因。
这个问题 与我 使用 repartitionAndSortWithinPartitions 相关 , 这个算子需要出传入一个自定义的分区器。错误就发生在我分区器的代码实现上,下面是错误的代码 :
class UserWatchPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[OrderKey]
k.basicKey.hashCode() % numPartitions
//Math.abs(k.basicKey.hashCode() % numPartitions)
}
}
错误就发生在这行代码上:
k.basicKey.hashCode() % numPartitions
获取分区信息的时候,由于 hashCode 有可能为负数,导致该 Key 被分配到了一个负的分区从而出错
下面是一个正确的分区器实现:
class UserWatchPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[OrderKey]
Math.abs(k.basicKey.hashCode() % numPartitions)
}
}
关于 repartitionAndSortWithinPartitions 的详细用法与示例,请参考我的这篇文章
完整报错:
19/09/18 21:47:05 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.ArrayIndexOutOfBoundsException: -7
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
19/09/18 21:47:05 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: -7
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
19/09/18 21:47:05 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
19/09/18 21:47:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
19/09/18 21:47:05 INFO TaskSchedulerImpl: Cancelling stage 1
19/09/18 21:47:05 INFO DAGScheduler: ShuffleMapStage 1 (map at ETL_DWDEduWatchOrgDetailFact_2_DWDEduWatchCombDetailFact.scala:124) failed in 1.211 s due to Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: -7
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
19/09/18 21:47:05 INFO DAGScheduler: Job 1 failed: count at ETL_DWDEduWatchOrgDetailFact_2_DWDEduWatchCombDetailFact.scala:160, took 1.229507 s
19/09/18 21:47:05 INFO SparkUI: Stopped Spark web UI at http://LAPTOP-JEG7QNE0:4040
19/09/18 21:47:05 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/09/18 21:47:05 INFO MemoryStore: MemoryStore cleared
19/09/18 21:47:05 INFO BlockManager: BlockManager stopped
19/09/18 21:47:05 INFO BlockManagerMaster: BlockManagerMaster stopped
19/09/18 21:47:05 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/09/18 21:47:05 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: -7
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
at com.gaosi.spark.etl.log.front.client.ETL_DWDEduWatchOrgDetailFact_2_DWDEduWatchCombDetailFact$.main(ETL_DWDEduWatchOrgDetailFact_2_DWDEduWatchCombDetailFact.scala:160)
at com.gaosi.spark.etl.log.front.client.ETL_DWDEduWatchOrgDetailFact_2_DWDEduWatchCombDetailFact.main(ETL_DWDEduWatchOrgDetailFact_2_DWDEduWatchCombDetailFact.scala)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -7
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)