mllib是spark的机器学习库
依赖:
mllib的底层实现采用数值计算库Breeze和基础线性代数库BLAS
RDD操作(scala):
创建:
val data = Array(1,2,3,4,5,6,7,8,9)
val distData = sc.parallelize(data,3)
第一个参数是数据集;第二个参数是分区(数据集划分成分片的数量),
典型情况下每个cpu运行2~4个分区。
读取外部数据源:
val distFile1 = sc.textFile("hdfs://192.168.80.139:9000/test/data.txt", 3)
可以读取多个:
val distFile1 = sc.textFile("/test/1.txt, /test/2.txt")
转换操作:
map:
对RDD中的元素执行函数
val rdd1 = sc.parallelize(1 to 9)
val rdd2 = rdd1.map(x => x*2)
rdd2.collect //scala中可以不写括号
filter:
对RDD中的元素进行过滤
val rdd3 = rdd2.filter(x => x>10)
flatMap:
类似map,扁平化map
val rdd4 = rdd3.flatMap(x => x to 20)
此例:每个输入元素对应多个输出元素
mapPartitions、mapPartitionsWithIndex:
map的变种,针对分区,把分区的内容作为整体来处理
sample:
sample(withReplacement, fraction, seed)
采样、抽取样本
withReplacement:是否放回的抽样,fraction:比例,seed:随机种子(建议不填)
rdd4.sample(false, 0.1, 0)
union:
联合
val rdd8 = rdd1.union(rdd3)
intersection :
交集
val rdd9 = rdd8.intersection(rdd1)
distinct :
去重
val rdd10 = rdd8.union(rdd9).distinct
groupByKey :
根据key对value分组
val rdd0 = sc.parallelize(Array((1,1), (1,2), (1,3), (2,1), (2,2), (2,3)))
val rdd11 = rdd0.groupByKey()
rdd11.collect
res22: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(1, 2, 3)), (2,CompactBuffer(1, 2, 3)))
reduceByKey :
根据key,对value执行函数
val rdd12 = rdd0.reduceByKey((x, y) => x+y)
sortByKey :
根据key进行排序,默认升序
randomSplit:
样本划分
val rdd2 = rdd1.randomSplit(Array(0.3, 0.7))
rdd1(0).collect
substract :
减法,减去交集
val rdd1 = sc.parallelize(1 to 9)
val rdd2 = sc.parallelize(1 to 3)
val rdd3 = rdd1.substract(rdd2)
zip :
压缩操作,压缩成字典
val rdd1 = sc.parallelize(1 to 4)
val rdd2 = sc.parallelize(Array("a","b","c","d"))
val rdd3 = rdd1.zip(rdd2)
cache方法:
将数据内容保存到节点内存
rdd1.cache()
动作操作:
collect():
以array形式输出,scala中可以不写括号
count()、first()、take(5)、countByKey()
reduce(func):
对RDD所有元素执行聚集(func)函数
val rdd1 = sc.parallelize(1 to 9)
val rdd2 = rdd1.reduce(_ + _) //等同于val rdd2 = rdd1.reduce((x,y) => x+y)
rdd2: Int = 45
foreach(func):
对RDD每个元素执行func函数
saveAsTextFile :
保存为文件
持久化:
rdd1.peisist()
共享变量:
broadcast 广播变量:
广播出去的是变量不是RDD
scala> val x = 10
x: Int = 10
scala> val x1 = sc.broadcast(x)
x1: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(2)
accumulator 累加器:
accumulator有两个参数,
第一个是累加器的初值,第二个是在Spark UI上显示的名字(可选)。
主要意义是全局变量。
1、累计器全局(全集群)唯一,只增不减(Executor中的task去修改,即累加);
2、累加器是Executor共享
scala> val data = sc.parallelize(Array(1,2,3,4,5,6,6,7,8,8,9))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:43
scala> val sum = sc.accumulator(0)
sum: org.apache.spark.Accumulator[Int] = 0
scala> val result = data.foreach(item =>sum += item)
result: Unit = ()
scala> println(sum)
59