版权声明:转载请链接 https://blog.csdn.net/DPnice/article/details/85329765
http://www.alluxio.com/blog/effective-spark-rdds-with-alluxio
Spark的RDD本身数据就是存储到内存,但是如果数据量超大 spark cache 到内存 会导致性能显着下降。
Alluxio允许将更大的数据集保存在内存中,从而实现更快的Spark应用程序。Alluxio还支持在内存中与多个Spark应用程序共享相同的数据集,这可以提高整个集群的性能。
在Alluxio内存中存储RDD非常简单,只需要将RDD作为文件保存到Alluxio。两种常见的方法来RDDS保存为文件,saveAsTextFile并saveAsObjectFile可以与Alluxio使用。可以使用sc.textFile或再次(从内存中)读取Alluxio中保存的RDD sc.objectFile。
Demo:
package com.dhhy.spark.demo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Map;
import java.util.Properties;
/**
* @author DPn!ce
* @date 2018/12/28.
*/
public class SparkPostgreSQLJdbcWc {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("SparkPostgreSQLJdbcWc");
conf.setMaster("local[*]");
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
// jdbc
Properties connectionProperties = new Properties();
//增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
connectionProperties.put("user", "");
connectionProperties.put("password", "");
connectionProperties.put("driver", "org.postgresql.Driver");
connectionProperties.put("fetchsize", "10000");
String persistPath = "alluxio://ip:19998/spark/persist";
String url = "jdbc:postgresql://ip:15432/dscdb";
// 必须是表中的数字列
String columnName = "cast(origin_id AS NUMERIC)";
// String table = "iot.dhhsh_m_fire_control";
String table = "(SELECT * FROM iot.dhhsh_m_fire_control limit 10 ) dhhsh_m_fire_control";
// 小于下界的分到一个分区 origin_id < lowerBound
Long lowerBound = 1568L;
// 大于上界的分到一个分区 origin_id > upperBound
Long upperBound = 6709L;
// 这里分四个分区
int numPartitions = 4;
/*
分区1:origin_id < lowerBound
分区2:lowerBound < origin_id <= (upperBound -lowerBound)/(numPartitions -2)
分区3:(upperBound -lowerBound)/(numPartitions -2) < origin_id <= upperBound
分区4:origin_id > upperBound
*/
//SparkJdbc读取PostgreSQL 表内容
Dataset<Row> jdbcDS = spark.read()
//table 需要加上模式
.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, connectionProperties);
long readTimeStart = System.currentTimeMillis();
//显示jdbcDF数据内容
jdbcDS.show(1);
System.out.println(System.currentTimeMillis() - readTimeStart + " ms");
// 查看分区数
// int size = jdbcDS.toJavaRDD().partitions().size();
// System.out.println(size);
JavaRDD<Row> rowJavaRDD = jdbcDS.toJavaRDD();
long saveTimeStart = System.currentTimeMillis();
// 保存到alluxio
rowJavaRDD.saveAsTextFile(persistPath);
// rowJavaRDD.saveAsTextFile("C:\\Users\\Lenovo\\Desktop\\p");
System.out.println(System.currentTimeMillis() - saveTimeStart + " ms");
long countTimeStart = System.currentTimeMillis();
// 读取alluxio的数据
RDD<String> stringRDD = spark.sparkContext().textFile(persistPath, numPartitions);
// RDD<String> stringRDD = spark.sparkContext().textFile("C:\\Users\\Lenovo\\Desktop\\p", numPartitions);
Map<String, Long> stringLongMap = stringRDD.toJavaRDD().mapToPair(
str -> {
String[] split = str.split(",");
return new Tuple2<>(split[3], split[0]);
})
.countByKey();
System.out.println("分组计数用时:"+(System.currentTimeMillis() - countTimeStart) + " ms");
stringLongMap.forEach((k, v) -> System.out.println(k + "\t" + v));
}
}