已复制
全屏展示
复制代码

flink window窗口概念与使用总结


· 10 min read

一. 概要介绍

1.1 窗口总结

针对计数窗口来说,主要使用CountWindowAll、CountWindow,直接传入一个window的数据条数据。

  • CountWindowAll(无分组),不调用 keyBy,所有数据都在一个subtask的同一个组里面
  • CountWindow(有分组),调用 keyBy,可以在不同subtask里面的不同分组里面

针对时间窗口来说,主要使用windowAll、window,需要传入windowAssinger才能使用

  • windowAll(无分组),不调用 keyBy,所有数据都在一个subtask的同一个组里面
  • window(有分组),调用 keyBy,可以在不同subtask里面的不同分组里面

1.2 Window Assinger

当调用window或windowAll方法时,所传入的参数就是Window Assigner(窗口分配器),其作用是决定划分什么样类型的窗口,即以何种条件划分窗口,输入的数据以何种方式分配到窗口内,窗口如何触发等等。

1.3 设置时间类型

  • Event Time 数据产生时的时间,时间从数据上的时间字段获取。
  • Ingestion Time 数据进入source的时间。
  • Processing Time 在Flink算子处理时的时间,Flink默认使用的是Processing Time

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//设置EventTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//设置IngestionTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

//设置ProcessingTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

二. 计数窗口

又叫做GlobalWindow,按照指定的数据条数生成一个Window,与时间无关。根据参数数量,分为滚动窗、滑动窗口,根据是否分组,分为CountWindowAll(不需要分组)、CountWindow(需要分组)

2.1 滚动窗口CountWindowAll

根据数据条数来切分window,并且只会有一个分组,并行度一定为1,将所有数据放在一个分区中的一个subtask里面计算。比如mapped.keyBy(Integer::intValue).countWindowAll(5),尽管使用 keyBy 分组了,但是实际上还是在一个组里面。

public class Main {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
        SingleOutputStreamOperator<Integer> mapped = lines.map(Integer::parseInt);

        // 求和:所有数据在一个subtask里面计算, 并行度为1
        mapped.countWindowAll(5).process(new ProcessAllWindowFunction<Integer, Integer, GlobalWindow>() {
            @Override
            public void process(Context context, Iterable<Integer> elements, Collector<Integer> out)
                    throws Exception {

                int sum = 0;
                for (Integer element : elements) {
                    sum += element;
                }
                out.collect(sum);

            }
        }).print();


        env.execute();
    }
}

2.2 滚动窗口CountWindow

CountWindowAll是不能使用分组,而CountWindow则必须使用分组,在每个分组内各自统计,当一个分组满足了以后触发自己分组的计算,没有满足条件的分组不会触发计算。一个subtask里面可以有多个分组,当一个组满足条件只处理自己这个组,这个subtask内的其他组不会有触发。

  • 一次处理所有数据
public class Main {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] fields = s.split(" ");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        // 在每个分组内各自统计,当一个分组满足了以后触发自己分组计算,没有满足条件的分组不会触发
        SingleOutputStreamOperator<Object> process = wordCount.keyBy(f -> f.f0).countWindow(5).process(new ProcessWindowFunction<Tuple2<String, Integer>, Object, String, GlobalWindow>() {
            @Override
            public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
                int sum = 0;
                for (Tuple2<String, Integer> element : elements) {
                    sum += element.f1;
                }
                out.collect(key + ":" + sum);
            }
        });

        // 并行度为默认并行度,我的机器是8核心,所以是8
        System.out.println("process.getParallelism() = " + process.getParallelism());

        process.print();

        env.execute();
    }
}
  • 增量聚合逐条处理
public class Main {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] fields = s.split(" ");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        // 在每个分组内各自统计,当一个分组满足了以后触发自己分组计算,没有满足条件的分组不会触发
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = wordCount.keyBy(f -> f.f0).countWindow(5).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                value1.f1 = value1.f1 + value2.f1;
                return value1;
            }
        });

        // 并行度为默认并行度,我的机器是8核心,所以是8
        System.out.println("reduce.getParallelism() = " + reduce.getParallelism());

        reduce.print();

        env.execute();
    }
}

2.3 滑动窗口countWindowAll

在滚动窗口的基础上,加上第二个参数,就是滑动窗口,mapped.keyBy(0).countWindow(10, 5),其他和滚动窗口的使用方法相同。

2.4 滑动窗口countWindow

三. 时间窗口

即TimeWindow,按照时间生成Window,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window)。本小节内容都是使用 ProcessingTime。

3.1 滚动窗口

滚动窗口将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠,即一条数据只会出现在一个窗口中,窗口的大小根据时间来定,比如1分钟一个窗口。

3.2 滚动窗口-不分组计算

