flink window窗口概念与使用总结
一. 概要介绍
1.1 窗口总结
针对计数窗口来说,主要使用CountWindowAll、CountWindow,直接传入一个window的数据条数据。
- CountWindowAll(无分组),不调用 keyBy,所有数据都在一个subtask的同一个组里面
- CountWindow(有分组),调用 keyBy,可以在不同subtask里面的不同分组里面
针对时间窗口来说,主要使用windowAll、window,需要传入windowAssinger才能使用
- windowAll(无分组),不调用 keyBy,所有数据都在一个subtask的同一个组里面
- window(有分组),调用 keyBy,可以在不同subtask里面的不同分组里面
1.2 Window Assinger
当调用window或windowAll方法时,所传入的参数就是Window Assigner(窗口分配器),其作用是决定划分什么样类型的窗口,即以何种条件划分窗口,输入的数据以何种方式分配到窗口内,窗口如何触发等等。
1.3 设置时间类型
- Event Time 数据产生时的时间,时间从数据上的时间字段获取。
- Ingestion Time 数据进入source的时间。
- Processing Time 在Flink算子处理时的时间,Flink默认使用的是Processing Time
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置EventTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置IngestionTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//设置ProcessingTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
二. 计数窗口
又叫做GlobalWindow,按照指定的数据条数生成一个Window,与时间无关。根据参数数量,分为滚动窗、滑动窗口,根据是否分组,分为CountWindowAll(不需要分组)、CountWindow(需要分组)
2.1 滚动窗口CountWindowAll
根据数据条数来切分window,并且只会有一个分组,并行度一定为1,将所有数据放在一个分区中的一个subtask里面计算。比如mapped.keyBy(Integer::intValue).countWindowAll(5),尽管使用 keyBy 分组了,但是实际上还是在一个组里面。
public class Main {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
SingleOutputStreamOperator<Integer> mapped = lines.map(Integer::parseInt);
// 求和:所有数据在一个subtask里面计算, 并行度为1
mapped.countWindowAll(5).process(new ProcessAllWindowFunction<Integer, Integer, GlobalWindow>() {
@Override
public void process(Context context, Iterable<Integer> elements, Collector<Integer> out)
throws Exception {
int sum = 0;
for (Integer element : elements) {
sum += element;
}
out.collect(sum);
}
}).print();
env.execute();
}
}
2.2 滚动窗口CountWindow
CountWindowAll是不能使用分组,而CountWindow则必须使用分组,在每个分组内各自统计,当一个分组满足了以后触发自己分组的计算,没有满足条件的分组不会触发计算。一个subtask里面可以有多个分组,当一个组满足条件只处理自己这个组,这个subtask内的其他组不会有触发。
- 一次处理所有数据
public class Main {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] fields = s.split(" ");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
// 在每个分组内各自统计,当一个分组满足了以后触发自己分组计算,没有满足条件的分组不会触发
SingleOutputStreamOperator<Object> process = wordCount.keyBy(f -> f.f0).countWindow(5).process(new ProcessWindowFunction<Tuple2<String, Integer>, Object, String, GlobalWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
int sum = 0;
for (Tuple2<String, Integer> element : elements) {
sum += element.f1;
}
out.collect(key + ":" + sum);
}
});
// 并行度为默认并行度,我的机器是8核心,所以是8
System.out.println("process.getParallelism() = " + process.getParallelism());
process.print();
env.execute();
}
}
- 增量聚合逐条处理
public class Main {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] fields = s.split(" ");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
// 在每个分组内各自统计,当一个分组满足了以后触发自己分组计算,没有满足条件的分组不会触发
SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = wordCount.keyBy(f -> f.f0).countWindow(5).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
value1.f1 = value1.f1 + value2.f1;
return value1;
}
});
// 并行度为默认并行度,我的机器是8核心,所以是8
System.out.println("reduce.getParallelism() = " + reduce.getParallelism());
reduce.print();
env.execute();
}
}
2.3 滑动窗口countWindowAll
在滚动窗口的基础上,加上第二个参数,就是滑动窗口,mapped.keyBy(0).countWindow(10, 5),其他和滚动窗口的使用方法相同。
略
2.4 滑动窗口countWindow
略
三. 时间窗口
即TimeWindow,按照时间生成Window,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window)。本小节内容都是使用 ProcessingTime。
3.1 滚动窗口
滚动窗口将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠,即一条数据只会出现在一个窗口中,窗口的大小根据时间来定,比如1分钟一个窗口。
3.2 滚动窗口-不分组计算
不分组计算使用 windowAll(),只要不分组,并行度就一定为1,所有数据都会在一个subtask里面计算,并行度为1。可以使用 reduce进行增量逐条计算,可以使用process一次计算所有数据,示例同上。
public class Main {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
SingleOutputStreamOperator<Tuple2<String, Integer>> wc = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] fields = s.split(" ");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
// 将所有的值相加
SingleOutputStreamOperator<Object> process = wc.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>() {
@Override
public void process(Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
int sum = 0;
for (Tuple2<String, Integer> element : elements) {
sum += element.f1;
}
out.collect(sum);
}
});
// 并行度为1
System.out.println("process.getParallelism() = " + process.getParallelism());
process.print();
env.execute();
}
}
3.3 滚动窗口-有分组计算
有分组计算使用 window(),按照默认的并行度计算。
public class Main {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
SingleOutputStreamOperator<Tuple2<String, Integer>> wc = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] fields = s.split(" ");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
// 分组计算
SingleOutputStreamOperator<Object> process = wc
.keyBy(f -> f.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, Object, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
int sum = 0;
for (Tuple2<String, Integer> element : elements) {
sum += element.f1;
}
out.collect(key + ":" + sum);
}
});
// 并行度为8
System.out.println("process.getParallelism() = " + process.getParallelism());
process.print();
env.execute();
}
}
3.4 滑动窗口
滑动窗口由固定的窗口⻓度和滑动步长组成。如果滑动步长等于窗口长度,此时就变成了滚动窗口了。如果滑动步长大于窗口长度,此时就会有数据丢失了。
3.5 滑动窗口-不分组计算
不分组计算使用 windowAll()
public class Main {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
SingleOutputStreamOperator<Tuple2<String, Integer>> wc = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] fields = s.split(",");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
// 传入两个参数(窗口长度 滑动步长)
wc.windowAll(
SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))
).sum(1).print();
env.execute();
}
}
3.6 滑动窗口-有分组计算
有分组计算使用 window()
// 传入两个参数(窗口长度 滑动步长)
wc.keyBy(f -> f.f0).window(
SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))
).sum(1).print();
3.7 会话窗口
由一系列事件组合一个指定时间⻓度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
3.8 会话窗口-不分组计算
不分组计算使用 windowAll()
public class Main {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
SingleOutputStreamOperator<Tuple2<String, Integer>> wc = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] fields = s.split(",");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
// 传入一个参数:窗口超时时间(超时后进入下一个窗口)
wc.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1).print();
env.execute();
}
}
3.9 会话窗口-有分组计算
有分组计算使用 window()
// 传入一个参数:窗口超时时间(超时后进入下一个窗口)
wc.keyBy(f ->f.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1).print();
3.10 会话窗口-动态时间会话
有分组计算使用 window()
// 传入一个动态参数,根据元素的字段动态计算session时间
wc.keyBy(f ->f.f0).window(
ProcessingTimeSessionWindows.withDynamicGap( element -> {
return element.f1 * 10;
})
).sum(1).print();
四. 窗口函数
在对数据进行分配窗口以后,还需要指定窗口函数,用于对窗口内的数据计算,窗口函数分为两类:增量聚合函数(ReduceFunction、AggregateFunction)、全窗口聚合函数(ProcessWindowFunction、WindowFunction)。
4.1 增量聚合函数
表示来一条数据计算一条数据,主要有ReduceFunction、AggregateFunction。比如计数窗口中的滚动窗口示例:
public class Main {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
SingleOutputStreamOperator<Integer> mapped = lines.map(Integer::parseInt);
// 自定义一个sum:增量聚合,来一条,处理一条
SingleOutputStreamOperator<Integer> reduce = mapped.countWindowAll(5).reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer integer, Integer t1) throws Exception {
return integer + t1;
}
});
// 并行度为 1
System.out.println("reduce.getParallelism() = " + reduce.getParallelism());
// 并行度恢复默认,分发到下游
reduce.print();
env.execute();
}
}
4.2 全窗聚合函数
一次性计算整个窗口的数据,主要由ProcessWindowFunction、WindowFunction。比如计数窗口中的滚动窗口示例:
public class Main {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
DataStreamSource<String> lines = env.socketTextStream("localhost", 9090);
SingleOutputStreamOperator<Integer> mapped = lines.map(Integer::parseInt);
// 求和:所有数据在一个subtask里面计算, 并行度为1
mapped.countWindowAll(5).process(new ProcessAllWindowFunction<Integer, Integer, GlobalWindow>() {
@Override
public void process(Context context, Iterable<Integer> elements, Collector<Integer> out)
throws Exception {
int sum = 0;
for (Integer element : elements) {
sum += element;
}
out.collect(sum);
}
}).print();
env.execute();
}
}