Flink-1.12 API上手实践(源码层面)
官方地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
流批一体官方原话说明
Apache Flink’s unified approach to stream and batch processing means that a DataStream application executed over bounded input will produce the same final results regardless of the configured execution mode. It is important to note what final means here: a job executing in
STREAMING
mode might produce incremental updates (think upserts in a database) while aBATCH
job would only produce one final result at the end. The final result will be the same if interpreted correctly but the way to get there can be different.By enabling
BATCH
execution, we allow Flink to apply additional optimizations that we can only do when we know that our input is bounded. For example, different join/aggregation strategies can be used, in addition to a different shuffle implementation that allows more efficient task scheduling and failure recovery behavior. We will go into some of the details of the execution behavior below.
官方举例说明什么时候适合使用流批一体,batch execution mode的好处:
- One obvious outlier is when you want to use a bounded job to bootstrap some job state that you then want to use in an unbounded job. For example, by running a bounded job using
STREAMING
mode, taking a savepoint, and then restoring that savepoint on an unbounded job. This is a very specific use case and one that might soon become obsolete when we allow producing a savepoint as additional output of aBATCH
execution job.- Another case where you might run a bounded job using
STREAMING
mode is when writing tests for code that will eventually run with unbounded sources. For testing it can be more natural to use a bounded source in those cases.
1.流批执行模式的配置
STREAMING
: The classic DataStream execution mode (default)BATCH
: Batch-style execution on the DataStream APIAUTOMATIC
: Let the system decide based on the boundedness of the sources
命令行模式提交:
1 | bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar |
代码中配置:
1 | StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |