Flink读取文件的几种方式
1.从文本文件中读取数据
我们尝试读取一份用户访问网址的数据:
用户名 | 访问url | 时间戳 |
---|---|---|
asdzxc | www.baidu.com | 1607136604 |
asdzxv | www.taobao.com | 1607136610 |
asdzxb | www.leetcode.com | 1607136645 |
asdzxn | www.jd.com | 1607136668 |
asdzxm | www.aliyun.com | 1607136699 |
asdzxp | www.tencent.com | 1607136670 |
asdzxp | www.tencent.com | 1607136671 |
1.测试从文本读取数据
文本读取则是有限流,会直接读取完毕。这里我们读取一个用户访问一个网址信息
visitor_id string,url string,timestamp long
这也是常用的一种方式来做离线分析,一般上报的日志都会切割为日志,我们可以直接读取日志来进行分析
DataStream<String> textStream = env.readTextFile("D:\\develop\\projects\\flink-study" +
"-project\\src\\resources\\readfile");
DataStream<VisitorLog> logs = textStream.map(new MapFunction<String, VisitorLog>() {
@Override
public VisitorLog map(String value) throws Exception {
String[] strings = value.split(",");
return new VisitorLog(strings[0],strings[1],Long.parseLong(strings[2]));
}
});
2.从容器中读取数据
这种方式我们一般不适用,因为在大数据场景下,一般也不会将大数据文件放在容器中,因为容器里面就是内存里面,大数据文件动辄j上T,怎么可能放在容器里,所以这种方式也仅仅是测试逻辑罢了。
/**
* 2.测试从容器中读取数据
* asdzxc,www.baidu.com,1607136604
* asdzxv,www.taobao.com,1607136610
* asdzxb,www.leetcode.com,1607136645
* asdzxn,www.jd.com,1607136668
* asdzxm,www.aliyun.com,1607136699
* asdzxp,www.tencent.com,1607136670
*/
List<String> lists = new ArrayList<>();
lists.add("asdzxc,www.baidu.com,1607136604");
lists.add("asdzxv,www.taobao.com,1607136610");
lists.add("asdzxb,www.leetcode.com,1607136645");
lists.add("asdzxm,www.aliyun.com,1607136699");
DataStream<String> collectionStream = env.fromCollection(lists);
3.从流处理组件中读取数据
这种方式是目前企业一般使用的,采集—解耦存储组件—实时处理平台。比如flume—kafka—spark|flink|storm。或者是阿里的sls—datahub—blink—datahub—bi|mysql|adb
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","pc:9092");
properties.setProperty("group.id","log1");
DataStream<String> kafkaStream = env.addSource(new FlinkKafkaConsumer011<String>("visitor_log",
new SimpleStringSchema(),
properties));
4.自定义源读取数据
优秀如flink组件,也不是适配万能源的。毕竟通用的源就那么几个,不可能官方都匹配了。所以遇到一些企业的小范围使用组件,我们就需要使用自定源来实现读取数据。我们只要实现SourceFunction即可实现。
package com.keino.apiTest;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
//这里实现接口的时候,需要定义产生的数据类型
public class MySource implements SourceFunction<String> {
private static boolean run = true;
/**
* run方法可以源源不断的产生数据
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext ctx) throws Exception {
while(run){
ctx.collect("hello");
}
}
/**
* 该方法用来停止数据的生成
*/
@Override
public void cancel() {
this.run = false;
}
}
/**
* 4.自定义source
*/
DataStream<String> mySourceStream = env.addSource(new MySource());
mySourceStream.print("user define test").setParallelism(1);
env.execute();