实现监控某个端口号,获取该端口号内容。
package org.feng.stream
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark._
import org.apache.spark.streaming._
/**
* Created by Feng on 2019/12/2 15:52
* CurrentProject's name is spark
* 自定义数据源
*/
object MyDefine {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("MyDefine")
val streamingContext = new StreamingContext(sparkConf, Seconds(3))
val lines = streamingContext.receiverStream(new CustomReceiver("localhost", 12345))
val line = lines.flatMap(_.split(" "))
line.map(word => (word, 1)).reduceByKey(_+_).print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
class CustomReceiver(host:String, port:Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging{
override def onStart(): Unit = {
new Thread("CustomRecei"){
override def run(): Unit ={
receive()
}
}.start()
}
override def onStop(): Unit = {
// 什么都不写
}
/**
* 接收方法
*/
private def receive(): Unit = {
try{
var userInput:String = ""
val socket:Socket = new Socket(host, port)
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped() && userInput != null){
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
restart("Trying to connect again")
} catch {
case e:java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e)
case t:Throwable => restart("Error receiving data", t)
}
}
}
注意
流处理的这个程序是监听一个本地的端口,使用netcat往该端口上发送数据即可监听到。在本地运行时,需要安装netcat的windows版本(https://eternallybored.org/misc/netcat/)。点击这里的连接,下载1.1.2版本。之后解压该压缩包,将里边的所有子文件复制到System32文件夹下,就可以用cmd去访问了。命令是nc -l -p [port],我这里监听的是12345端口。
原文链接:https://blog.csdn.net/FBB360JAVA/article/details/103410629