Flink 事件时间处理和 Kafka 案例
in Note with 0 comment
Flink 事件时间处理和 Kafka 案例
in Note with 0 comment

事件时间

由于在大多数现实世界的用例中,消息到达是无序的,我们处理事件时必须考虑那些迟到的消息。 Flink 有强大的事件时间处理接口,协助开发者更好地完成业务需求。

如下图,我们可以清晰地看到关于事件的三种时间:1、Event Time ;2、Ingestion Time;3、Processing Time。

1551686758157-1471f0a9-8b64-4fe2-b401-968c434c0bf2-image.png

Event Time 是事件在现实世界中发生的时间,Ingestion Time 是事件摄入到系统的时间,Processing Time 是Flink系统处理该事件的时间。

watermark

一般情况下,我们选择用 Event Time 作为事件处理时间,来保证程序处理是符合预期的。

但我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。但是对于 late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是 watermark。

更多关于 watermark 的介绍可以查看:http://vishnuviswanath.com/flink_eventtime.html

kafka 例子

kafka 作为数据源,我们要选择消息里的某个字段作为 EventTime,配置 watermark 来保证数据处理的顺序性:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)

图例:

1551690999643-08deaca8-f6ae-4828-9551-da38bdff549a-image-resized.png

从图中,我们可以看到每个分区都会生成watermark,以及在这种情况下 watermark 是如何在数据处理流中传递。

注意事项:

如果 watermark 依赖从 kafka 读取的消息里的字段,则需要所有 topic 和 partition 都要有连续的消息流,否则,整个应用的 watermark 不能前进,而且基于时间操作的算子都不会进行工作,如 windows 算子。

目前这个问题将在 1.8.0 版本中解决:https://issues.apache.org/jira/browse/FLINK-5479

我们集群是1.7.2,如果使用kafka作为数据源,需要保证每个分区都有消息流。

参考链接

Responses