1. KafkaSink
package com.qu.sink
import java.util.Properties
import com.qu.source.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 读取一个本地文件
val inputPath = "/Users/xietong/IdeaProjects/FlinkTutorial/src/main/resources/1.txt"
val inputDataSet = env.readTextFile(inputPath)
//数据源:kafka
//测试: 控制台生产数据 ./bin/kafka-console-producer.sh --broker-list cdh-master:9092 --topic sensor
val property = new Properties()
property.setProperty("bootstrap.servers", "113.143.100.155:9092")
property.put("group.id", "consumer-group")
property.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
property.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
property.setProperty("auto.offset.reset", "latest")
val stream = env.addSource(new FlinkKafkaConsumer("sensor", new SimpleStringSchema(), property))
.map{ line=>
val ss = line.split(",")
SensorReading(ss(0), ss(1).toLong, ss(2).toDouble).toString
}
//消费者依旧是kafka
//测试是否消费到 ./bin/kafka-console-consumer.sh --bootstrap-server cdh-master:9092 --tpoic sensor01
stream.addSink( new FlinkKafkaProducer[String]("cdh-master:9092", "sensor01", new SimpleStringSchema()))
env.execute("kafka test")
}
}
2. RedisSink
package com.qu.sink
import com.qu.source.SensorReading
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 读取一个本地文件
val inputPath = "/Users/xietong/IdeaProjects/FlinkTutorial/src/main/resources/1.txt"
val inputDataSet = env.readTextFile(inputPath)
//map: 1.匿名函数
val map1 = inputDataSet.map{ line=>
SensorReading(line.split(",")(0),line.split(",")(1).toLong,line.split(",")(2).toDouble)
}
// val redisConf = new FlinkJedisPoolConfig("cdh-master", "6379", "111", "111")
//
// map1.addSink( new RedisSink[SensorReading](redisConf, new myRedisMapper()))
env.execute("redis sink test")
}
}
class myRedisMapper() extends RedisMapper[SensorReading]{
//定义 将数据保存到redis的命令
override def getCommandDescription: RedisCommandDescription = {
//将传感器数据 通过hset命令 存放入 表名key='sensor_tbl'的表中
new RedisCommandDescription(RedisCommand.HSET, "sensor_tbl")
}
//定义需要存入的数据的key
override def getKeyFromData(data: SensorReading): String = data.id
//定义需要存入的数据的value
override def getValueFromData(data: SensorReading): String = data.temperature.toString
}
3. ESSink
package com.qu.sink
import java.util
import com.qu.source.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object ESSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 读取一个本地文件
val inputPath = "/Users/xietong/IdeaProjects/FlinkTutorial/src/main/resources/1.txt"
val inputDataSet = env.readTextFile(inputPath)
//map: 1.匿名函数
val map1 = inputDataSet.map{ line=>
SensorReading(line.split(",")(0),line.split(",")(1).toLong,line.split(",")(2).toDouble)
}
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("cdh-master", 9200))
//创建一个esSink的builder
val esBuilder = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading] {
override def process(data: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
//将收到的数据 封装成 一个map或者jsonObject
val json = new util.HashMap[String, String]()
json.put("sensor_id", data.id)
json.put("timestamps", data.timestamps.toString)
json.put("temperature", data.temperature.toString)
//创建一个 index request 发送数据
val indexRequest = Requests.indexRequest()
.index("sensor")
.`type`("sensor")
.source(json)
//利用RequestIndexer发送request请求
indexer.add(indexRequest)
}
})
//添加一个sink
map1.addSink( esBuilder.build())
env.execute("es sink test")
}
}
4. JDBCSink
package com.qu.sink
import java.sql.{Connection, DriverManager, PreparedStatement}
import com.qu.source.SensorReading
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
object MysqlSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 读取一个本地文件
val inputPath = "/Users/xietong/IdeaProjects/FlinkTutorial/src/main/resources/1.txt"
val inputDataSet = env.readTextFile(inputPath)
//map: 1.匿名函数
val map1 = inputDataSet.map{ line=>
val ss = line.split(",")
SensorReading(ss(0), ss(1).toLong, ss(2).toDouble)
}
map1.addSink( new MysqlSink())
env.execute("mysql sink test")
}
}
class MysqlSink extends RichSinkFunction[SensorReading]{
var conn: Connection = _
var insertPstmt: PreparedStatement = _
var updatePstmt: PreparedStatement = _
//生命周期开始创建链接
override def open(parameters: Configuration): Unit = {
// 初始化创建一个jdbc连接
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/project_crowd", "root", "123456")
insertPstmt = conn.prepareStatement("insert into sensor (sensor_id, temp) values(?, ?)")
updatePstmt = conn.prepareStatement("update sensor set temp=? where sensor_id=?")
}
override def invoke(data: SensorReading, context: SinkFunction.Context[_]): Unit = {
updatePstmt.setDouble(1, data.temperature)
updatePstmt.setString(2, data.id)
updatePstmt.execute()
if(updatePstmt.getUpdateCount == 0){
insertPstmt.setString(1, data.id)
insertPstmt.setDouble(2, data.temperature)
insertPstmt.execute()
}
}
//生命周期结束 关闭连接 清理资源
override def close(): Unit = {
insertPstmt.close()
updatePstmt.close()
conn.close()
}
}