Flink 各层 API 的用法支持

DataSet API

Source - 数据来源


  • CRowValuesInputFormat
  • CollectionInputFormat
  • CsvInputFormat
  • IteratorInputFormat
  • JDBCInputFormat
  • ParallelIteratorInputFormat
  • PojoCsvInputFormat
  • PrimitiveInputFormat
  • ReplicatingInputFormat
  • RowCsvInputFormat
  • SerializedInputFormat
  • TextInputFormat
  • TextValueInputFormat
  • TupleCsvInputFormat
  • TypeSerializerInputFormat
  • ValuesInputFormat

Sink - 数据目标


  • BlockingShuffleOutputFormat
  • CsvOutputFormat
  • DiscardingOutputFormat
  • JDBCOutputFormat
  • JDBCUpsertOutputFormat
  • LocalCollectionOutputFormat
  • PrintingOutputFormat
  • ScalaCsvOutputFormat
  • SerializedOutputFormat
  • TextOutputFormat
  • TypeSerializerOutputFormat

Transformation - 转换

DataStream API

Source - 数据来源

File 类型

  • readTextFile
  • readFile

Socket 类型

  • socketTextStream

Collection 类型

  • fromCollection
  • fromElements
  • fromParallelCollection
  • generateSequence


  • addSource - 调用 SourceFunction 的实现类实现自定义数据来源

Sink - 数据目标


  • print
  • printToErr

File 类型

  • writeAsText
  • writeAsCsv

Socket 类型

  • writeToSocket


  • addSink - 调用 SinkFunction 的实现类实现自定义数据目标

Connector - 连接器

除了基础的 Source 和 Sink 编程接口,Flink 提供一些多样化的第三方系统专用交互支持,也就是连接器。连接器可以同时支持 Souce 和 Sink 的相关功能。

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)

还有一部分连接器通过 Apache Bahir 发布,包括:

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

Transformation - 转换

Table API

Source - 数据来源


  • CsvTableSource
// create Hadoop Configuration
Configuration config = new Configuration();

OrcTableSource orcTableSource = OrcTableSource.builder()
  // path to ORC file(s). NOTE: By default, directories are recursively scanned.
  // schema of ORC files
  // Hadoop configuration
  // build OrcTableSource
  • JDBCTableSource

官方 API 暂未提供说明但代码中有

Sink - 数据目标


  • CsvTableSink
CsvTableSink sink = new CsvTableSink(
    path,                  // output path
    "|",                   // optional: delimit files by '|'
    1,                     // optional: write to a single file
    WriteMode.OVERWRITE);  // optional: override existing files

  // specify table schema
  new String[]{"f0", "f1"},
  new TypeInformation[]{Types.STRING, Types.INT},

Table table = ...
  • JDBCAppendTableSink
JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
  .setQuery("INSERT INTO books (id) VALUES (?)")

  // specify table schema
  new String[]{"id"},
  new TypeInformation[]{Types.INT},

Table table = ...
  • JDBCUpsertTableSink

官方 API 暂未提供说明但代码中有

  • CassandraAppendTableSink
ClusterBuilder builder = ... // configure Cassandra cluster connection

CassandraAppendTableSink sink = new CassandraAppendTableSink(
  // the query must match the schema of the table
  "INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)");

  // specify table schema
  new String[]{"id", "name", "value"},
  new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE},

Table table = ...

Connector - 连接器

  • File System Connector
  new FileSystem()
    .path("file:///path/to/whatever")    // required: path to a file or directory
.withFormat(                             // required: file system connector requires to specify a format,
  ...                                    // currently only OldCsv format is supported.
)                                        // Please refer to old CSV format part of Table Formats section for more details.
  • Kafka Connector
  new Kafka()
    .version("0.11")    // required: valid connector versions are
                        //   "0.8", "0.9", "0.10", "0.11", and "universal"
    .topic("...")       // required: topic name from which the table is read

    // optional: connector specific properties
    .property("zookeeper.connect", "localhost:2181")
    .property("bootstrap.servers", "localhost:9092")
    .property("group.id", "testGroup")

    // optional: select a startup mode for Kafka offsets

    // optional: output partitioning from Flink's partitions into Kafka's partitions
    .sinkPartitionerFixed()         // each Flink partition ends up in at-most one Kafka partition (default)
    .sinkPartitionerRoundRobin()    // a Flink partition is distributed to Kafka partitions round-robin
    .sinkPartitionerCustom(MyCustom.class)    // use a custom FlinkKafkaPartitioner subclass
.withFormat(                                  // required: Kafka connector requires to specify a format,
  ...                                         // the supported formats are Csv, Json and Avro.
)                                             // Please refer to Table Formats section for more details.
  • Elasticsearch Connector
  new Elasticsearch()
    .version("6")                      // required: valid connector versions are "6"
    .host("localhost", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
    .index("MyUsers")                  // required: Elasticsearch index
    .documentType("user")              // required: Elasticsearch document type

    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" by default)
                              //   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
    .keyNullLiteral("n/a")    // optional: representation for null fields in keys ("null" by default)

    // optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
    .failureHandlerFail()          // optional: throws an exception if a request fails and causes a job failure
    .failureHandlerIgnore()        //   or ignores failures and drops the request
    .failureHandlerRetryRejected() //   or re-adds requests that have failed due to queue capacity saturation
    .failureHandlerCustom(...)     //   or custom failure handling with a ActionRequestFailureHandler subclass

    // optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
    .disableFlushOnCheckpoint()    // optional: disables flushing on checkpoint (see notes below!)
    .bulkFlushMaxActions(42)       // optional: maximum number of actions to buffer for each bulk request
    .bulkFlushMaxSize("42 mb")     // optional: maximum size of buffered actions in bytes per bulk request
                                   //   (only MB granularity is supported)
    .bulkFlushInterval(60000L)     // optional: bulk flush interval (in milliseconds)

    .bulkFlushBackoffConstant()    // optional: use a constant backoff type
    .bulkFlushBackoffExponential() //   or use an exponential backoff type
    .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
    .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds)

    // optional: connection properties to be used during REST communication to Elasticsearch
    .connectionMaxRetryTimeout(3)  // optional: maximum timeout (in milliseconds) between retries
    .connectionPathPrefix("/v1")   // optional: prefix string to be added to every REST communication
