实现自定义采集器类
import java.io.{
BufferedReader, InputStreamReader}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{
DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{
Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
var socket:java.net.Socket=null
def receive(): Unit = {
socket = new java.net.Socket(host,port)
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))
var line:String=null
while((line=reader.readLine())!=null){
if(line.equals("end")){
return
}else{
this.store(line)
}
}
}
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
receive()
}
}).start()
}
override def onStop(): Unit = {
if(socket!=null){
socket.close()
socket=null
}
}
}
object MyReceiver{
def main(args: Array[String]): Unit = {
val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("myreceiver")
val context = new StreamingContext(sparkconf,Seconds(5))
val receiverStream: ReceiverInputDStream[String] = context.receiverStream(new MyReceiver("192.168.**.**",7777))
val line: DStream[String] = receiverStream.flatMap(line=>line.split("\\s+"))
val wordStream: DStream[(String, Int)] = line.map((_,1))
val reduceStream: DStream[(String, Int)] = wordStream.reduceByKey(_+_)
receiverStream.print()
context.start()
context.awaitTermination()
}
}
启动SparkStreaming
启动端口
- 启动命令::
nc -lk 7777
- 输入数据:
hello spark
hello world
hello java
hello spark
查看SparkStreaming输出数据