SPARK学习RDD(一)
import org.apache.spark.SparkContext._
常见的RDD操作就是从文件系统中读取数据,然后进行常见的转化和行动操作,之所以叫他为弹性分布式系统,是因为它的失败重算功能。
常见读取文件操作:
Val test = sc.textFile(“readme.md”);
Val lines = test.filter(lamba line: “python” in line )
Lines.persist(); //进行数据结果缓存
Lines.first();//找到存在python字符串的第一行数据
Lines.count();
由于spark是惰性求值,在转化操作阶段并不进行任何操作,只有在行动操作是才会进行数据计算,并且它的计算数据并不会存储,每次都需要重新计算,我们可以使用persist()方法对其存储结果进行缓存(如果需要重复使用到这个数据结果);
常见创建数据集RDD的方法:①读取外部数据源②在驱动程序中对一个集合进行并行化
这种用法不常见
Val lines = sc.parallelize(List(“pandas”,”I like pandas”)); //括号内是集合,set,map,list等
JavaRDD<String> lines = sc.parallelize(Arrays.asList(“pandas”,”I like pandas”));
一个简单的例子:从日志文件中读取包含error和warnning的信息
//转化操作
Val lines = sc.textFile(“../log.txt”);
Val errorlines = lines.filter(line =>line.contains(“error”));
Val warning = lines.filter(line =>line.contains(“warning”));
Val message = errorlines.union(warning);
//行动操作
Println(message.count());//输出数据的条数
//我想遍历其中的前十条数据,可以如下
badLinesRDD.take(10).foreach(println) //书上示例程序
For lineRDD <- message.take(10) //自己认为的scala程序
Println(lineRDD);
如果希望将请求结果保存起来的话,可以使用
saveAsTextFile();
saveAsSequenceFile(); 等来保存数据
向spark中传递函数,注意如果存在局部变量或成员变量时,传递对象中的变量时应该先将他赋值给临时变量,因为直接传递对象引用变量不安全。
Class searchFunction(val query: String){
def ismatch(a :String) : Boolean= {
a.contains(query);
}
Def getMatchFunctionReference(rdd:RDD[String]) : RDD[String] = {
rdd.Map(ismatch);
}
Def getMatchNoReference(rdd:RDD[String]) : RDD[String] = {
Val query_ = this.query; //将引用的变量赋值给一个临时变量,直接使用会传递整个对象不安全。
Rdd.Map(x=>x.split(query_));
}
}
常见的两个转化操作map和filter,map和filter都可以传递函数,只不过map是将函数应用于所有rdd元素并返回一个新的rdd,而filter是将rdd中符合的元素进行筛选返回一个新的rdd;
inputRDD {1,2,3,4} Mapped(x= x*x) MappedRDD {1,4,9,16}
inputRDD {1,2,3,4} Filtered(x=>x!=1) FilteredRDD{2,3,4}
map()操作不需要输入和输出参数类型相同
一个简单的例子(对输入的元素值求平方):
Val inputFile = sc.parallelize(List[1,2,3,4]);
Val result = inputFile.map(x=>x*x);
Println(result.collect().mkString(“,”))
有时我们希望一个输入对应多个输出,可以使用flatMap(),和map类似,也是对rdd中所有元素进行操作,
一般用于数据单词切分等;
举例:
Val inputFile = sc.parallelize(List(“hello world”,”haha”));
Val words = inputFile.flatMap(x=>x.split(“ “));
Words.first();
他的返回结果是一个序列,如果我使用for循环,是否可以遍历出来(经测试可以计算出来)
For word <- words
Println(word);
Map和flatMap本质区别,map返回一个列表List的RDD,而flatMap则类似一个将List元素重新组成的RDD。
RDD中经常存在一些重复的数据,我们进行去重可以使用RDD.distinct()方法,但是这种去重有一个混洗过程,内存开销较大。不建议这样做。
Distinct(去重),union(并集),intersection(交集),substract(差集)
集卡尔积,大数据运用开销巨大:
行动操作,常见的有reduce操作,它的参数为一个函数,常见的如累加操作:
Val sum = rdd.reduce((x,y)=>x+y); //操作数据类型和返回数据类型相同
Flod方法和reduce方法类似,flod()方法会提供一个初始值,这个初始值是操作参数的单位元素,就是执行函数操作值不会改变,例如 + 0,*1等操作
aggregate() 函数则把我们从返回值类型必须与所操作的 RDD 类型相同的限制中解放出
来。与 fold() 类似,使用 aggregate() 时,需要提供我们期待返回的类型的初始值。然后
通过一个函数把 RDD 中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加
的,最终,还需要提供第二个函数来将累加器两两合并
示例:累加求平均数
Val result = rdd.aggregate((0,0),(
(acc,value)=>(acc._1+value,acc._2+1),
(acc1,acc2)=>(acc1._1+acc2._1,acc2._1+acc2._2));
//第一个参数(0,0)是初始值(acc,value)
//第二个参数是让第一个参数累加,并且每次计数加一
//第二个参数首先是对前一个参数的第一个acc._1的数据进行合并累加,后一个是对前一个参数的第二个参数进行累加,最终返回一个类似(k,v)的数据rdd结构
Val avg = result._1/result._2.toDouble;
持久化(缓存),持久化函数persist()和解除持久化函数unpersist()来解除缓存
Val result = rdd.map(x=>x*x);
Input.persist(StorageLevel.DISK_ONLY);
Println(result.count());
Println(result.collect().mkString(“,”)); //将result中的数据用逗号分隔显示
在平常使用到聚合,分组等操作时,需要进行键值对数据操作,下面详细介绍相关键值对操作
Spark(二)键值对操作
我们通常把经常访问的数据放在不同的分区中,这样可以有利于提高数据的访问性能
PairRDD(reducebykey,join等方法)通常我们将一个普通的RDD转化为pairRDD可以使用map()方法来实现
例如:val result = rdd.map(x=>(x.split(“ ”)(0),x));
因为pairRDD也是RDD,因此它也支持RDD所支持的所有函数,例如:
我们求一个值长度小于10的value;pairs为上一个pairsRDD。
Pairs.filter{case(key,value)=>value.length<10}
平常如果我们只想访问键值对的value部分,我们可以用mapValues(func)函数来操作, 功能类似于 map{case (x, y): (x,func(y))}
求每个键对应值的平均值;
rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2));
使用scala进行单词计数:
Val input = sc.textFile(“文件路径”);
Val words= input.flatMap(x=>x.split(“ “));
Val result = words.Map(x=>(x,1)).reduceByKey((x,y) => x +y)
//也可以使用countByValue函数直接计数
Val words = input.flatMap(x => x.split(“ “)).countByValue();
combineByKey是最基本的基于键聚合的函数,和aggregate一样,允许用户返回和参数不同类型的值
使用combineByKey来求每个键对应的平均数:
val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result.collectAsMap().map(println(_))
并行度调优:
val data = Seq(("a", 3), ("b", 4), ("a", 1))
sc.parallelize(data).reduceByKey((x, y) => x + y) // 默认并行度
sc.parallelize(data).reduceByKey((x, y) => x + y,3) // 自定义并行度
有时,我们希望在除分组操作和聚合操作之外的操作中也能改变 RDD 的分区。对于
这样的情况,Spark 提供了 repartition() 函数。它会把数据通过网络进行混洗,并创
建出新的分区集合。切记,对数据进行重新分区是代价相对比较大的操作。Spark 中也
有一个优化版的 repartition() ,叫作 coalesce() 。你可以使用 Java 或 Scala 中的 rdd.
partitions.size() 以及 Python 中的 rdd.getNumPartitions 查看 RDD 的分区数,并确保调
用 coalesce() 时将 RDD 合并到比现在的分区数更少的分区中
cogroup:对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。
利用scala模拟内连接,左外连接和右外连接:
storeAddress = {
(Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"),
(Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")}
storeRating = {
(Store("Ritual"), 4.9), (Store("Philz"), 4.8))}
storeAddress.join(storeRating) == {
(Store("Ritual"), ("1026 Valencia St", 4.9)),
(Store("Philz"), ("748 Van Ness Ave", 4.8)),
(Store("Philz"), ("3101 24th St", 4.8))}
storeAddress.leftOuterJoin(storeRating) ==
{(Store("Ritual"),("1026 Valencia St",Some(4.9))),
(Store("Starbucks"),("Seattle",None)),
(Store("Philz"),("748 Van Ness Ave",Some(4.8))),
(Store("Philz"),("3101 24th St",Some(4.8)))}
storeAddress.rightOuterJoin(storeRating) ==
{(Store("Ritual"),(Some("1026 Valencia St"),4.9)),
(Store("Philz"),(Some("748 Van Ness Ave"),4.8)),
(Store("Philz"), (Some("3101 24th St"),4.8))}
数据排序,我们利用sortByKey()函数接收一个ascending参数,默认为true,升序排序
Implicit是隐式转换关键字
val input: RDD[(Int, Venue)] = ...
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.toString.compare(b.toString)//重写compare函数
}
rdd.sortByKey()
PairRDD的行动操作:
数据分区:
// 初始化代码;从HDFS商的一个Hadoop SequenceFile中读取用户信息
// userData中的元素会根据它们被读取时的来源,即HDFS块所在的节点来分布
// Spark此时无法获知某个特定的UserID对应的记录位于哪个节点上
Val sc = new SparkContext(….) //参数是配置文件conf
Val userData = sc.sequenceFile[UserID,UserInfo](“HDFS://…”).partitionBy(new HashPartitioner(100)).persist();
Def processNewLogs(logFileName:String){
Val events = sc.sequenceFile[UserID,LinkInfo](logFileName)
Val joined = userData.join(events)
Val offTopicVisits = joined.filter{
Case(UserId,(userInfo,linkInfo))=>!userInfo.topics.contains(linkInfo.topics)
}.count()
Println(“number of visits to non-subscribed topics:”+ offTopicVisits)
}
获取RDD的分区方式:
scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at
<console>:12
scala> pairs.partitioner
res0: Option[spark.Partitioner] = None
scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14
scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)
这里列出了所有会为生成的结果 RDD 设好分区方式的操作: cogroup() 、 groupWith() 、
join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、
combineByKey() 、 partitionBy() 、 sort() 、 mapValues() (如果父 RDD 有分区方式的话)、
flatMapValues() (如果父 RDD 有分区方式的话),以及 filter() (如果父 RDD 有分区方
式的话)。前面这些函数会保留父分区,其他所有的操作生成的结果都不会存在特定的分区方式。
对于二元操作,如果一个父RDD设置过分区,那么他的结果也会采用这种分区方式,如果两个父RDD都设置过分区方式,那么他的结果会采用第一个父分区分区方式。
分区的优点和好处就是减少shuffle(混洗)内存开销
下面会讨论示例:示例:PageRank和自定义分区方式
Spark(三)数据的读取与保存
动机:普通数据操作一般是集合和外部文本文件,数据量较小,如果对于数据量特别大的数据,如何读取与存储。
文件格式与文件系统
对于存储在本地的文件系统和分布式文件系统(NFS,HDFS等),Spark 可以访问很多种不同的文件格式,包括文本文件、JSON、SequenceFile,以及 protocol buffer,我们会展示几种常见格式的用法,以及 Spark 针对不同文件系统的配置和压缩选项。
读取文本文件:
val input = sc.textFile("file:///home/holden/repos/spark/README.md")
如果多个输入文件以一个包含数据所有部分的目录的形式出现,可以用两种方式来处
理。可以仍使用 textFile 函数,传递目录作为参数,这样它会把各部分都读取到 RDD
中。有时候有必要知道数据的各部分分别来自哪个文件(比如将键放在文件名中的时间
数据),有时候则希望同时处理整个文件。如果文件足够小,那么可以使用 SparkContext.
wholeTextFiles() 方法,该方法会返回一个 pair RDD,其中键是输入文件的文件名。
wholeTextFiles() 在每个文件表示一个特定时间段内的数据时非常有用。如果有表示不同
阶段销售数据的文件,则可以很容易地求出每个阶段的平均值,
例 5-4:在 Scala 中求每个文件的平均值
val input = sc.wholeTextFiles("file://home/holden/salesFiles")
val result = input.mapValues{y =>
val nums = y.split(" ").map(x => x.toDouble)
nums.sum / nums.size.toDouble
}
Spark 支持读取给定目录中的所有文件,以及在输入路径中使用通配字符,(如 part-*.txt )。大规模数据集通常存放在多个文件中,因此这一特性很有用,尤其是在同一目录中存在一些别的文件(比如成功标记文件)的时候。
保存文本文件:
result.saveAsTextFile(outputFile)
import org.apache.spark.{SparkConf, SparkContext}
object txtReader {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("txtReader").setMaster("local");
val sc = new SparkContext(conf);
val input = sc.textFile("D:/json/testtxt.txt");
//val input = sc.wholeTextFiles("file:///home/common/coding/coding/Scala/word-count")读取目录下的所有文件
input.saveAsTextFile("D:/json/testtxt1.txt")
}
}
json数据的读取:
在 Scala 和 Java 中,通常将记录读入到一个代表结构信息的类中。在这个过程中可能还需
要略过一些无效的记录。下面以将记录读取为 Person 类作为一个例子。
例 5-7:在 Scala 中读取 JSON
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name: String, lovesPandas: Boolean) // 必须是顶级类
...
// 将其解析为特定的case class。使用flatMap,通过在遇到问题时返回空列表(None)
// 来处理错误,而在没有问题时返回包含一个元素的列表(Some(_))
val result = input.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Person]))
} catch {
case e: Exception => None
}})
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
import org.apache.spark.{SparkConf, SparkContext}
case class Person(firstName: String, lastName: String, address: List[Address]) {
override def toString = s"Person(firstName=$firstName, lastName=$lastName, address=$address)"
}
case class Address(line1: String, city: String, state: String, zip: String) {
override def toString = s"Address(line1=$line1, city=$city, state=$state, zip=$zip)"
}
object WordCount {
def main(args: Array[String]) {
val inputJsonFile = "D:/json/test.json"
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val input5 = sc.textFile(inputJsonFile)
val dataObjsRDD = input5.map { myrecord =>
implicit val formats = DefaultFormats
// Workaround as DefaultFormats is not serializable
val jsonObj = parse(myrecord)
//val addresses = jsonObj \ "address"
//println((addresses(0) \ "city").extract[String])
jsonObj.extract[Person]
}
dataObjsRDD.saveAsTextFile("D:/json/test1.json")
}
}
//这种json文件读入的方式应该也是以textFile()函数读入,然后映射到类的字段信息中。
处理格式不正确的记录有可能会引起很严重的问题,尤其对于像 JSON 这样
的半结构化数据来说。对于小数据集来说,可以接受在遇到错误的输入时停
止程序(程序失败),但是对于大规模数据集来说,格式错误是家常便饭。如
果选择跳过格式不正确的数据,你应该尝试使用累加器来跟踪错误的个数。
保存json数据文件
result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
.saveAsTextFile(outputFile)
读取csv数据:他们有对应的库用来读取csv数据
Java和scala地址:http://opencsv.sourceforge.net/
例 5-13:在 Scala 中使用 textFile() 读取 CSV
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}
例 5-16:在 Scala 中完整读取 CSV
case class Person(name: String, favoriteAnimal: String)
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
val reader = new CSVReader(new StringReader(txt));
reader.readAll().map(x => Person(x(0), x(1)))
}
如果只有一小部分输入文件,你需要使用 wholeFile() 方法,可能还需要对
输入数据进行重新分区使得 Spark 能够更高效地并行化执行后续操作。
例 5-19:在 Scala 中写 CSV
pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
.mapPartitions{people =>
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)
import java.io.StringReader
import com.opencsv.CSVReader
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
/**
* Created by common on 17-4-3.
*/
object WordCounts {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("CSVReader").setMaster("local")
val sc = new SparkContext(conf)
val input = sc.textFile("D:/json/1.csv")
val result6 = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}
result6.saveAsTextFile("D:/json/2.csv")
for(result <- result6){
for(re <- result){
println(re)
}
}
}
}
Maven和idea 和scala集成【具体参考https://blog.csdn.net/u012373815/article/details/53266301】
Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Helloworld</groupId>
<artifactId>Hello</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.0</scala.version>
<spark.version>2.3.1</spark.version>
<hadoop.version>2.7.0</hadoop.version>
<hbase.version>1.2.0</hbase.version>
</properties>
<dependencies>
<!--scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--hbase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
</dependencies>
</project>
Spark利用JDBC连接mysql数据库:查询
import org.apache.spark.sql.SparkSession
object MySqlSpark {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("MysqlQueryDemo").master("local").getOrCreate()
val jdbcDF = spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://127.0.0.1:3306/tech_convert?useUnicode=true&characterEncoding=utf-8")
.option("dbtable", "test_spark")
.option("user", "root")
.option("password", "222818")
.load()
jdbcDF.show()
}
}
Spark利用JDBC连接mysql数据库:更新
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import java.util.Properties
object saveMysqlSpark {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("MysqlInsertDemo").master("local").getOrCreate()
val df = spark.read.option("header", "true").csv("D:/json/1.csv")
df.show()
val url = "jdbc:mysql://127.0.0.1:3306/tech_convert?useUnicode=true&characterEncoding=utf-8"
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "222818")
df.write.mode(SaveMode.Append).jdbc(url, "test_spark", prop)
}
}