环境:WIN10+JDK1.8+IDEA+Maven3.6.3+FlinkSQL1.13
INSERT 到 connector=print的表
pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Java代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Hi {
public static void main(String[] args) {
//创建流和表的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
//创建表,连接MySQL表
tbEnv.executeSql("CREATE TABLE t1(a STRING,b INT)WITH ('connector' = 'print')");
//写入数据
tbEnv.executeSql("INSERT INTO t1 VALUES('ab',23)");
}
}
-
打印结果
-
+I[ab, 23]
FlinkSQL连MySQL
pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<mysql.version>8.0.31</mysql.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 'connector' = 'jdbc' -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 'driver' = 'com.mysql.cj.jdbc.Driver' -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
</dependencies>
MySQL建表
CREATE OR REPLACE VIEW db1.v AS
SELECT 'ab' AS a,23 AS b,6.78 AS c;
Java代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Hi {
public static void main(String[] args) {
//创建流和表的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
//创建表,连接MySQL表
tbEnv.executeSql("CREATE TEMPORARY TABLE temp_tb (\n" +
" a STRING,\n" +
" b INT,\n" +
" c DECIMAL(3,2))\n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/db1',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'table-name' = 'v'\n" +
")");
//执行查询,打印
tbEnv.sqlQuery("SELECT * FROM temp_tb").execute().print();
}
}
FlinkSQL连Kafka
Kafka => FlinkSQL => Kafka
创建主题
kafka-topics --create \
--bootstrap-server hadoop105:9092 \
--replication-factor 1 --partitions 1 \
--topic t1
kafka-topics --create \
--bootstrap-server hadoop105:9092 \
--replication-factor 1 --partitions 1 \
--topic t2
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<!-- 流 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<!-- 'format'='json' -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.13.1</version>
</dependency>
<!-- 'connector'='kafka' -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.1</version>
</dependency>
Java代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class K2K {
public static void main(String[] args) {
// 1、创建流环境和流式表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2、注册SourceTable
tableEnv.executeSql("CREATE TABLE tb1 (id STRING, ts BIGINT, vc INT) WITH ("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_source_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop105:9092,hadoop106:9092,hadoop107:9092',"
+ "'properties.group.id' = 'g1',"
+ "'scan.startup.mode' = 'latest-offset',"
+ "'format' = 'json'"
+ ")");
// 3、注册SinkTable
tableEnv.executeSql("CREATE TABLE tb2 (id STRING, ts BIGINT, vc INT) WITH ("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_sink_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop105:9092,hadoop106:9092,hadoop107:9092',"
+ "'format' = 'json'"
+ ")");
// 4、查询SourceTable数据,写到SinkTable
tableEnv.executeSql("INSERT INTO tb2 SELECT * FROM tb1 WHERE 1=1");
}
}
生产Kafka和消费Kafka
kafka-console-producer \
--broker-list hadoop105:9092 \
--topic t1
kafka-console-consumer \
--bootstrap-server hadoop105:9092 \
--topic t2
开两个窗口查看效果