这边实现的主要是flume中的poll流程,SparkStreaming主动去获取数据。
scala:scala-2.11.8
spark:spark-2.1.0
flume:flume-1.7.0
- 有使用到的pom.xml部分代码
<properties>
<scala.version>2.11</scala.version>
<spark.version>2.1.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
-
jar处理
将 spark安装包里jars文件夹下的jar包拷贝到flume的lib里
将maven下的spark-streaming-flume-sink_2.11-2.1.0.jar放到flume的lib里 -
SparkStreaming代码
如果是要实现flume的push,则使用FlumeUtils.createStream方法。
val ssc = new StreamingContext(conf, Seconds(3))
val flumeEventDStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils
.createPollingStream(ssc,"bigdata11",777,StorageLevel.MEMORY_AND_DISK_2)
val ds: DStream[String] = flumeEventDStream.map(e => {
new String(e.event.getBody.array())
})
ds.print()
ssc.start()
- flume脚本
exec_sparkSinks.conf
#bin/flume-ng agent -n a1 -f conf/agent/exec_sparkSinks.conf -c conf -Dflume.root.logger=INFO,console
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/test/exec_sparkSinks.txt
a1.sources.r1.shell = /usr/bin/bash -c
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.16.11
a1.sinks.k1.port = 777
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5、运行flume脚本、Spark项目,对flume监听的源 exec_sparkSinks.txt 进行写入操作,接着就能看到SparkStreaming输出数据
echo "nihao" >> exec_sparkSinks.txt
echo "haha" >> exec_sparkSinks.txt
注意
这边的话启动flume脚本有可能会报错
java.lang.IllegalStateException: begin() called when transaction is OPEN!
这个是由于flume的lib目录下有2个scala-library-*.jar文件导致的,1个flume自带的,1个是从spark里拷贝过去的,移除掉flume自带的那个即可,接着重新运行flume脚本,Spark项目即可。