一:Flink
不知道怎么去介绍Flink,从简单的说Flink就是一个流式计算的框架,选用Flink主要是阿里根据原有的Flink改为了更适合自己的Blink,基于阿里的影响,所以选择了Flink。一路坑,一路踩,现在稍微有点小收获,分享一下。Flink主要是对Sql流的支持,以及自身的特性,以流为基础,批处理是特殊的流。对Flink有兴趣的可以去搜索一下,介绍真的不太会。
二:代码 从简单的开始 从DataSet开始
最简单的一个demo 计算本地单词的数量
package com.yjp.flink.demo; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.tuple.Tuple2; /** * 简单demo 计算本地发射单词的数量 * * @Author : WenBao * Date : 16:25 2018/3/1 */ public class WordCount { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //fromElements发射数据 DataSet<String> text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles," ); DataSet<Tuple2<String, Integer>> counts = text.flatMap(new LineSplitter()).groupBy(0) .aggregate(Aggregations.SUM, 1); counts.print(); } }
package com.yjp.flink.demo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2<String, Integer>(token, 1)); } } } }
2.从文件中读出流,将流中的Tuple转换为对象,然后存入文件中
package com.yjp.flink.demo2; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.core.fs.FileSystem; /** * 第二个demo 可以从文件中读取然后计算 将计算结果写入文件中 * * @Author : WenBao * Date : 16:25 2018/3/1 */ public class WordCountPojo { public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); //建立执行环境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //使参数在Web接口中可用 env.getConfig().setGlobalJobParameters(params); //输入 DataSet<String> text; if (params.has("input")) { //从给定输入路径读取文本文件 text = env.readTextFile(params.get("input")); } else { // 获取默认的数据 System.out.println("Executing WordCount example with default input data set."); System.out.println("Use --input to specify file input."); text = WordCountData.getDefaultTextLineDataSet(env); } DataSet<Word> counts = text.flatMap(new Tokenizer()).groupBy("word") .reduce( new ReduceFunction<Word>() { public Word reduce(Word value1, Word value2) throws Exception { return new Word(value1.getWord(), value1.getFrequency() + value2.getFrequency()); } }); if (params.has("output")) { //FileSystem.WriteMode.OVERWRITE 如果文件存在会删除文件新建 counts.writeAsText(params.get("output"), FileSystem.WriteMode.OVERWRITE); env.execute("WordCount-Pojo Example"); } else { System.out.println("Printing result to stdout. Use --output to " + "specify output path."); counts.print(); } } }
package com.yjp.flink.demo2; public class Word { private String word; private int frequency; public Word() { } public Word(String word, int frequency) { this.word = word; this.frequency = frequency; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getFrequency() { return frequency; } public void setFrequency(int frequency) { this.frequency = frequency; } @Override public String toString() { return "Word{" + "word='" + word + '\'' + ", frequency=" + frequency + '}'; } }
package com.yjp.flink.demo2; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; public class Tokenizer implements FlatMapFunction<String, Word> { @Override public void flatMap(String value, Collector<Word> collector) throws Exception { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Word(token, 1)); } } } }
package com.yjp.flink.demo2; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; public class WordCountData { public static final String[] WORDS = new String[]{ "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", }; public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) { return env.fromElements(WORDS); } }
3.将两个数据流中公共的数据提取出来
package com.yjp.flink.demo3; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import java.util.List; /** * 第三个demo 将两个数据流中公共的数据提取出来 * * @Author : WenBao * Date : 16:26 2018/3/1 */ public class CollectionExecutionExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); User[] usersArray = {new User(1, "Peter"), new User(2, "John"), new User(3, "Bill")}; EMail[] emailsArray = { new EMail(1, "Re: Meeting", "How about 1pm?"), new EMail(1, "Re: Meeting", "Sorry, I'm not availble"), new EMail(4, "Re: Re: Project proposal", "Give me a few more days to think about it.")}; //将User类型的数据发射 DataSet<User> userDataSet = env.fromElements(usersArray); //将EMail类型的数据发射 DataSet<EMail> eMailDataSet = env.fromElements(emailsArray); //将两个数据流中等同的数据保留 符合条件的保留不符合条件则抛弃 DataSet<Tuple2<User, EMail>> joined = userDataSet.join(eMailDataSet) .where("userIdentifier").equalTo("userId"); //将符合条件的数据发射 List<Tuple2<User, EMail>> result = joined.collect(); for (Tuple2<User, EMail> t : result) { System.err.println("Result = " + t); } } }
package com.yjp.flink.demo3; public class EMail { public int userId; public String subject; public String body; public EMail() { } public EMail(int userId, String subject, String body) { this.userId = userId; this.subject = subject; this.body = body; } public String toString() { return "eMail{userId=" + userId + " subject=" + subject + " body=" + body + "}"; } }
package com.yjp.flink.demo3; public class User { public int userIdentifier; public String name; public User() { } public User(int userIdentifier, String name) { this.userIdentifier = userIdentifier; this.name = name; } public String toString() { return "Student{userIdentifier=" + userIdentifier + " name=" + name + "}"; } }4.通过对象获得最大或者最小值 也可以通过Tuple
package com.yjp.flink.demo4; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Demo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Student> dataStream = env.fromElements( new Student(16, "zhangasn", 194.5), new Student(17, "zhangasn", 184.5), new Student(18, "zhangasn", 174.5), new Student(16, "lisi", 194.5), new Student(17, "lisi", 184.5), new Student(18, "lisi", 174.5) ); DataStream<Student> dataStream1 = dataStream.keyBy("age").min("height"); dataStream1.print(); } }
package com.yjp.flink.demo4; public class Student { private int age; private String name; private double height; public Student() { } public Student(int age, String name, double height) { this.age = age; this.name = name; this.height = height; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getHeight() { return height; } public void setHeight(double height) { this.height = height; } @Override public String toString() { return "Student{" + "age=" + age + ", name='" + name + '\'' + ", height=" + height + '}'; } }5.通过流来分组和时间窗口
扫描二维码关注公众号,回复:
659891 查看本文章
package com.yjp.flink.demo5; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; /** * 每5秒钟计算一次单词的数量 * * @Author : WenBao * Date : 14:47 2018/3/2 */ public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("local", 9999) //输入一个元素,生成零个、一个或者多个元素 .flatMap(new LineSplitter()) //以第一个元素分组 .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); //输入一个元素,生成另一个元素,元素类型不变 dataStream.map(new MapFunction<Tuple2<String, Integer>, Object>() { @Override public Object map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return stringIntegerTuple2.f1 * 2; } }); dataStream.print(); //一旦完成程序,用户需要启动程序执行,可以直接调用 StreamExecutionEnviroment 的 execute() env.execute("Window WordCount"); } }
package com.yjp.flink.demo5; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2<String, Integer>(token, 1)); } } } }
上面是一些简单的Flink API的操作
努力吧,皮卡丘