dataStream Api
package com.atguigu.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//有界流
// DataStreamSource<String> lineStreamSource = env.readTextFile(BatchWordCount.class.getClassLoader().getResource("word.txt").getPath());
//真正的流 nc-lk 7777
DataStreamSource<String> lineStreamSource = env.socketTextStream("localhost", 7777);
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndTuple = lineStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
for (String word : line.split(" ")) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
wordAndTuple.keyBy(data->data.f0).sum(1).print();
env.execute();
}
}
3)> (hello,1) 3 代表本地哪个线程。任务槽(slot)
6> (word,1)
3> (hello,2)
7> (flink,1)
datasetApi
package com.atguigu.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> lineDataSource = env.readTextFile(BatchWordCount.class.getClassLoader().getResource("word.txt").getPath());
FlatMapOperator<String, Tuple2<String, Long>> wordAndTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
for (String key : line.split(" ")) {
out.collect(Tuple2.of(key, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
UnsortedGrouping<Tuple2<String, Long>> wordAndgroup = wordAndTuple.groupBy(0);
AggregateOperator<Tuple2<String, Long>> sum = wordAndgroup.sum(1);
sum.print();
}
}