已复制
全屏展示
复制代码

Flink 从文件读取


· 1 min read

一. readFile

  • 只读取一次目录(FileProcessingMode.PROCESS_ONCE),处理完以后就退出了
  • 持续监听目录时(FileProcessingMode.PROCESS_CONTINUOUSLY),只要某个文件的修改时间有变化,不管文件内容有没有变化,都会重新读取一遍该文件。
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;

public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        
        // 如果是在yarn集群上跑的时候,必须是hdfs路径
        String path = "hdfs:///tmp/flink";
        // PROCESS_CONTINUOUSLY 模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化
        // 如果有个文件有变化,重新读取该文件
        DataStreamSource<String> lines = env.readFile(
                new TextInputFormat(null),
                path,
                FileProcessingMode.PROCESS_CONTINUOUSLY,
                5000
        );
        System.out.println("parallelism: " + lines.getParallelism()); // 为多并行
        lines.print();
        env.execute();
    }
}

二. readTextFile

只读取一次,读完即退出,有限数据流。

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        
        // 如果是在yarn集群上跑的时候,必须是hdfs路径
        String path = "hdfs:///tmp/flink";
        DataStreamSource<String> lines = env.readTextFile(path);
        System.out.println("parallelism: " + lines.getParallelism()); // 为多并行
        lines.print();
        env.execute();
    }
}
🔗

文章推荐