八种 Flink 处理函数的简介以及详细的应用场景案例:
1. ProcessFunction
简介:
ProcessFunction
是 Flink 中最基础的处理函数,用于对 DataStream
中的元素进行处理。它不涉及状态或定时器,适用于对单个流的数据进行简单处理。
应用场景:
当你需要对流中的每个元素进行基础的转换或过滤操作时,可以使用 ProcessFunction
。
示例:为每个消息添加一个前缀。
1 2 3 4 5 6 7 8 9 10 11
| public class AddPrefixProcessFunction extends ProcessFunction<String, String> { @Override public void processElement(String value, Context ctx, Collector<String> out) { out.collect("Prefix: " + value); } }
DataStream<String> inputStream = env.fromElements("message1", "message2"); DataStream<String> processedStream = inputStream.process(new AddPrefixProcessFunction());
|
2. KeyedProcessFunction
简介:
KeyedProcessFunction
处理按键分区后的流,可以访问状态并使用定时器。适用于需要基于键值进行状态管理和定时操作的场景。
应用场景:
计算每个用户每分钟的点击次数,并每分钟输出结果。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class UserClickCountProcessFunction extends KeyedProcessFunction<String, String, String> { private transient ValueState<Long> countState;
@Override public void open(Configuration parameters) { ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("clickCount", TypeInformation.of(Long.class), 0L); countState = getRuntimeContext().getState(descriptor); }
@Override public void processElement(String value, Context ctx, Collector<String> out) throws Exception { Long currentCount = countState.value(); countState.update(currentCount + 1); ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 60000L); }
@Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { Long count = countState.value(); out.collect("User: " + ctx.getCurrentKey() + " Clicks: " + count); countState.clear(); } }
DataStream<String> clicks = env.fromElements("user1", "user1", "user2"); KeyedStream<String, String> keyedStream = clicks.keyBy(value -> value); DataStream<String> resultStream = keyedStream.process(new UserClickCountProcessFunction());
|
3. ProcessWindowFunction
简介:
ProcessWindowFunction
在窗口中处理数据,可以访问窗口的上下文。适用于需要在窗口级别进行复杂处理的场景。
应用场景:
计算每个用户在指定时间窗口内的总点击数。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class WindowClickCountProcessFunction extends ProcessWindowFunction<String, String, String, TimeWindow> { @Override public void process(String key, Context context, Iterable<String> elements, Collector<String> out) { long count = StreamSupport.stream(elements.spliterator(), false).count(); out.collect("User: " + key + " Window: " + context.window() + " Clicks: " + count); } }
DataStream<String> clicks = env.fromElements("user1", "user1", "user2"); KeyedStream<String, String> keyedStream = clicks.keyBy(value -> value); DataStream<String> windowedStream = keyedStream .window(TumblingEventTimeWindows.of(Time.minutes(1))) .process(new WindowClickCountProcessFunction());
|
4. ProcessAllWindowFunction
简介:
ProcessAllWindowFunction
在全窗口范围内处理数据,可以处理所有的元素,并对全局数据进行操作。
应用场景:
计算全局的总点击量,例如每小时的点击总量。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class GlobalClickCountProcessAllWindowFunction extends ProcessAllWindowFunction<String, String, TimeWindow> { private long totalClicks = 0;
@Override public void process(Context context, Iterable<String> elements, Collector<String> out) { long count = StreamSupport.stream(elements.spliterator(), false).count(); totalClicks += count; out.collect("Total Clicks in Window: " + totalClicks); } }
DataStream<String> clicks = env.fromElements("click1", "click2", "click3"); DataStream<String> globalWindowedStream = clicks .windowAll(TumblingEventTimeWindows.of(Time.hours(1))) .process(new GlobalClickCountProcessAllWindowFunction());
|
5. CoProcessFunction
简介:
CoProcessFunction
用于处理连接的两条流。可以访问两条流的元素,并且允许处理数据的合并。
应用场景:
将用户点击流与用户注册流合并,获取每个用户的注册和点击信息。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class UserStatsCoProcessFunction extends CoProcessFunction<String, String, String> { private MapState<String, Boolean> userState;
@Override public void open(Configuration parameters) { userState = getRuntimeContext().getMapState(new MapStateDescriptor<>("userState", String.class, Boolean.class)); }
@Override public void processElement1(String click, Context ctx, Collector<String> out) throws Exception { if (userState.contains(click)) { out.collect("User: " + click + " Clicked"); } }
@Override public void processElement2(String registration, Context ctx, Collector<String> out) throws Exception { userState.put(registration, true); out.collect("User: " + registration + " Registered"); } }
DataStream<String> clicks = env.fromElements("user1", "user2"); DataStream<String> registrations = env.fromElements("user1", "user3"); DataStream<String> resultStream = clicks.connect(registrations).process(new UserStatsCoProcessFunction());
|
6. ProcessJoinFunction
简介:
ProcessJoinFunction
用于在时间间隔内连接两条流。适用于在时间窗口内进行流的连接操作。
应用场景:
将用户点击流与用户信息流按时间间隔进行连接,获取点击事件和用户信息。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class ClickUserJoinFunction extends ProcessJoinFunction<String, String, String> { @Override public void processElement1(String click, Context ctx, Collector<String> out) { out.collect("Click: " + click); }
@Override public void processElement2(String userInfo, Context ctx, Collector<String> out) { out.collect("User Info: " + userInfo); } }
DataStream<String> clicks = env.fromElements("click1", "click2"); DataStream<String> userInfos = env.fromElements("user1_info", "user2_info"); DataStream<String> joinedStream = clicks.intervalJoin(userInfos) .between(Time.minutes(5), Time.minutes(10)) .process(new ClickUserJoinFunction());
|
7. BroadcastProcessFunction
简介:
BroadcastProcessFunction
用于处理主流与广播流的合并。适用于需要将一个流的元素广播到所有处理节点的场景。
应用场景:
将主流与广播流(例如配置数据)合并,应用广播流中的配置信息。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class ConfigBroadcastProcessFunction extends BroadcastProcessFunction<String, String, String> { private MapState<String, String> configState;
@Override public void open(Configuration parameters) { configState = getRuntimeContext().getMapState(new MapStateDescriptor<>("configState", String.class, String.class)); }
@Override public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { String config = configState.get("configKey"); out.collect("Value: " + value + " Config: " + config); }
@Override public void processBroadcastElement(String config, Context ctx, Collector<String> out) throws Exception { configState.put("configKey", config); } }
DataStream<String> mainStream = env.fromElements("data1", "data2"); BroadcastStream<String> configStream = env.fromElements("config1"); DataStream<String> processedStream = mainStream .connect(configStream.broadcast()) .process(new ConfigBroadcastProcessFunction());
|
8. KeyedBroadcastProcessFunction
简介:
KeyedBroadcastProcessFunction
用于处理按键分区的主流与广播流的合并。适用于需要按键分区并结合广播流进行处理的场景。
应用场景:
将按键分区的主流与广播流(例如动态规则)进行合并,应用按键分区的规则。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class KeyedRuleBroadcastProcessFunction extends KeyedBroadcastProcessFunction<String, String, String, String> { private MapState<String, String> ruleState;
@Override public void open(Configuration parameters) { ruleState = getRuntimeContext().getMapState(new MapStateDescriptor<>("ruleState", String.class, String.class)); }
@Override public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { String rule = ruleState.get("ruleKey"); out.collect("Key: " + ctx.getCurrentKey() + " Value: " + value + " Rule: " + rule); }
@Override public void processBroadcastElement(String rule, Context ctx, Collector<String> out) throws Exception { ruleState.put("ruleKey", rule); } }
DataStream<String> keyedStream = env.fromElements("data1", "data2").keyBy(value -> value); BroadcastStream<String> ruleStream = env.fromElements("rule1"); DataStream<String> resultStream = keyedStream .connect(ruleStream.broadcast()) .process(new KeyedRuleBroadcastProcessFunction());
|
这些示例展示了如何使用不同的 Flink 处理函数来处理各种流处理需求。根据你的数据处理需求,可以选择合适的处理函数来实现相应的业务逻辑。