版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/yulei_qq/article/details/82698699
文章内容参考地址:
http://spark.apache.org/docs/2.3.0/streaming-programming-guide.html#dataframe-and-sql-operations
你可以使用SparkStreaming 中使用的SparkContext 来创建一个SparkSession ,每个RDD 被转换成一个DataFrame,注册为临时表,然后使用SQL查询。
创建三个java类
package com.study.sqlandstreaming;
public class JavaRow implements java.io.Serializable {
private String word;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
package com.study.sqlandstreaming;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
public class JavaSparkSessionSingleton{
private static transient SparkSession instance =null;
public static SparkSession getInstance(SparkConf sparkConf){
if(instance ==null){
instance=SparkSession.builder().config(sparkConf).getOrCreate();
}
return instance;
}
}
package com.study.sqlandstreaming;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* @author : yulei
* @data : 2018/9/14 10:32
* @Version : 1.0
**/
public class TestSparkSQLAndStreaming {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("JavaSqlNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
JavaReceiverInputDStream<String> lines = ssc.socketTextStream("192.168.44.128",9999);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
List<String> list = Arrays.asList( s.split(" "));
return list.iterator();
}
});
// Convert RDDs of the words DStream to DataFrame and run SQL query
/*************START****************/
words.foreachRDD((rdd,time)->{
// Get the singleton instance of SparkSession
SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf);
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map( word->{
JavaRow record = new JavaRow();
record.setWord(word);
return record;
});
Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
wordsDataFrame.createOrReplaceTempView("words");
// Do word count on table using SQL and print it
Dataset<Row> wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word ");
System.out.println("========= " + time + "=========");
wordCountsDataFrame.show();
});
/*************END****************/
ssc.start();
ssc.awaitTermination();
}
}
输入:
[hadoop@s201 ~]$nc -lk 9999
he he sdf wd wd ss ss ss
输出结果大致如下:
========= 1536898010000 ms=========
+----+-----+
|word|total|
+----+-----+
+----+-----+
18/09/14 12:06:54 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/09/14 12:06:54 WARN BlockManager: Block input-0-1536898014600 replicated to only 0 peer(s) instead of 1 peers
========= 1536898015000 ms=========
+----+-----+
|word|total|
+----+-----+
| ss| 3|
| sdf| 1|
| wd| 2|
| he| 2|
+----+-----+
========= 1536898020000 ms=========
+----+-----+
|word|total|
+----+-----+
+----+-----+