首页 笔记 图片 查字 
所属分类:Flink
浏览:281
内容:

在 Flink 中,Watermark 是用于表示事件时间进展的机制,它可以用来告诉 Flink 在处理流式数据时,哪些事件已经到达了一定的时间阈值,可以被认为是已经处理完毕的数据。Flink 中的 Watermark 机制可以通过以下几种策略进行设置:

1. 周期性生成 Watermark:该策略会定期生成 Watermark,例如每隔一分钟生成一次,可以通过  AssignerWithPeriodicWatermarks  接口实现。
2. 事件触发生成 Watermark:该策略会在每个事件到达时生成 Watermark,可以通过  AssignerWithPunctuatedWatermarks  接口实现。
3. 自定义 Watermark 生成策略:用户可以根据自己的需求实现自定义的 Watermark 生成策略,例如基于一些外部因素的判断,可以通过  AscendingTimestampExtractor  接口实现。
 
在 Flink 中,可以通过以下方式设置 Watermark 策略:
DataStream<MyEvent> input = ...;

input
    .assignTimestampsAndWatermarks(new MyWatermarkStrategy())
    .keyBy(...)
    .window(...)
    .reduce(...);
在上述代码中, MyWatermarkStrategy  是用户自定义的 Watermark 策略,通过  assignTimestampsAndWatermarks  方法将其应用到输入数据流中。在 Watermark 策略中,需要实现  getCurrentWatermark  方法和  extractTimestamp  方法,分别用于生成 Watermark 和提取事件时间戳。

WatermarkStrategy  是 Flink 1.12 引入的新特性,它是一种更加灵活和易用的 Watermark 策略定义方式。相比于之前的 Watermark 策略定义方式, WatermarkStrategy  可以更好地支持多种 Watermark 生成策略,并且可以在一个地方定义所有的 Watermark 相关参数。
 
使用  WatermarkStrategy ,可以通过以下方式定义 Watermark 策略:
DataStream<MyEvent> input = ...;
input
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner(new MyTimestampAssigner()));
在上述代码中, forBoundedOutOfOrderness  方法指定了一个基于乱序数据的 Watermark 策略,即在事件时间戳比当前 Watermark 值大于 10 秒时,生成一个新的 Watermark。 withTimestampAssigner  方法指定了一个自定义的时间戳分配器,用于从数据中提取事件时间戳。
 
除了  forBoundedOutOfOrderness  方法, WatermarkStrategy  还支持以下几种 Watermark 策略的定义方式:
-  forMonotonousTimestamps :用于处理单调递增的事件时间戳,例如 Kafka 中的消息。
-  forGenerator :用于自定义 Watermark 生成器。
-  noWatermarks :用于关闭 Watermark 机制,即将所有数据视为实时数据。
 
通过  WatermarkStrategy ,可以更加方便地定义和配置 Watermark 策略,提高代码的可读性和可维护性。