本地执行Flink demo
- 开发工具IDEA
- maven配置
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.4.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
demo与官网的quick-start一样,只是启动用local模式,我这里写的是参考官网java代码,启动参数为 –port 9999
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception{
final int port;
try{
final ParameterTool params=ParameterTool.fromArgs(args);
port=params.getInt("port");
}catch (Exception e){
System.err.println("No port specified.Pleas run 'SocketWindowCount --port <port>'");
return ;
}
// final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//local模式
final StreamExecutionEnvironment env=StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> text=env.socketTextStream("localhost",port,"\n");
DataStream<WordWithCount> windowCounts=text.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
for(String word:value.split("\\s")){
out.collect(new WordWithCount(word,1L));
}
}
}).keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1)).reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word,a.count+b.count);
}
});
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){
}
public WordWithCount(String word,long count){
this.word=word;
this.count=count;
}
@Override
public String toString() {
return word+" : "+count;
}
}
}
模拟server发送消息。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
//模拟服务端发送消息
public class ServerDemo {
public static void main(String[] args) throws IOException{
//本机localhost
System.out.println(InetAddress.getLocalHost());
ServerSocket server=new ServerSocket(9999);
Socket client=server.accept();
BufferedReader in =new BufferedReader(new InputStreamReader(System.in));
PrintWriter out=new PrintWriter(client.getOutputStream());
while(true){
String str=in.readLine();
System.out.println(str);
out.println(str);
out.flush();
if(str.equals("end"))
break;
}
client.close();
}
}