已复制
全屏展示
复制代码

flink 如何自定义 source 数据源


· 2 min read

Apache Flink 提供了自定义 Source 的能力,使用户可以根据自己的需求实现数据源的逻辑。自定义Source是通过实现 Flink 的 SourceFunction接口来实现的。

一. 单并行度DataSource

单并行度source实现SourceFunction,并且单并行度source不允许使用setParallelism方式设置并行度。

创建MySource

package com.yuchaoshui.flink;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class MySource implements SourceFunction<String> {

    private boolean running = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        // 如果 run 方法不会退出,那么就是一个无限的数据流
        int i = 0;
        while (running) {
            Thread.sleep(1000);
            ctx.collect("data:" + i);
            i++;
        }
    }

    @Override
    public void cancel() {
        // 在web界面取消执行该任务时,会调用该方法,停止任务
        running = false;
    }
}

使用MySource

package com.yuchaoshui.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Main {
    public static void main(String[] args) throws Exception {
        Configuration c = new Configuration();
        c.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(c);

        DataStreamSource<String> lines = env.addSource(new MySource(), MySource.class.getName());

        // 并行度为1
        System.out.println("parallelism: " + lines.getParallelism());

        // 并行度不为1,使用 rebalance 的方式将数据轮询分发到不同的 subtask
        lines.print();

        env.execute();
    }
}

二. 多并行度DataSource

创建 MyParallelSource

package com.yuchaoshui.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class MyParallelSource extends RichParallelSourceFunction<String> {

    public static Boolean running = true;

    public MyParallelSource() {
        super();
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();

        // 根据不同的 subtask index 生成不同的数据,由代码编写者自己决定
        switch (indexOfThisSubtask % 3) {
            case 0:
                getNumByIndexOfSubtask(ctx, 0);
                break;
            case 1:
                getNumByIndexOfSubtask(ctx, 1);
                break;
            case 2:
                getNumByIndexOfSubtask(ctx, 2);
                break;
        }
    }

    public void getNumByIndexOfSubtask(SourceContext<String> ctx, int group) throws Exception {
        int i = 0;
        while (running) {
            Thread.sleep(1000);
            ctx.collect(group + ":" + i);
            i++;
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
}

使用MyParallelSource

package com.yuchaoshui.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Main {
    public static void main(String[] args) throws Exception {
        Configuration c = new Configuration();
        c.setInteger("rest.port", 8181);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(c);

        DataStreamSource<String> lines = env.addSource(new MyParallelSource(), MyParallelSource.class.getName());

        // 并行度不为1,可以设置并行度大小
        System.out.println("parallelism: " + lines.getParallelism());

        // 并行度不为1,使用 rebalance 的方式将数据轮询分发到不同的 subtask
        lines.print();

        env.execute();
    }
}

ParallelSource方法调用顺序

  • 调用 MySource 构造方法
  • 调用 open 方法,调用一次
  • 调用 run 方法
  • 调用 cancel 方法,webui点击cancel时会调用
  • 调用 close 方法,释放资源
🔗

文章推荐