1 预定义Source
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class SourceExamples {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 基于可变参数
DataStream<String> ds1 = env.fromElements("hadoop,spark,flink", "hadoop spark hdfs", "flink");
// 基于集合
DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop,spark,flink", "hadoop spark hdfs", "flink"));
// 基于文件
DataStream<String> ds3 = env.readTextFile("SourceExamples.java");
// 基于文件-HDFS
DataStream<String> ds4 = env.readTextFile("hdfs://localhost:9000/user/hadoop/flink/wordcount/output.txt");
// 基于Socket
DataStream<String> ds5 = env.socketTextStream("localhost", 9999);
ds1.print();
ds2.print();
ds3.print();
ds4.print();
ds5.print();
env.execute();
}
}
2 自定义Source
2.1 自定义类为Source
Source 类继承 RichParallelSourceFunction 并重写 run 方法
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
public class SourceExamplesUser {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream<User> userDS = env.addSource(new UserSource());
userDS.print();
env.execute();
}
}
class User {
private int id;
private int gender;
public User(int id, int gender) {
this.id = id;
this.gender = gender;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", gender=" + gender +
'}';
}
}
class UserSource extends RichParallelSourceFunction<User> {
private Boolean flag = true;
@Override
public void run(SourceContext<User> out) throws Exception {
Random random = new Random();
while (flag){
Thread.sleep(1000);
int id = random.nextInt(100);
int gender = random.nextInt(2);
out.collect(new User(id,gender));
}
}
@Override
public void cancel() {
flag = false;
}
}