已复制
全屏展示
复制代码

Flink 如何实现双流 join 连接


· 5 min read

一. join()

join() 即inner join,算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间类型。left join 可以使用 coGroup 实现。

package com.yuchaoshui.flink;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/* source1 数据
1,wang
3,zhang
0,li
4,zhao
9,wu
*/

/* source2 数据
1,899
3,799
0,699
4,599
9,499
*/

public class Main {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<String> source1 = env.socketTextStream("localhost", 8888);
        DataStreamSource<String> source2 = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<Tuple2<String, String>> dataStream1 =
                source1.map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String value) throws Exception {
                        String[] split = value.split(",");
                        String userId = split[0];
                        String username = split[1];
                        return Tuple2.of(userId, username);
                    }
                });

        SingleOutputStreamOperator<Tuple2<String, String>> dataStream2 =
                source2.map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String value) throws Exception {
                        String[] split = value.split(",");
                        String userId = split[0];
                        String totalScore = split[1];
                        return Tuple2.of(userId, totalScore);
                    }
                });

        // inner join
        dataStream1
                .join(dataStream2)
                .where(field1 -> field1.f0)
                .equalTo(field1 -> field1.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                .apply(new JoinFunction<Tuple2<String, String>,
                        Tuple2<String, String>,
                        Tuple3<String, String, String>>() {
                    @Override
                    public Tuple3<String, String, String> join(Tuple2<String, String> first,
                                                               Tuple2<String, String> second)
                            throws Exception {
                        return Tuple3.of(first.f0, first.f1, second.f1);
                    }
                })
                .print();

        env.execute();
    }
}

二. coGroup()

使用 coGroup,不管能不能匹配上,都会把原始数据传递,下面的示例使用 coGroup 实现了 left join 的功能。

package com.yuchaoshui.flink;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


/* source1 数据
1,wang
3,zhang
0,li
4,zhao
9,wu
*/

/* source2 数据
1,899
1,899
3,799
*/

public class Main {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<String> source1 = env.socketTextStream("localhost", 8888);
        DataStreamSource<String> source2 = env.socketTextStream("localhost", 9999);

        // source1
        SingleOutputStreamOperator<Tuple2<String, String>> dataStream1 =
                source1.map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String value) throws Exception {
                        String[] split = value.split(",");
                        String userId = split[0];
                        String username = split[1];
                        return Tuple2.of(userId, username);
                    }
                });

        // source2
        SingleOutputStreamOperator<Tuple2<String, String>> dataStream2 =
                source2.map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String value) throws Exception {
                        String[] split = value.split(",");
                        String userId = split[0];
                        String totalScore = split[1];
                        return Tuple2.of(userId, totalScore);
                    }
                });

        // left join
        dataStream1
                .coGroup(dataStream2)
                .where(field -> field.f0)
                .equalTo(field -> field.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                .apply(new CoGroupFunction<Tuple2<String, String>,
                        Tuple2<String, String>,
                        Tuple3<String, String, String>>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, String>> first,
                                        Iterable<Tuple2<String, String>> second,
                                        Collector<Tuple3<String, String, String>> out)
                            throws Exception {
                        boolean matched = false;
                        for (Tuple2<String, String> firstElement : first) {
                            for (Tuple2<String, String> secondElement : second) {
                                // 能进入该循环,说明已经匹配上了
                                // 左表一条数据,可能会匹配多条数据,所有用 for 循环
                                matched = true;
                                out.collect(
                                        Tuple3.of(firstElement.f0,
                                                firstElement.f1,
                                                secondElement.f1)
                                );
                            }
                            // left join,如果右表没有匹配上任何一条数据,用 null 代替
                            if (!matched) {
                                out.collect(Tuple3.of(firstElement.f0, firstElement.f1, null));
                            }
                        }
                    }
                })
                .print();

        env.execute();
    }
}

三. intervalJoin()

join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。所以 Flink 又提供了 Interval join 的语义。

  • interval join 也是 inner join,虽然不需要开窗。
  • 需要用户指定偏移区间的上下界。
  • 并且只支持事件时间。
  • interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界,如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。
  • 按照指定字段以及右流相对左流偏移的时间区间进行关联,即以左流为基础,右流的时间在左流的基础上增加或者减少一定范围。

package com.yuchaoshui.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/* source1 数据
1,wang,1639289005000
3,zhang,1639289015000
0,li,1639289025000
4,zhao,1639289026000
9,wu,1639289055000
*/

/* source2 数据
1,899,1639289016000
3,799,1639289016000
0,699,1639289055000
4,599,1639289075000
9,499,1639289085000
*/

// 最后输出结果只有 (3,zhang,799),因为其他的时间范围不符合。

public class Main {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<String> source1 = env.socketTextStream("localhost", 8888);
        DataStreamSource<String> source2 = env.socketTextStream("localhost", 9999);

        // source1
        SingleOutputStreamOperator<Tuple2<String, String>> dataStream1 =
                source1
                        .assignTimestampsAndWatermarks(
                                new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                                    @Override
                                    public long extractTimestamp(String element) {
                                        return Long.parseLong(element.split(",")[2]);
                                    }
                                }
                        )
                        .map(new MapFunction<String, Tuple2<String, String>>() {
                            @Override
                            public Tuple2<String, String> map(String value) throws Exception {
                                String[] split = value.split(",");
                                String userId = split[0];
                                String username = split[1];
                                return Tuple2.of(userId, username);
                            }
                        });

        // source2
        SingleOutputStreamOperator<Tuple2<String, String>> dataStream2 =
                source2
                        .assignTimestampsAndWatermarks(
                                new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                                    @Override
                                    public long extractTimestamp(String element) {
                                        return Long.parseLong(element.split(",")[2]);
                                    }
                                }
                        )
                        .map(new MapFunction<String, Tuple2<String, String>>() {
                            @Override
                            public Tuple2<String, String> map(String value) throws Exception {
                                String[] split = value.split(",");
                                String userId = split[0];
                                String totalScore = split[1];
                                return Tuple2.of(userId, totalScore);
                            }
                        });

        // interval join
        dataStream1
                .keyBy(f -> f.f0)
                .intervalJoin(dataStream2.keyBy(f -> f.f0))
                .between(Time.seconds(-10), Time.seconds(10))
                // .upperBoundExclusive()
                // .lowerBoundExclusive()
                .process(new ProcessJoinFunction<Tuple2<String, String>,
                        Tuple2<String, String>,
                        Tuple3<String, String, String>>() {
                    @Override
                    public void processElement(Tuple2<String, String> left,
                                               Tuple2<String, String> right,
                                               Context ctx,
                                               Collector<Tuple3<String, String, String>> out)
                            throws Exception {
                        out.collect(Tuple3.of(left.f0, left.f1, right.f1));
                    }
                })
                .print();
        env.execute();
    }
}
🔗

文章推荐