已复制
全屏展示
复制代码

Flink 从变量中读取数据


· 2 min read

在学习 Flink ,或者验证一些 Flink 的功能的时候,可能需要简单的数据准备,最好的方式就是从变量中读取数据(内存读取数据)。

一. fromCollection

package com.yuchaoshui.flink;

import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        // 传入基本类型
        DataStreamSource<Integer> nums1 = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0));
        nums1.print("nums1");

        // 传入自定义类型
        DataStreamSource<Person> nums2 = env.fromCollection(
                Arrays.asList(
                        new Person("zhang", 30),
                        new Person("li", 40),
                        new Person("song", 40))
        );
        nums2.print("num2");

        // 传入自定义类型,同时指定类型
        DataStreamSource<Person> nums3 = env.fromCollection(
                Arrays.asList(
                        new Person("zhang", 30),
                        new Person("li", 40),
                        new Person("song", 40)
                ),
                TypeInformation.of(new TypeHint<Person>() {
                })
        );
        nums3.print("num3");

        env.execute();
    }

    @Data
    public static class Person {
        String name;
        Integer age;

        public Person(String name_, Integer age_) {
            name = name_;
            age = age_;
        }
    }
}

二. fromElements


// 基本类型
DataStreamSource<Integer> nums = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 0);


// 自定义类型
DataStreamSource<Person> nums = env.fromElements(
    new Person("zhang", 30),
    new Person("li", 40),
    new Person("song", 40)
);

// 自定义类型,同时传入类型
DataStreamSource<Person> nums = env.fromElements(
    Person.class,
    new Person("zhang", 30),
    new Person("li", 40),
    new Person("song", 40)
);

三. fromParallelCollection

多并行,根据线程数决定,或者手动指定


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 1 到 20 之间的数,包含 1 也同时包含 20
DataStreamSource<Long> nums = env.fromParallelCollection(
    new NumberSequenceIterator(1L, 20L), Long.class
).setParallelism(3);

四. generateSequence

多并行,根据线程数决定,或者手动指定


// 1 到 100 之间的数,包含 1 也同时包含 100
DataStreamSource<Long> nums = env.generateSequence(1, 100).setParallelism(2);
🔗

文章推荐