八种 Flink 处理函数的简介以及详细的应用场景案例:

1. ProcessFunction

简介
ProcessFunction 是 Flink 中最基础的处理函数,用于对 DataStream 中的元素进行处理。它不涉及状态或定时器,适用于对单个流的数据进行简单处理。

应用场景
当你需要对流中的每个元素进行基础的转换或过滤操作时,可以使用 ProcessFunction

示例:为每个消息添加一个前缀。

1
2
3
4
5
6
7
8
9
10
11
public class AddPrefixProcessFunction extends ProcessFunction<String, String> {
@Override
public void processElement(String value, Context ctx, Collector<String> out) {
// 为每个元素添加前缀
out.collect("Prefix: " + value);
}
}

// 使用示例
DataStream<String> inputStream = env.fromElements("message1", "message2");
DataStream<String> processedStream = inputStream.process(new AddPrefixProcessFunction());

2. KeyedProcessFunction

简介
KeyedProcessFunction 处理按键分区后的流,可以访问状态并使用定时器。适用于需要基于键值进行状态管理和定时操作的场景。

应用场景
计算每个用户每分钟的点击次数,并每分钟输出结果。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class UserClickCountProcessFunction extends KeyedProcessFunction<String, String, String> {
private transient ValueState<Long> countState;

@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("clickCount", TypeInformation.of(Long.class), 0L);
countState = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
Long currentCount = countState.value();
countState.update(currentCount + 1);
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 60000L);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
Long count = countState.value();
out.collect("User: " + ctx.getCurrentKey() + " Clicks: " + count);
countState.clear();
}
}

// 使用示例
DataStream<String> clicks = env.fromElements("user1", "user1", "user2");
KeyedStream<String, String> keyedStream = clicks.keyBy(value -> value);
DataStream<String> resultStream = keyedStream.process(new UserClickCountProcessFunction());

3. ProcessWindowFunction

简介
ProcessWindowFunction 在窗口中处理数据,可以访问窗口的上下文。适用于需要在窗口级别进行复杂处理的场景。

应用场景
计算每个用户在指定时间窗口内的总点击数。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WindowClickCountProcessFunction extends ProcessWindowFunction<String, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<String> elements, Collector<String> out) {
long count = StreamSupport.stream(elements.spliterator(), false).count();
out.collect("User: " + key + " Window: " + context.window() + " Clicks: " + count);
}
}

// 使用示例
DataStream<String> clicks = env.fromElements("user1", "user1", "user2");
KeyedStream<String, String> keyedStream = clicks.keyBy(value -> value);
DataStream<String> windowedStream = keyedStream
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new WindowClickCountProcessFunction());

4. ProcessAllWindowFunction

简介
ProcessAllWindowFunction 在全窗口范围内处理数据,可以处理所有的元素,并对全局数据进行操作。

应用场景
计算全局的总点击量,例如每小时的点击总量。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class GlobalClickCountProcessAllWindowFunction extends ProcessAllWindowFunction<String, String, TimeWindow> {
private long totalClicks = 0;

@Override
public void process(Context context, Iterable<String> elements, Collector<String> out) {
long count = StreamSupport.stream(elements.spliterator(), false).count();
totalClicks += count;
out.collect("Total Clicks in Window: " + totalClicks);
}
}

// 使用示例
DataStream<String> clicks = env.fromElements("click1", "click2", "click3");
DataStream<String> globalWindowedStream = clicks
.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new GlobalClickCountProcessAllWindowFunction());

5. CoProcessFunction

简介
CoProcessFunction 用于处理连接的两条流。可以访问两条流的元素,并且允许处理数据的合并。

应用场景
将用户点击流与用户注册流合并,获取每个用户的注册和点击信息。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class UserStatsCoProcessFunction extends CoProcessFunction<String, String, String> {
private MapState<String, Boolean> userState;

@Override
public void open(Configuration parameters) {
userState = getRuntimeContext().getMapState(new MapStateDescriptor<>("userState", String.class, Boolean.class));
}

@Override
public void processElement1(String click, Context ctx, Collector<String> out) throws Exception {
if (userState.contains(click)) {
out.collect("User: " + click + " Clicked");
}
}

@Override
public void processElement2(String registration, Context ctx, Collector<String> out) throws Exception {
userState.put(registration, true);
out.collect("User: " + registration + " Registered");
}
}

