版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/yulei_qq/article/details/82463759
参考spark2.3.0文档 http://spark.apache.org/docs/2.3.0/quick-start.html
1、从spark 2.0开始,官网强烈推荐Dataset ,它比RDD拥有更好的性能.
2、启动spak-shell
[spark@big-data-1 home]$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/09/06 16:44:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://big-data-1:4041
Spark context available as 'sc' (master = local[*], app id = local-1536223462803).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.0.2.6.5.0-292
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
加载一个文件 ,返回一个Dataset 数据集
scala> val textFile = spark.read.textFile("test.txt")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
如下方式返回的是一个RDD
scala> val textFile = sc.textFile("test.txt")
textFile: org.apache.spark.rdd.RDD[String] = test.txt MapPartitionsRDD[1] at textFile at <console>:24
3、创建RDD。
Spark 提供了两种创建RDD的方式: 读取外部数据集,以及在驱动器程序中对一个集合进行并行化
Scala 中的 parallelize() 方法
scala> val lines =sc.parallelize(List("pandas","i like pandas"));
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
Java 中的 parallelize() 方法
JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));
4、RDD操作
扫描二维码关注公众号,回复:
3165919 查看本文章
RDD支持两种操作:转换操作和行动操作
转换操作是返回一个新的RDD操作,比如map()和filter。
行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如count() 和first() . 行动操作返回的是其他数据类型.
package com.study;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.tools.ant.taskdefs.Java;
/**
* @author : yulei
* @data : 2018/9/4 8:33
* @Version : 1.0
**/
public class Test1 {
public static void main(String[] args) {
SparkConf conf =new SparkConf();
conf.setMaster("local") ;
conf.setAppName("test");
JavaSparkContext sc= new JavaSparkContext(conf);
JavaRDD<String> rdd1 = sc.textFile("d:/logs/*");
JavaRDD<String> rdd2 = rdd1.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return s.contains("hello");
}
});
System.out.println(rdd2.count());
}
}
其中代码中的logs/* ,这样写会递归logs文件夹中的所有文件,包括it目录下的文件。如下:
一些基本操作:
4、持久化缓存
rdd3.persist(StorageLevel.MEMORY_AND_DISK()); rdd3.unpersist();