flink 如何自定义 source 数据源
Apache Flink 提供了自定义 Source 的能力,使用户可以根据自己的需求实现数据源的逻辑。自定义Source是通过实现 Flink 的 SourceFunction
接口来实现的。
一. 单并行度DataSource
单并行度source实现SourceFunction,并且单并行度source不允许使用setParallelism方式设置并行度。
创建MySource
package com.yuchaoshui.flink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySource implements SourceFunction<String> {
private boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
// 如果 run 方法不会退出,那么就是一个无限的数据流
int i = 0;
while (running) {
Thread.sleep(1000);
ctx.collect("data:" + i);
i++;
}
}
@Override
public void cancel() {
// 在web界面取消执行该任务时,会调用该方法,停止任务
running = false;
}
}
使用MySource
package com.yuchaoshui.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Main {
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
c.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(c);
DataStreamSource<String> lines = env.addSource(new MySource(), MySource.class.getName());
// 并行度为1
System.out.println("parallelism: " + lines.getParallelism());
// 并行度不为1,使用 rebalance 的方式将数据轮询分发到不同的 subtask
lines.print();
env.execute();
}
}
二. 多并行度DataSource
创建 MyParallelSource
package com.yuchaoshui.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
public class MyParallelSource extends RichParallelSourceFunction<String> {
public static Boolean running = true;
public MyParallelSource() {
super();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
// 根据不同的 subtask index 生成不同的数据,由代码编写者自己决定
switch (indexOfThisSubtask % 3) {
case 0:
getNumByIndexOfSubtask(ctx, 0);
break;
case 1:
getNumByIndexOfSubtask(ctx, 1);
break;
case 2:
getNumByIndexOfSubtask(ctx, 2);
break;
}
}
public void getNumByIndexOfSubtask(SourceContext<String> ctx, int group) throws Exception {
int i = 0;
while (running) {
Thread.sleep(1000);
ctx.collect(group + ":" + i);
i++;
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void close() throws Exception {
super.close();
}
}
使用MyParallelSource
package com.yuchaoshui.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Main {
public static void main(String[] args) throws Exception {
Configuration c = new Configuration();
c.setInteger("rest.port", 8181);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(c);
DataStreamSource<String> lines = env.addSource(new MyParallelSource(), MyParallelSource.class.getName());
// 并行度不为1,可以设置并行度大小
System.out.println("parallelism: " + lines.getParallelism());
// 并行度不为1,使用 rebalance 的方式将数据轮询分发到不同的 subtask
lines.print();
env.execute();
}
}
ParallelSource方法调用顺序
- 调用 MySource 构造方法
- 调用 open 方法,调用一次
- 调用 run 方法
- 调用 cancel 方法,webui点击cancel时会调用
- 调用 close 方法,释放资源