reduce和aggregate都是增量聚合,它们的区别如下
- AggregateFunction 输入输出元素类型可以不相同。
- ReduceFunction 输入输出元素类型相同
AggregateFunction它有三个参数:输入类型(IN),累加器类型(ACC)和输出类型(OUT),需要实现4个方法,所有的方法都是在同一个组里面:
- ACC createAccumulator(); 创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化
- ACC add(IN value, ACC accumulator); 对于数据的每条数据,和迭代数据的聚合的具体实现
- ACC merge(ACC a, ACC b); 合并两个累加器,返回一个具有合并状态的累加器,注意:AggregateFunction中的merge方法仅SessionWindow会调用该方法,如果time window是不会调用的,merge方法即使返回null也是可以的。参考:https://www.zhihu.com/question/346639699
官网中的描述大概的意思是:因为会话窗口没有固定的起始时间和结束时间,他们被运算不同于滚动窗口和滑动窗口。本质上,会话窗口会为每一批相邻两条数据没有大于指定间隔时间的数据merge到以一起。为了数据能够被merge,会话窗口需要一个merge的触发器和一个可以merge的WindowFunction,比如ReduceFunction、AggregateFunction或者ProcessWindowFunction,需要注意的是FoldFunction不能merge!
- OUT getResult(ACC accumulator); 从累加器获取聚合的结果
看下面的完整示例:
package com.yuchaoshui.flink;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> nums = env.socketTextStream("localhost", 9090);
nums.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
String[] split = value.split(",");
out.collect(Tuple2.of(Integer.parseInt(split[0]), Integer.parseInt(split[1])));
}
}).keyBy(f -> f.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.aggregate( // 将相同 key 的数据放在同一组,将value用 -> 连接起来
// aggregate 第一个参数:指定每来一条数据怎么聚合
new AggregateFunction<Tuple2<Integer, Integer>, String, String>() {
@Override
public String createAccumulator() {
return "";
}
@Override
public String add(Tuple2<Integer, Integer> value, String accumulator) {
accumulator = accumulator + "->" + value.f1;
return accumulator;
}
@Override
public String getResult(String accumulator) {
return accumulator;
}
@Override
public String merge(String a, String b) {
return a+b;
}
},
// aggregate 第二个参数:处理按照 key 分组以后的每组的数据。
new ProcessWindowFunction<String, String, Integer, TimeWindow>() {
@Override
public void process(Integer key, Context context, Iterable<String> in,
Collector<String> out) throws Exception {
out.collect("---" + key + ":" + in.iterator().next());
}
}).print().setParallelism(1);
env.execute();
}
}
- 参考资料