SparkStreaming编程
1 Transformation 高级算子
1.1 updateStateByKey
/**
* 单词计数
*
* Driver服务:
* 上一次 运行结果,状态
* Driver服务
* 新的数据
*
*/
object UpdateStateBykeyWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetWordCount")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(2))
ssc.checkpoint("hdfs://hadoop1:9000/streamingcheckpoint")
/**
* 数据的输入
*/
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop1",9999)
/***
* 数据的处理
*
* Option:
* Some:有值
* None:没有值
* updateFunc: (Seq[V], Option[S]) => Option[S]
* 数据的输入:
* you,1
* you,1
* jump,1
*
* ByKey:分组
* you,{1,1}
* jump,{1}
*
* values:Seq[Int] List{1,1}
*
* state:Option[Int] 上一次这个单词出现了多少次 None Some 2
*/
// var f=(values:Seq[Int],state:Option[Int])=>{
// val currentCount = values.sum
// val lastCount = state.getOrElse(0)
// Some(currentCount+lastCount)
// }
// .updateStateByKey(f)
val wordCountDStream = dstream.flatMap(_.split(","))
.map((_, 1))
.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val lastCount = state.getOrElse(0)
Some(currentCount + lastCount)
})
/*
数据的输出
*/
wordCountDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
1.2 mapWithState
/**
* 性能更好
*/
object MapWithStateAPITest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val sc = new SparkContext(sparkConf)
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("hdfs://master:9999/streaming/checkpoint")
val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordsDStream = words.map(x => (x, 1))
val initialRDD = sc.parallelize(List(("dummy", 100L), ("source", 32L)))
// currentBatchTime : 表示当前的Batch的时间
// key: 表示需要更新状态的key
// value: 表示当前batch的对应的key的对应的值
// currentState: 对应key的当前的状态
val stateSpec = StateSpec.function((currentBatchTime: Time, key: String, value: Option[Int], currentState: State[Long]) => {
val sum = value.getOrElse(0).toLong + currentState.getOption.getOrElse(0L)
val output = (key, sum)
if (!currentState.isTimingOut()) {
currentState.update(sum)
}
Some(output)
}).initialState(initialRDD).numPartitions(2).timeout(Seconds(30)) //timeout: 当一个key超过这个时间没有接收到数据的时候,这个key以及对应的状态会被移除掉
val result = wordsDStream.mapWithState(stateSpec)
result.print()
result.stateSnapshots().print()
//启动Streaming处理流
ssc.start()
ssc.stop(false)
//等待Streaming程序终止
ssc.awaitTermination()
}
}
1.3 Transform
**
* 黑名单过滤
*/
object WordBlack {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetWordCount")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(2))
/**
* 数据的输入
*/
/**
* 自己模拟一个黑名单:
* 各位注意:
* 这个黑名单,一般情况下,不是我们自己模拟出来,应该是从mysql数据库
* 或者是Reids 数据库,或者是HBase数据库里面读取出来的。
*/
val wordBlackList = ssc.sparkContext.parallelize(List("?", "!", "*"))
.map(param => (param, true))
val blackList = wordBlackList.collect()
val blackListBroadcast = ssc.sparkContext.broadcast(blackList)
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop1",9999)
/**
* 数据的处理
*/
val wordOneDStream = dstream.flatMap(_.split(","))
.map((_, 1))
//transform 需要有返回值,必须类型是RDD
val wordCountDStream = wordOneDStream.transform(rdd => {
/**
* SparkCore:
* 咱们演示的就是对RDD进行操作
* SparkSQL:
* DataFrame
* SQL
*/
val filterRDD: RDD[(String, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
val resultRDD: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(filterRDD)
/**
*
* (String, (Int, Option[Boolean])
* String: word
* Int: 1
* Option:有可能join上 也有可能join不上
*
* 思路:
* 我们应该要的是join不上的,说白了要的是 Option[Boolean] =None
*
* filter:
* true代表我们要
*/
resultRDD.filter(tuple => {
tuple._2._2.isEmpty
}).map(_._1)
}).map((_, 1)).reduceByKey(_ + _)
/**
* 数据的输出
*
*/
wordCountDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
1.4 Window操作
/**
*
* 需求:
* 实现一个 每隔4秒,统计最近6秒的单词计数的情况。
*
* reduceByKeyAndWindow
*/
object WindowOperatorTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetWordCount")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(2))
/**
* 数据的输入
* 到目前为止这个地方还没有跟生产进行对接。
*/
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop1",9999)
/**
* 数据的处理
* 我们一直讲的是数据处理的算子
* 这个地方算子 就是生产时候用的算子。
*
* reduceFunc: (V, V) => V,
windowDuration: Duration,6 窗口的大小
slideDuration: Duration,4 滑动的大小
numPartitions: Int 指定分区数
*/
val resultWordCountDStream = dstream.flatMap(_.split(","))
.map((_, 1))
.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(6), Seconds(4))
/**
* 数据的输出
*/
resultWordCountDStream.print()
/**
* 这个操作仅仅限于测试的时候使用。
*/
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
2 Output
2.1 foreachRDD
核心算子讲解
/**
* WordCount程序,Spark Streaming消费TCP Server发过来的实时数据的例子:
*
* 1、在master服务器上启动一个Netcat server
* `$ nc -lk 9998` (如果nc命令无效的话,我们可以用yum install -y nc来安装nc)
*
*
* create table wordcount(ts bigint, word varchar(50), count int);
*
* spark-shell --total-executor-cores 4 --executor-cores 2 --master spark://master:7077 --jars mysql-connector-java-5.1.44-bin.jar,c3p0-0.9.1.2.jar,spark-streaming-basic-1.0-SNAPSHOT.jar
*
*
*/
object NetworkWordCountForeachRDD {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("NetworkWordCountForeachRDD").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sc, Seconds(5))
//创建一个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理
val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)
//处理的逻辑,就是简单的进行word count
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
//将结果保存到Mysql(一)
wordCounts.foreachRDD { (rdd, time) =>
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "root")
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
rdd.foreach { record =>
statement.setLong(1, time.milliseconds)
statement.setString(2, record._1)
statement.setInt(3, record._2)
statement.execute()
}
statement.close()
conn.close()
}
//启动Streaming处理流
ssc.start()
ssc.stop(false)
//将结果保存到Mysql(二)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreach { record =>
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "root")
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
statement.setLong(1, time.milliseconds)
statement.setString(2, record._1)
statement.setInt(3, record._2)
statement.execute()
statement.close()
conn.close()
}
}
//将结果保存到Mysql(三)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "root")
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.foreach { case (word, count) =>
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.execute()
}
statement.close()
conn.close()
}
}
//将结果保存到Mysql(四)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
val conn = ConnectionPool.getConnection
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.foreach { case (word, count) =>
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.execute()
}
statement.close()
ConnectionPool.returnConnection(conn)
}
}
//将结果保存到Mysql(五)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
val conn = ConnectionPool.getConnection
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.foreach { case (word, count) =>
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.addBatch()
}
statement.executeBatch()
statement.close()
ConnectionPool.returnConnection(conn)
}
}
//将结果保存到Mysql(六)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
val conn = ConnectionPool.getConnection
conn.setAutoCommit(false)
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.foreach { case (word, count) =>
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.addBatch()
}
statement.executeBatch()
statement.close()
conn.commit()
conn.setAutoCommit(true)
ConnectionPool.returnConnection(conn)
}
}
//将结果保存到Mysql(七)
wordCounts.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionRecords =>
val conn = ConnectionPool.getConnection
conn.setAutoCommit(false)
val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
partitionRecords.zipWithIndex.foreach { case ((word, count), index) =>
statement.setLong(1, time.milliseconds)
statement.setString(2, word)
statement.setInt(3, count)
statement.addBatch()
if (index != 0 && index % 500 == 0) {
statement.executeBatch()
conn.commit()
}
}
statement.executeBatch()
statement.close()
conn.commit()
conn.setAutoCommit(true)
ConnectionPool.returnConnection(conn)
}
}
//等待Streaming程序终止
ssc.awaitTermination()
}
}
连接池代码编写:
pom.xml文件添加如下内容:
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
/**
* 连接池
*/
public class ConnectionPool {
private static ComboPooledDataSource dataSource = new ComboPooledDataSource();
static {
dataSource.setJdbcUrl("jdbc:mysql://master:3306/test");//设置连接数据库的URL
dataSource.setUser("root");//设置连接数据库的用户名
dataSource.setPassword("root");//设置连接数据库的密码
dataSource.setMaxPoolSize(40);//设置连接池的最大连接数
dataSource.setMinPoolSize(2);//设置连接池的最小连接数
dataSource.setInitialPoolSize(10);//设置连接池的初始连接数
dataSource.setMaxStatements(100);//设置连接池的缓存Statement的最大数
}
public static Connection getConnection() {
try {
return dataSource.getConnection();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
public static void returnConnection(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
3 Checkpoint
/**
* Dirver HA
*/
object DriverHAWordCount {
def main(args: Array[String]): Unit = {
val checkpointDirectory:String="hdfs://hadoop1:9000/streamingcheckpoint2";
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetWordCount")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(2))
ssc.checkpoint(checkpointDirectory)
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop1",9999)
val wordCountDStream = dstream.flatMap(_.split(","))
.map((_, 1))
.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val lastCount = state.getOrElse(0)
Some(currentCount + lastCount)
})
wordCountDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
4 SparkStreaming和SparkSQL整合
pom.xml里面添加
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.1</version>
</dependency>
/**
* WordCount程序,Spark Streaming消费TCP Server发过来的实时数据的例子:
*
* 1、在master服务器上启动一个Netcat server
* `$ nc -lk 9998` (如果nc命令无效的话,我们可以用yum install -y nc来安装nc)
*
*
*/
object NetworkWordCountForeachRDDDataFrame {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("NetworkWordCountForeachRDD")
val sc = new SparkContext(sparkConf)
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sc, Seconds(1))
//创建一个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理
val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)
//处理的逻辑,就是简单的进行word count
val words = lines.flatMap(_.split(" "))
//将RDD转化为Dataset
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
//启动Streaming处理流
ssc.start()
ssc.stop(false)
//将RDD转化为Dataset
words.foreachRDD { (rdd, time) =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame = wordsDataFrame.groupBy("word").count()
val resultDFWithTs = wordCountsDataFrame.rdd.map(row => (row(0), row(1), time.milliseconds)).toDF("word", "count", "ts")
resultDFWithTs.write.mode(SaveMode.Append).parquet("hdfs://master:9999/user/spark-course/streaming/parquet")
}
//等待Streaming程序终止
ssc.awaitTermination()
}
}