Spark杂记

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/yulei_qq/article/details/82463759

  参考spark2.3.0文档  http://spark.apache.org/docs/2.3.0/quick-start.html

 1、从spark 2.0开始,官网强烈推荐Dataset ,它比RDD拥有更好的性能. 

  2、启动spak-shell 

[spark@big-data-1 home]$ spark-shell 
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/09/06 16:44:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://big-data-1:4041
Spark context available as 'sc' (master = local[*], app id = local-1536223462803).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0.2.6.5.0-292
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

  加载一个文件   ,返回一个Dataset 数据集

scala> val textFile = spark.read.textFile("test.txt")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

  如下方式返回的是一个RDD  

scala> val textFile = sc.textFile("test.txt")
textFile: org.apache.spark.rdd.RDD[String] = test.txt MapPartitionsRDD[1] at textFile at <console>:24

3、创建RDD。

Spark 提供了两种创建RDD的方式: 读取外部数据集,以及在驱动器程序中对一个集合进行并行化

Scala 中的 parallelize() 方法

scala> val lines =sc.parallelize(List("pandas","i like pandas"));
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24

Java 中的 parallelize() 方法

JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));

4、RDD操作

扫描二维码关注公众号,回复: 3165919 查看本文章

RDD支持两种操作:转换操作和行动操作 

转换操作是返回一个新的RDD操作,比如map()和filter。

行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如count() 和first() .  行动操作返回的是其他数据类型.

package com.study;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.tools.ant.taskdefs.Java;

/**
 * @author :  yulei
 * @data :  2018/9/4 8:33
 * @Version :  1.0
 **/

public class Test1 {

    public static void main(String[] args) {

         SparkConf conf =new SparkConf();
         conf.setMaster("local") ;
         conf.setAppName("test");

         JavaSparkContext sc= new JavaSparkContext(conf);
         JavaRDD<String> rdd1 = sc.textFile("d:/logs/*");
         JavaRDD<String> rdd2 = rdd1.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                return s.contains("hello");
            }
        });
        System.out.println(rdd2.count());

    }

}

其中代码中的logs/* ,这样写会递归logs文件夹中的所有文件,包括it目录下的文件。如下:

一些基本操作:

4、持久化缓存

rdd3.persist(StorageLevel.MEMORY_AND_DISK());
rdd3.unpersist();

猜你喜欢

转载自blog.csdn.net/yulei_qq/article/details/82463759