.withFormat(                      // required: Elasticsearch connector requires to specify a format,
  ...                             // currently only Json format is supported.
                                  // Please refer to Table Formats section for more details.
  • HBase Connector
  new HBase()
    .version("1.4.3")                      // required: currently only support "1.4.3"
    .tableName("hbase_table_name")         // required: HBase table name
    .zookeeperQuorum("localhost:2181")     // required: HBase Zookeeper quorum configuration
    .zookeeperNodeParent("/test")          // optional: the root dir in Zookeeper for HBase cluster.
                                           // The default value is "/hbase".
    .writeBufferFlushMaxSize("10mb")       // optional: writing option, determines how many size in memory of buffered
                                           // rows to insert per round trip. This can help performance on writing to JDBC
                                           // database. The default value is "2mb".
    .writeBufferFlushMaxRows(1000)         // optional: writing option, determines how many rows to insert per round trip.
                                           // This can help performance on writing to JDBC database. No default value,
                                           // i.e. the default flushing is not depends on the number of buffered rows.
    .writeBufferFlushInterval("2s")        // optional: writing option, sets a flush interval flushing buffered requesting
                                           // if the interval passes, in milliseconds. Default value is "0s", which means
                                           // no asynchronous flush thread will be scheduled.
  • JDBC Connector

JDBC Connector 目前只提供 DDL 编程支持,不提供 Java/Scala 语法支持

  • Hive Connector
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf"; // a local path
String version         = "2.3.4";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
  • CSV Format
  new Csv()

    // optional: define the schema explicitly using type information. This overrides default
    // behavior that uses table's schema as format schema.

    .fieldDelimiter(';')         // optional: field delimiter character (',' by default)
    .lineDelimiter("\r\n")       // optional: line delimiter ("\n" by default;
                                 //   otherwise "\r", "\r\n", or "" are allowed)
    .quoteCharacter('\'')        // optional: quote character for enclosing field values ('"' by default)
    .allowComments()             // optional: ignores comment lines that start with '#' (disabled by default);
                                 //   if enabled, make sure to also ignore parse errors to allow empty rows
    .ignoreParseErrors()         // optional: skip fields and rows with parse errors instead of failing;
                                 //   fields are set to null in case of errors
    .arrayElementDelimiter("|")  // optional: the array element delimiter string for separating
                                 //   array and row element values (";" by default)
    .escapeCharacter('\\')       // optional: escape character for escaping values (disabled by default)
    .nullLiteral("n/a")          // optional: null literal string that is interpreted as a
                                 //   null value (disabled by default)
  • JSON Format
  new Json()
    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

    // optional: define the schema explicitly using type information. This overrides default
    // behavior that uses table's schema as format schema.

    // or by using a JSON schema which parses to DECIMAL and TIMESTAMP. This also overrides default behavior.
      "{" +
      "  type: 'object'," +
      "  properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "  }" +
  • Apache Avro Format
  new Avro()

    // required: define the schema either by using an Avro specific record class

    // or by using an Avro schema
      "{" +
      "  \"type\": \"record\"," +
      "  \"name\": \"test\"," +
      "  \"fields\" : [" +
      "    {\"name\": \"a\", \"type\": \"long\"}," +
      "    {\"name\": \"b\", \"type\": \"string\"}" +
      "  ]" +
  • Old CSV Format
  new OldCsv()
    .field("field1", DataTypes.STRING())    // optional: declare ordered format fields explicitly. This will overrides
    .field("field2", DataTypes.TIMESTAMP(3)) //  the default behavior that uses table's schema as format schema.
    .fieldDelimiter(",")              // optional: string delimiter "," by default
    .lineDelimiter("\n")              // optional: string delimiter "\n" by default
    .quoteCharacter('"')              // optional: single character for string values, empty by default
    .commentPrefix('#')               // optional: string to indicate comments, empty by default
    .ignoreFirstLine()                // optional: ignore the first line, by default it is not skipped
    .ignoreParseErrors()              // optional: skip records with parse error instead of failing by default
  new Schema()
    .field("MyField1", DataTypes.INT())     // required: specify the fields of the table (in this order)
    .field("MyField2", DataTypes.STRING())
    .field("MyField3", DataTypes.BOOLEAN())
  new Schema()
    .field("MyField1", DataTypes.TIMESTAMP(3))
      .proctime()      // optional: declares this field as a processing-time attribute
    .field("MyField2", DataTypes.TIMESTAMP(3))
      .rowtime(...)    // optional: declares this field as a event-time attribute
    .field("MyField3", DataTypes.BOOLEAN())
      .from("mf3")     // optional: original field in the input that is referenced/aliased by this field
Update Modes
) WITH (
 'update-mode' = 'append'  -- otherwise: 'retract' or 'upsert'

Transformation - 转换


Source - 数据来源

Sink - 数据目标

Transformation - 转换

发布了40 篇原创文章 · 获赞 25 · 访问量 10万+

