这篇文章简要介绍 Flink Process Function
.
概述 ProcessFunction
是 Flink low-level 流处理操作方法,通过它可以做如下操作:
访问事件;
获取 Keyed Stream 流状态;
操作基于事件时间或处理时间的定时器 (only on keyed stream).
ProcessFunction
可以像 FlatMapFunction
一样访问 Keyed 流状态及定时器,这在收到事件之后对事件进行处理。
通过 RuntimeContext
可以定义和访问 Keyed state 数据。
定时器允许应用根据事件时间或处理时间作出响应,可以通过 TimerService
注册或删除定时器,参数为一个代表某个点的时间戳。随着事件时间或处理时间的推进,会触发回调函数 onTimer
, 在该函数中可以定义相应的处理逻辑。定时器作用范围是基于 key,即每个 key 上都会有定义自己的定时器。另外,在事件时间语义下,定时器的触发基于事件时间,如果后续没有收到事件,有可能不会触发执行。
代码实例 在下面的实例中,使用 KeyedProcessFunction
模拟了一个简单的 Session Window
的功能,它统计用户一次会话的请求数,一个用户持续 60 S 没有收到请求,则表明会话结束,结束会话并输出统计结果。它包含如下的功能:
UserRequest
对象代表了用户的请求数据,包括用户 id, 操作类型及时间戳三个字段;
CountWithTimestamp
对象状态变量,包括用户 id, 统计次数及上次访问的时间戳三个字段;
根据用户 id 进行分组(key by),每一个 key 都持有一个 CountWithTimestamp
状态变量;
在 KeyedProcessFunction
中实现计数及更新上次访问的时间戳的功能;
如果某个 key 持续 60S 没有收到请求数据,则说明 Session 结束,输出统计结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 public class ProcessFunctionJob { public static void main (String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); env.getConfig().setAutoWatermarkInterval(200L ); final ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host" ); int port = parameterTool.getInt("port" ); final DataStreamSource<String> dataStreamSource = env.socketTextStream(host, port); final SingleOutputStreamOperator<UserRequest> dataStream = dataStreamSource.map(line -> { String[] fields = line.split("," ); return new UserRequest (fields[0 ], fields[1 ], Long.parseLong(fields[2 ])); }); final WatermarkStrategy<UserRequest> watermarkStrategy = WatermarkStrategy .<UserRequest>forBoundedOutOfOrderness(Duration.ofSeconds(0 )) .withTimestampAssigner((event, timestamp) -> event.getTimestamp() * 1000 ); final SingleOutputStreamOperator<UserRequest> eventDataStream = dataStream.assignTimestampsAndWatermarks(watermarkStrategy); final SingleOutputStreamOperator<CountWithTimestamp> processStream = eventDataStream.keyBy(userRequest -> userRequest.getUserId()) .process(new RequestCountFunction ()); processStream.print("request-count" ); env.execute("Flink Process Function Job" ); } private static class RequestCountFunction extends KeyedProcessFunction <String, UserRequest, CountWithTimestamp> { private ValueState<CountWithTimestamp> state; @Override public void open (Configuration parameters) throws Exception { state = getRuntimeContext().getState(new ValueStateDescriptor <>("mystate" , CountWithTimestamp.class)); } @Override public void processElement (UserRequest userRequest, Context context, Collector<CountWithTimestamp> collector) throws Exception { CountWithTimestamp current = state.value(); if (current == null ) { current = new CountWithTimestamp (); current.setUserId(userRequest.getUserId()); } current.setCount(current.getCount() + 1 ); current.setLastModified(context.timestamp()); state.update(current); context.timerService().registerEventTimeTimer(current.getLastModified() + 60000 ); } @Override public void onTimer (long timestamp, OnTimerContext ctx, Collector<CountWithTimestamp> out) throws Exception { CountWithTimestamp result = state.value(); if (timestamp >= (result.getLastModified() + 60000 )) { out.collect(result); } } } }
工程代码:https://github.com/noahsarkzhang-ts/flink-lab/tree/main/flink-processfunction-training)
参考:
1. Flink Process Function