已复制
全屏展示
复制代码

Flink 自定义 aggregate 总结

· 3 min read

reduce和aggregate都是增量聚合,它们的区别如下

  • AggregateFunction 输入输出元素类型可以不相同。
  • ReduceFunction 输入输出元素类型相同

AggregateFunction它有三个参数:输入类型(IN),累加器类型(ACC)和输出类型(OUT),需要实现4个方法,所有的方法都是在同一个组里面:

  • ACC createAccumulator();   创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化
  • ACC add(IN value, ACC accumulator);  对于数据的每条数据,和迭代数据的聚合的具体实现
  • ACC merge(ACC a, ACC b);  合并两个累加器,返回一个具有合并状态的累加器,注意:AggregateFunction中的merge方法仅SessionWindow会调用该方法,如果time window是不会调用的,merge方法即使返回null也是可以的。参考:https://www.zhihu.com/question/346639699

官网中的描述大概的意思是:因为会话窗口没有固定的起始时间和结束时间,他们被运算不同于滚动窗口和滑动窗口。本质上,会话窗口会为每一批相邻两条数据没有大于指定间隔时间的数据merge到以一起。为了数据能够被merge,会话窗口需要一个merge的触发器和一个可以merge的WindowFunction,比如ReduceFunction、AggregateFunction或者ProcessWindowFunction,需要注意的是FoldFunction不能merge!

  • OUT getResult(ACC accumulator);   从累加器获取聚合的结果

看下面的完整示例:

package com.yuchaoshui.flink;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class Main {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> nums = env.socketTextStream("localhost", 9090);

        nums.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
                String[] split = value.split(",");
                out.collect(Tuple2.of(Integer.parseInt(split[0]), Integer.parseInt(split[1])));
            }
        }).keyBy(f -> f.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .aggregate( // 将相同 key 的数据放在同一组,将value用 -> 连接起来

                        // aggregate 第一个参数:指定每来一条数据怎么聚合
                        new AggregateFunction<Tuple2<Integer, Integer>, String, String>() {
                            @Override
                            public String createAccumulator() {
                                return "";
                            }

                            @Override
                            public String add(Tuple2<Integer, Integer> value, String accumulator) {
                                accumulator = accumulator + "->" + value.f1;
                                return accumulator;
                            }

                            @Override
                            public String getResult(String accumulator) {
                                return accumulator;
                            }

                            @Override
                            public String merge(String a, String b) {
                                return a+b;
                            }
                        },

                        // aggregate 第二个参数:处理按照 key 分组以后的每组的数据。
                        new ProcessWindowFunction<String, String, Integer, TimeWindow>() {
                            @Override
                            public void process(Integer key, Context context, Iterable<String> in,
                                                Collector<String> out) throws Exception {
                                out.collect("---" + key + ":" + in.iterator().next());
                            }
                        }).print().setParallelism(1);

        env.execute();
    }
}

  • 参考资料

https://blog.csdn.net/lujisen/article/details/105549406

🔗

文章推荐