取材自官网http://Spark.apache.org
案例1:和集群搭配使用
package SparkStreaming
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
案例2:和集群搭配使用
package SparkStreaming
import org.apache.hadoop.fs.shell.Count
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object NetworkSparkStreaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("nwc")
val ssc = new StreamingContext(conf,Seconds(20))
ssc.sparkContext.setLogLevel("ERROR")
//val lines = ssc.socketTextStream("lion",1234)
//val lines = ssc.textFileStream("hdfs://192.168.33.136:9000/wc/input/")
ssc.checkpoint("d://123/eq")
val lines = ssc.receiverStream(new CustomReceiver("lion",55555))
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word =>(word,1))
val wordcounts = pairs.reduceByKey(_+_)
val res = wordcounts.updateStateByKey(updateFunction)
res.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
def updateFunction(newValues:Seq[Int],runningCount:Option[Int]): Option[Int] ={
val newCounts = newValues.sum
val sum = runningCount.getOrElse(0)
Some(newCounts+sum)
}
}