Flink 从文件读取
一. readFile
- 只读取一次目录(FileProcessingMode.PROCESS_ONCE),处理完以后就退出了
- 持续监听目录时(FileProcessingMode.PROCESS_CONTINUOUSLY),只要某个文件的修改时间有变化,不管文件内容有没有变化,都会重新读取一遍该文件。
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 如果是在yarn集群上跑的时候,必须是hdfs路径
String path = "hdfs:///tmp/flink";
// PROCESS_CONTINUOUSLY 模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化
// 如果有个文件有变化,重新读取该文件
DataStreamSource<String> lines = env.readFile(
new TextInputFormat(null),
path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
5000
);
System.out.println("parallelism: " + lines.getParallelism()); // 为多并行
lines.print();
env.execute();
}
}
二. readTextFile
只读取一次,读完即退出,有限数据流。
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 如果是在yarn集群上跑的时候,必须是hdfs路径
String path = "hdfs:///tmp/flink";
DataStreamSource<String> lines = env.readTextFile(path);
System.out.println("parallelism: " + lines.getParallelism()); // 为多并行
lines.print();
env.execute();
}
}