一. 广播变量介绍
广播变量用于批计算。Flink支持广播变量,可以将数据广播到TaskManager上,数据存储在内存中,这样可以减缓大量的shuffle操作,使用广播变量同时也可以节省内存。
使用广播变量应该注意:
- 某个算子使用广播变量后,该算子内部的任何函数都可以使用这个广播变量。
- 不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。
使用方法:
- 在需要使用广播变量的算子后面使用 withBroadcastSet 创建广播。
- 在算子的内部函数使用 getRuntimeContext.getBroadcastVariable 获取变量。
- 广播变量只能使用 RichFunction 函数才能用,因为需要获取 context。
二. 广播变量示例
下面是一个简单的 Flink 广播变量的示例:
package com.yuchaoshui.flink;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Main {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 准备广播变量的数据(量小),一个TaskManager只会存一份
DataSource<HashMap<String, String>> broadcastData =
env.fromCollection(Collections.singletonList(new HashMap<String, String>() {{
put("0", "zhao");
put("1", "qian");
put("2", "sun");
put("3", "li");
put("4", "zhou");
put("5", "wu");
put("6", "zheng");
put("7", "wang");
}}));
// 正常的批数据(量大)
DataSource<Tuple2<String, String>> normalSource = env.fromCollection(
Arrays.asList(
new Tuple2<String, String>("3", "999a"),
new Tuple2<String, String>("4", "999b"),
new Tuple2<String, String>("5", "999c"),
new Tuple2<String, String>("6", "999d"),
new Tuple2<String, String>("7", "999e"),
new Tuple2<String, String>("10", "999f"),
new Tuple2<String, String>("20", "999g")
)
);
normalSource.map(new RichMapFunction<Tuple2<String, String>, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(Tuple2<String, String> value) throws Exception {
List<Map<String, String>> broadcastData =
getRuntimeContext().getBroadcastVariable("CustomBroadcastData");
return Tuple3.of(value.f0, broadcastData.get(0).get(value.f0), value.f1);
}
}).withBroadcastSet(broadcastData, "CustomBroadcastData").print();
}
}
/* 输出
(3,li,999a)
(4,zhou,999b)
(5,wu,999c)
(6,zheng,999d)
(7,wang,999e)
(10,null,999f)
(20,null,999g)
*/