SparkStreaming整合Flume有俩种方式
详细学习文档地址:https://spark.apache.org/docs/latest/streaming-flume-integration.html
方式一:
Push方式整合
步骤一:编写flume配置文件
Flume Agent的编写: flume_push_streaming.conf
simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = hadoop000
simple-agent.sources.netcat-source.port = 44444
simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname = 192.168.199.203
simple-agent.sinks.avro-sink.port = 41414
simple-agent.channels.memory-channel.type = memory
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel
步骤2:创建测试类(FlumePushWordCount)
--------------------------------------------------------------------------------------------------------------------------------------
package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Spark Streaming整合Flume的第一种方式
*/
object FlumePushWordCount {
def main(args: Array[String]): Unit = {
//为了适应生产环境,我们一般不将hostname port写死,而是通过参数判断的形式
if(args.length != 2) {//为什么是2 ,一个代表hostname 一个代表port
System.err.println("Usage: FlumePushWordCount <hostname> <port>")
System.exit(1)
}
val Array(hostname, port) = args
val sparkConf = new SparkConf() //.setMaster("local[2]").setAppName("FlumePushWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//TODO... 如何使用SparkStreaming整合Flume
val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)
flumeStream.map(x=> new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
注意:如何在idea中传入参数呢 , 在右上角Edit Configuration中的,选择programe arguments输入 0.0.0.0 44444(注意中间有空格),代表要传入俩个参数
---------------------------------------------------------------------------------------------------------------------------------------
pom.xml核心依赖(注意版本一致性)
---------------------------------------------------------------------------------------------------------------------------------------
<!-- Spark Streaming 依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming整合Flume 依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
---------------------------------------------------------------------------------------------------------------------------------------
注意:本地测试和线上测试的差异性
本地测试:
flume配置文件:
simple-agent.sinks.avro-sink.hostname = 192.168.199.203 (本地IP)
测试类:hostname改为0.0.0.0(代表本地)
//TODO... 如何使用SparkStreaming整合Flume
val flumeStream = FlumeUtils.createStream(ssc, "0.0.0.0", port.toInt)
-------------------------------------------------------------------------------------
Flume启动(hadoop000上)
flume-ng agent \
--name simple-agent \--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console
当看到avro-sink start代表启动成功!
再在另一个客户端上hadoop000上输入telnet localhost 44444, 输入a 回车 b回车 b回车
再去idea控制台上看有没有输出
------------------------------》证明:一切流程是没有问题
hadoop000:是服务器的地址
local的模式进行Spark Streaming代码的测试 192.168.199.203
本地测试总结
1)启动sparkstreaming作业
2) 启动flume agent
3) 通过telnet输入数据,观察IDEA控制台的输出
-------------------------------------------------接下来开始线上测试------------------------------------------------------
在idea中输入mvn clean package -DiskipTests 完成打包
线上客户端执行:
客户端1:
spark-submit \
--class com.imooc.spark.FlumePushWordCount \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
/home/hadoop/lib/sparktrain-1.0.jar \
hadoop000 41414
客户端2:
flume-ng agent \
--name simple-agent \--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console
客户端3:
telnet localhost 44444
测试:在客户端3上输入a a a d d d c回车
在客户端1上看到输出结果代表成功!!