Kafka
准备依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
官网API介绍
http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
注意
代码实现
package cn.hanjiaxiaozhi.structedstream
import org.apache.spark.SparkContext
import org.apache.spark.sql.{
DataFrame, Dataset, Row, SparkSession}
/**
* Author hanjiaxiaozhi
* Date 2020/7/27 10:22
* Desc StructuredStreaming整合Kafka,从Kafka消费数据
*/
object StructuredStreaming_Kafka {
def main(args: Array[String]): Unit = {
//1.准备环境
val spark: SparkSession = SparkSession.builder.appName("wc").master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
//2.连接Kafka获取数据
val df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node01:9092")//kafka集群地址
.option("subscribe", "spark_kafka")//要订阅的主题
.load()
//df中就有了从Kafka消费的数据,有固定的Schema,我们要获取其中的value是binary类型,所以需要通过如下转换,转为String类型
val ds: Dataset[String] = df.selectExpr("CAST(value AS STRING)") //将binary类型的value反序列化为String
.as[String]//将df转为有泛型的ds
//3.处理数据
val result: Dataset[Row] = ds.flatMap(_.split(" ")).groupBy("value").count().orderBy($"count".desc)
//4.输出结果--先输出到控制台
result.writeStream
.format("console")
.outputMode("complete")
.option("truncate",false)//表示列如果过长不会以...省略
.start()
.awaitTermination()
// /export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka
}
}
MySQL
准备MySQL表
CREATE TABLE `t_word` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(255) NOT NULL,
`count` bigint(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `word` (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
官网和博客说明
- Spark母公司的博客上有上述代码的示例程序
- Databricks是由Apache Spark的创始人建立的商业公司
代码实现
package cn.hanjiaxiaozhi.structedstream
import java.sql.{
Connection, DriverManager, PreparedStatement}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{
DataFrame, Dataset, ForeachWriter, Row, SparkSession}
/**
* Author hanjiaxiaozhi
* Date 2020/7/27 10:22
* Desc StructuredStreaming整合其他技术,
* 从Kafka消费数据并做WordCount,最后将结果输出到MySQL
*/
object StructuredStreaming_Kafka_MySQL{
def main(args: Array[String]): Unit = {
//1.准备环境
val spark: SparkSession = SparkSession.builder.appName("wc").master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
//2.连接Kafka获取数据
val df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node01:9092")//kafka集群地址
.option("subscribe", "spark_kafka")//要订阅的主题
.load()
//df中就有了从Kafka消费的数据,有固定的Schema,我们要获取其中的value是binary类型,所以需要通过如下转换,转为String类型
val ds: Dataset[String] = df.selectExpr("CAST(value AS STRING)") //将binary类型的value反序列化为String
.as[String]//将df转为有泛型的ds
//3.处理数据
val result: Dataset[Row] = ds.flatMap(_.split(" ")).groupBy("value").count().orderBy($"count".desc)
//4.输出结果--输出到MySQL
val jdbcSink = new JdbcSink("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
result.writeStream
.foreach(jdbcSink)
.outputMode("complete")
.start()
.awaitTermination()
}
class JdbcSink(url:String,username:String,password:String) extends ForeachWriter[Row]{
var conn: Connection = null
var ps: PreparedStatement = null
//开启连接
override def open(partitionId: Long, version: Long): Boolean = {
conn = DriverManager. getConnection(url,username,password)
val sql:String =
"""
|REPLACE INTO `t_word` (`id`, `word`, `count`) VALUES (NULL, ?, ?);
|""".stripMargin
ps = conn.prepareStatement(sql)
true //表示创建成功
}
//处理数据/保存数据到MySQL
override def process(value: Row): Unit = {
val word: String = value.getAs[String](0)
val count: Long = value.getAs[Long](1)
ps.setString(1,word)
ps.setLong(2,count)
ps.executeUpdate()
}
//关闭资源
override def close(errorOrNull: Throwable): Unit = {
if(conn != null) conn.close()
if(ps != null) ps.close()
}
}
}