flink 提供的ParameterTool 获取参数
代码
package com.it.flink.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 代码主要演示如何获取args传递的参数
*/
public class StreamWordCountGetParam {
public static void main(String[] args) throws Exception {
String host = "";
int port = 0;
try {
//flink 提供的ParameterTool 获取参数
ParameterTool tool = ParameterTool.fromArgs(args);
host = tool.get("host");
port = Integer.parseInt(tool.get("prot"));
}catch (Exception e){
e.printStackTrace();
port = 9999;
host = "t1";
System.out.println("未设置参数使用默认值");
}
//创建一个 flink stream 程序的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用StreamExecutionEnvironment 创建DataStream
DataStreamSource<String> lines = env.socketTextStream(host, port);
//调用DataStream上的方法 Transformation
SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String,Integer>> collector) throws Exception {
//切分
String[] split = s.split(" ");
for (String s1 : split) {
//输出前 组成元祖再返回
Tuple2<String, Integer> of = Tuple2.of(s1, 1);
collector.collect(of);
}
}
});
//将单词和一进行组合 ctrl+ i 或者tab 补全
//按单词分组 按value进行聚合 keyBy(0) 元祖中的第一个元素 sum(1)元祖中的第二个元素
SingleOutputStreamOperator<Tuple2<String, Integer>> keyBy = wordAndOne.keyBy(0).sum(1);
//调用sink (sink 必须调用)
//此处sink 是 print
keyBy.print();
//启动程序 给个JobName 启动有异常不要捕获要跑出去
env.execute("StreamWordCount");
/**
* 7> (flink,1)
* 7> (flink,2)
* 7> (flink,3)
*/
}
}
在IDEA中如传递参数
看源码
Keys have to start with ‘-’ or ‘–’
if (args[i].startsWith("--")) {
key = args[i].substring(2);
} else if (args[i].startsWith("-")) {
/**
* Returns {@link ParameterTool} for the given arguments. The arguments are keys followed by values.
* Keys have to start with '-' or '--'
*
* <p><strong>Example arguments:</strong>
* --key1 value1 --key2 value2 -key3 value3
*
* @param args Input array arguments
* @return A {@link ParameterTool}
*/
public static ParameterTool fromArgs(String[] args) {
final Map<String, String> map = new HashMap<>(args.length / 2);
int i = 0;
while (i < args.length) {
final String key;
if (args[i].startsWith("--")) {
key = args[i].substring(2);
} else if (args[i].startsWith("-")) {
key = args[i].substring(1);
} else {
throw new IllegalArgumentException(
String.format("Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.",
Arrays.toString(args), args[i]));
}
if (key.isEmpty()) {
throw new IllegalArgumentException(
"The input " + Arrays.toString(args) + " contains an empty argument");
}
i += 1; // try to find the value
if (i >= args.length) {
map.put(key, NO_VALUE_KEY);
} else if (NumberUtils.isNumber(args[i])) {
map.put(key, args[i]);
i += 1;
} else if (args[i].startsWith("--") || args[i].startsWith("-")) {
// the argument cannot be a negative number because we checked earlier
// -> the next argument is a parameter name
map.put(key, NO_VALUE_KEY);
} else {
map.put(key, args[i]);
i += 1;
}
}
return fromMap(map);
}