不分组计算使用 windowAll(),只要不分组,并行度就一定为1,所有数据都会在一个subtask里面计算,并行度为1。可以使用 reduce进行增量逐条计算,可以使用process一次计算所有数据,示例同上。

public class Main {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wc = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] fields = s.split(" ");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        // 将所有的值相加
        SingleOutputStreamOperator<Object> process = wc.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
                        int sum = 0;
                        for (Tuple2<String, Integer> element : elements) {
                            sum += element.f1;
                        }
                        out.collect(sum);
                    }
                });

        // 并行度为1
        System.out.println("process.getParallelism() = " + process.getParallelism());

        process.print();

        env.execute();
    }
}

3.3 滚动窗口-有分组计算

有分组计算使用 window(),按照默认的并行度计算。

public class Main {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wc = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] fields = s.split(" ");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        // 分组计算
        SingleOutputStreamOperator<Object> process = wc
            .keyBy(f -> f.f0)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .process(new ProcessWindowFunction<Tuple2<String, Integer>, Object, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
                int sum = 0;
                for (Tuple2<String, Integer> element : elements) {
                    sum += element.f1;
                }
                out.collect(key + ":" + sum);
            }
        });

        // 并行度为8
        System.out.println("process.getParallelism() = " + process.getParallelism());

        process.print();

        env.execute();
    }
}

3.4 滑动窗口

滑动窗口由固定的窗口⻓度和滑动步长组成。如果滑动步长等于窗口长度,此时就变成了滚动窗口了。如果滑动步长大于窗口长度,此时就会有数据丢失了。

3.5 滑动窗口-不分组计算

不分组计算使用 windowAll()

public class Main {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wc = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] fields = s.split(",");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        // 传入两个参数(窗口长度  滑动步长)
        wc.windowAll(
            SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))
        ).sum(1).print();

        env.execute();
    }
}

3.6 滑动窗口-有分组计算

有分组计算使用 window()


// 传入两个参数(窗口长度  滑动步长)
wc.keyBy(f -> f.f0).window(
    SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))
).sum(1).print();

3.7 会话窗口

由一系列事件组合一个指定时间⻓度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

3.8 会话窗口-不分组计算

不分组计算使用 windowAll()

public class Main {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wc = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] fields = s.split(",");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        // 传入一个参数:窗口超时时间(超时后进入下一个窗口)
        wc.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1).print();

        env.execute();
    }
}

3.9 会话窗口-有分组计算

有分组计算使用 window()


// 传入一个参数:窗口超时时间(超时后进入下一个窗口)
wc.keyBy(f ->f.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1).print();

3.10 会话窗口-动态时间会话

有分组计算使用 window()


// 传入一个动态参数,根据元素的字段动态计算session时间
wc.keyBy(f ->f.f0).window(
    ProcessingTimeSessionWindows.withDynamicGap( element -> {
        return element.f1 * 10; 
    })
).sum(1).print();

四. 窗口函数

在对数据进行分配窗口以后,还需要指定窗口函数,用于对窗口内的数据计算,窗口函数分为两类:增量聚合函数(ReduceFunction、AggregateFunction)、全窗口聚合函数(ProcessWindowFunction、WindowFunction)。

4.1 增量聚合函数

表示来一条数据计算一条数据,主要有ReduceFunction、AggregateFunction。比如计数窗口中的滚动窗口示例:

public class Main {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
        SingleOutputStreamOperator<Integer> mapped = lines.map(Integer::parseInt);

        // 自定义一个sum:增量聚合,来一条,处理一条
        SingleOutputStreamOperator<Integer> reduce = mapped.countWindowAll(5).reduce(new ReduceFunction<Integer>() {
            @Override
            public Integer reduce(Integer integer, Integer t1) throws Exception {
                return integer + t1;
            }
        });

        // 并行度为 1
        System.out.println("reduce.getParallelism() = " + reduce.getParallelism());

        // 并行度恢复默认,分发到下游
        reduce.print();

        env.execute();
    }
}

4.2 全窗聚合函数

一次性计算整个窗口的数据,主要由ProcessWindowFunction、WindowFunction。比如计数窗口中的滚动窗口示例:

public class Main {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
        SingleOutputStreamOperator<Integer> mapped = lines.map(Integer::parseInt);

        // 求和:所有数据在一个subtask里面计算, 并行度为1
        mapped.countWindowAll(5).process(new ProcessAllWindowFunction<Integer, Integer, GlobalWindow>() {
            @Override
            public void process(Context context, Iterable<Integer> elements, Collector<Integer> out)
                    throws Exception {

                int sum = 0;
                for (Integer element : elements) {
                    sum += element;
                }
                out.collect(sum);

            }
        }).print();


        env.execute();
    }
}
🔗

文章推荐