已复制
全屏展示
复制代码

Flink 如何使用广播变量(批处理)

· 2 min read

一. 广播变量介绍

广播变量用于批计算。Flink支持广播变量,可以将数据广播到TaskManager上,数据存储在内存中,这样可以减缓大量的shuffle操作,使用广播变量同时也可以节省内存。

Flink广播变量

使用广播变量应该注意:

  • 某个算子使用广播变量后,该算子内部的任何函数都可以使用这个广播变量。
  • 不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。

使用方法:

  • 在需要使用广播变量的算子后面使用 withBroadcastSet 创建广播。
  • 在算子的内部函数使用 getRuntimeContext.getBroadcastVariable 获取变量。
  • 广播变量只能使用 RichFunction 函数才能用,因为需要获取 context。

二. 广播变量示例

下面是一个简单的 Flink 广播变量的示例:

package com.yuchaoshui.flink;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class Main {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 准备广播变量的数据(量小),一个TaskManager只会存一份
        DataSource<HashMap<String, String>> broadcastData =
                env.fromCollection(Collections.singletonList(new HashMap<String, String>() {{
                    put("0", "zhao");
                    put("1", "qian");
                    put("2", "sun");
                    put("3", "li");
                    put("4", "zhou");
                    put("5", "wu");
                    put("6", "zheng");
                    put("7", "wang");
                }}));

        // 正常的批数据(量大)
        DataSource<Tuple2<String, String>> normalSource = env.fromCollection(
                Arrays.asList(
                        new Tuple2<String, String>("3", "999a"),
                        new Tuple2<String, String>("4", "999b"),
                        new Tuple2<String, String>("5", "999c"),
                        new Tuple2<String, String>("6", "999d"),
                        new Tuple2<String, String>("7", "999e"),
                        new Tuple2<String, String>("10", "999f"),
                        new Tuple2<String, String>("20", "999g")
                )
        );
        normalSource.map(new RichMapFunction<Tuple2<String, String>, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(Tuple2<String, String> value) throws Exception {
                List<Map<String, String>> broadcastData =
                        getRuntimeContext().getBroadcastVariable("CustomBroadcastData");

                return Tuple3.of(value.f0, broadcastData.get(0).get(value.f0), value.f1);
            }
        }).withBroadcastSet(broadcastData, "CustomBroadcastData").print();

    }
}



/* 输出
(3,li,999a)
(4,zhou,999b)
(5,wu,999c)
(6,zheng,999d)
(7,wang,999e)
(10,null,999f)
(20,null,999g)
*/
🔗

文章推荐