(运行环境:搭建好Hadoop集群,Zookeeper集群,Spark集群,关闭防火墙)
场景:通过Spark Streaming 拉取Kafka中的数据进行消费
(1)数据灌入kafka (生产者:代码如下)
maven pom.xml配置如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>SSO-Scala</artifactId>
<groupId>org.jy.data.yh.bigdata.drools.scala</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>repos</id>
<name>Repository</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</repository>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>repos</id>
<name>Repository</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</pluginRepository>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<groupId>org.jy.data.bigdata.drools.scala</groupId>
<artifactId>SOO-SparkStreaming-kafka</artifactId>
<name>SOO-SparkStreaming-kafka</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>2.4.0</spark.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<dependency> <!-- Spark -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency> <!-- Spark Streaming -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency><!-- Spark Streaming with Kafka 版本要与scala匹配-->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 2.12.x需要与spark的2.12对应-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
<build>
<finalName>SSO-KafkaProduct</finalName>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- 混合Scala/Java编译-->
<plugin><!--Scala编译插件 -->
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source><!-- 设置Java源-->
<target>1.8</target>
</configuration>
</plugin>
<plugin><!-- 将所有的依赖包打入同一个jar包-->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef><!--jar包的后缀名-->
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.jy.data.bigdata.drools.scala.producer.KafkaProduct</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin><!--Maven打包插件 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath><!--添加类路径-->
<!--设置程序的入口类 org.jy.data.bigdata.drools.scala.pro.KafkaOperationConsumer
org.jy.data.bigdata.drools.scala.producer.KafkaProduct
-->
<mainClass>org.jy.data.bigdata.drools.scala.producer.KafkaProduct</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
(2)完整生产者代码如下:
package org.jy.data.bigdata.drools.scala.producer
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.util.Random
/**
* Kafka提供者
*/
object KafkaProduct {
def main(args: Array[String]): Unit = {
// 运行时参数读入topic
//val topic = args(0) // 消息向那个主题发送
val topic = "kafkaOperation";
// 运行时参数读入brokers
//val brokers = args(1)
// 设置项
val rnd = new Random()
// 配置项
val props = new Properties() // kafka的配置项
// 配置Brokers
props.put("bootstrap.servers","node-1:9092,node-2:9092,node-3:9092") // kafka集群地址
// 设置客户端的名称
props.put("client.id","kafkaGenerator")
// 序列化
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
// 建立连接
val producer = new KafkaProducer[String,String](props)
val t = System.currentTimeMillis()
// 模拟用户名地址数据,
val nameAddrs =Map("bob" -> "shanghai#200000","amy" -> "beijing#100000","alice" -> "shanghai#200000",
"tom" -> "beijing#100000","lulu" -> "hangzhou#310000","nick" -> "shanghai#200000"
)
// 模拟用户名电话数据
val namePhones = Map("bob" -> "1300079421","amy" -> "18700074423","alice" -> "17730079427",
"tom" -> "16700379451", "lulu" -> "18800074423","nick" -> "14400033426"
)
// 生成模拟数据(name,addr,type:0)
for(nameAddr <- nameAddrs){
val data = new ProducerRecord[String,String](topic,nameAddr._1,s"${nameAddr._1}\t${nameAddr._2}\t0") // 对应上面的type:0
producer.send(data)
if(rnd.nextInt(100) <50) Thread.sleep(rnd.nextInt(10))
}
// 生成模拟数据(name,addr,type:1)
for(namePhone <- namePhones){
val data = new ProducerRecord[String,String](topic,namePhone._1,s"${namePhone._1}\t${namePhone._2}\t1")
producer.send(data)
if(rnd.nextInt(100)<50) Thread.sleep(rnd.nextInt(10))
System.out.println("发送的消息为:"+data.value())
}
System.out.println("send per second: "+(nameAddrs.size+namePhones.size)*1000 / (System.currentTimeMillis()-t))
producer.close() // 执行玩完就会关闭
}
}
打包时pom.xml作如下调整
<build>
<finalName>SSO-KafkaConsumer</finalName>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- 混合Scala/Java编译-->
<plugin><!--Scala编译插件 -->
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source><!-- 设置Java源-->
<target>1.8</target>
</configuration>
</plugin>
<plugin><!-- 将所有的依赖包打入同一个jar包-->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef><!--jar包的后缀名-->
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.jy.data.bigdata.drools.scala.pro.KafkaOperationConsumer</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin><!--Maven打包插件 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath><!--添加类路径-->
<!--设置程序的入口类 org.jy.data.bigdata.drools.scala.pro.KafkaOperationConsumer
org.jy.data.bigdata.drools.scala.producer.KafkaProduct
-->
<mainClass>org.jy.data.bigdata.drools.scala.pro.KafkaOperationConsumer</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
消费者代码如下
package org.jy.data.bigdata.drools.scala.consumer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.{Logger, LoggerFactory}
/**
* kafka消费这Spark Streaming
*/
object KafkaOperationConsumer {
def main(args: Array[String]): Unit = {
val log = LoggerFactory.getLogger(KafkaOperationConsumer.getClass)
// Spark 配置项
val sparkConf = new SparkConf()
.setMaster("spark://centoshadoop1:7077,centoshadoop2:7077") // Spark HA的地址
.set("spark.local.dir","./tmp")
.set("spark.streaming.kafka.maxRatePerPartition","10")
// 创建流式上下文,2s为批处理间隔
val ssc = new StreamingContext(sparkConf,Seconds(4))
// 根据broker和topic创建直接通过kafka连接Direct Kafka
val kafkaParams = Map[String,Object]( // kafka集群地址
"bootstrap.servers" -> "node-1:9092,node-2:9092,node-3:9092", // 服务器地址
"key.deserializer" -> classOf[StringDeserializer], // 序列化类型,需要引入序列化对象
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kafkaOperationGroup", // group 设置
"auto.offset.reset" -> "latest", // 从最新offset开始
"enable.auto.commit" -> (false:java.lang.Boolean)) // 自动提交
log.info("======================211111111111=====")
// 获取kafkaDStream
val kafkaDirectStream = KafkaUtils.createDirectStream[String,String](
ssc,
PreferConsistent,
Subscribe[String,String](List("kafkaOperation"),kafkaParams)) // 主题名称
// 根据接收到的kafka信息,切分得到用户地址DStream
val nameAddrStream = kafkaDirectStream.map(_.value).filter(record =>{
// 记录以制表符切割
val tokens = record.split("\t")
// addr type 0
tokens(2).toInt == 0
}).map( record => {
val tokens = record.split("\t")
(tokens(0),tokens(1))
})
log.info("======================2222222222222222222222=====")
System.out.println("====================================111111111111=开始执行了吗=================================")
// 根据接收的kafka信息,切分得到用户电话DStream
val namePhoneStream = kafkaDirectStream.map(_.value).filter(record => {
val tokens = record.split("\t")
// phone type 1
tokens(2).toInt == 1
}).map(record =>{
val tokens = record.split("\t")
(tokens(0),tokens(1))
})
log.info("======================33333333333333=====")
System.out.println("====================================111111111111=执行中间过程了吗=================================")
// 以用户名为key,将地址电话配对在一起
// 并产生固定格式的用户地址电话信息
val nameAddrPhoneStream = nameAddrStream.join(namePhoneStream).map(record => {
s"姓名: ${record._1}, 地址: ${record._2._1}, 电话: ${record._2._2}"
})
// 打印输出
nameAddrPhoneStream.print()
System.out.println("====================================111111111111=有没有执行到这里=================================")
// 开始运算
ssc.start()
ssc.awaitTermination()
}
}
(3)运行消费者运行命令如下:(对应自己实际调整)
nohup /home/hadoop/spark/spark-2.4.5-bin-hadoop2.7/bin/spark-submit \
--class org.jy.data.bigdata.drools.scala.consumer.KafkaOperationConsumer \
--num-executors 4 \
--driver-memory 1G \
--executor-memory 1G \
--conf spark.default.parallelism=1000 \
/home/hadoop/tools/SSO-KafkaConsumer.jar > SSO-KafkaConsumer.file 2>&1 &
(4)Spark 集群的安装目录:
cd /home/hadoop/spark/spark-2.4.5-bin-hadoop2.7
tail -f SSO-KafkaConsumer.file
(5)运行生产者(向对应主题灌入数据)命令如下:
/home/hadoop/spark/spark-2.4.5-bin-hadoop2.7/bin/spark-submit \
--class org.jy.data.bigdata.drools.scala.producer.KafkaProduct \
--num-executors 2 \
--driver-memory 1G \
--executor-memory 1G \
--conf spark.default.parallelism=1000 \
/home/hadoop/tools/SSO-KafkaProducer.jar
再次查看消费者实时日志:
消费者实时拉取kafka中的数据进行消费
至此,Over !!!!!!!!!!!!!!!!!!!!!!!!!