Flink 如何实现双流 join 连接
一. join()
join() 即inner join,算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间类型。left join 可以使用 coGroup 实现。
package com.yuchaoshui.flink;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/* source1 数据
1,wang
3,zhang
0,li
4,zhao
9,wu
*/
/* source2 数据
1,899
3,799
0,699
4,599
9,499
*/
public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source1 = env.socketTextStream("localhost", 8888);
DataStreamSource<String> source2 = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, String>> dataStream1 =
source1.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split(",");
String userId = split[0];
String username = split[1];
return Tuple2.of(userId, username);
}
});
SingleOutputStreamOperator<Tuple2<String, String>> dataStream2 =
source2.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split(",");
String userId = split[0];
String totalScore = split[1];
return Tuple2.of(userId, totalScore);
}
});
// inner join
dataStream1
.join(dataStream2)
.where(field1 -> field1.f0)
.equalTo(field1 -> field1.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new JoinFunction<Tuple2<String, String>,
Tuple2<String, String>,
Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> join(Tuple2<String, String> first,
Tuple2<String, String> second)
throws Exception {
return Tuple3.of(first.f0, first.f1, second.f1);
}
})
.print();
env.execute();
}
}
二. coGroup()
使用 coGroup,不管能不能匹配上,都会把原始数据传递,下面的示例使用 coGroup 实现了 left join 的功能。
package com.yuchaoshui.flink;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/* source1 数据
1,wang
3,zhang
0,li
4,zhao
9,wu
*/
/* source2 数据
1,899
1,899
3,799
*/
public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source1 = env.socketTextStream("localhost", 8888);
DataStreamSource<String> source2 = env.socketTextStream("localhost", 9999);
// source1
SingleOutputStreamOperator<Tuple2<String, String>> dataStream1 =
source1.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split(",");
String userId = split[0];
String username = split[1];
return Tuple2.of(userId, username);
}
});
// source2
SingleOutputStreamOperator<Tuple2<String, String>> dataStream2 =
source2.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split(",");
String userId = split[0];
String totalScore = split[1];
return Tuple2.of(userId, totalScore);
}
});
// left join
dataStream1
.coGroup(dataStream2)
.where(field -> field.f0)
.equalTo(field -> field.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new CoGroupFunction<Tuple2<String, String>,
Tuple2<String, String>,
Tuple3<String, String, String>>() {
@Override
public void coGroup(Iterable<Tuple2<String, String>> first,
Iterable<Tuple2<String, String>> second,
Collector<Tuple3<String, String, String>> out)
throws Exception {
boolean matched = false;
for (Tuple2<String, String> firstElement : first) {
for (Tuple2<String, String> secondElement : second) {
// 能进入该循环,说明已经匹配上了
// 左表一条数据,可能会匹配多条数据,所有用 for 循环
matched = true;
out.collect(
Tuple3.of(firstElement.f0,
firstElement.f1,
secondElement.f1)
);
}
// left join,如果右表没有匹配上任何一条数据,用 null 代替
if (!matched) {
out.collect(Tuple3.of(firstElement.f0, firstElement.f1, null));
}
}
}
})
.print();
env.execute();
}
}
三. intervalJoin()
join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。所以 Flink 又提供了 Interval join 的语义。
- interval join 也是 inner join,虽然不需要开窗。
- 需要用户指定偏移区间的上下界。
- 并且只支持事件时间。
- interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界,如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。
- 按照指定字段以及右流相对左流偏移的时间区间进行关联,即以左流为基础,右流的时间在左流的基础上增加或者减少一定范围。
package com.yuchaoshui.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/* source1 数据
1,wang,1639289005000
3,zhang,1639289015000
0,li,1639289025000
4,zhao,1639289026000
9,wu,1639289055000
*/
/* source2 数据
1,899,1639289016000
3,799,1639289016000
0,699,1639289055000
4,599,1639289075000
9,499,1639289085000
*/
// 最后输出结果只有 (3,zhang,799),因为其他的时间范围不符合。
public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source1 = env.socketTextStream("localhost", 8888);
DataStreamSource<String> source2 = env.socketTextStream("localhost", 9999);
// source1
SingleOutputStreamOperator<Tuple2<String, String>> dataStream1 =
source1
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
return Long.parseLong(element.split(",")[2]);
}
}
)
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split(",");
String userId = split[0];
String username = split[1];
return Tuple2.of(userId, username);
}
});
// source2
SingleOutputStreamOperator<Tuple2<String, String>> dataStream2 =
source2
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
return Long.parseLong(element.split(",")[2]);
}
}
)
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
String[] split = value.split(",");
String userId = split[0];
String totalScore = split[1];
return Tuple2.of(userId, totalScore);
}
});
// interval join
dataStream1
.keyBy(f -> f.f0)
.intervalJoin(dataStream2.keyBy(f -> f.f0))
.between(Time.seconds(-10), Time.seconds(10))
// .upperBoundExclusive()
// .lowerBoundExclusive()
.process(new ProcessJoinFunction<Tuple2<String, String>,
Tuple2<String, String>,
Tuple3<String, String, String>>() {
@Override
public void processElement(Tuple2<String, String> left,
Tuple2<String, String> right,
Context ctx,
Collector<Tuple3<String, String, String>> out)
throws Exception {
out.collect(Tuple3.of(left.f0, left.f1, right.f1));
}
})
.print();
env.execute();
}
}