Flink 从变量中读取数据
在学习 Flink ,或者验证一些 Flink 的功能的时候,可能需要简单的数据准备,最好的方式就是从变量中读取数据(内存读取数据)。
一. fromCollection
package com.yuchaoshui.flink;
import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 传入基本类型
DataStreamSource<Integer> nums1 = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0));
nums1.print("nums1");
// 传入自定义类型
DataStreamSource<Person> nums2 = env.fromCollection(
Arrays.asList(
new Person("zhang", 30),
new Person("li", 40),
new Person("song", 40))
);
nums2.print("num2");
// 传入自定义类型,同时指定类型
DataStreamSource<Person> nums3 = env.fromCollection(
Arrays.asList(
new Person("zhang", 30),
new Person("li", 40),
new Person("song", 40)
),
TypeInformation.of(new TypeHint<Person>() {
})
);
nums3.print("num3");
env.execute();
}
@Data
public static class Person {
String name;
Integer age;
public Person(String name_, Integer age_) {
name = name_;
age = age_;
}
}
}
二. fromElements
// 基本类型
DataStreamSource<Integer> nums = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 0);
// 自定义类型
DataStreamSource<Person> nums = env.fromElements(
new Person("zhang", 30),
new Person("li", 40),
new Person("song", 40)
);
// 自定义类型,同时传入类型
DataStreamSource<Person> nums = env.fromElements(
Person.class,
new Person("zhang", 30),
new Person("li", 40),
new Person("song", 40)
);
三. fromParallelCollection
多并行,根据线程数决定,或者手动指定
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1 到 20 之间的数,包含 1 也同时包含 20
DataStreamSource<Long> nums = env.fromParallelCollection(
new NumberSequenceIterator(1L, 20L), Long.class
).setParallelism(3);
四. generateSequence
多并行,根据线程数决定,或者手动指定
// 1 到 100 之间的数,包含 1 也同时包含 100
DataStreamSource<Long> nums = env.generateSequence(1, 100).setParallelism(2);