配置中心
package dbus.config;
import java.io.Serializable;
/**
* 在生产上一般通过配置中心来管理
*/
public class GlobalConfig implements Serializable {
/**
* 数据库driver class
*/
public static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
/**
* 数据库jdbc url
*/
public static final String DB_URL = "jdbc:mysql://note01:3306/test?useUnicode=true&characterEncoding=utf8";
/**
* 数据库user name
*/
public static final String USER_MAME = "canal";
/**
* 数据库password
*/
public static final String PASSWORD = "000000";
/**
* 批量提交size
*/
public static final int BATCH_SIZE = 2;
//HBase相关配置
public static final String HBASE_ZOOKEEPER_QUORUM = "note01,note02,note03";
public static final String HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "2181";
public static final String ZOOKEEPER_ZNODE_PARENT = "/hbase";
}
商品模拟数据
package dbus.demodata;
import dbus.config.GlobalConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.types.Row;
import java.math.BigDecimal;
import java.sql.Types;
/**
* 商品数据模拟产生
**/
public class GoodsSimulator {
public static void main(String[] args) throws Exception {
//获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//准备测试数据 商品名称,商品价格,库存数,评论数
DataSet<Tuple5<Integer, String, BigDecimal, Integer, Integer>> inputs = env.fromElements(
Tuple5.of(1, "Apple iPhone X (A1865)", BigDecimal.valueOf(6319.00), 10000, 34564),
Tuple5.of(2, "vivo iQOO", BigDecimal.valueOf(3298.00), 20000, 3433),
Tuple5.of(3, "AppleMQHV2CH/A", BigDecimal.valueOf(2749.00), 2000, 342221),
Tuple5.of(4, "AppleApple Watch", BigDecimal.valueOf(2099.00), 5587, 22111),
Tuple5.of(5, "xiaomi10", BigDecimal.valueOf(2299.00), 10000, 1298)
);
//数据转换
DataSet<Row> goods = inputs.map(new MapFunction<Tuple5<Integer, String, BigDecimal, Integer, Integer>, Row>() {
@Override
public Row map(Tuple5<Integer, String, BigDecimal, Integer, Integer> value) throws Exception {
return Row.of(value.f0, value.f1, value.f2, value.f3, value.f4);
}
});
goods.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(GlobalConfig.DRIVER_CLASS)
.setDBUrl(GlobalConfig.DB_URL)
.setUsername(GlobalConfig.USER_MAME)
.setPassword(GlobalConfig.PASSWORD)
.setBatchInterval(GlobalConfig.BATCH_SIZE)
.setQuery("insert into zyd_goods (goodsId ,goodsName ,sellingPrice ,goodsStock ,appraiseNum ) values (?,?,?,?,?)")
.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.DECIMAL, Types.INTEGER, Types.INTEGER})
.finish()
);
env.execute("GoodsSimulator");
}
}
订单模拟数据
需要引入依赖,需要自己在码云上编译,自己打包到本地仓库
<dependency>
<groupId>com.cloudwise.toushibao</groupId>
<artifactId>simulatedata-generator</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
订单实体类
package dbus.model;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Timestamp;
@Data
@ToString
public class Orders implements Serializable {
private Integer orderId;
private String orderNo;
private Integer userId;
private Integer goodId;
private BigDecimal goodsMoney;
private BigDecimal realTotalMoney;
private Integer payFrom;
private String province;
private Timestamp createTime;
}
随机数规则字典
orders.dic
orderNo=$Func{uuid()}
userId=$Func{intRand(10000,99999)}
goodId=$Func{intRand(1,5)}
goodsMoney=$Func{doubleRand(2099, 100000, 2)}
realTotalMoney=$Func{doubleRand(1000, 100000, 2)}
payFrom=1|||2|||1|||1|||2
province=$Func{intRand(1,34)}
UserEvent.dic
userId=$Func{intRand(10,99)}
channel=APP
eventType=VIEW_PRODUCT|||ADD_TO_CART|||REMOVE_FROM_CART
eventTime=$Func{timestamp()}
productId=$Func{intRand(100,150)}
price=600|||500|||300|||340|||246
amount=$Func{intRand(1,50)}
主程序
package dbus.demodata;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.google.common.collect.ImmutableMap;
import com.cloudwise.sdg.dic.DicInitializer;
import com.cloudwise.sdg.template.TemplateAnalyzer;
import dbus.config.GlobalConfig;
import dbus.model.Orders;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.types.Row;
import java.util.Map;
public class OrdersSimulator {
//构建字典映射
public static final Map<String, String> PROVINCE_MAP = new ImmutableMap
.Builder<String, String>()
.put("1", "北京")
.put("2", "上海")
.put("3", "天津")
.put("4", "重庆")
.put("5", "黑龙江")
.put("6", "吉林")
.put("7", "辽宁")
.put("8", "内蒙古")
.put("9", "河北")
.put("10", "新疆")
.put("11", "甘肃")
.put("12", "青海")
.put("13", "陕西")
.put("14", "宁夏")
.put("15", "河南")
.put("16", "山东")
.put("17", "山西")
.put("18", "安徽")
.put("19", "湖北")
.put("20", "湖南")
.put("21", "江苏")
.put("22", "四川")
.put("23", "贵州")
.put("24", "云南")
.put("25", "广西")
.put("26", "西藏")
.put("27", "浙江")
.put("28", "江西")
.put("29", "广东")
.put("30", "福建")
.put("31", "台湾")
.put("32", "海南")
.put("33", "香港")
.put("34", "澳门")
.build();
//数据库中字段名称
private static final String[] FIELD_NAMES = new String[]{
"orderNo",
"userId",
"goodId",
"goodsMoney",
"realTotalMoney",
"payFrom",
"province"
};
//对应的字段类型映射
private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.BIG_DEC_TYPE_INFO,
BasicTypeInfo.BIG_DEC_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};
private static final RowTypeInfo ROW_TYPE = new RowTypeInfo(FIELD_TYPES, FIELD_NAMES);
public static void main(String[] args) throws Exception {
//获取执行环境
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername(GlobalConfig.DRIVER_CLASS)
.setDBUrl(GlobalConfig.DB_URL)
.setUsername(GlobalConfig.USER_MAME)
.setPassword(GlobalConfig.PASSWORD)
.setBatchSize(GlobalConfig.BATCH_SIZE)
.setQuery("insert into zyd_orders (orderNo,userId ,goodId ,goodsMoney ,realTotalMoney ,payFrom ,province) values (?,?,?,?,?,?,?)")
.setParameterTypes(new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.BIG_DEC_TYPE_INFO,
BasicTypeInfo.BIG_DEC_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
}).build();
//模拟生成Orders
DataStreamSource<Row> orders = sEnv.addSource(new RichParallelSourceFunction<Row>() {
//定义状态标识位
private volatile boolean isRunning = true;
private TemplateAnalyzer ordersTplAnalyzer;
private Orders orders;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//加载数据字典,对于数据字段,文件夹名称dictionaries
DicInitializer.init();
//编辑模板
String ordersTpl = "{\"orderNo\":\"$Dic{orderNo}\",\"userId\":$Dic{userId},\"goodId\":$Dic{goodId},\"goodsMoney\":" +
"$Dic{goodsMoney},\"realTotalMoney\":$Dic{realTotalMoney},\"payFrom\":$Dic{payFrom},\"province\":$Dic{province}}";
//创建模板分词器
ordersTplAnalyzer = new TemplateAnalyzer("orders", ordersTpl);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void run(SourceContext<Row> sc) throws Exception {
while (isRunning) {
//使用fastjson 自动映射map
orders = JSON.parseObject(ordersTplAnalyzer.analyse(), new TypeReference<Orders>() {
});
sc.collect(Row.of(
orders.getOrderNo(),
orders.getUserId(),
orders.getGoodId(),
orders.getGoodsMoney(),
orders.getRealTotalMoney(),
orders.getPayFrom(),
PROVINCE_MAP.get(orders.getProvince())
));
}
long sleep = (long) (Math.random() * 2000);
Thread.sleep(sleep);
}
@Override
public void cancel() {
isRunning = false;
}
}, ROW_TYPE);
orders.print();
sink.emitDataStream(orders);
sEnv.execute();
}
}