flink中将数据赞一赞再执行
数据分好组了(KeyBy),是组内计数,我们直接countWindow(5)即可
数据未未分组所以全局计数.countWindowAll(5)
例如
数据分好组的代码:
public class KeyBycountWindows {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//如果是划分窗口,如果调用keyBy分组(Keyed Stream),调用window
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = wordAndOne.keyBy(1);
//传入窗口分配器(划分器),传入具体划分窗口规则
//CountWindw:按照条数划分窗口
WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> noall = keyedStream.countWindow(5);
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = noall.sum(1);
summed.print();
env.execute();
}
}
在端口依次输入:
hive
hive
hive
hive
hive
代码实现:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
3> (hive,5)
数据没有分好组的代码:
public class countWindowsAll {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//如果是划分窗口,如果没有调用keyBy分组(Non-Keyed Stream),调用windowAll
SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);
//传入窗口分配器(划分器),传入具体划分窗口规则
//CountWindw:按照条数划分窗口
AllWindowedStream<Integer, GlobalWindow> window = nums.countWindowAll(5);
SingleOutputStreamOperator<Integer> result = window.sum(0);
result.print();
env.execute();
}
}
在端口依次输入
1
1
1
1
1
代码实现:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2> 5