已复制
全屏展示
复制代码

Flink 广播流(流处理)

· 2 min read

一. 广播流介绍

广播流用于流计算

场景:

  • 当两条流需要 join 的时候,并且其中一条流的数据量很小,并且不是经常变动。此时可以将数据流较小的流作为广播流,这样的话在另外一条流的每一个Subtask里面都有了这个数据量较小的数据流。
  • 配合 MapStateDescriptor 使用,就可以把广播流当做变量一样来使用了。
  • 同时还可以实时更新广播流的值。

注意:

  • 广播流只能用在流处理 StreamExecutionEnvironment 环境,广播变量只能用作批处理 ExecutionEnvironment 中。

二. 广播流使用

示例:


package com.yuchaoshui.flink;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Collections;
import java.util.HashMap;

/* source2 测试数据
2,90
9,80
4,999
3,88
4,890
*/

public class Main {
    public static void main(String[] args) throws Exception {
        final String className = Thread.currentThread().getStackTrace()[1].getClassName();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        // 定义广播流
        DataStreamSource<HashMap<String, String>> source1 =
                env.fromCollection(Collections.singletonList(new HashMap<String, String>() {{
                    put("0", "zhao");
                    put("1", "qian");
                    put("2", "sun");
                    put("3", "li");
                    put("4", "zhou");
                    put("5", "wu");
                    put("6", "zheng");
                    put("7", "wang");
                }}));
        
        MapStateDescriptor<String, String> broadcastDescriptor = new MapStateDescriptor<String, String>
                ("customBroadcastState", Types.STRING, Types.STRING);
        BroadcastStream<HashMap<String, String>> broadcastStream = source1.broadcast(broadcastDescriptor);


        // 定义正常数据流
        DataStreamSource<String> source2 = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple3<String, String, String>> process = source2
                .connect(broadcastStream)
                .process(new BroadcastProcessFunction<
                        String,
                        HashMap<String, String>,
                        Tuple3<String, String, String>>() {

                    // 正常数量处理逻辑,在里面可以获取广播流的广播状态
                    @Override
                    public void processElement(String value,
                                               ReadOnlyContext ctx,
                                               Collector<Tuple3<String, String, String>> out)
                            throws Exception {
                        ReadOnlyBroadcastState<String, String> broadcastState =
                                ctx.getBroadcastState(broadcastDescriptor);
                        String[] split = value.split(",");
                        out.collect(
                                Tuple3.of(split[0],
                                        broadcastState.get(split[0]),
                                        split[1]
                                )
                        );
                    }

                    // 广播流状态的更新
                    @Override
                    public void processBroadcastElement(HashMap<String, String> value,
                                                        Context ctx,
                                                        Collector<Tuple3<String, String, String>> out)
                            throws Exception {
                        BroadcastState<String, String> broadcastState =
                                ctx.getBroadcastState(broadcastDescriptor);
                        broadcastState.putAll(value);
                    }
                });
        process.print();

        env.execute(className);
    }
}

/* 输出数据
2> (2,sun,90)
3> (9,null,80)
5> (3,li,88)
4> (4,zhou,999)
6> (4,zhou,890)
*/
🔗

文章推荐