Flink实战最常用算子合集详解
map
DataStreamSource<Integer> nums = env.fromElements(1,2,3,4,5,6,7,8,9);
nums.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value * 10;
}
}).print();
flatmap
DataStreamSource<Integer> nums = env.fromElements(1,2,3,4,5,6,7,8,9);
nums.flatMap(new FlatMapFunction<Integer, Integer>() {
@Override
public void flatMap(Integer value, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
out.collect(value);
}
}
}).print();
filter
DataStreamSource<Integer> nums = env.fromElements(1,2,3,4,5,6,7,8,9);
nums.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 != 0;
}
}).print();
keyBy
keyBy不是算子,只是一个分区的方法。
将数据流按照key分成不相交的流,由 DataStream 转换为 KeyedStream。很多的算子都必须要分组以后才可以继续计算,比如:window、process等。
dataStream.keyBy() 参数可以有三种形式:
- 如果dataStream是Tuple类型,可以直接传入数字
- 如果dataStream是对象,可以直接传入属性的字符串名称
- 任何时候都可以传入 KeySelector 对象
sum()
参数:位置数字、属性名称字符串
min()
参数:位置数字、属性名称字符串
max()
参数:位置数字、属性名称字符串
minBy()
参数:位置数字、属性名称字符串
maxBy()
参数:位置数字、属性名称字符串
max 和 maxBy 的区别:
- max 只取给定位置的最大值,该元素的其他字段不会更新。
- maxBy 也是取给定字段的最大值,但是该元素的其他字段也会更新。
下面是是一个示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> nums = env.fromElements(
"a,6,name1", "a,5,name2", "b,3,name3", "b,4,name4", "b,5,name5"
);
nums.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, String>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, Integer, String>> out) throws Exception {
String[] split = value.split(",");
out.collect(Tuple3.of(split[0],
Integer.parseInt(split[1]),
split[2]
));
}
}).keyBy(0).max(1).print();
// }).key(0).max(1).print();
env.execute("x");
- 使用 max 输出
(a,6,name1)
(a,6,name1)
(b,3,name3)
(b,4,name3)
(b,5,name3)
- 使用 maxBy 输出
(a,6,name1)
(a,6,name1)
(b,3,name3)
(b,4,name4)
(b,5,name5)
reduce
一个分组数据流的聚合操作,合并当前元素和上次合并的结果。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> nums = env.fromElements(
"a,6", "a,5", "b,3", "b,4", "b,5"
);
nums.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] split = value.split(",");
return Tuple2.of(split[0], Integer.parseInt(split[1]));
}
}).keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}).print();
env.execute("x");
split 与 select
注意:最新版本的 flink 已经取消了split和select,可以使用测流输出(side output)来实现。
- Split :DataStream -> SplitStream 根据某些特征把一个DataStream拆分成两个或者多个DataStream。
- Select :SplitStream -> DataStream 从一个SplitStream中获取一个或者多个DataStream。