// 使用示例
DataStream<String> clicks = env.fromElements("user1", "user2");
DataStream<String> registrations = env.fromElements("user1", "user3");
DataStream<String> resultStream = clicks.connect(registrations).process(new UserStatsCoProcessFunction());

6. ProcessJoinFunction

简介
ProcessJoinFunction 用于在时间间隔内连接两条流。适用于在时间窗口内进行流的连接操作。

应用场景
将用户点击流与用户信息流按时间间隔进行连接,获取点击事件和用户信息。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ClickUserJoinFunction extends ProcessJoinFunction<String, String, String> {
@Override
public void processElement1(String click, Context ctx, Collector<String> out) {
out.collect("Click: " + click);
}

@Override
public void processElement2(String userInfo, Context ctx, Collector<String> out) {
out.collect("User Info: " + userInfo);
}
}

// 使用示例
DataStream<String> clicks = env.fromElements("click1", "click2");
DataStream<String> userInfos = env.fromElements("user1_info", "user2_info");
DataStream<String> joinedStream = clicks.intervalJoin(userInfos)
.between(Time.minutes(5), Time.minutes(10))
.process(new ClickUserJoinFunction());

7. BroadcastProcessFunction

简介
BroadcastProcessFunction 用于处理主流与广播流的合并。适用于需要将一个流的元素广播到所有处理节点的场景。

应用场景
将主流与广播流(例如配置数据)合并,应用广播流中的配置信息。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ConfigBroadcastProcessFunction extends BroadcastProcessFunction<String, String, String> {
private MapState<String, String> configState;

@Override
public void open(Configuration parameters) {
configState = getRuntimeContext().getMapState(new MapStateDescriptor<>("configState", String.class, String.class));
}

@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
String config = configState.get("configKey");
out.collect("Value: " + value + " Config: " + config);
}

@Override
public void processBroadcastElement(String config, Context ctx, Collector<String> out) throws Exception {
configState.put("configKey", config);
}
}

// 使用示例
DataStream<String> mainStream = env.fromElements("data1", "data2");
BroadcastStream<String> configStream = env.fromElements("config1");
DataStream<String> processedStream = mainStream
.connect(configStream.broadcast())
.process(new ConfigBroadcastProcessFunction());

8. KeyedBroadcastProcessFunction

简介
KeyedBroadcastProcessFunction 用于处理按键分区的主流与广播流的合并。适用于需要按键分区并结合广播流进行处理的场景。

应用场景
将按键分区的主流与广播流(例如动态规则)进行合并,应用按键分区的规则。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class KeyedRuleBroadcastProcessFunction extends KeyedBroadcastProcessFunction<String, String, String, String> {
private MapState<String, String> ruleState;



@Override
public void open(Configuration parameters) {
ruleState = getRuntimeContext().getMapState(new MapStateDescriptor<>("ruleState", String.class, String.class));
}

@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
String rule = ruleState.get("ruleKey");
out.collect("Key: " + ctx.getCurrentKey() + " Value: " + value + " Rule: " + rule);
}

@Override
public void processBroadcastElement(String rule, Context ctx, Collector<String> out) throws Exception {
ruleState.put("ruleKey", rule);
}
}

// 使用示例
DataStream<String> keyedStream = env.fromElements("data1", "data2").keyBy(value -> value);
BroadcastStream<String> ruleStream = env.fromElements("rule1");
DataStream<String> resultStream = keyedStream
.connect(ruleStream.broadcast())
.process(new KeyedRuleBroadcastProcessFunction());

这些示例展示了如何使用不同的 Flink 处理函数来处理各种流处理需求。根据你的数据处理需求,可以选择合适的处理函数来实现相应的业务逻辑。