什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。
代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
懒加载
RDD五大特性
RDD编程
RDD的创建
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
首先我们先创建一个maven工程
1.创建maven工程,在pom文件中添加spark-core的依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
2.如果不希望运行时打印大量日志,可以在resources文件夹中添加log4j.properties文件,并添加日志配置信息
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{
yy/MM/dd HH:mm:ss} %p %c{
1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
从集合中创建RDD
代码实现:
public class Test01_list {
public static void main(String[] args) {
//1. 创建spark配置
SparkConf conf = new SparkConf().setAppName("SparkCore").setMaster("local[*]");
//2.创建SparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//3.编写代码
//parallelize(List,分区数)
JavaRDD<Integer> javaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4,5),2);
javaRDD.collect().forEach(System.out::println);
//4.关闭资源
sc.stop();
}
}
分区规则:
上述代码分区规则:
0号分区:1,2
1号分区:3,4,5
利用整数除机制 左闭右开
先求商,剩余的余数从后往前分配
从外部存储创建RDD
代码实现:
public class Test02_file {
public static void main(String[] args) {
//1. 创建spark配置
SparkConf conf = new SparkConf().setAppName("SparkCore").setMaster("local[*]");
//2.创建SparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//3.编写代码
JavaRDD<String> javaRDD = sc.textFile("Input",2);
javaRDD.collect().forEach(System.out::println);
//4.关闭资源
sc.stop();
}
}
分区规则
具体的分区个数需要经过公式计算
首先获取文件的总长度 totalSize
计算平均长度 goalSize = totalSize / numSplits
获取块大小 128M
计算切分大小 splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
最后使用splitSize 按照1.1倍原则切分整个文件 得到几个分区就是几个分区
实际开发中 只需要看文件总大小 / 填写的分区数 和块大小比较 谁小拿谁进行切分
扫描二维码关注公众号,回复:
15745433 查看本文章
从其他RDD创建。
主要是通过一个RDD运算完后,再产生新的RDD。使用转换算子,详见我的下一篇博客