!!!两者不同的是poll是推送到Linux自己的节点上,push是推送到win上面。所以两个sink不同。Scala代码中也只有 host和port不同
1、poll的方式
开发flume配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/flume/flume-poll
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=node03
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000
代码实现
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
object flume_pull {
def updateFunc(inputStream:Seq[Int], resultPut:Option[Int]):Option[Int] ={
val result = inputStream.sum+resultPut.getOrElse(0)
Option(result)
}
// 私用poll的方式从flume中领取 数据
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("flume-poll"))
context.setCheckpointDir("./check_point")
context.setLogLevel("WARN")
val stream = new StreamingContext(context,Seconds(5))
val host = "node03"
val port = 8888
/**
* ssc: StreamingContext,
* hostname: String, 主机名
* port: Int, 端口
* storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 默认值
* SparkFlumeEvent :所有的数据都封装在SparkFlumeEvent里面
*/
val flume_poll: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(stream,host,port)
//得到每一行数据
val line = flume_poll.map(x => {
//x代表 SparkFlumeEvent封装的对象,这个对象里面封装了Event 数据
val array: Array[Byte] = x.event.getBody.array()
val str = new String(array)
str
})
val value = line.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc)
value.print()
stream.start()
stream.awaitTermination()
}
}
2、 push的方式
flume配置文件
#push mode
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/flume/flume-push
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
#注意这里的ip需要指定的是我们spark程序所运行的服务器的ip,也就是我们的win的ip地址
a1.sinks.k1.hostname=192.168.*.*
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000
代码实现
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
object flume_push {
def updateFunc(inputStream:Seq[Int], resultStream:Option[Int]):Option[Int] = {
val result = inputStream.sum+resultStream.getOrElse(0)
Option(result)
}
def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("flume-push"))
val streamingContext = new StreamingContext(sparkContext,Seconds(5))
sparkContext.setLogLevel("WARN")
sparkContext.setCheckpointDir("./check_point")
/**
*
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
*/
val flume_push: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(streamingContext, "192.168.*.*",8888)
val line: DStream[String] = flume_push.map(x => {
val array: Array[Byte] = x.event.getBody.array()
val str = new String(array)
str
})
val value = line.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc)
value.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}