版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/kaaosidao/article/details/85790797
1.先写TestForeachWriter需要extends ForeachWriter,代码如下:
class TestForeachWriter extends ForeachWriter[Row] with Serializable {
var connection:Connection = _
var statement:Statement = _
val ip = " "
val port = "3306"
val user = " "
val database = " "
val password = ""
val url = s"jdbc:mysql://$ip:$port/$database?useUnicode=true&characterEncoding=UTF-8"
override def open(partitionId: Long, version: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
connection = DriverManager.getConnection(url, user, password)
statement = connection.createStatement
true
}
override def process(value: Row): Unit = {
statement.execute(s"insert into stream_word values ('${value.get(0)}',${value.get(1)})")
}
override def close(errorOrNull: Throwable): Unit = {
connection.close()
}
}
2.监控目录文件将数据插入mysql
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[1]").getOrCreate()
import spark.implicits._
val wordCounts = spark.readStream.text("D:\\tmp\\streaming\\struct")
.as[String].flatMap(_.split(" "))
.groupBy("value").count()
val query = wordCounts.writeStream
.foreach(new TestForeachWriter())
.outputMode("complete")//complete append
.trigger(ProcessingTime("10 seconds"))
.start()
query.awaitTermination()
}
3.向监控目录写文件
public static void main(String[] args) {
try {
FileOutputStream fos = new FileOutputStream("D:\\tmp\\streaming\\struct\\c.txt");
try {
fos.write("c c c c c\r\nb a b\r\nc a a".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}