描述:在HDFS上有订单数据order.txt文件,文件字段的分割符号",",样本数据如下:
|
其中字段依次表示订单id,商品id,交易额
问题:使用sparkcore,求每个订单中成交额最大的商品id
代码如下:(示例为local方式)
package com.company.sparkcore
import org.apache.spark.{SparkConf, SparkContext}
object TopOrderItem {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("top n order and item").setMaster("local")
val sc = new SparkContext(conf)
val orderData = sc.textFile("file:///e:/order.txt")
val splitOrderData = orderData.map(_.split(",")).cache()
val mapOrderData = splitOrderData.map { arrValue =>
val orderID = arrValue(0)
val itemID = arrValue(1)
val total = arrValue(2).toDouble
(orderID, (itemID, total))
}
val groupOrderData = mapOrderData.groupByKey()
//groupOrderData.foreach(x => println(x))
// (Order_00003,CompactBuffer((Pdt_01,222.8)))
// (Order_00002,CompactBuffer((Pdt_03,522.8), (Pdt_04,122.4), (Pdt_05,722.4)))
// (Order_00001,CompactBuffer((Pdt_01,222.8), (Pdt_05,25.8)))
val topOrderData = groupOrderData.map(tupleData => {
val orderid = tupleData._1
val maxTotal = tupleData._2.toArray.sortWith(_._2 > _._2).take(1)
//排序之后的结果为:
//Array((Order_00003,Array((Pdt_01,222.8))), (Order_00001,Array((Pdt_01,222.8))), //(Order_00002,Array((Pdt_05,722.4))))
(orderid, maxTotal)
}
)
topOrderData.foreach(value =>
println("最大成交额的订单ID为:" + value._1 + " ,对应的商品ID为:" + value._2(0)._1)
//输出结果如下:
// 最大成交额的订单ID为:Order_00003 ,对应的商品ID为:Pdt_01
// 最大成交额的订单ID为:Order_00002 ,对应的商品ID为:Pdt_05
// 最大成交额的订单ID为:Order_00001 ,对应的商品ID为:Pdt_01
)
}
}