已复制
全屏展示
复制代码

Flink实战最常用算子合集详解


· 3 min read

map

DataStreamSource<Integer> nums = env.fromElements(1,2,3,4,5,6,7,8,9);
nums.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return value * 10;
    }
}).print();

flatmap

DataStreamSource<Integer> nums = env.fromElements(1,2,3,4,5,6,7,8,9);
nums.flatMap(new FlatMapFunction<Integer, Integer>() {
    @Override
    public void flatMap(Integer value, Collector<Integer> out) throws Exception {
        if (value % 2 == 0) {
            out.collect(value);
        }
    }
}).print();

filter

DataStreamSource<Integer> nums = env.fromElements(1,2,3,4,5,6,7,8,9);
nums.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value % 2 != 0;
    }
}).print();

keyBy

keyBy不是算子,只是一个分区的方法。

将数据流按照key分成不相交的流,由 DataStream 转换为 KeyedStream。很多的算子都必须要分组以后才可以继续计算,比如:window、process等。

dataStream.keyBy() 参数可以有三种形式:

  • 如果dataStream是Tuple类型,可以直接传入数字
  • 如果dataStream是对象,可以直接传入属性的字符串名称
  • 任何时候都可以传入 KeySelector 对象

sum()

参数:位置数字、属性名称字符串

min()

参数:位置数字、属性名称字符串

max()

参数:位置数字、属性名称字符串

minBy()

参数:位置数字、属性名称字符串

maxBy()

参数:位置数字、属性名称字符串

max 和 maxBy 的区别:

  • max 只取给定位置的最大值,该元素的其他字段不会更新。
  • maxBy 也是取给定字段的最大值,但是该元素的其他字段也会更新。

下面是是一个示例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> nums = env.fromElements(
    "a,6,name1", "a,5,name2", "b,3,name3", "b,4,name4", "b,5,name5"
);
nums.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, String>>() {
    @Override
    public void flatMap(String value, Collector<Tuple3<String, Integer, String>> out) throws Exception {
        String[] split = value.split(",");
        out.collect(Tuple3.of(split[0],
                              Integer.parseInt(split[1]),
                              split[2]
                             ));
    }
}).keyBy(0).max(1).print();
// }).key(0).max(1).print();

env.execute("x");
  • 使用 max 输出
(a,6,name1)
(a,6,name1)
(b,3,name3)
(b,4,name3)
(b,5,name3)
  • 使用 maxBy 输出

(a,6,name1)
(a,6,name1)
(b,3,name3)
(b,4,name4)
(b,5,name5)

reduce

一个分组数据流的聚合操作,合并当前元素和上次合并的结果。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> nums = env.fromElements(
    "a,6", "a,5", "b,3", "b,4", "b,5"
);
nums.map(new MapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> map(String value) throws Exception {
        String[] split = value.split(",");
        return Tuple2.of(split[0], Integer.parseInt(split[1]));
    }
}).keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
    }
}).print();

env.execute("x");

split 与 select

注意:最新版本的 flink 已经取消了split和select,可以使用测流输出(side output)来实现。

  • Split :DataStream -> SplitStream 根据某些特征把一个DataStream拆分成两个或者多个DataStream。
  • Select :SplitStream -> DataStream 从一个SplitStream中获取一个或者多个DataStream。

🔗

文章推荐