Spark-SQL-Java编程

认识:最核心的编程抽象就是DataFrame。

原理:将Spark SQL 转化为 RDD, 然后提交到集群执行。

SparkSession:统一的切入点,实质上是SQLContext和HiveContext的组合。允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

DataFrame:以RDD为基础的分布式数据集,二维表格。schemaRDD,比RDD多了一个表头信息(Schema)。DataFrame是用来处理结构化数据的

RDD:弹性分布式数据集.是Spark对数据进行的一种抽象,可以理解为Spark对数据的一种组织方式。更简单些说,RDD 里面包含了数据和操作数据的方法。关键字:弹性,分布式,数据集

DataSet:对于RDD而言,DataSet做了强类型支持,在RDD的每一行都做了数据类型约束。RDD转换DataFrame后不可逆,但RDD转换Dataset是可逆的。

RDD转换成dataframe

  1. 反射:根据RDD对象的具体类型映射,推导出schema,也就是反射
  2. 通过显式的程序代码构造schema,然后将schema应用到RDD上,最后转换成DataSet
-----------------------------------------------------------------------------------------------------------------
        SparkSession sparkSession = SparkSession.builder().master("local")
                .appName("Java Spark SQL")
                .getOrCreate();

        Dataset<Row> dataset = sparkSession.read().json("URL");
        //只返回name字段
        dataset.select("name").show();
        //返回两个字段,所有age的value+1
        dataset.select(col("name"),col("age").plus(1)).show();
        //选择age大于21岁的人
        dataset.filter(col("age").gt(21)).show();
        //分组聚合,group age
        dataset.groupBy("age").count().show();

        /*以编程的方式运行SQL查询*/
        //注册临时表、临时视图!临时视图是session级别的,它会随着session的消息而消失
        dataset.createOrReplaceTempView("user");
        Dataset<Row> users = sparkSession.sql("SELECT * FROM user");
        users.show();

        // 全局临时视图的生命周期随spark程序的运行消失.全局临时变量是跨会话的!!!
        try {
            //创建全局临时视图
            dataset.createGlobalTempView("user");
            //全局临时视图绑定到系统保存的数据库“global_temp”
            Dataset<Row> globalUser = sparkSession.sql("SELECT * FROM global_temp.user");
            sparkSession.newSession().sql("SELECT * FROM global_temp.user");
        } catch (AnalysisException e) {
            e.printStackTrace();
        }
-----------------------------------------------------------------------------------------------------------------
        SparkSession sparkSession = SparkSession.builder().master("local")
                .appName("Java Spark SQL")
                .getOrCreate();
        Person person = new Person("spark",10);
        Encoder<Person> encoder = Encoders.bean(Person.class);
        Dataset<Person> dataset = sparkSession.createDataset(Collections.singletonList(person),encoder);
        dataset.show();
        //最终输出 {name:spark;age:10}


        /*常见类型的编码器*/
        Encoder<Integer> integerEncoder = Encoders.INT();
        Dataset<Integer> integerDataset = sparkSession.createDataset(Arrays.asList(1,2),integerEncoder);
        Dataset<Integer> result = integerDataset.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer call(Integer value) {
                return value+1;
            }
        },integerEncoder);
        result.collect();
        //最终输出 [2,3]
        
        
         /*通过提供一个类,可以将数据流转换为数据集。基于名称的映射*/
        String url = "/usr/local/text.json";
        Dataset<Person> personDataset = sparkSession.read().json(url).as(encoder);
        personDataset.show();
        //最终输出 name:...  age:,,,,
-----------------------------------------------------------------------------------------------------------------
// 根据RDD对象的具体类型映射,推导出schema,也就是反射
        SparkSession sparkSession = SparkSession.builder().master("local")
                .appName("Java Spark SQL")
                .getOrCreate();

        //定义一个RDD,转换成DataFrame
        JavaRDD<Person> personJavaRDD = sparkSession.read().textFile("URL")
                .javaRDD().map(new Function<String, Person>() {
                    @Override
                    public Person call(String v1) {
                        String[] param = v1.split(":");
                        Person person = new Person();
                        person.setName(param[0]);
                        person.setAge(Integer.valueOf(param[1].trim()));
                        return person;
                    }
                });
        
        //转换成DataFrame
        Dataset<Row> personDataset = sparkSession.createDataFrame(personJavaRDD,Person.class);
// 验证
         //创建临时视图
        personDataset.createOrReplaceTempView("user");
        Dataset<Row> result = sparkSession.sql("SELECT * FROM user");
        Encoder<String> encoder = Encoders.STRING();
        
        //第一种方式:通过下标获取value
        Dataset<String> dataset = result.map(new MapFunction<Row, String>() {
            @Override
            public String call(Row value) {
                return value.getString(0);
            }
        },encoder);
        dataset.show();
          //第二种方式:通过字段获取value
        Dataset<String> fieldValue = result.map(new MapFunction<Row, String>() {
            @Override
            public String call(Row value) {
                return value.getAs("name");
            }
        },encoder);
        fieldValue.show();
-----------------------------------------------------------------------------------------------------------------
// 代码显式的构造schema:代码案例
        SparkSession sparkSession = SparkSession.builder()
                .master("local")
                .appName("spark app")
                .getOrCreate();

        //创建普通的JavaRDD
        JavaRDD<String> javaRDD = sparkSession.sparkContext().textFile("URL", 1).toJavaRDD();
        //字符串编码的模式
        String schema = "name age";

        //根据模式的字符串生成模式
        List<StructField> structFieldList = new ArrayList<>();
        for (String fieldName : schema.split(" ")) {
            StructField structField = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
            structFieldList.add(structField);
        }
        StructType structType = DataTypes.createStructType(structFieldList);

        JavaRDD<Row> rowJavaRDD = javaRDD.map(new Function<String, Row>() {
            @Override
            public Row call(String v1) {
                String[] attirbutes = v1.split(",");
                return RowFactory.create(attirbutes[0], attirbutes[1].trim());
            }
        });

        //将模式应用于RDD
        Dataset<Row> dataset = sparkSession.createDataFrame(rowJavaRDD, structType);

        //创建临时视图
        dataset.createOrReplaceTempView("user");
        Dataset<Row> result = sparkSession.sql("select * from user");
        result.show();
-----------------------------------------------------------------------------------------------------------------
// 数据源
// parquet代码案例
// 强大的列式存储格式,它可以高效的存储嵌套格式的字段,SparkSQL提供了直接读取存储parquet格式文件的方法
        SparkSession sparkSession = SparkSession.builder()
                .master("local")
                .appName("spark app")
                .getOrCreate();

        Dataset<Row> rddDataset = sparkSession.read().parquet("usr/local/data.parquet");
        rddDataset.select("name","age").write().save("nameAndAge.parquet");

// JSON是一种半结构化/结构化的存储格式,如果兄弟萌的数据格式符合json的定义,那么sparkSQL就可以扫描j文件推测出结构信息,并且可以使我们通过名称访问字段
        Dataset<Row> jsonDataSet = sparkSession.read().json("usr/local/data.json");
        jsonDataSet.select("name","age").write().save("nameAndAge.parquet");


 //手动指定数据源
        Dataset<Row> customDataSource = sparkSession.read().format("json").load("usr/local/data.json");
customDataSource.select("name","age").write().format("json").save("nameAndAge.json");
-----------------------------------------------------------------------------------------------------------------

参考链接https://blog.csdn.net/youbitch1/article/details/88973028

发布了81 篇原创文章 · 获赞 118 · 访问量 24万+

猜你喜欢

转载自blog.csdn.net/dlphay/article/details/98765556