创建流式的DataSet和DataFrame
方式一:通过JavaBean方式
Java语言实现:
package com.kfk.spark.structuredstreaming;
import java.sql.Date;
/**
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/21
* @time : 10:38 下午
*/
public class DeviceData {
// device: string, type: string, signal: double, time: DateType
private String device;
private String deviceType;
private double signal;
private Date deviceTime;
public String getDevice() {
return device;
}
public void setDevice(String device) {
this.device = device;
}
public String getDeviceType() {
return deviceType;
}
public void setDeviceType(String deviceType) {
this.deviceType = deviceType;
}
public double getSignal() {
return signal;
}
public void setSignal(double signal) {
this.signal = signal;
}
public Date getDeviceTime() {
return deviceTime;
}
public void setDeviceTime(Date deviceTime) {
this.deviceTime = deviceTime;
}
public DeviceData(String device, String deviceType, double signal, Date deviceTime) {
this.device = device;
this.deviceType = deviceType;
this.signal = signal;
this.deviceTime = deviceTime;
}
public DeviceData(){
}
}
package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.sql.Date;
/**
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/21
* @time : 10:35 下午
*/
public class StruStreamingDFOper {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = CommSparkSession.getSparkSession();
// input table
Dataset<String> dflines = spark.readStream()
.format("socket")
.option("host", "bigdata-pro-m04")
.option("port", 9999)
.load().as(Encoders.STRING());
// 根据javaBean转换为dataset
Dataset<DeviceData> dfdevice = dflines.map(new MapFunction<String, DeviceData>() {
@Override
public DeviceData call(String value) throws Exception {
String[] lines = value.split(",");
return new DeviceData(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3]));
}
}, ExpressionEncoder.javaBean(DeviceData.class));
// result table
Dataset<Row> dffinal = dfdevice.select("device","deviceType").where("signal > 10").groupBy("deviceType").count();
// output
StreamingQuery query = dffinal.writeStream()
.outputMode("update")
.format("console")
.start();
query.awaitTermination();
}
}
方式二:构造Schema的方式
package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
/**
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/21
* @time : 10:35 下午
*/
public class StruStreamingDFOper2 {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = CommSparkSession.getSparkSession();
// input table
Dataset<String> dflines = spark.readStream()
.format("socket")
.option("host", "bigdata-pro-m04")
.option("port", 9999)
.load().as(Encoders.STRING());
List<StructField> fields = new ArrayList<StructField>();
StructField device = DataTypes.createStructField("device",DataTypes.StringType,true);
StructField deviceType = DataTypes.createStructField("deviceType",DataTypes.StringType,true);
StructField signal = DataTypes.createStructField("signal",DataTypes.DoubleType,true);
StructField deviceTime = DataTypes.createStructField("deviceTime",DataTypes.DateType,true);
fields.add(device);
fields.add(deviceType);
fields.add(signal);
fields.add(deviceTime);
// 构造Schema
StructType scheme = DataTypes.createStructType(fields);
// 根据schema转换为dataset
Dataset<Row> dfdevice = dflines.map(new MapFunction<String, Row>() {
@Override
public Row call(String value) throws Exception {
String[] lines = value.split(",");
return RowFactory.create(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3]));
}
}, RowEncoder.apply(scheme));
// result table
Dataset<Row> dffinal = dfdevice.select("device","deviceType").where("signal > 10").groupBy("deviceType").count();
// output
StreamingQuery query = dffinal.writeStream()
.outputMode("update")
.format("console")
.start();
query.awaitTermination();
}
}
Scala语言实现:
package com.kfk.spark.structuredstreaming
import com.kfk.spark.common.CommSparkSessionScala
import java.sql.Date
import org.apache.spark.sql.{
Dataset, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType;
/**
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/22
* @time : 7:31 下午
*/
object StruStreamingDFOperScala {
case class DeviceData(device : String,deviceType : String , signal : Double ,deviceTime: Date)
def main(args: Array[String]): Unit = {
val spark = CommSparkSessionScala.getSparkSession() ;
import spark.implicits._;
// input table
val socketDF = spark
.readStream
.format("socket")
.option("host", "bigdata-pro-m04")
.option("port", 9999)
.load().as[String]
// 构造Schema
val userSchema = new StructType()
.add("device", "string")
.add("deviceType", "string")
.add("signal", "double")
.add("deviceTime", "date")
// 根据schema转换为dataset
val df_device = socketDF.map(x => {
val lines = x.split(",")
Row(lines(0),lines(1),lines(2),lines(3))
})(RowEncoder(userSchema))
// 根据case class转换为dataset
val df : Dataset[DeviceData] = df_device.as[DeviceData]
// result table
val df_final = df.groupBy("deviceType").count()
// output
val query = df_final.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
}
}
测试数据:
aa,ss,100,2020-12-21
bb,ss,47,2020-12-21
cc,ss,3,2020-12-21
dd,ss,46,2020-12-21
运行结果:
-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-----+
|deviceType|count|
+----------+-----+
| ss| 3|
+----------+-----+
创建流式的DataSet和DataFrame也可以通过创建临时表来进行业务分析
package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
/**
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/21
* @time : 10:35 下午
*/
public class StruStreamingDFOper3 {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = CommSparkSession.getSparkSession();
// input table
Dataset<String> dflines = spark.readStream()
.format("socket")
.option("host", "bigdata-pro-m04")
.option("port", 9999)
.load().as(Encoders.STRING());
List<StructField> fields = new ArrayList<StructField>();
StructField device = DataTypes.createStructField("device",DataTypes.StringType,true);
StructField deviceType = DataTypes.createStructField("deviceType",DataTypes.StringType,true);
StructField signal = DataTypes.createStructField("signal",DataTypes.DoubleType,true);
StructField deviceTime = DataTypes.createStructField("deviceTime",DataTypes.DateType,true);
fields.add(device);
fields.add(deviceType);
fields.add(signal);
fields.add(deviceTime);
// 构造Schema
StructType scheme = DataTypes.createStructType(fields);
// 根据schema转换为dataset
Dataset<Row> dfdevice = dflines.map(new MapFunction<String, Row>() {
@Override
public Row call(String value) throws Exception {
String[] lines = value.split(",");
return RowFactory.create(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3]));
}
}, RowEncoder.apply(scheme));
// 创建临时表
dfdevice.createOrReplaceTempView("device");
// result table
Dataset<Row> dffinal = spark.sql("select * from device");
// output
StreamingQuery query = dffinal.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
}
}
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!