这篇文章简要介绍 Flink Window
Flink Window
数据源:传感器每 1S
上的数据按照每 5S 进行切割分桶,如
0~4S,5-9S. 一个桶中包含了同一个时间段内的所有数据;
窗口 API
Flink 窗口有两种类型:keyed streams 和 non-keyed streams.
它们的区别在于是否需要对数据进行分组(keyBy 操作),不进行 keyBy
操作是将所有数据作为一个逻辑上的 Stream, 所有的窗口计算会被同一个 task
完成,也就是 parallelism 为 1. 在调用方式上,keyed streams 要调用
keyBy(...) 后再调用 window(...) , 而 non-keyed streams 只用直接调用
Keyed Windows 1 2 3 4 5 6 7 8 9 10 stream .keyBy(...) <- 仅 keyed 窗口需要 .window(...) <- 必填项:"assigner" [.trigger(...)] <- 可选项:"trigger" (省略则使用默认 trigger) [.evictor(...)] <- 可选项:"evictor" (省略则不使用 evictor) [.allowedLateness(...)] <- 可选项:"lateness" (省略则为 0 ) [.sideOutputLateData(...)] <- 可选项:"output tag" (省略则不对迟到数据使用 side output) .reduce/aggregate/apply() <- 必填项:"function" [.getSideOutput(...)] <- 可选项:"output tag"
Non-Keyed Windows 1 2 3 4 5 6 7 8 stream .windowAll(...) <- 必填项:"assigner" [.trigger(...)] <- 可选项:"trigger" (else default trigger) [.evictor(...)] <- 可选项:"evictor" (else no evictor) [.allowedLateness(...)] <- 可选项:"lateness" (else zero) [.sideOutputLateData(...)] <- 可选项:"output tag" (else no side output for late data) .reduce/aggregate/apply() <- 必填项:"function" [.getSideOutput(...)] <- 可选项:"output tag"
或 processing time) 超过窗口的“结束时间戳 + 用户定义的 allowed lateness
” 时被完全删除。Flink 仅保证删除基于时间的窗口,其他类型的窗口不做保证,
比如全局窗口。 例如,对于一个基于 event time
如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1
分钟, 那么第一个元素落入 12:00 至 12:05 这个区间时,Flink
就会为这个区间创建一个新的窗口。 当 watermark 越过 12:06
另外,每个窗口会设置自己的 Trigger 和 function
(ProcessWindowFunction、ReduceFunction、或 AggregateFunction)。该
function 决定如何计算窗口中的内容, 而 Trigger
决定何时窗口中的数据可以被 function 计算。 Trigger
的触发(fire)条件可能是“当窗口中有多于 4 条数据”或“当 watermark
越过窗口的结束时间”等。 Trigger 还可以在 window
这里的数据仅指窗口内的元素,不包括窗口的 meta data。也就是说,窗口在
purge 后仍然可以加入新的数据。
除此之外,也可以指定一个 Evictor (详见 Evictors),在 trigger
触发之后,Evictor 可以在窗口函数的前后删除数据。
keyBy 定义了是否对数据进行分组操作,类似于 SQL 语句中的
Window assigner
Window assigner 定义了 stream 中的元素如何被分发到各个窗口。 在
window(...)(用于 keyed streams)或 windowAll(...) (用于 non-keyed
streams)中可以指定一个 WindowAssigner。 WindowAssigner 负责将 stream
中的每个数据分发到一个或多个窗口中。 Flink
为最常用的情况提供了一些定义好的 window assigner,也就是 tumbling
windows、 sliding windows、 session windows 和 global
windows。另外,也可以继承 WindowAssigner 类来实现自定义的 window
assigner。 所有内置的 window assigner(除了 global
window)都是基于时间分发数据的,processing time 或 event time 均可。
基于时间的窗口用 start timestamp(包含)和 end
timestamp(不包含)描述窗口的大小。 在代码中,Flink
处理基于时间的窗口使用的是 TimeWindow, 它有查询开始和结束 timestamp
以及返回窗口所能储存的最大 timestamp 的方法 maxTimestamp()。
窗口函数(Window Functions)
窗口函数(Window Functions)定义了 Triggers
窗口函数有三种:ReduceFunction、AggregateFunction 或
ProcessWindowFunction。 前两者执行起来更高效,因为 Flink
可以在每条数据到达窗口后进行增量聚合(incrementally aggregate)。 而
ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的
Iterable,以及关于这个窗口的 meta-information。
使用 ProcessWindowFunction 的窗口转换操作没有其他两种函数高效,因为
Flink 在窗口触发前必须缓存里面的所有数据。 ProcessWindowFunction 可以与
ReduceFunction 或 AggregateFunction 合并来提高效率。
这样做既可以增量聚合窗口内的数据,又可以从 ProcessWindowFunction
接收窗口的 metadata。
Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。
1 2 3 4 5 6 7 8 9 10 DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new ReduceFunction <Tuple2<String, Long>>() { public Tuple2<String, Long> reduce (Tuple2<String, Long> v1, Tuple2<String, Long> v2) { return new Tuple2 <>(v1.f0, v1.f1 + v2.f1); } });
ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction
输入数据的类型是输入流的元素类型,AggregateFunction 接口有如下几个方法:
与 ReduceFunction 相同,Flink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private static class AverageAggregate implements AggregateFunction <Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator () { return new Tuple2 <>(0L , 0L ); } @Override public Tuple2<Long, Long> add (Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2 <>(accumulator.f0 + value.f1, accumulator.f1 + 1L ); } @Override public Double getResult (Tuple2<Long, Long> accumulator) { return ((double ) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge (Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2 <>(a.f0 + b.f0, a.f1 + b.f1); } } DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate ());
ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable,
以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。
ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ataStream<Tuple2<String, Long>> input = ...; input .keyBy(t -> t.f0) .window(TumblingEventTimeWindows.of(Time.minutes(5 ))) .process(new MyProcessWindowFunction ()); public class MyProcessWindowFunction extends ProcessWindowFunction <Tuple2<String, Long>, String, String, TimeWindow> { @Override public void process (String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) { long count = 0 ; for (Tuple2<String, Long> in: input) { count++; } out.collect("Window: " + context.window() + "count: " + count); } }
上例使用 ProcessWindowFunction
增量聚合的 ProcessWindowFunction
ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction
将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从
ProcessWindowFunction` 中获得窗口的元数据。
使用 ReduceFunction 增量聚合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 DataStream<SensorReading> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new MyReduceFunction (), new MyProcessWindowFunction ()); private static class MyReduceFunction implements ReduceFunction <SensorReading> { public SensorReading reduce (SensorReading r1, SensorReading r2) { return r1.value() > r2.value() ? r2 : r1; } } private static class MyProcessWindowFunction extends ProcessWindowFunction <SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> { public void process (String key, Context context, Iterable<SensorReading> minReadings, Collector<Tuple2<Long, SensorReading>> out) { SensorReading min = minReadings.iterator().next(); out.collect(new Tuple2 <Long, SensorReading>(context.window().getStart(), min)); } }
上例组合 ReduceFunction 与
使用 AggregateFunction 增量聚合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate (), new MyProcessWindowFunction ()); private static class AverageAggregate implements AggregateFunction <Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator () { return new Tuple2 <>(0L , 0L ); } @Override public Tuple2<Long, Long> add (Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2 <>(accumulator.f0 + value.f1, accumulator.f1 + 1L ); } @Override public Double getResult (Tuple2<Long, Long> accumulator) { return ((double ) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge (Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2 <>(a.f0 + b.f0, a.f1 + b.f1); } } private static class MyProcessWindowFunction extends ProcessWindowFunction <Double, Tuple2<String, Double>, String, TimeWindow> { public void process (String key, Context context, Iterable<Double> averages, Collector<Tuple2<String, Double>> out) { Double average = averages.iterator().next(); out.collect(new Tuple2 <>(key, average)); } }
上例组合 AggregateFunction 与
ProcessWindowFunction,计算平均值并与窗口对应的 key 一同输出。
Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window
function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认
trigger 无法满足需求,可以在 trigger(...) 调用中指定自定义的
Trigger 接口提供了五个方法来响应不同的事件:
onElement(): 该方法在每个元素被加入窗口时调用;
onEventTime(): 该方法在注册的 event-time timer 触发时调用;
onProcessingTime(): 该方法在注册的 processing-time timer
onMerge(): 该方法与有状态的 trigger 相关。该方法会在两个窗口合并时,
将窗口对应 trigger 的状态进行合并,比如使用会话窗口时;
Flink 的窗口模型允许在 WindowAssigner 和 Trigger 之外指定可选的
Evictor。 通过 evictor(...) 方法传入 Evictor。 Evictor 可以在 trigger
Allowed Lateness
在使用 event-time 窗口时,数据可能会迟到,即 Flink 用来追踪
event-time 进展的 watermark 已经越过了窗口结束的 timestamp
后,数据才到达。默认情况下,watermark 一旦越过窗口结束的
timestamp,迟到的数据就会被直接丢弃。 但是 Flink 允许指定窗口算子最大的
allowed lateness。 Allowed lateness
定义了一个元素可以在迟到多长时间的情况下不被丢弃,这个参数默认是 0。 在
watermark 超过窗口末端、到达窗口末端加上 allowed lateness
之前的这段时间内到达的元素, 依旧会被加入窗口。取决于窗口的
为了实现这个功能,Flink 会将窗口状态保存到 allowed lateness
超时才会将窗口及其状态删除。默认情况下,allowed lateness 被设为 0。即
watermark 之后到达的元素会被丢弃。
使用 GlobalWindows 时,没有数据会被视作迟到,因为全局窗口的结束
timestamp 是 Long.MAX_VALUE。
自定义一个数据源,每秒为 10 个传感器生成随机数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class MySensorSource implements SourceFunction <SensorReading> { private boolean running = true ; private HashMap<String, Double> sensorTempMap = new HashMap <>(); public MySensorSource () { Random random = new Random (); for (int i = 0 ; i < 10 ; i++) { sensorTempMap.put("sensor_" + (i + 1 ), 60 + random.nextGaussian() * 20 ); } } @Override public void run (SourceContext<SensorReading> sourceContext) throws Exception { Random random = new Random (); while (running) { sensorTempMap.entrySet().stream().forEach(entry -> { String sensorId = entry.getKey(); Double newTemp = entry.getValue() + random.nextGaussian(); sensorTempMap.put(sensorId, newTemp); sourceContext.collect(new SensorReading (sensorId, System.currentTimeMillis(),newTemp)); }); TimeUnit.SECONDS.sleep(1 ); } } @Override public void cancel () { running = false ; } }
设置事件时间语义,将 Watermark 间隔时间设置为 200 MS;
设置定定义 Source;
定义 WatermarkStrategy, Watermark
设置为单调自增模式,没有乱序,且设置 Timestamp 的提取方式;
定义分组且设置窗口为滚动事件窗口,窗口大小为 5S;
使用 ReduceFunction
使用 ReduceFunction + ProcessWindowFunction
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class FlinkWindowTrainingJob { public static void main (String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(200L ); env.setParallelism(1 ); DataStream<SensorReading> dataStream = env.addSource(new MySensorSource ()); WatermarkStrategy<SensorReading> watermarkStrategy = WatermarkStrategy.<SensorReading>forMonotonousTimestamps() .withTimestampAssigner((event, timestamp) -> event.getTimestamp()); final SingleOutputStreamOperator<SensorReading> eventDataStream = dataStream.assignTimestampsAndWatermarks(watermarkStrategy); WindowedStream<SensorReading, String, TimeWindow> windowStream = eventDataStream.keyBy(sensorReading -> sensorReading.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(5 ))); final SingleOutputStreamOperator<SensorReading> minTempStream = windowStream .reduce(new ReduceFunction <SensorReading>() { @Override public SensorReading reduce (SensorReading v1, SensorReading v2) throws Exception { final SensorReading sensorReading = v1.getTemperature().compareTo(v2.getTemperature()) > 0 ? v2 : v1; return sensorReading; } }); minTempStream.print("minTemp-reduce" ); SingleOutputStreamOperator<Tuple3<String, Long, SensorReading>> reduceProcessWindow = windowStream.reduce(new ReduceFunction <SensorReading>() { @Override public SensorReading reduce (SensorReading v1, SensorReading v2) throws Exception { final SensorReading sensorReading = v1.getTemperature().compareTo(v2.getTemperature()) > 0 ? v2 : v1; return sensorReading; } }, new MyProcessWindowFunction ()); reduceProcessWindow.print("minTemp-reduce-process" ); env.execute("Flink Window Training" ); } private static class MyProcessWindowFunction extends ProcessWindowFunction <SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> { @Override public void process (String key, Context context, Iterable<SensorReading> minReadings, Collector<Tuple3<String, Long, SensorReading>> out) { SensorReading min = minReadings.iterator().next(); out.collect(new Tuple3 <String, Long, SensorReading>(key, context.window().getStart(), min)); } } }
[windonStart, windowStart + size)
, 它的取值包括
, 但不包括
windowEnd(windowStart + size)
. 只要确定了
, 便可确定窗口时间范围。
以 TumblingEventTimeWindows
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @PublicEvolving public class TumblingEventTimeWindows extends WindowAssigner <Object, TimeWindow> { private static final long serialVersionUID = 1L ; private final long size; private final long globalOffset; private Long staggerOffset = null ; private final WindowStagger windowStagger; protected TumblingEventTimeWindows (long size, long offset, WindowStagger windowStagger) { if (Math.abs(offset) >= size) { throw new IllegalArgumentException ( "TumblingEventTimeWindows parameters must satisfy abs(offset) < size" ); } this .size = size; this .globalOffset = offset; this .windowStagger = windowStagger; } @Override public Collection<TimeWindow> assignWindows ( Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { if (staggerOffset == null ) { staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size); } long start = TimeWindow.getWindowStartWithOffset( timestamp, (globalOffset + staggerOffset) % size, size); return Collections.singletonList(new TimeWindow (start, start + size)); } else { throw new RuntimeException ( "Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?" ); } } public static TumblingEventTimeWindows of (Time size) { return new TumblingEventTimeWindows (size.toMilliseconds(), 0 , WindowStagger.ALIGNED); } ... }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Override public Collection<TimeWindow> assignWindows ( Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { if (staggerOffset == null ) { staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size); } long start = TimeWindow.getWindowStartWithOffset( timestamp, (globalOffset + staggerOffset) % size, size); return Collections.singletonList(new TimeWindow (start, start + size)); } else { throw new RuntimeException ( "Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?" ); } } public static long getWindowStartWithOffset (long timestamp, long offset, long windowSize) { final long remainder = (timestamp - offset) % windowSize; if (remainder < 0 ) { return timestamp - (remainder + windowSize); } else { return timestamp - remainder; } }
一句话总结 windonStart
: 最靠近小于
且为 windowSize 整数倍的数值。计算逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 windonStart = timestamp - remainder; windonStart = timestamp - (remainder + windowSize); remainder = (timestamp - offset) % windowSize; offset = (globalOffset + staggerOffset) % size;
以 TumblingEventTimeWindows.of(Time.seconds(5))
1 2 3 4 5 6 globalOffset = 0 ; staggerOffset = 0 ; windowSize = 5 ; windonStart = timestamp - (timestamp % 5 )
即 windonStart
为最靠近小于 timestamp
5 整数倍的数值。
Flink 窗口