diff --git a/internals/Jobs and Scheduling.md b/internals/Jobs and Scheduling.md new file mode 100644 index 0000000..cb402e4 --- /dev/null +++ b/internals/Jobs and Scheduling.md @@ -0,0 +1,3190 @@ + + + + + + Apache Flink 1.6 Documentation: Jobs and Scheduling + + + + + + + + + + + + + + + + + + + +
+ +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ +
+ +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + +

任务和定时

+ + + + +

该文档简要的描述了Flink如何给任务定时,如何在任务管理器上展示和追踪任务的状态。

+ + + +

定时

+ +

Flink中的执行资源是通过Task Slots定义的。每个TaskManager都有一个或多个任务插槽,每个插槽可以运行一个并行任务管道。管道由多个连续的任务组成,例如MapFunction的 +第n个并行实例和ReduceFunction的第n个并行实例。请注意,Flink经常并发地执行连续的任务:对于流程序,这在任何情况下都会发生,但是对于批处理程序,它也经常发生。

+ +

下图说明了这一点。考虑一个具有数据源、MapFunctionReduceFunction的程序。源函数和MapFunction的并行度为4,而ReduceFunction的并行度为3。一个管道由序列源-映射-还原组成。在一个具有2个任务管理器(每个任务管理器都有3个插槽)的集群中,程序将按照如下所述执行。

+ +
+Assigning Pipelines of Tasks to Slots +
+ +

在内部,Flink通过SlotSharingGroupCoLocationGroup定义哪些任务可以共享一个插槽(许可),哪些任务必须严格地放在同一个插槽中。

+ +

作业管理器的数据结构

+ +

在作业执行期间,作业管理器跟踪分布式任务,决定何时调度下一个任务(或一组任务),并对完成的任务或执行失败的任务作出反应

+ +

做特管理器接受JobGraph这是由操作符JobVertex) +和中间结果 (IntermediateDataSet)组成的数据流的表示。每个运算符都有属性,比如并行性和它执行的代码。此外,JobGraph有一组附加的库,这些库是执行操作符代码所必需的。

+ +

作业管理器将JobGraph转换为ExecutionGraph。ExecutionGraph是JobGraph的并行版本:对于每个JobVertex,它包含每个并行子任务的ExecutionVertex。ExecutionVertex 并行度为100的运算符将有一个JobVertex和100个execution顶点。该ExecutionVertex会跟踪特定子任务的执行状态。 一个JobVertex中的所有execution顶点都保存在一个 +ExecutionJobVertex中,它会跟踪操作符的整体状态。除顶点外,ExecutionGraph 还包含 +IntermediateResultIntermediateResultPartition的划分。前者跟踪,IntermediateDataSet的状态,后者跟踪每个分区的状态。

+ +
+JobGraph and ExecutionGraph +
+ +

每个ExecutionGraph都有一个与之相关联的作业状态。此作业状态指示该作业执行的当前状态。

+ +

一个Flink作业首先处于created状态,然后切换到running状态,完成所有工作后切换到finished状态。在出现故障的情况下,作业首先切换到failing状态,并取消所有正在运行的作业。如果所有作业顶点都已达到最终状态,且作业不可重新启动,则作业转换为failed状态。如果作业可以重新启动,那么它将进入restarting状态。一旦作业完全重新启动,它将到达created状态。

+ +

如果用户取消作业,它将进入cancelling状态。这还需要取消所有当前正在运行的任务。一旦所有运行的任务都达到了一个最终状态,那么该作业的状态就会变为cancelled

+ +

finished, canceledfailed的状态不同的是,这三种状态表示全局终止的状态,因此会出发作业清理的功能,而suspended状态仅仅表示局部终止。局部终止味着作业的执行已经在相应的作业管理器上终止,但是Flink集群的另一个作业管理器可以从持久的HA存储中检索作业并重新启动它。因此,到达suspended状态的作业不会被完全清理。

+ +
+States and Transitions of Flink job +
+ +

在ExecutionGraph执行期间,每个并行任务经历多个阶段,从createdfinishedfailed。下面的图表说明了它们之间的状态和可能的转换。任务可以多次执行(例如在故障恢复过程中)。 +因此,在一个Execution中可以追踪一个ExecutionVertex 的执行过程。 每个ExecutionVertex都有一个并行执行和优先执行。

+ +
+States and Transitions of Task Executions +
+ +

Back to top

+ + +
+
+
+ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/monitoringlogging.md b/monitoringlogging.md new file mode 100644 index 0000000..3f4f41b --- /dev/null +++ b/monitoringlogging.md @@ -0,0 +1,114 @@ +
+ + + + + + + + + + + + + + + + + + + + + + + + + + +

如何使用日志

+ + + + +

Flink中的日志记录是使用slf4j日志记录接口实现的。Flink使用log4j作为底层日志记录框。我们还提供了logback配置文件,并将它们作为属性传递给JVM。愿意使用logback来代替log4j的用户可以排除掉log4j(或者从lib文件夹中删除它)。

+ + + +

配置 Log4j

+ +

Log4j是使用属性文件控制的。 在Flink中, 这个文件通常被称为 log4j.properties。 我们通过 -Dlog4j.configuration= 参数将文件名和文件的位置传递给JVM。

+ +

Flink附带以下默认属性文件:

+ + + +

配置 logback

+ +

对于用户和开发人员来说,控制日志框架非常重要。日志框架的配置完全由配置文件完成。配置文件必须通过设置环境变量 -Dlogback.configurationFile=<file> 或者在classpath中设置 logback.xml 来指定。 conf文件夹中包含了一个可修改的logback.xml 文件,如果Flink在IDE外启动并提供了启动脚本,则使用该配置文件。 +提供的 logback.xml文件的形式如下:

+ +
<configuration>
+    <appender name="file" class="ch.qos.logback.core.FileAppender">
+        <file>${log.file}</file>
+        <append>false</append>
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="INFO">
+        <appender-ref ref="file"/>
+    </root>
+</configuration>
+ +

例如,如果想要控制 org.apache.flink.runtime.jobgraph.JobGraph的日志等级,就必须添加如下语句到配置文件中

+ +
<logger name="org.apache.flink.runtime.jobgraph.JobGraph" level="DEBUG"/>
+ +

更多的logback的配置信息可以看LOGback’s manual.

+ +

开发人员的最佳实践

+ +

使用slf4j的日志记录器是通过调用创建的

+ +
import org.slf4j.LoggerFactory
+import org.slf4j.Logger
+
+Logger LOG = LoggerFactory.getLogger(Foobar.class)
+ +

为了从slf4j中获得最大利益,建议使用它的占位符机制。 +使用占位符可以避免不必要的字符串结构,以防日志记录级别设置得太高以致消息无法记录。 +占位符的语法如下:

+ +
LOG.info("This message contains {} placeholders. {}", 2, "Yippie");
+ +

占位符还可以与需要记录的异常一起使用。

+ +
catch(Exception exception){
+	LOG.error("An {} occurred.", "error", exception);
+}
+ +

Back to top

+ + +
\ No newline at end of file