在Flink中的DataStream程序是实现数据流转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。数据流最初是从各种来源创建的(例如,消息队列、套接字流、文件)。结果通过接收器返回,例如,接收器可以将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其他程序中。可以在本地JVM中执行,也可以在许多机器的集群上执行。
Flink程序的解剖:
- Obtain an
execution environment
, - Load/create the initial data, source
- Specify transformations on this data, operator
- Specify where to put the results of your computations, sink
- Trigger the program execution execute()
source:
Sources are where your program reads its input from. You can attach a source to your program by using StreamExecutionEnvironment.addSource(sourceFunction)
Sinks:
Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them
Iterations: 迭代
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/datastream_api.html
Execution Parameters:执行参数
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/execution_configuration.html
Fault Tolerance:容错
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/checkpointing.html
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
Controlling Latency:控制延迟
要控制吞吐量和延迟,可以在执行环境(或单个操作符)上使用env.setBufferTimeout(timeoutMillis)设置缓冲区满溢的最大等待时间。在此之后,即使缓冲区没有满,也会自动发送缓冲区。此超时的默认值是100毫秒。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
operator:算子
DataStream Transformations:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/
窗口
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/windows.html
Joining
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/joining.html
Process Function(Low-level Operations)
用于外部数据访问的异步 I/O:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/asyncio.html
connector 流式连接器:
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
Flink 还有些一些额外的连接器通过 Apache Bahir 发布, 包括:
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
- Netty (source)
Side Outputs:
除了由DataStream操作生成的主流之外,您还可以生成任意数量的附加输出结果流
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// emit data to regular output
out.collect(value);
// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});