flink消费kafka数据的版本问题,可以去https://mvnrepository.com/,查看对应版本。
如果在开发过程中,出现版本不对应,那么kafka的topic一定要重新创建一个,以防各种错误。
环境:
mysql
zookeeper:3.4.13
kafka:0.8_2.11
flink:1.7.2(pom.xml中)
启动zookeeper
bin/zkServer.sh start
启动kafka:(此时我的topic是lhqtest)
完整代码:
pom.xml:
代码FlinkKafkajson:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
/**
-
Created by luhaiqing on 2019/6/11.
*/
public class FlinkKafkajson {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);
tableEnvironment.connect(new Kafka().version(“0.8”).topic(“lhqtest”).startFromLatest()
.property(“bootstrap.servers”,“192.168.190.133:9092”)
.property(“zookeeper.connect”,“192.168.190.133:2181”)
.property(“group.id”, “lhqtest”))
.withFormat(new Json().failOnMissingField(true).deriveSchema())
.withSchema(new Schema()
.field(“id”, Types.INT)
.field(“name”, Types.STRING)
.field(“sex”, Types.STRING)) .inAppendMode() .registerTableSource("lhq_user"); Table table = tableEnvironment.scan("lhq_user").select("id,name,sex"); DataStream<Row> personDataStream = tableEnvironment.toAppendStream(table,Row.class); personDataStream.addSink(new MysqlSink()); env.execute("userPv from Kafka");
}
}
写入mysql代码:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
-
Created by luhaiqing on 2019/6/5.
*/public class MysqlSink extends RichSinkFunction
{private Connection connection; private PreparedStatement preparedStatement; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); String className = "com.mysql.jdbc.Driver"; Class.forName(className); String url = "jdbc:mysql://localhost:3306/test"; String user = "root"; String password = "123456"; connection = DriverManager.getConnection(url, user, password); String sql = "replace into flinkjsontest(id,name,sex) values(?,?,?)"; preparedStatement = connection.prepareStatement(sql); super.open(parameters); } @Override public void close() throws Exception { super.close(); if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } super.close(); } public void invoke(Row value, Context context) throws Exception { int id = (int)value.getField(0); String name = (String)value.getField(1); String sex = (String)value.getField(2); System.out.print(id+":"+name+":"+sex); preparedStatement.setInt(1, id); preparedStatement.setString(2, name); preparedStatement.setString(3,sex); int i = preparedStatement.executeUpdate(); if (i > 0) { System.out.println("value=" + value); } }
}