1、什么是ELT
数据工程师,多数情况下接触过ETL,即提取(Extract)、转换(Transform)、加载(Load),随着越来越多的计算平台能力的崛起,很多时候,数据工程师按照ELT进行数据操作,即按照提取(Extract)、加载(Load)、转换(Transform),此好处就是,数据的转换可以借助于强大的计算平台,而数据同步工具只需要更多的关注数据的提取和加更加简单快捷的为开发者提高效率。
2、为什么选择Spark
a)在日益增长的业务数据同步过程中,很多传统ETL工具都是单机运行,搭建一套具备大规模数据处理能力的数据同步系统成为大数据分析系统中不可或缺的环节。由于Spark可运行于分布式平台并且对各种数据库的访问都实现了良好的支持,使用Spark来开发数据同步工具成为一个不错的选择;
b)Spark DataFrame提供了丰富的操作API且可以直接基于DataFrame进行SQL操作,在EL过程中且可以进行一些简单的数据转换;
c)Spark程序部署简单,只需要使用spark-submit命令提交代码即可。
2.1、Spark ETL不带T
本次实践不进行转换(Transform)实战,只进行简单的EL操作实战,旨在为了能够熟练使用Spark进行多种异构数源之间数据同步。
2.3、Java Spark Read on MySQL
public class MysqlReader {
public Dataset<Row> reader(SparkSession sparkSession){
Dataset<Row> rowDataset = sparkSession.read()
.format("jdbc")
.option("url", url)
.option("dbtable", tableName)
.option("user", userName)
.option("password", passwd)
.option("driver", "com.mysql.cj.jdbc.Driver").load();
return rowDataset;
}
}
如上代码会有一个弊端,当表数据量大的时候,因其是一个session一次性全量读取mysql数据,存在读取数据oom的风险。故而可以有如下第二种读法:
Dataset<Row> rowDataset = sparkSession.read()
.format("jdbc")
.option("url", url)
.option("dbtable", tableName)
.option("user", userName)
.option("password", passwd)
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("partitionColumn", columnName)
.option("lowerBound", 1)
.option("upperBound", 1000)
.option("fetchsize", 1000)
.option("numPartitions", 3)
.load();
查看官方文档可以发现,partitionColumn 配置项和numPartitions、lowerBound、upperBound这三个必须要同时出现。其中
partitionColumn :表示读取数据的时候按条件过滤的字段,一般情况下选择格式为int、datatime、timestamp类型的主键或索引字段;
numPartitions :表示读取的时候会分成几个分区来读,最终会把想要读取的数据读到Spark的几个分区中;
lowerBound :表示读取的时候,小于1的都一定在第一个分区中;
upperBound :表示读的时候,超过1000的都一定在最后一个分区中;
fetchsize :表示读取的时候,每次读取返回的最大条数,能够有效的控制读取mysql数据的速率,不能过快,过快会把mysql读崩掉;
如下图就是实际读取的时候,上面代码中的1、1000、3这几个数字的含义体现,1000/3=333,所以日志中,采用了334和667做为三个数据分区的存放规则