1. ReduceFunction

ReduceFunction 用于在窗口内对数据进行聚合操作。它适用于对数据进行简单的聚合计算,如求和、最大值、最小值等。

  • 用法: 用于合并窗口中的元素,通常进行如求和、计数、最大值等操作。
  • 适用场景: 简单的累积计算。
1
2
3
4
5
6
7
8
9
DataStream<Tuple2<String, Double>> result = gmvStream
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
});

2. AggregateFunction

AggregateFunction 是用于窗口计算的另一个选项,提供了更细粒度的控制。它允许你定义窗口的初始值、累积逻辑和最终聚合逻辑。

  • 用法: 用于更复杂的聚合计算,例如统计信息的计算。
  • 适用场景: 需要对窗口中的数据进行复杂的聚合计算时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
DataStream<Tuple2<String, Double>> result = gmvStream
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> createAccumulator() {
return Tuple2.of("", 0.0);
}

@Override
public Tuple2<String, Double> add(Tuple2<String, Double> value, Tuple2<String, Double> accumulator) {
return Tuple2.of(value.f0, accumulator.f1 + value.f1);
}

@Override
public Tuple2<String, Double> getResult(Tuple2<String, Double> accumulator) {
return accumulator;
}

@Override
public Tuple2<String, Double> merge(Tuple2<String, Double> a, Tuple2<String, Double> b) {
return Tuple2.of(a.f0, a.f1 + b.f1);
}
});

3. ProcessWindowFunction

ProcessWindowFunction 是一个功能强大的工具,用于实现复杂的窗口操作。它允许你在窗口触发时,访问窗口内的所有元素,并进行自定义处理。这对于需要访问整个窗口状态、进行复杂的逻辑处理、发出多个输出等情况非常有用。

  • 用法: 用于实现复杂的窗口逻辑,可以访问整个窗口的内容,进行自定义的输出。
  • 适用场景: 需要在窗口内访问全部数据,进行复杂的处理或自定义输出时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

DataStream<Tuple2<String, Double>> result = gmvStream
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.process(new ProcessWindowFunction<Tuple2<String, Double>, Tuple2<String, Double>, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Double>> elements, Collector<Tuple2<String, Double>> out) {
double sum = 0.0;
for (Tuple2<String, Double> element : elements) {
sum += element.f1;
}
out.collect(Tuple2.of(key, sum));
}
});

4. WindowFunction

WindowFunction 是另一种用于窗口计算的方式,提供了比 ReduceFunction 更灵活的方式来处理窗口数据。

  • 用法: 与 ProcessWindowFunction 类似,但通常用于简单的窗口转换,而不需要复杂的逻辑。
  • 适用场景: 需要在窗口内对每个元素进行处理并输出时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

DataStream<Tuple2<String, Double>> result = gmvStream
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.apply(new WindowFunction<Tuple2<String, Double>, Tuple2<String, Double>, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Double>> input, Collector<Tuple2<String, Double>> out) {
double sum = 0.0;
for (Tuple2<String, Double> element : input) {
sum += element.f1;
}
out.collect(Tuple2.of(key, sum));
}
});

总结

  • ReduceFunction: 用于简单的聚合计算,例如求和、计数、最大值。
  • AggregateFunction: 用于更复杂的聚合计算,需要自定义累积逻辑。
  • ProcessWindowFunction: 用于复杂的窗口操作,允许访问整个窗口的数据,并进行自定义处理和输出。
  • WindowFunction: 适用于简单的窗口操作,通常用于对窗口内每个元素进行处理。