Flink版本为1.6,任务提交后的jobgraph为:
Maven 的pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>yzg.cmos</groupId>
<artifactId>HelloWolrdFlink</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.28</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-shade-plugin -->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<archive>
<manifest>
<mainClass>CallList.JustForTestFour</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Flink的主程序:
package CallList;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.*;
public class JustForTestFour {
//日志
private static Logger logger = LoggerFactory.getLogger(JustForTestFour.class);
public static void main(String[] args) throws Exception {
//设置statebackend
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
env.setStateBackend(new FsStateBackend("hdfs://172.19.72.52:8020/user/zx_yzg", false));
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "172.19.72.54:9092,172.19.72.55:9092,172.19.72.56:9092");
props.setProperty("zookeeper.connect", "172.19.72.54:2181,172.19.72.55:2181,172.19.72.56:2181");
props.setProperty("group.id", "call-list-group");
logger.info("begin to add source!");
FlinkKafkaConsumer010<String> consumer =
new FlinkKafkaConsumer010<>("topic_list_data", new SimpleStringSchema(), props);
DataStream source =
env.addSource(consumer)
.filter(new FilterClass()) //呼入的话单
.map(new MapclassOne()) //抽取部分字段
.map(new MapclassTwo()) //得到等待时长和应答时常
.keyBy(0, 1, 2) //按照statis-date,prov_code,call_id分组
.window(SlidingProcessingTimeWindows.of(Time.minutes(60), Time.minutes(3))) //小时窗口
.aggregate(new AggFunction(), new ProcessWinFun()).setParallelism(3); //得到是否是一条话单,和接通时长,及窗口的元数据;
//将source 发到kafka
//首先将collector 转换为 string
DataStream source_tostring = source.map(new MapFunction<Tuple6<String, String, String, Long, Long, String>, String>() {
@Override
public String map(Tuple6<String, String, String, Long, Long, String> value) throws Exception {
return value.toString();
}
});
//将string 吐到kafka
source_tostring.addSink(new FlinkKafkaProducer010<>(
"172.19.72.54:9092,172.19.72.55:9092,172.19.72.56:9092",
"topic_list_data_result",
new SimpleStringSchema()
)).name("list-data-to-kafka")
.setParallelism(3);
//后续给予窗口汇聚
DataStream aggreate = source.map(new MapFunction<Tuple6<String, String, String, Long, Long, String>, Tuple5<String, String, Long, Long, String>>() {
@Override
public Tuple5<String, String, Long, Long, String> map(Tuple6<String, String, String, Long, Long, String> v1) throws Exception {
//如果是人工请求,且接通率时长设置<15秒,则请求成功;
final Long time_long = 15000L;
final Long get_connect_flag;
if ((v1.f3 == 1L) && (v1.f4 <= time_long)) {
get_connect_flag = 1L;
} else {
get_connect_flag = 0L;
}
return new Tuple5<>(v1.f0, v1.f1, v1.f3, get_connect_flag, v1.f5);
}
}).keyBy(0, 1, 4)
.reduce(new ReduceFunction<Tuple5<String, String, Long, Long, String>>() {
@Override
public Tuple5<String, String, Long, Long, String> reduce(Tuple5<String, String, Long, Long, String> v1, Tuple5<String, String, Long, Long, String> v2) throws Exception {
v1.f2 += v2.f2;
v1.f3 += v2.f3;
return v1;
}
});
// 将汇聚后的结果转为string
DataStream aggreate_string=aggreate.map(new MapFunction<Tuple5<String, String, Long, Long, String>, String>() {
@Override
public String map(Tuple5<String, String, Long, Long, String> value) throws Exception {
return value.toString();
}
});
//汇聚后的结果 吐到kafka
aggreate_string.addSink(new FlinkKafkaProducer010<>(
"172.19.72.54:9092,172.19.72.55:9092,172.19.72.56:9092",
"topic_list_aggreat_result",
new SimpleStringSchema()
)).name("list-aggreate-to-kafka")
.setParallelism(3);
DataStream result=aggreate.keyBy(0,1,4)
.process(new KeyedProFun()).setParallelism(3)
.map(new MapFunction<Tuple5<String, String, Long, Long, String>, String>() {
@Override
public String map(Tuple5<String, String, Long, Long, String> value) throws Exception {
return value.toString();
}
}).setParallelism(2);
logger.info("add sink over!");
//将string 吐到kafka
result.addSink(new FlinkKafkaProducer010<>(
"172.19.72.54:9092,172.19.72.55:9092,172.19.72.56:9092",
"topic_list_last_result",
new SimpleStringSchema()
)).name("result_of_all")
.setParallelism(3);
logger.info("add sink over!");
env.execute("list_data");
//结果写入到mysql中;
/* create database bill_list default character set utf8;
CREATE USER 'csap'@'%' IDENTIFIED BY '1q2w!Q@W';
GRANT ALL PRIVILEGES ON bill_list. * TO 'csap'@'%';
FLUSH PRIVILEGES;*/
}
//只计算呼入的话单
public static final class FilterClass implements FilterFunction<String> {
@Override
public boolean filter(String value) throws Exception {
int[] call_type_in = {0, 1, 2, 3, 4, 5, 13, 15, 20, 21, 22, 23, 32, 33, 46};
int call_type = Integer.parseInt(GetList.parse(value).getCall_type());
//int device_type=Integer.parseInt(GetList.parse(value).getDevice_type());
int index = Arrays.binarySearch(call_type_in, call_type);
if ("-1".equals(index)) {
//System.out.println("two");
return false;
} else {
//System.out.println("twotwo");
return true;
}
}
}
//重写mapfunction:抽取统计时间、省份、call_id、call_type、ack_begin、ack_end等字段;
public static final class MapclassOne implements MapFunction<String, Tuple8<String, String, String, String, Long, Long, Long, Long>> {
@Override
public Tuple8<String, String, String, String, Long, Long, Long, Long> map(String s) throws Exception {
//System.out.println("three");
return GetListAsTuple.GetListAsTupleForMap(s);
}
}
//得到每条数据的等待和应答时长;
public static final class MapclassTwo implements MapFunction<Tuple8<String, String, String, String, Long, Long, Long, Long>, Tuple6<String, String, String, String, Long, Long>> {
@Override
public Tuple6<String, String, String, String, Long, Long> map(Tuple8<String, String, String, String, Long, Long, Long, Long> value) throws Exception {
System.out.println(" statis_date: " + value.f0 + " " + " prov_code: " + value.f1 + " " + " call_id: " + value.f2 + " " + " device_type: " + value.f3 + " wait: " + (value.f5 - value.f4) + " ack: " + (value.f7 - value.f6));
return new Tuple6<>(value.f0, value.f1, value.f2, value.f3, value.f5 - value.f4, value.f7 - value.f6);
}
}
//窗口中的数据进行reduce操作,得到device_type队列;
public static final class AggFunction implements AggregateFunction<Tuple6<String, String, String, String, Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple6<String, String, String, String, Long, Long> v1, Tuple2<Long, Long> v2) {
//System.out.println("add:one");
if (v1.f3.equals("1") || v1.f3.equals("2")) {
//System.out.println("add:two");
v2.f0 = v2.f0 + 1L;
}
return new Tuple2<>(v2.f0, v2.f1 + v1.f4);
}
@Override
public Tuple2<Long, Long> getResult(Tuple2<Long, Long> v1) {
if (v1.f0 >= 1L) {
v1.f0 = 1L;
}
return v1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> v1, Tuple2<Long, Long> v2) {
return new Tuple2<>(v1.f0 + v2.f0, v1.f1 + v2.f1);
}
}
public static final class ProcessWinFun extends ProcessWindowFunction<Tuple2<Long, Long>, Tuple6<String, String, String, Long, Long, String>, Tuple, TimeWindow> {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<Long, Long>> iterable, Collector<Tuple6<String, String, String, Long, Long, String>> collector) throws Exception {
System.out.println("begin to processfun:~~~~~~~~~~~");
Tuple3<String, String, String> key = (Tuple3<String, String, String>) tuple;
Tuple2<Long, Long> aggreateResult = iterable.iterator().next();
collector.collect(new Tuple6<>(key.f0, key.f1, key.f2, aggreateResult.f0, aggreateResult.f1, format.format(context.window().getEnd())));
}
}
//只得到结果:得到窗口周期内人工请求量和15s内接通请求量
public static final class KeyedProFun extends KeyedProcessFunction<Tuple, Tuple5<String, String, Long, Long, String>, Tuple5<String, String, Long, Long, String>> {
private ListState<Tuple5<String, String, Long, Long, String>> itemState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
itemState = getRuntimeContext().getListState(new ListStateDescriptor("itemState",new TupleTypeInfo()));
new TypeHint<Tuple2<Long, String>>(){};
}
@Override
public void processElement(Tuple5<String, String, Long, Long, String> input,
Context context,
Collector<Tuple5<String, String, Long, Long, String>> collector) throws Exception {
// 每条数据都保存到状态中
itemState.add(input);
//context.timerService().currentProcessingTime();
//将数据中最后一个字段转为 整形的时间
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(input.f4);
long timestamp = date.getTime();
context.timerService().registerProcessingTimeTimer(timestamp+1);
}
@Override
public void onTimer(long timestamp,
OnTimerContext ctx,
Collector<Tuple5<String, String, Long, Long, String>> out) throws Exception {
List<Tuple5<String, String, Long, Long, String>> allItems = new ArrayList<>();
for (Tuple5<String, String, Long, Long, String> item : itemState.get()) {
allItems.add(item);
}
// 提前清除状态中的数据,释放空间
itemState.clear();
// 按照点击量从大到小排序
allItems.sort(new Comparator<Tuple5<String, String, Long, Long, String>>() {
@Override
public int compare(Tuple5<String, String, Long, Long, String> o1, Tuple5<String, String, Long, Long, String> o2) {
return (int) (o2.f2 - o1.f2);
}
});
Tuple5<String, String, Long, Long, String> result=allItems.get(0);
out.collect(result);
}
}
}
实体类:
package CallList;
import java.io.Serializable;
public class CallListTable implements Serializable {
String statis_date;
String prov_code ;
String file_name ;
String part_id ;
String call_id ;
String call_id_num ;
String caller_no ;
String callee_no ;
String wait_begin ;
String wait_end ;
String ack_begin ;
String ack_end ;
String call_begin ;
String call_end ;
String service_no ;
String trk_no ;
String trk_grp_no ;
String mod_no ;
String device_type ;
String device_no ;
String device_in ;
String call_type ;
String wait_cause ;
String release_cause ;
String sub_cc_no ;
String vdn ;
String media_type ;
String uv_id ;
String org_cc_no ;
String org_call_id ;
String org_callee_no ;
String org_service_no ;
String ser_cc_no ;
String ser_service ;
String user_level ;
String user_type ;
String call_in_cause ;
String enter_reason ;
String leave_reason ;
String bill_info ;
String pre_service_no ;
String pre_device_type ;
String pre_device_no ;
String pre_device_in ;
String media_info_type ;
String skill_id ;
String location_id ;
String bill_info1 ;
String bill_info2 ;
String bill_info3 ;
String bill_info4 ;
String obs_service_id ;
String obs_unique_id ;
String current_skill_id ;
String sub_media_type ;
String cust_star_level ;
String cust_star_level_name ;
String user_level_name ;
public CallListTable(String calllist){
//statis_date|prov_code|file_name|part_id|call_id|call_id_num|caller_no|callee_no|wait_begin|wait_end|ack_begin|ack_end|call_begin|call_end|service_no|trk_no|trk_grp_no|mod_no|device_type|device_no|device_in|call_type|wait_cause|release_cause|sub_cc_no|vdn|media_type|uv_id|org_cc_no|org_call_id|org_callee_no|org_service_no|ser_cc_no|ser_service|user_level|user_type|call_in_cause|enter_reason|leave_reason|bill_info|pre_service_no|pre_device_type|pre_device_no|pre_device_in|media_info_type|skill_id|location_id|bill_info1|bill_info2|bill_info3|bill_info4|obs_service_id|obs_unique_id|current_skill_id|sub_media_type|cust_star_level|cust_star_level_name|user_level_name|real_callee_no
//20190429|371|371_3_Prm20190429_0101_000105.dat|7|1556498471-674286|-1|10085|13783468713|2019-04-29 08:41:12|2019-04-29 08:41:12|2019-04-29 08:41:12|2019-04-29 08:41:16|2019-04-29 08:41:16|2019-04-29 08:41:16|1|65535|-1|80|2|322|DEVICE_AGENT|7|0|1287|3|1|5|1210936422|3|4294967295-4294967295||65535|3|1|0|0|-1|0|16|-1|0|0|0||1|1|0|-1|-1|-1|-1|-1299400938|-1299400907|1|0|101|255|1|
String[] tokens = calllist.split("\\|");
this.setStatis_date(tokens[0]);
this.setProv_code(tokens[1]);
this.setFile_name(tokens[2]);
this.setPart_id(tokens[3]);
this.setCall_id(tokens[4]);
this.setCall_id_num(tokens[5]);
this.setCaller_no(tokens[6]);
this.setCallee_no(tokens[7]);
this.setWait_begin(tokens[8]);
this.setWait_end(tokens[9]);
this.setAck_begin(tokens[10]);
this.setAck_end(tokens[11]);
this.setCall_begin(tokens[12]);
this.setCall_end(tokens[13]);
this.setService_no(tokens[14]);
this.setTrk_no(tokens[15]);
this.setTrk_grp_no(tokens[16]);
this.setMod_no(tokens[17]);
this.setDevice_type(tokens[18]);
this.setDevice_no(tokens[19]);
this.setDevice_in(tokens[20]);
this.setCall_type(tokens[21]);
this.setWait_cause(tokens[22]);
this.setRelease_cause(tokens[23]);
this.setSub_cc_no(tokens[24]);
this.setVdn(tokens[25]);
this.setMedia_type(tokens[26]);
this.setUv_id(tokens[27]);
this.setOrg_cc_no(tokens[28]);
this.setOrg_call_id(tokens[29]);
this.setOrg_callee_no(tokens[30]);
this.setOrg_service_no(tokens[31]);
this.setSer_cc_no(tokens[32]);
this.setSer_service(tokens[33]);
this.setUser_level(tokens[34]);
this.setUser_type(tokens[35]);
this.setCall_in_cause(tokens[36]);
this.setEnter_reason(tokens[37]);
this.setLeave_reason(tokens[38]);
this.setBill_info(tokens[39]);
this.setPre_service_no(tokens[40]);
this.setPre_device_type(tokens[41]);
this.setPre_device_no(tokens[42]);
this.setPre_device_in(tokens[43]);
this.setMedia_info_type(tokens[44]);
this.setSkill_id(tokens[45]);
this.setLocation_id(tokens[46]);
this.setBill_info1(tokens[47]);
this.setBill_info2(tokens[48]);
this.setBill_info3(tokens[49]);
this.setBill_info4(tokens[50]);
this.setObs_service_id(tokens[51]);
this.setObs_unique_id(tokens[52]);
this.setCurrent_skill_id(tokens[53]);
this.setSub_media_type(tokens[54]);
this.setCust_star_level(tokens[55]);
this.setCust_star_level_name(tokens[56]);
this.setUser_level_name(tokens[57]);
}
public String getStatis_date() {
return statis_date;
}
public void setStatis_date(String statis_date) {
this.statis_date = statis_date;
}
public String getProv_code() {
return prov_code;
}
public void setProv_code(String prov_code) {
this.prov_code = prov_code;
}
public String getFile_name() {
return file_name;
}
public void setFile_name(String file_name) {
this.file_name = file_name;
}
public String getPart_id() {
return part_id;
}
public void setPart_id(String part_id) {
this.part_id = part_id;
}
public String getCall_id() {
return call_id;
}
public void setCall_id(String call_id) {
this.call_id = call_id;
}
public String getCall_id_num() {
return call_id_num;
}
public void setCall_id_num(String call_id_num) {
this.call_id_num = call_id_num;
}
public String getCaller_no() {
return caller_no;
}
public void setCaller_no(String caller_no) {
this.caller_no = caller_no;
}
public String getCallee_no() {
return callee_no;
}
public void setCallee_no(String callee_no) {
this.callee_no = callee_no;
}
public String getWait_begin() {
return wait_begin;
}
public void setWait_begin(String wait_begin) {
this.wait_begin = wait_begin;
}
public String getWait_end() {
return wait_end;
}
public void setWait_end(String wait_end) {
this.wait_end = wait_end;
}
public String getAck_begin() {
return ack_begin;
}
public void setAck_begin(String ack_begin) {
this.ack_begin = ack_begin;
}
public String getAck_end() {
return ack_end;
}
public void setAck_end(String ack_end) {
this.ack_end = ack_end;
}
public String getCall_begin() {
return call_begin;
}
public void setCall_begin(String call_begin) {
this.call_begin = call_begin;
}
public String getCall_end() {
return call_end;
}
public void setCall_end(String call_end) {
this.call_end = call_end;
}
public String getService_no() {
return service_no;
}
public void setService_no(String service_no) {
this.service_no = service_no;
}
public String getTrk_no() {
return trk_no;
}
public void setTrk_no(String trk_no) {
this.trk_no = trk_no;
}
public String getTrk_grp_no() {
return trk_grp_no;
}
public void setTrk_grp_no(String trk_grp_no) {
this.trk_grp_no = trk_grp_no;
}
public String getMod_no() {
return mod_no;
}
public void setMod_no(String mod_no) {
this.mod_no = mod_no;
}
public String getDevice_type() {
return device_type;
}
public void setDevice_type(String device_type) {
this.device_type = device_type;
}
public String getDevice_no() {
return device_no;
}
public void setDevice_no(String device_no) {
this.device_no = device_no;
}
public String getDevice_in() {
return device_in;
}
public void setDevice_in(String device_in) {
this.device_in = device_in;
}
public String getCall_type() {
return call_type;
}
public void setCall_type(String call_type) {
this.call_type = call_type;
}
public String getWait_cause() {
return wait_cause;
}
public void setWait_cause(String wait_cause) {
this.wait_cause = wait_cause;
}
public String getRelease_cause() {
return release_cause;
}
public void setRelease_cause(String release_cause) {
this.release_cause = release_cause;
}
public String getSub_cc_no() {
return sub_cc_no;
}
public void setSub_cc_no(String sub_cc_no) {
this.sub_cc_no = sub_cc_no;
}
public String getVdn() {
return vdn;
}
public void setVdn(String vdn) {
this.vdn = vdn;
}
public String getMedia_type() {
return media_type;
}
public void setMedia_type(String media_type) {
this.media_type = media_type;
}
public String getUv_id() {
return uv_id;
}
public void setUv_id(String uv_id) {
this.uv_id = uv_id;
}
public String getOrg_cc_no() {
return org_cc_no;
}
public void setOrg_cc_no(String org_cc_no) {
this.org_cc_no = org_cc_no;
}
public String getOrg_call_id() {
return org_call_id;
}
public void setOrg_call_id(String org_call_id) {
this.org_call_id = org_call_id;
}
public String getOrg_callee_no() {
return org_callee_no;
}
public void setOrg_callee_no(String org_callee_no) {
this.org_callee_no = org_callee_no;
}
public String getOrg_service_no() {
return org_service_no;
}
public void setOrg_service_no(String org_service_no) {
this.org_service_no = org_service_no;
}
public String getSer_cc_no() {
return ser_cc_no;
}
public void setSer_cc_no(String ser_cc_no) {
this.ser_cc_no = ser_cc_no;
}
public String getSer_service() {
return ser_service;
}
public void setSer_service(String ser_service) {
this.ser_service = ser_service;
}
public String getUser_level() {
return user_level;
}
public void setUser_level(String user_level) {
this.user_level = user_level;
}
public String getUser_type() {
return user_type;
}
public void setUser_type(String user_type) {
this.user_type = user_type;
}
public String getCall_in_cause() {
return call_in_cause;
}
public void setCall_in_cause(String call_in_cause) {
this.call_in_cause = call_in_cause;
}
public String getEnter_reason() {
return enter_reason;
}
public void setEnter_reason(String enter_reason) {
this.enter_reason = enter_reason;
}
public String getLeave_reason() {
return leave_reason;
}
public void setLeave_reason(String leave_reason) {
this.leave_reason = leave_reason;
}
public String getBill_info() {
return bill_info;
}
public void setBill_info(String bill_info) {
this.bill_info = bill_info;
}
public String getPre_service_no() {
return pre_service_no;
}
public void setPre_service_no(String pre_service_no) {
this.pre_service_no = pre_service_no;
}
public String getPre_device_type() {
return pre_device_type;
}
public void setPre_device_type(String pre_device_type) {
this.pre_device_type = pre_device_type;
}
public String getPre_device_no() {
return pre_device_no;
}
public void setPre_device_no(String pre_device_no) {
this.pre_device_no = pre_device_no;
}
public String getPre_device_in() {
return pre_device_in;
}
public void setPre_device_in(String pre_device_in) {
this.pre_device_in = pre_device_in;
}
public String getMedia_info_type() {
return media_info_type;
}
public void setMedia_info_type(String media_info_type) {
this.media_info_type = media_info_type;
}
public String getSkill_id() {
return skill_id;
}
public void setSkill_id(String skill_id) {
this.skill_id = skill_id;
}
public String getLocation_id() {
return location_id;
}
public void setLocation_id(String location_id) {
this.location_id = location_id;
}
public String getBill_info1() {
return bill_info1;
}
public void setBill_info1(String bill_info1) {
this.bill_info1 = bill_info1;
}
public String getBill_info2() {
return bill_info2;
}
public void setBill_info2(String bill_info2) {
this.bill_info2 = bill_info2;
}
public String getBill_info3() {
return bill_info3;
}
public void setBill_info3(String bill_info3) {
this.bill_info3 = bill_info3;
}
public String getBill_info4() {
return bill_info4;
}
public void setBill_info4(String bill_info4) {
this.bill_info4 = bill_info4;
}
public String getObs_service_id() {
return obs_service_id;
}
public void setObs_service_id(String obs_service_id) {
this.obs_service_id = obs_service_id;
}
public String getObs_unique_id() {
return obs_unique_id;
}
public void setObs_unique_id(String obs_unique_id) {
this.obs_unique_id = obs_unique_id;
}
public String getCurrent_skill_id() {
return current_skill_id;
}
public void setCurrent_skill_id(String current_skill_id) {
this.current_skill_id = current_skill_id;
}
public String getSub_media_type() {
return sub_media_type;
}
public void setSub_media_type(String sub_media_type) {
this.sub_media_type = sub_media_type;
}
public String getCust_star_level() {
return cust_star_level;
}
public void setCust_star_level(String cust_star_level) {
this.cust_star_level = cust_star_level;
}
public String getCust_star_level_name() {
return cust_star_level_name;
}
public void setCust_star_level_name(String cust_star_level_name) {
this.cust_star_level_name = cust_star_level_name;
}
public String getUser_level_name() {
return user_level_name;
}
public void setUser_level_name(String user_level_name) {
this.user_level_name = user_level_name;
}
}
解析数据源,得到字段;
package CallList;
import java.text.SimpleDateFormat;
import java.util.Date;
public class GetList {
//20190429|371|371_3_Prm20190429_0101_000105.dat|7|1556498471-674286|-1|10085|13783468713|2019-04-29 08:41:12|2019-04-29 08:41:12|2019-04-29 08:41:12|2019-04-29 08:41:16|2019-04-29 08:41:16|2019-04-29 08:41:16|1|65535|-1|80|2|322|DEVICE_AGENT|7|0|1287|3|1|5|1210936422|3|4294967295-4294967295||65535|3|1|0|0|-1|0|16|-1|0|0|0||1|1|0|-1|-1|-1|-1|-1299400938|-1299400907|1|0|101|255|1|
// statis_date|prov_code|file_name|part_id|call_id|call_id_num|caller_no|callee_no|wait_begin|wait_end|ack_begin|ack_end|call_begin|call_end|service_no|trk_no|trk_grp_no|mod_no|device_type|device_no|device_in|call_type|wait_cause|release_cause|sub_cc_no|vdn|media_type|uv_id|org_cc_no|org_call_id|org_callee_no|org_service_no|ser_cc_no|ser_service|user_level|user_type|call_in_cause|enter_reason|leave_reason|bill_info|pre_service_no|pre_device_type|pre_device_no|pre_device_in|media_info_type|skill_id|location_id|bill_info1|bill_info2|bill_info3|bill_info4|obs_service_id|obs_unique_id|current_skill_id|sub_media_type|cust_star_level|cust_star_level_name|user_level_name|real_callee_no
public static String getStatisDateFromListData(String raw){
CallListTable listTable=parse(raw);
return listTable.getStatis_date();
}
public static String getProCodeFromListData(String raw){
CallListTable listTable=parse(raw);
return listTable.getProv_code();
}
public static String getCallIdFromListData(String raw){
CallListTable listTable=parse(raw);
return listTable.getCall_id();
}
public static Long getWaitBeginFromListData(String raw) {
Long WaitBeginFromListData=null;
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try{
WaitBeginFromListData=format.parse(parse(raw).getWait_begin()).getTime();
}catch (Exception e){
System.out.println(e.getStackTrace());
System.out.println("getWaitBeginFromListData timestamp error,check data!");
}
return WaitBeginFromListData;
}
public static Long getWaitEndFromListData(String raw) {
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long WaitEndFromListData=null;
try{
WaitEndFromListData=format.parse(parse(raw).getWait_end()).getTime();
}catch (Exception e){
System.out.println(e.getStackTrace());
System.out.println("getWaitEndFromListData timestamp error,check data!");
}
return WaitEndFromListData;
}
public static Long getAckBeginFromListData(String raw) {
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long AckBeginFromListData=null;
try{
AckBeginFromListData=format.parse(parse(raw).getAck_begin()).getTime();
}catch (Exception e){
System.out.println(e.getStackTrace());
System.out.println("getAckBeginFromListData timestamp error,check data!");
}
return AckBeginFromListData;
}
public static Long getAckEndFromListData(String raw){
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long WaitBeginFromListData=null;
try{
WaitBeginFromListData=format.parse(parse(raw).getAck_end()).getTime();
}catch (Exception e){
System.out.println(e.getStackTrace());
System.out.println("getAckEndFromListData timestamp error,check data!");
}
return WaitBeginFromListData;
}
public static Long getCallBeginFromListData(String raw){
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long CallBeginFromListData=null;
try{
CallBeginFromListData=format.parse(parse(raw).getCall_begin()).getTime();
}catch (Exception e){
System.out.println(e.getStackTrace());
System.out.println("getCallBeginFromListData timestamp error,check data!");
}
return CallBeginFromListData;
}
public static Long getCallEndFromListData(String raw){
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long CallEndFromListData=null;
try{
CallEndFromListData=format.parse(parse(raw).getCall_end()).getTime();
}catch (Exception e){
System.out.println(e.getStackTrace());
System.out.println("getCallEndFromListData timestamp error,check data!");
}
return CallEndFromListData;
}
public static String getDeviceTypeFromListData(String raw){
CallListTable listTable=parse(raw);
return listTable.getDevice_type();
}
public static CallListTable parse(String raw){
CallListTable listTable=null;
if((raw.split("\\|").length!=58)){
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
System.out.println("The running time is :"+df.format(new Date()));// new Date()为获取当前系统时间
System.out.println("The calllist is less than 58, Please check for the original data! ");
}else {
listTable=new CallListTable(raw);
}
return listTable;
}
}
3、得到Tuple;
package CallList;
import org.apache.flink.api.java.tuple.Tuple8;
import java.text.ParseException;
import java.text.SimpleDateFormat;
public class GetListAsTuple {
public static Tuple8<String,String,String,String,Long,Long,Long,Long> GetListAsTupleForMap(String raw) throws ParseException {
CallListTable listTable=parse(raw);
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String getStatisDateFromListData=listTable.getStatis_date();
String getProCodeFromListData=listTable.getProv_code();
String getCallIdFromListData=listTable.getCall_id();
String getDeviceTypeFromListData=listTable.getDevice_type();
Long getAckBeginFromListData = format.parse(listTable.getAck_begin()).getTime();
Long getAckEndFromListData=format.parse(listTable.getAck_end()).getTime();
Long getWaitBeginFromListData=format.parse(listTable.getWait_begin() ).getTime();
Long getWaitEndFromListData=format.parse(listTable.getWait_end()).getTime();
Tuple8<String,String,String,String,Long,Long,Long,Long> tuple=new Tuple8<>(
getStatisDateFromListData
,getProCodeFromListData
,getCallIdFromListData
,getDeviceTypeFromListData
,getWaitBeginFromListData
,getWaitEndFromListData
,getAckBeginFromListData
,getAckEndFromListData);
return tuple;
}
public static CallListTable parse(String raw){
CallListTable listTable=null;
if((raw.split("\\|").length!=58)){
System.out.println("The calllist is less than 58, Please check for the original data! ");
}else {
listTable=new CallListTable(raw);
}
return listTable;
}
}