1. ReduceFunction
ReduceFunction
用于在窗口内对数据进行聚合操作。它适用于对数据进行简单的聚合计算,如求和、最大值、最小值等。
- 用法: 用于合并窗口中的元素,通常进行如求和、计数、最大值等操作。
- 适用场景: 简单的累积计算。
1 2 3 4 5 6 7 8 9
| DataStream<Tuple2<String, Double>> result = gmvStream .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .reduce(new ReduceFunction<Tuple2<String, Double>>() { @Override public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } });
|
2. AggregateFunction
AggregateFunction
是用于窗口计算的另一个选项,提供了更细粒度的控制。它允许你定义窗口的初始值、累积逻辑和最终聚合逻辑。
- 用法: 用于更复杂的聚合计算,例如统计信息的计算。
- 适用场景: 需要对窗口中的数据进行复杂的聚合计算时。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| DataStream<Tuple2<String, Double>> result = gmvStream .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .aggregate(new AggregateFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> createAccumulator() { return Tuple2.of("", 0.0); }
@Override public Tuple2<String, Double> add(Tuple2<String, Double> value, Tuple2<String, Double> accumulator) { return Tuple2.of(value.f0, accumulator.f1 + value.f1); }
@Override public Tuple2<String, Double> getResult(Tuple2<String, Double> accumulator) { return accumulator; }
@Override public Tuple2<String, Double> merge(Tuple2<String, Double> a, Tuple2<String, Double> b) { return Tuple2.of(a.f0, a.f1 + b.f1); } });
|
3. ProcessWindowFunction
ProcessWindowFunction
是一个功能强大的工具,用于实现复杂的窗口操作。它允许你在窗口触发时,访问窗口内的所有元素,并进行自定义处理。这对于需要访问整个窗口状态、进行复杂的逻辑处理、发出多个输出等情况非常有用。
- 用法: 用于实现复杂的窗口逻辑,可以访问整个窗口的内容,进行自定义的输出。
- 适用场景: 需要在窗口内访问全部数据,进行复杂的处理或自定义输出时。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;
DataStream<Tuple2<String, Double>> result = gmvStream .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .process(new ProcessWindowFunction<Tuple2<String, Double>, Tuple2<String, Double>, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<Tuple2<String, Double>> elements, Collector<Tuple2<String, Double>> out) { double sum = 0.0; for (Tuple2<String, Double> element : elements) { sum += element.f1; } out.collect(Tuple2.of(key, sum)); } });
|
4. WindowFunction
WindowFunction
是另一种用于窗口计算的方式,提供了比 ReduceFunction
更灵活的方式来处理窗口数据。
- 用法: 与
ProcessWindowFunction
类似,但通常用于简单的窗口转换,而不需要复杂的逻辑。
- 适用场景: 需要在窗口内对每个元素进行处理并输出时。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;
DataStream<Tuple2<String, Double>> result = gmvStream .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .apply(new WindowFunction<Tuple2<String, Double>, Tuple2<String, Double>, String, TimeWindow>() { @Override public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Double>> input, Collector<Tuple2<String, Double>> out) { double sum = 0.0; for (Tuple2<String, Double> element : input) { sum += element.f1; } out.collect(Tuple2.of(key, sum)); } });
|
总结
ReduceFunction
: 用于简单的聚合计算,例如求和、计数、最大值。
AggregateFunction
: 用于更复杂的聚合计算,需要自定义累积逻辑。
ProcessWindowFunction
: 用于复杂的窗口操作,允许访问整个窗口的数据,并进行自定义处理和输出。
WindowFunction
: 适用于简单的窗口操作,通常用于对窗口内每个元素进行处理。