一. 广播流介绍
广播流用于流计算
场景:
- 当两条流需要 join 的时候,并且其中一条流的数据量很小,并且不是经常变动。此时可以将数据流较小的流作为广播流,这样的话在另外一条流的每一个Subtask里面都有了这个数据量较小的数据流。
- 配合 MapStateDescriptor 使用,就可以把广播流当做变量一样来使用了。
- 同时还可以实时更新广播流的值。
注意:
- 广播流只能用在流处理 StreamExecutionEnvironment 环境,广播变量只能用作批处理 ExecutionEnvironment 中。
二. 广播流使用
示例:
package com.yuchaoshui.flink;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.HashMap;
/* source2 测试数据
2,90
9,80
4,999
3,88
4,890
*/
public class Main {
public static void main(String[] args) throws Exception {
final String className = Thread.currentThread().getStackTrace()[1].getClassName();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 定义广播流
DataStreamSource<HashMap<String, String>> source1 =
env.fromCollection(Collections.singletonList(new HashMap<String, String>() {{
put("0", "zhao");
put("1", "qian");
put("2", "sun");
put("3", "li");
put("4", "zhou");
put("5", "wu");
put("6", "zheng");
put("7", "wang");
}}));
MapStateDescriptor<String, String> broadcastDescriptor = new MapStateDescriptor<String, String>
("customBroadcastState", Types.STRING, Types.STRING);
BroadcastStream<HashMap<String, String>> broadcastStream = source1.broadcast(broadcastDescriptor);
// 定义正常数据流
DataStreamSource<String> source2 = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple3<String, String, String>> process = source2
.connect(broadcastStream)
.process(new BroadcastProcessFunction<
String,
HashMap<String, String>,
Tuple3<String, String, String>>() {
// 正常数量处理逻辑,在里面可以获取广播流的广播状态
@Override
public void processElement(String value,
ReadOnlyContext ctx,
Collector<Tuple3<String, String, String>> out)
throws Exception {
ReadOnlyBroadcastState<String, String> broadcastState =
ctx.getBroadcastState(broadcastDescriptor);
String[] split = value.split(",");
out.collect(
Tuple3.of(split[0],
broadcastState.get(split[0]),
split[1]
)
);
}
// 广播流状态的更新
@Override
public void processBroadcastElement(HashMap<String, String> value,
Context ctx,
Collector<Tuple3<String, String, String>> out)
throws Exception {
BroadcastState<String, String> broadcastState =
ctx.getBroadcastState(broadcastDescriptor);
broadcastState.putAll(value);
}
});
process.print();
env.execute(className);
}
}
/* 输出数据
2> (2,sun,90)
3> (9,null,80)
5> (3,li,88)
4> (4,zhou,999)
6> (4,zhou,890)
*/