文章目录
数据格式
日志数据
我们的日志结构大致可分为两类,一是普通页面埋点日志,二是启动日志。
普通页面日志结构如下,每条日志包含了,当前页面的页面信息,所有事件(动作)、所有曝光信息以及错误信息。除此之外,还包含了一系列公共信息,包括设备信息,地理位置,应用信息等,即下边的common字段。
-
普通页面埋点日志格式
{ "common": { -- 公共信息 "ar": "230000", -- 地区编码 "ba": "iPhone", -- 手机品牌 "ch": "Appstore", -- 渠道 "is_new": "1",--是否首日使用,首次使用的当日,该字段值为1,过了24:00,该字段置为0。 "md": "iPhone 8", -- 手机型号 "mid": "YXfhjAYH6As2z9Iq", -- 设备id "os": "iOS 13.2.9", -- 操作系统 "uid": "485", -- 会员id "vc": "v2.1.134" -- app版本号 }, "actions": [ --动作(事件) { "action_id": "favor_add", --动作id "item": "3", --目标id "item_type": "sku_id", --目标类型 "ts": 1585744376605 --动作时间戳 } ], "displays": [ { "displayType": "query", -- 曝光类型 "item": "3", -- 曝光对象id "item_type": "sku_id", -- 曝光对象类型 "order": 1, --出现顺序 "pos_id": 2 --曝光位置 }, { "displayType": "promotion", "item": "6", "item_type": "sku_id", "order": 2, "pos_id": 1 }, { "displayType": "promotion", "item": "9", "item_type": "sku_id", "order": 3, "pos_id": 3 }, { "displayType": "recommend", "item": "6", "item_type": "sku_id", "order": 4, "pos_id": 2 }, { "displayType": "query ", "item": "6", "item_type": "sku_id", "order": 5, "pos_id": 1 } ], "page": { --页面信息 "during_time": 7648, -- 持续时间毫秒 "item": "3", -- 目标id "item_type": "sku_id", -- 目标类型 "last_page_id": "login", -- 上页类型 "page_id": "good_detail", -- 页面ID "sourceType": "promotion" -- 来源类型 }, "err":{ --错误 "error_code": "1234", --错误码 "msg": "***********" --错误信息 }, "ts": 1585744374423 --跳入时间戳 }
-
启动日志格式
启动日志结构相对简单,主要包含公共信息,启动信息和错误信息。
{ "common": { "ar": "370000", "ba": "Honor", "ch": "wandoujia", "is_new": "1", "md": "Honor 20s", "mid": "eQF5boERMJFOujcp", "os": "Android 11.0", "uid": "76", "vc": "v2.1.134" }, "start": { "entry": "icon", --icon手机图标 notice 通知 install 安装后启动 "loading_time": 18803, --启动加载时间 "open_ad_id": 7, --广告页ID "open_ad_ms": 3449, -- 广告总共播放时间 "open_ad_skip_ms": 1989 -- 用户跳过广告时点 }, "err":{ --错误 "error_code": "1234", --错误码 "msg": "***********" --错误信息 }, "ts": 1585744304000 }
示例:
日志数据:
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone X","mid":"mid_3442864","os":"iOS 12.4.1","uid":"928","vc":"v2.1.134"},"start":{
"entry":"icon","loading_time":1361,"open_ad_id":17,"open_ad_ms":2968,"open_ad_skip_ms":0},"ts":1651303983000}
{
"common":{
"ar":"440000",
"ba":"iPhone",
"ch":"Appstore",
"is_new":"0",
"md":"iPhone X",
"mid":"mid_3442864",
"os":"iOS 12.4.1",
"uid":"928",
"vc":"v2.1.134"
},
"start":{
"entry":"icon",
"loading_time":1361,
"open_ad_id":17,
"open_ad_ms":2968,
"open_ad_skip_ms":0
},
"ts":1651303983000
}
{
"common":{
"ar":"110000","ba":"Xiaomi","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","os":"Android 11.0","uid":"513","vc":"v2.1.134"},"err":{
"error_code":2633,"msg":" Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)"},"start":{
"entry":"notice","loading_time":12438,"open_ad_id":7,"open_ad_ms":4407,"open_ad_skip_ms":0},"ts":1651217959000}
{
"actions":[{
"action_id":"favor_add","item":"5","item_type":"sku_id","ts":1651217964522}],"common":{
"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","os":"iOS 13.3.1","uid":"981","vc":"v2.0.1"},"displays":[{
"display_type":"query","item":"15","item_type":"sku_id","order":1,"pos_id":1},{
"display_type":"query","item":"26","item_type":"sku_id","order":2,"pos_id":3},{
"display_type":"query","item":"31","item_type":"sku_id","order":3,"pos_id":2},{
"display_type":"promotion","item":"29","item_type":"sku_id","order":4,"pos_id":5},{
"display_type":"query","item":"9","item_type":"sku_id","order":5,"pos_id":2},{
"display_type":"recommend","item":"1","item_type":"sku_id","order":6,"pos_id":1}],"err":{
"error_code":1559,"msg":" Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)"},"page":{
"during_time":7045,"item":"5","item_type":"sku_id","last_page_id":"good_list","page_id":"good_detail","source_type":"activity"},"ts":1651217961000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone X","mid":"mid_3442864","os":"iOS 12.4.1","uid":"928","vc":"v2.1.134"},"displays":[{
"display_type":"activity","item":"1","item_type":"activity_id","order":1,"pos_id":5},{
"display_type":"promotion","item":"2","item_type":"sku_id","order":2,"pos_id":3},{
"display_type":"query","item":"23","item_type":"sku_id","order":3,"pos_id":5},{
"display_type":"query","item":"9","item_type":"sku_id","order":4,"pos_id":5},{
"display_type":"query","item":"28","item_type":"sku_id","order":5,"pos_id":5},{
"display_type":"query","item":"14","item_type":"sku_id","order":6,"pos_id":3}],"page":{
"during_time":4970,"page_id":"home"},"ts":1651303983000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone X","mid":"mid_3442864","os":"iOS 12.4.1","uid":"928","vc":"v2.1.134"},"displays":[{
"display_type":"promotion","item":"8","item_type":"sku_id","order":1,"pos_id":5},{
"display_type":"promotion","item":"35","item_type":"sku_id","order":2,"pos_id":5},{
"display_type":"recommend","item":"17","item_type":"sku_id","order":3,"pos_id":3},{
"display_type":"promotion","item":"15","item_type":"sku_id","order":4,"pos_id":1},{
"display_type":"query","item":"32","item_type":"sku_id","order":5,"pos_id":4},{
"display_type":"recommend","item":"3","item_type":"sku_id","order":6,"pos_id":1},{
"display_type":"query","item":"33","item_type":"sku_id","order":7,"pos_id":2},{
"display_type":"query","item":"25","item_type":"sku_id","order":8,"pos_id":3},{
"display_type":"promotion","item":"4","item_type":"sku_id","order":9,"pos_id":5}],"page":{
"during_time":7626,"item":"小米盒子","item_type":"keyword","last_page_id":"home","page_id":"good_list"},"ts":1651303984000}
{
"actions":[{
"action_id":"favor_add","item":"33","item_type":"sku_id","ts":1651303989298},{
"action_id":"cart_add","item":"33","item_type":"sku_id","ts":1651303993596},{
"action_id":"get_coupon","item":"3","item_type":"coupon_id","ts":1651303997894}],"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone X","mid":"mid_3442864","os":"iOS 12.4.1","uid":"928","vc":"v2.1.134"},"displays":[{
"display_type":"promotion","item":"19","item_type":"sku_id","order":1,"pos_id":3},{
"display_type":"query","item":"27","item_type":"sku_id","order":2,"pos_id":4},{
"display_type":"query","item":"27","item_type":"sku_id","order":3,"pos_id":5},{
"display_type":"promotion","item":"5","item_type":"sku_id","order":4,"pos_id":4}],"page":{
"during_time":17195,"item":"33","item_type":"sku_id","last_page_id":"good_list","page_id":"good_detail","source_type":"promotion"},"ts":1651303985000}
{
"actions":[{
"action_id":"cart_remove","item":"30","item_type":"sku_id","ts":1651303990453}],"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone X","mid":"mid_3442864","os":"iOS 12.4.1","uid":"928","vc":"v2.1.134"},"page":{
"during_time":8907,"last_page_id":"good_detail","page_id":"cart"},"ts":1651303986000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone X","mid":"mid_3442864","os":"iOS 12.4.1","uid":"928","vc":"v2.1.134"},"page":{
"during_time":1957,"item":"19,3,20","item_type":"sku_ids","last_page_id":"cart","page_id":"trade"},"ts":1651303987000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone X","mid":"mid_3442864","os":"iOS 12.4.1","uid":"928","vc":"v2.1.134"},"page":{
"during_time":11912,"item":"34,4","item_type":"sku_ids","last_page_id":"trade","page_id":"payment"},"ts":1651303988000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"start":{
"entry":"icon","loading_time":11717,"open_ad_id":4,"open_ad_ms":4719,"open_ad_skip_ms":0},"ts":1651303983000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"displays":[{
"display_type":"activity","item":"1","item_type":"activity_id","order":1,"pos_id":1},{
"display_type":"activity","item":"2","item_type":"activity_id","order":2,"pos_id":1},{
"display_type":"query","item":"24","item_type":"sku_id","order":3,"pos_id":1},{
"display_type":"promotion","item":"21","item_type":"sku_id","order":4,"pos_id":5},{
"display_type":"query","item":"11","item_type":"sku_id","order":5,"pos_id":5},{
"display_type":"recommend","item":"28","item_type":"sku_id","order":6,"pos_id":4},{
"display_type":"query","item":"23","item_type":"sku_id","order":7,"pos_id":3},{
"display_type":"query","item":"34","item_type":"sku_id","order":8,"pos_id":5},{
"display_type":"query","item":"22","item_type":"sku_id","order":9,"pos_id":4},{
"display_type":"promotion","item":"5","item_type":"sku_id","order":10,"pos_id":4}],"page":{
"during_time":14351,"page_id":"home"},"ts":1651303983000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"page":{
"during_time":10266,"last_page_id":"home","page_id":"search"},"ts":1651303984000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"displays":[{
"display_type":"query","item":"35","item_type":"sku_id","order":1,"pos_id":2},{
"display_type":"query","item":"16","item_type":"sku_id","order":2,"pos_id":3},{
"display_type":"query","item":"34","item_type":"sku_id","order":3,"pos_id":1},{
"display_type":"query","item":"6","item_type":"sku_id","order":4,"pos_id":2},{
"display_type":"query","item":"5","item_type":"sku_id","order":5,"pos_id":2},{
"display_type":"query","item":"35","item_type":"sku_id","order":6,"pos_id":1},{
"display_type":"promotion","item":"32","item_type":"sku_id","order":7,"pos_id":5}],"page":{
"during_time":9012,"item":"小米","item_type":"keyword","last_page_id":"search","page_id":"good_list"},"ts":1651303985000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"displays":[{
"display_type":"promotion","item":"19","item_type":"sku_id","order":1,"pos_id":3},{
"display_type":"recommend","item":"14","item_type":"sku_id","order":2,"pos_id":5},{
"display_type":"promotion","item":"16","item_type":"sku_id","order":3,"pos_id":4},{
"display_type":"query","item":"34","item_type":"sku_id","order":4,"pos_id":3},{
"display_type":"recommend","item":"10","item_type":"sku_id","order":5,"pos_id":2},{
"display_type":"promotion","item":"31","item_type":"sku_id","order":6,"pos_id":4}],"page":{
"during_time":12336,"item":"26","item_type":"sku_id","last_page_id":"good_list","page_id":"good_detail","source_type":"recommend"},"ts":1651303986000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"page":{
"during_time":8465,"last_page_id":"good_detail","page_id":"login"},"ts":1651303987000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"page":{
"during_time":4460,"last_page_id":"login","page_id":"register"},"ts":1651303988000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"displays":[{
"display_type":"recommend","item":"28","item_type":"sku_id","order":1,"pos_id":2},{
"display_type":"query","item":"26","item_type":"sku_id","order":2,"pos_id":1},{
"display_type":"query","item":"12","item_type":"sku_id","order":3,"pos_id":4},{
"display_type":"query","item":"35","item_type":"sku_id","order":4,"pos_id":2},{
"display_type":"recommend","item":"31","item_type":"sku_id","order":5,"pos_id":1},{
"display_type":"query","item":"20","item_type":"sku_id","order":6,"pos_id":5},{
"display_type":"query","item":"19","item_type":"sku_id","order":7,"pos_id":2},{
"display_type":"promotion","item":"23","item_type":"sku_id","order":8,"pos_id":5},{
"display_type":"query","item":"3","item_type":"sku_id","order":9,"pos_id":5},{
"display_type":"query","item":"35","item_type":"sku_id","order":10,"pos_id":1}],"page":{
"during_time":9236,"item":"33","item_type":"sku_id","last_page_id":"register","page_id":"good_detail","source_type":"query"},"ts":1651303989000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"page":{
"during_time":8224,"last_page_id":"good_detail","page_id":"cart"},"ts":1651303990000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"page":{
"during_time":11863,"item":"34,27,14","item_type":"sku_ids","last_page_id":"cart","page_id":"trade"},"ts":1651303991000}
{
"common":{
"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"page":{
"during_time":19061,"item":"35,26","item_type":"sku_ids","last_page_id":"trade","page_id":"payment"},"ts":1651303992000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_51315","os":"iOS 13.2.3","uid":"603","vc":"v2.1.132"},"start":{
"entry":"notice","loading_time":1087,"open_ad_id":1,"open_ad_ms":9832,"open_ad_skip_ms":0},"ts":1651303983000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_51315","os":"iOS 13.2.3","uid":"603","vc":"v2.1.132"},"displays":[{
"display_type":"activity","item":"2","item_type":"activity_id","order":1,"pos_id":5},{
"display_type":"recommend","item":"35","item_type":"sku_id","order":2,"pos_id":5},{
"display_type":"query","item":"23","item_type":"sku_id","order":3,"pos_id":5},{
"display_type":"promotion","item":"18","item_type":"sku_id","order":4,"pos_id":4},{
"display_type":"recommend","item":"9","item_type":"sku_id","order":5,"pos_id":5},{
"display_type":"promotion","item":"22","item_type":"sku_id","order":6,"pos_id":2},{
"display_type":"query","item":"19","item_type":"sku_id","order":7,"pos_id":4},{
"display_type":"promotion","item":"18","item_type":"sku_id","order":8,"pos_id":1},{
"display_type":"query","item":"28","item_type":"sku_id","order":9,"pos_id":5},{
"display_type":"query","item":"31","item_type":"sku_id","order":10,"pos_id":3}],"page":{
"during_time":2952,"page_id":"home"},"ts":1651303983000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_51315","os":"iOS 13.2.3","uid":"603","vc":"v2.1.132"},"page":{
"during_time":17033,"last_page_id":"home","page_id":"search"},"ts":1651303984000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_51315","os":"iOS 13.2.3","uid":"603","vc":"v2.1.132"},"displays":[{
"display_type":"promotion","item":"33","item_type":"sku_id","order":1,"pos_id":3},{
"display_type":"promotion","item":"20","item_type":"sku_id","order":2,"pos_id":1},{
"display_type":"query","item":"14","item_type":"sku_id","order":3,"pos_id":1},{
"display_type":"query","item":"29","item_type":"sku_id","order":4,"pos_id":2},{
"display_type":"query","item":"34","item_type":"sku_id","order":5,"pos_id":2},{
"display_type":"query","item":"1","item_type":"sku_id","order":6,"pos_id":1}],"page":{
"during_time":11092,"item":"苹果手机","item_type":"keyword","last_page_id":"search","page_id":"good_list"},"ts":1651303985000}
{
"actions":[{
"action_id":"get_coupon","item":"3","item_type":"coupon_id","ts":1651303990081}],"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_51315","os":"iOS 13.2.3","uid":"603","vc":"v2.1.132"},"displays":[{
"display_type":"query","item":"31","item_type":"sku_id","order":1,"pos_id":2},{
"display_type":"query","item":"19","item_type":"sku_id","order":2,"pos_id":1},{
"display_type":"query","item":"22","item_type":"sku_id","order":3,"pos_id":3},{
"display_type":"query","item":"26","item_type":"sku_id","order":4,"pos_id":4},{
"display_type":"query","item":"6","item_type":"sku_id","order":5,"pos_id":3},{
"display_type":"query","item":"11","item_type":"sku_id","order":6,"pos_id":2},{
"display_type":"query","item":"21","item_type":"sku_id","order":7,"pos_id":1},{
"display_type":"query","item":"22","item_type":"sku_id","order":8,"pos_id":4},{
"display_type":"query","item":"5","item_type":"sku_id","order":9,"pos_id":3},{
"display_type":"query","item":"25","item_type":"sku_id","order":10,"pos_id":1}],"page":{
"during_time":8162,"item":"31","item_type":"sku_id","last_page_id":"good_list","page_id":"good_detail","source_type":"promotion"},"ts":1651303986000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_51315","os":"iOS 13.2.3","uid":"603","vc":"v2.1.132"},"page":{
"during_time":18138,"last_page_id":"good_detail","page_id":"login"},"ts":1651303987000}
{
"actions":[{
"action_id":"cart_add","item":"4","item_type":"sku_id","ts":1651303991527},{
"action_id":"get_coupon","item":"1","item_type":"coupon_id","ts":1651303995054}],"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_51315","os":"iOS 13.2.3","uid":"603","vc":"v2.1.132"},"displays":[{
"display_type":"query","item":"28","item_type":"sku_id","order":1,"pos_id":1},{
"display_type":"recommend","item":"20","item_type":"sku_id","order":2,"pos_id":2},{
"display_type":"query","item":"3","item_type":"sku_id","order":3,"pos_id":2},{
"display_type":"query","item":"2","item_type":"sku_id","order":4,"pos_id":3},{
"display_type":"query","item":"4","item_type":"sku_id","order":5,"pos_id":4}],"page":{
"during_time":10583,"item":"4","item_type":"sku_id","last_page_id":"login","page_id":"good_detail","source_type":"recommend"},"ts":1651303988000}
{
"actions":[{
"action_id":"cart_minus_num","item":"32","item_type":"sku_id","ts":1651303991656}],"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_51315","os":"iOS 13.2.3","uid":"603","vc":"v2.1.132"},"page":{
"during_time":5312,"last_page_id":"good_detail","page_id":"cart"},"ts":1651303989000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_51315","os":"iOS 13.2.3","uid":"603","vc":"v2.1.132"},"page":{
"during_time":14512,"item":"21","item_type":"sku_ids","last_page_id":"cart","page_id":"trade"},"ts":1651303990000}
{
"common":{
"ar":"440000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_51315","os":"iOS 13.2.3","uid":"603","vc":"v2.1.132"},"page":{
"during_time":10632,"item":"34,9","item_type":"sku_ids","last_page_id":"trade","page_id":"payment"},"ts":1651303991000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_5445386","os":"iOS 13.3.1","uid":"271","vc":"v2.1.134"},"start":{
"entry":"notice","loading_time":13546,"open_ad_id":3,"open_ad_ms":8113,"open_ad_skip_ms":0},"ts":1651303983000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_5445386","os":"iOS 13.3.1","uid":"271","vc":"v2.1.134"},"displays":[{
"display_type":"activity","item":"2","item_type":"activity_id","order":1,"pos_id":1},{
"display_type":"activity","item":"1","item_type":"activity_id","order":2,"pos_id":1},{
"display_type":"query","item":"8","item_type":"sku_id","order":3,"pos_id":1},{
"display_type":"query","item":"11","item_type":"sku_id","order":4,"pos_id":3},{
"display_type":"query","item":"34","item_type":"sku_id","order":5,"pos_id":2},{
"display_type":"query","item":"3","item_type":"sku_id","order":6,"pos_id":1},{
"display_type":"promotion","item":"22","item_type":"sku_id","order":7,"pos_id":1},{
"display_type":"promotion","item":"8","item_type":"sku_id","order":8,"pos_id":5},{
"display_type":"query","item":"3","item_type":"sku_id","order":9,"pos_id":5},{
"display_type":"promotion","item":"10","item_type":"sku_id","order":10,"pos_id":1},{
"display_type":"query","item":"16","item_type":"sku_id","order":11,"pos_id":4},{
"display_type":"query","item":"29","item_type":"sku_id","order":12,"pos_id":2}],"page":{
"during_time":2933,"page_id":"home"},"ts":1651303983000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_5445386","os":"iOS 13.3.1","uid":"271","vc":"v2.1.134"},"page":{
"during_time":19036,"last_page_id":"home","page_id":"mine"},"ts":1651303984000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_5445386","os":"iOS 13.3.1","uid":"271","vc":"v2.1.134"},"page":{
"during_time":19944,"last_page_id":"mine","page_id":"orders_unpaid"},"ts":1651303985000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_5445386","os":"iOS 13.3.1","uid":"271","vc":"v2.1.134"},"page":{
"during_time":10109,"item":"3,7","item_type":"sku_ids","last_page_id":"orders_unpaid","page_id":"trade"},"ts":1651303986000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_5445386","os":"iOS 13.3.1","uid":"271","vc":"v2.1.134"},"page":{
"during_time":2496,"item":"20,23,26","item_type":"sku_ids","last_page_id":"trade","page_id":"payment"},"ts":1651303987000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_7826605","os":"iOS 13.3.1","uid":"884","vc":"v2.1.132"},"start":{
"entry":"icon","loading_time":6558,"open_ad_id":5,"open_ad_ms":2562,"open_ad_skip_ms":2071},"ts":1651303983000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_7826605","os":"iOS 13.3.1","uid":"884","vc":"v2.1.132"},"displays":[{
"display_type":"activity","item":"1","item_type":"activity_id","order":1,"pos_id":2},{
"display_type":"promotion","item":"21","item_type":"sku_id","order":2,"pos_id":5},{
"display_type":"query","item":"20","item_type":"sku_id","order":3,"pos_id":2},{
"display_type":"recommend","item":"10","item_type":"sku_id","order":4,"pos_id":1},{
"display_type":"promotion","item":"35","item_type":"sku_id","order":5,"pos_id":2},{
"display_type":"promotion","item":"29","item_type":"sku_id","order":6,"pos_id":3},{
"display_type":"promotion","item":"30","item_type":"sku_id","order":7,"pos_id":5},{
"display_type":"query","item":"8","item_type":"sku_id","order":8,"pos_id":2},{
"display_type":"promotion","item":"18","item_type":"sku_id","order":9,"pos_id":4},{
"display_type":"promotion","item":"12","item_type":"sku_id","order":10,"pos_id":2}],"page":{
"during_time":13109,"page_id":"home"},"ts":1651303983000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_7826605","os":"iOS 13.3.1","uid":"884","vc":"v2.1.132"},"page":{
"during_time":6071,"last_page_id":"home","page_id":"mine"},"ts":1651303984000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_7826605","os":"iOS 13.3.1","uid":"884","vc":"v2.1.132"},"page":{
"during_time":4795,"last_page_id":"mine","page_id":"orders_unpaid"},"ts":1651303985000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_7826605","os":"iOS 13.3.1","uid":"884","vc":"v2.1.132"},"page":{
"during_time":5665,"item":"16,3,23","item_type":"sku_ids","last_page_id":"orders_unpaid","page_id":"trade"},"ts":1651303986000}
{
"common":{
"ar":"110000","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_7826605","os":"iOS 13.3.1","uid":"884","vc":"v2.1.132"},"page":{
"during_time":6308,"item":"34,23,7","item_type":"sku_ids","last_page_id":"trade","page_id":"payment"},"ts":1651303987000}
业务数据
Maxwell数据格式:
{
"database":"gmall",
"table":"cart_info",
"type":"update",
"ts":1592270938, //注意ts是maxwell加的,后面DWS开窗使用的是自身数据的create_time
"xid":13090,
"xoffset":1573,
"data":{
"id":100924,
"user_id":"93",
"sku_id":16,
"cart_price":4488,
"sku_num":1,
"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg",
"sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机",
"is_checked":null,
"create_time":"2020-06-14 09:28:57",
"operate_time":null,
"is_ordered":1,
"order_time":"2021-10-17 09:28:58",
"source_type":"2401",
"source_id":null
},
"old":{
"is_ordered":0,
"order_time":null
}
}
FlinkCDC读取数据
在DIM层用到了FlinkCDC。
{
"before":null,
"after":{
"source_table":"base_trademark",
"sink_table":"dim_base_trademark",
"sink_columns":"id,tm_name",
"sink_pk":"id",
"sink_extend":null
},
"source":{
"version":"1.5.4.Final",
"connector":"mysql",
"name":"mysql_binlog_source",
"ts_ms":1655172926148,
"snapshot":"false",
"db":"gmall-211227-config",
"sequence":null,
"table":"table_process",
"server_id":0,
"gtid":null,
"file":"",
"pos":0,
"row":0,
"thread":null,
"query":null
},
"op":"r",
"ts_ms":1655172926150,
"transaction":null
}
数仓分析
实时数仓分层:
- 计算框架:Flink;存储框架:消息队列(可以实时读取&可以实时写入)
- ODS:Kafka
使用场景:每过来一条数据,读取到并加工处理 - DIM:HBase
使用场景:事实表会根据主键获取一行维表数据(1.永久存储、2.根据主键查询)
HBase:海量数据永久存储,根据主键快速查询 √
Redis:用户表数据量大,内存数据库 ×
ClickHouse:并发不行,列存 ×
ES:默认给所有字段创建索引 ×
Hive(HDFS):效率低下 ×
Mysql本身:压力太大,实在要用就使用从库 √ - DWD:Kafka
使用场景:每过来一条数据,读取到并分组累加处理 - DWS:ClickHouse
使用场景:每过来一条数据,读取到并重新分组、累加处理 - ADS:不落盘,实质上是接口模块中查询ClickHouse的SQL语句
使用场景:读取最终结果数据展示 - DWS:用户、省份、商品 GMV
- ADS:用户 GMV
省份 GMV
商品 GMV
省份、商品 GMV
ODS层
采集到 Kafka 的 topic_log 和 topic_db 主题的数据即为实时数仓的 ODS 层,这一层的作用是对数据做原样展示和备份。
DIM层
DIM层设计要点:
-
DIM层的设计依据是维度建模理论,该层存储维度模型的维度表。
-
DIM层的数据存储在 HBase 表中
DIM 层表是用于维度关联的,要通过主键去获取相关维度信息,这种场景下 K-V 类型数据库的效率较高。常见的 K-V 类型数据库有 Redis、HBase,而 Redis 的数据常驻内存,会给内存造成较大压力,因而选用 HBase 存储维度数据。
-
DIM层表名的命名规范为dim_表名
配置表
本层的任务是将业务数据直接写入到不同的 HBase 表中。那么如何让程序知道流中的哪些数据是维度数据?维度数据又应该写到 HBase 的哪些表中?为了解决这个问题,我们有以下几种方法:
-
在代码中给定十几张维表的表名,但是如果增加维表,需要修改代码-重新编译-打包-上传、重启任务;
-
优化1:不修改代码、只重启任务。配置信息中保存需要的维表信息,配置信息只在程序启动的时候加载一次。
-
优化2:不修改代码、不重启任务。让程序在启动以后还可以获取配置信息中增加的内容。
具体实施:
-
定时任务:每隔一段时间加载一次配置信息,将定时任务写在Open方法
-
监控配置信息:一旦配置信息增加了数据,可以立马获取到
(1)MySQLBinlog:FlinkCDC监控直接创建流
a.配置信息处理成广播流:缺点 -> 如果配置信息过大,冗余太多
b.按照表名进行KeyBy处理:缺点 -> 有可能产生数据倾斜
(2)文件:Flume->Kafka->Flink消费创建流
-
我们选择在 MySQL 中构建一张配置表,通过 Flink CDC 将配置表信息读取到程序中。
配置表设计:
(1)我们将为配置表设计五个字段
- source_table:作为数据源的业务数据表名
- sink_table:作为数据目的地的 Phoenix 表名
- sink_columns:Phoenix 表字段
- sink_pk:Phoenix 表主键
- sink_extend:Phoenix 建表扩展,即建表时一些额外的配置语句
将 source_table 作为配置表的主键,可以通过它获取唯一的目标表名、字段、主键和建表扩展,从而得到完整的 Phoenix 建表语句。
(2)在Mysql中创建数据库建表并开启Binlog:
mysql -uroot -p000000 -e"create database gmall_config charset utf8 default collate utf8_general_ci"
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
在MySQL配置文件中增加 gmall_config 开启Binlog。
导入数据:
/*
SQLyog
MySQL - 5.7.29-log : Database - gmall_realtime
*********************************************************************
*/
/*!40101 SET NAMES utf8 */;
/*!40101 SET SQL_MODE=''*/;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
/*Table structure for table `table_process` */
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*Data for the table `table_process` */
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_info', 'dim_activity_info', 'id,activity_name,activity_type,activity_desc,start_time,end_time,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_rule', 'dim_activity_rule', 'id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_sku', 'dim_activity_sku', 'id,activity_id,sku_id,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category1', 'dim_base_category1', 'id,name', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category2', 'dim_base_category2', 'id,name,category1_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category3', 'dim_base_category3', 'id,name,category2_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_province', 'dim_base_province', 'id,name,region_id,area_code,iso_code,iso_3166_2', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_region', 'dim_base_region', 'id,region_name', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_trademark', 'dim_base_trademark', 'id,tm_name', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_info', 'dim_coupon_info', 'id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_range', 'dim_coupon_range', 'id,coupon_id,range_type,range_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('financial_sku_cost', 'dim_financial_sku_cost', 'id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('sku_info', 'dim_sku_info', 'id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time', 'id', ' SALT_BUCKETS = 4');
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('spu_info', 'dim_spu_info', 'id,spu_name,description,category3_id,tm_id','id', 'SALT_BUCKETS = 3');
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('user_info', 'dim_user_info', 'id,login_name,name,user_level,birthday,gender,create_time,operate_time', 'id', ' SALT_BUCKETS = 3');
结果:
数据格式
Maxwell数据格式展示(主流数据):
bin/maxwell-bootstrap --database gmall-211126-flink --table base_trademark --config ./config.properties
保留的:
{
"database":"gmall-211126-flink","table":"base_trademark","type":"insert","ts":1652499161,"xid":167,"commit":true,"data":{
"id":13,"tm_name":"atguigu","logo_url":"/aaa/aaa"}}
{
"database":"gmall-211126-flink","table":"base_trademark","type":"update","ts":1652499176,"xid":188,"commit":true,"data":{
"id":13,"tm_name":"atguigu","logo_url":"/bbb/bbb"},"old":{
"logo_url":"/aaa/aaa"}}
{
"database":"gmall-211126-flink","table":"base_trademark","type":"bootstrap-insert","ts":1652499295,"data":{
"id":1,"tm_name":"三星","logo_url":"/static/default.jpg"}}
过滤掉:
{
"database":"gmall-211126-flink","table":"base_trademark","type":"delete","ts":1652499184,"xid":201,"commit":true,"data":{
"id":13,"tm_name":"atguigu","logo_url":"/bbb/bbb"}}
{
"database":"gmall-211126-flink","table":"base_trademark","type":"bootstrap-start","ts":1652499295,"data":{
}}
{
"database":"gmall-211126-flink","table":"base_trademark","type":"bootstrap-complete","ts":1652499295,"data":{
}}
配置信息数据(广播流):
{
"before":null,"after":{
"source_table":"aa","sink_table":"bb","sink_columns":"cc","sink_pk":"id","sink_extend":"xxx"},"source":{
"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1652513039549,"snapshot":"false","db":"gmall-211126-config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1652513039551,"transaction":null}
主要任务
(1)接收Kafka数据,过滤数据
对Maxwell抓取的数据进行ETL,有用的部分保留,没用的过滤掉。
- 空数据过滤掉
- 非json数据过滤掉
- 保留insert、update、bootstrap-insert数据,过滤掉bootstrap-start、bootstrap-complete数据。
(2)动态拆分维度表功能
由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个维度表拆开处理。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张维度表表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。这种可以有三个方案实现:
- 一种是用Zookeeper存储,通过Watch感知数据变化;
- 另一种是用mysql数据库存储,周期性的同步;
- 再一种是用mysql数据库存储,使用广播流。
这里选择第三种方案,主要是MySQL对于配置数据初始化和维护管理,使用FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。
所以就有了如下图:
(3)把流中的数据保存到对应的维度表
维度数据保存到HBase的表中。
代码实现
接收Kafka数据,过滤数据
(1)创建 KafkaUtil工具类
和 Kafka 交互要用到 Flink 提供的 FlinkKafkaConsumer、FlinkKafkaProducer 类,为了提高模板代码的复用性,将其封装到 KafkaUtil 工具类中。
此处从 Kafka 读取数据,创建 getFlinkKafkaConsumer(String topic, String groupId) 方法:
public class MyKafkaUtil {
private static final String KAFKA_SERVER = "hadoop102:9092";
public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic, String groupId) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new FlinkKafkaConsumer<String>(
topic,
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record == null || record.value() == null) {
return "";
} else {
return new String(record.value());
}
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
},
properties);
}
}
(2)主程序
//数据流:web/app -> nginx -> 业务服务器 -> Mysql(binlog) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Phoenix
//程 序:Mock -> Mysql(binlog) -> Maxwell -> Kafka(ZK) -> DimApp(FlinkCDC/Mysql) -> Phoenix(HBase/ZK/HDFS)
public class DimApp {
public static void main(String[] args) throws Exception {
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); //生产环境中设置为Kafka主题的分区数
//1.1 开启CheckPoint
//env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
//env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//1.2 设置状态后端
//env.setStateBackend(new HashMapStateBackend());
//env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/211126/ck");
//System.setProperty("HADOOP_USER_NAME", "atguigu");
//TODO 2.读取 Kafka topic_db 主题数据创建主流
String topic = "topic_db";
String groupId = "dim_app_211126";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));
//TODO 3.过滤掉非JSON数据&保留新增、变化以及初始化数据并将数据转换为JSON格式
//flatMap这里替换了filter和map操作
SingleOutputStreamOperator<JSONObject> filterJsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
@Override
public void flatMap(String value, Collector<JSONObject> out) throws Exception {
try {
//将数据转换为JSON格式
JSONObject jsonObject = JSON.parseObject(value);
//获取数据中的操作类型字段
String type = jsonObject.getString("type");
//保留新增、变化以及初始化数据
if ("insert".equals(type) || "update".equals(type) || "bootstrap-insert".equals(type)) {
out.collect(jsonObject);
}
} catch (Exception e) {
System.out.println("发现脏数据:" + value);
}
}
});
//TODO 4.使用FlinkCDC读取MySQL配置信息表创建配置流
//TODO 5.将配置流处理为广播流
//TODO 6.连接主流与广播流
//TODO 7.处理连接流,根据配置信息处理主流数据
//TODO 8.将数据写出到Phoenix
//TODO 9.启动任务
env.execute("DimApp");
}
根据MySQL的配置表,动态进行分流
(1)导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 如果不引入 flink-table 相关依赖,则会报错:
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.connector.base.source.reader.RecordEmitter
引入以下依赖可以解决这个问题(引入某些其它的 flink-table相关依赖也可)
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.13.0</version>
</dependency>
(2)创建配置表实体类
@Data
public class TableProcess {
//来源表
String sourceTable;
//输出表
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
(3)编写操作读取配置表形成广播流
//TODO 4.使用FlinkCDC读取MySQL配置信息表创建配置流
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("000000")
.databaseList("gmall-211126-config")
.tableList("gmall-211126-config.table_process")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> mysqlSourceDS = env.fromSource(mySqlSource,
WatermarkStrategy.noWatermarks(),
"MysqlSource");
//TODO 5.将配置流处理为广播流
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
BroadcastStream<String> broadcastStream = mysqlSourceDS.broadcast(mapStateDescriptor);
//TODO 6.连接主流与广播流
BroadcastConnectedStream<JSONObject, String> connectedStream = filterJsonObjDS.connect(broadcastStream);
//TODO 7.处理连接流,根据配置信息处理主流数据
SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));
(4)定义一个项目中常用的配置常量类GmallConfig
public class GmallConfig {
// Phoenix库名
public static final String HBASE_SCHEMA = "GMALL211126_REALTIME";
// Phoenix驱动
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
// Phoenix连接参数
public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
}
(5)自定义函数TableProcessFunction
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private Connection connection;
private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
public TableProcessFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.mapStateDescriptor = mapStateDescriptor;
}
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
//value:{"before":null,"after":{"source_table":"aa","sink_table":"bb","sink_columns":"cc","sink_pk":"id","sink_extend":"xxx"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1652513039549,"snapshot":"false","db":"gmall-211126-config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1652513039551,"transaction":null}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
//1.获取并解析数据
JSONObject jsonObject = JSON.parseObject(value);
TableProcess tableProcess = JSON.parseObject(jsonObject.getString("after"), TableProcess.class);
//2.校验并建表
checkTable(tableProcess.getSinkTable(),
tableProcess.getSinkColumns(),
tableProcess.getSinkPk(),
tableProcess.getSinkExtend());
//3.写入状态,广播出去
BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
broadcastState.put(tableProcess.getSourceTable(), tableProcess);
}
/**
* 校验并建表:create table if not exists db.tn(id varchar primary key,bb varchar,cc varchar) xxx
*
* @param sinkTable Phoenix表名
* @param sinkColumns Phoenix表字段
* @param sinkPk Phoenix表主键
* @param sinkExtend Phoenix表扩展字段
*/
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
PreparedStatement preparedStatement = null;
try {
//处理特殊字段
if (sinkPk == null || "".equals(sinkPk)) {
sinkPk = "id";
}
if (sinkExtend == null) {
sinkExtend = "";
}
//拼接SQL:create table if not exists db.tn(id varchar primary key,bb varchar,cc varchar) xxx
StringBuilder createTableSql = new StringBuilder("create table if not exists ")
.append(GmallConfig.HBASE_SCHEMA)
.append(".")
.append(sinkTable)
.append("(");
String[] columns = sinkColumns.split(",");
for (int i = 0; i < columns.length; i++) {
//取出字段
String column = columns[i];
//判断是否为主键
if (sinkPk.equals(column)) {
createTableSql.append(column).append(" varchar primary key");
} else {
createTableSql.append(column).append(" varchar");
}
//判断是否为最后一个字段
if (i < columns.length - 1) {
createTableSql.append(",");
}
}
createTableSql.append(")").append(sinkExtend);
//编译SQL
System.out.println("建表语句为:" + createTableSql);
preparedStatement = connection.prepareStatement(createTableSql.toString());
//执行SQL,建表
preparedStatement.execute();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException("建表失败:" + sinkTable);
} finally {
//释放资源
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
//value:{"database":"gmall-211126-flink","table":"base_trademark","type":"update","ts":1652499176,"xid":188,"commit":true,"data":{"id":13,"tm_name":"atguigu","logo_url":"/bbb/bbb"},"old":{"logo_url":"/aaa/aaa"}}
//value:{"database":"gmall-211126-flink","table":"order_info","type":"update","ts":1652499176,"xid":188,"commit":true,"data":{"id":13,...},"old":{"xxx":"/aaa/aaa"}}
@Override
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
//1.获取广播的配置数据
ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String table = value.getString("table");
TableProcess tableProcess = broadcastState.get(table);
if (tableProcess != null) {
//2.过滤字段
filterColumn(value.getJSONObject("data"), tableProcess.getSinkColumns());
//3.补充SinkTable并写出到流中
value.put("sinkTable", tableProcess.getSinkTable());
out.collect(value);
} else {
System.out.println("找不到对应的Key:" + table);
}
}
/**
* 过滤字段
*
* @param data {"id":13,"tm_name":"atguigu","logo_url":"/bbb/bbb"}
* @param sinkColumns "id,tm_name"
*/
private void filterColumn(JSONObject data, String sinkColumns) {
//切分sinkColumns
String[] columns = sinkColumns.split(",");
List<String> columnList = Arrays.asList(columns);
// Set<Map.Entry<String, Object>> entries = data.entrySet();
// Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
// while (iterator.hasNext()) {
// Map.Entry<String, Object> next = iterator.next();
// if (!columnList.contains(next.getKey())) {
// iterator.remove();
// }
// }
Set<Map.Entry<String, Object>> entries = data.entrySet();
entries.removeIf(next -> !columnList.contains(next.getKey()));
}
}
保存维度到HBase(Phoenix)
DimSink 继承了RickSinkFunction,这个function得分两条时间线:
- 一条是任务启动时执行open操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行;
- 另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。
(1)创建 PhoenixUtil 工具类,在其中创建 upsertValues()方法
public class PhoenixUtil {
/**
* @param connection Phoenix连接
* @param sinkTable 表名 tn
* @param data 数据 {"id":"1001","name":"zhangsan","sex":"male"}
*/
public static void upsertValues(DruidPooledConnection connection, String sinkTable, JSONObject data) throws SQLException {
//1.拼接SQL语句:upsert into db.tn(id,name,sex) values('1001','zhangsan','male')
Set<String> columns = data.keySet();
Collection<Object> values = data.values();
//StringUtils.join(columns, ",") == columns.mkString(",") ==> id,name,sex
String sql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTable + "(" +
StringUtils.join(columns, ",") + ") values ('" +
StringUtils.join(values, "','") + "')";
//2.预编译SQL
PreparedStatement preparedStatement = connection.prepareStatement(sql);
//3.执行
preparedStatement.execute();
connection.commit();
//4.释放资源
preparedStatement.close();
}
}
(2)DimSinkFunction
自定义 SinkFunction 子类 DimSinkFunction,在其中调用 Phoenix 工具类的 upsertValues 方法,将维度数据写出到 Phoenix 的维度表中。为了提升效率,减少频繁创建销毁连接带来的性能损耗,创建连接池。
1)添加德鲁伊连接池依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.16</version>
</dependency>
2)连接池创建工具类:
public class DruidDSUtil {
private static DruidDataSource druidDataSource = null;
public static DruidDataSource createDataSource() {
// 创建连接池
druidDataSource = new DruidDataSource();
// 设置驱动全类名
druidDataSource.setDriverClassName(GmallConfig.PHOENIX_DRIVER);
// 设置连接 url
druidDataSource.setUrl(GmallConfig.PHOENIX_SERVER);
// 设置初始化连接池时池中连接的数量
druidDataSource.setInitialSize(5);
// 设置同时活跃的最大连接数
druidDataSource.setMaxActive(20);
// 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0
druidDataSource.setMinIdle(1);
// 设置没有空余连接时的等待时间,超时抛出异常,-1 表示一直等待
druidDataSource.setMaxWait(-1);
// 验证连接是否可用使用的 SQL 语句
druidDataSource.setValidationQuery("select 1");
// 指明连接是否被空闲连接回收器(如果有)进行检验,如果检测失败,则连接将被从池中去除
// 注意,默认值为 true,如果没有设置 validationQuery,则报错
// testWhileIdle is true, validationQuery not set
druidDataSource.setTestWhileIdle(true);
// 借出连接时,是否测试,设置为 false,不测试,否则很影响性能
druidDataSource.setTestOnBorrow(false);
// 归还连接时,是否测试
druidDataSource.setTestOnReturn(false);
// 设置空闲连接回收器每隔 30s 运行一次
druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);
// 设置池中连接空闲 30min 被回收,默认值即为 30 min
druidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);
return druidDataSource;
}
}
3)DimSinkFunction
public class DimSinkFunction extends RichSinkFunction<JSONObject> {
private DruidDataSource druidDataSource = null;
@Override
public void open(Configuration parameters) throws Exception {
druidDataSource = DruidDSUtil.createDataSource();
}
//value:{"database":"gmall-211126-flink","table":"base_trademark","type":"update","ts":1652499176,"xid":188,"commit":true,"data":{"id":13,"tm_name":"atguigu"},"old":{"logo_url":"/aaa/aaa"},"sinkTable":"dim_xxx"}
//value:{"database":"gmall-211126-flink","table":"order_info","type":"update","ts":1652499176,"xid":188,"commit":true,"data":{"id":13,...},"old":{"xxx":"/aaa/aaa"},"sinkTable":"dim_xxx"}
@Override
public void invoke(JSONObject value, Context context) throws Exception {
//获取连接
DruidPooledConnection connection = druidDataSource.getConnection();
String sinkTable = value.getString("sinkTable");
JSONObject data = value.getJSONObject("data");
//获取数据类型
String type = value.getString("type");
//如果为更新数据,则需要删除Redis中的数据
if ("update".equals(type)) {
DimUtil.delDimInfo(sinkTable.toUpperCase(), data.getString("id"));
}
//写出数据
PhoenixUtil.upsertValues(connection, sinkTable, data);
//归还连接
connection.close();
}
}
(3)主程序 DimSinkApp中调用MyPhoenixSink
//TODO 8.将数据写出到Phoenix
dimDS.print(">>>>>>>>>>>>");
dimDS.addSink(new DimSinkFunction());
最终主程序
//数据流:web/app -> nginx -> 业务服务器 -> Mysql(binlog) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Phoenix
//程 序:Mock -> Mysql(binlog) -> Maxwell -> Kafka(ZK) -> DimApp(FlinkCDC/Mysql) -> Phoenix(HBase/ZK/HDFS)
public class DimApp {
public static void main(String[] args) throws Exception {
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); //生产环境中设置为Kafka主题的分区数
//1.1 开启CheckPoint
//env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
//env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//1.2 设置状态后端
//env.setStateBackend(new HashMapStateBackend());
//env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/211126/ck");
//System.setProperty("HADOOP_USER_NAME", "atguigu");
//TODO 2.读取 Kafka topic_db 主题数据创建主流
String topic = "topic_db";
String groupId = "dim_app_211126";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));
//TODO 3.过滤掉非JSON数据&保留新增、变化以及初始化数据并将数据转换为JSON格式
//flatMap这里替换了filter和map操作
SingleOutputStreamOperator<JSONObject> filterJsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
@Override
public void flatMap(String value, Collector<JSONObject> out) throws Exception {
try {
//将数据转换为JSON格式
JSONObject jsonObject = JSON.parseObject(value);
//获取数据中的操作类型字段
String type = jsonObject.getString("type");
//保留新增、变化以及初始化数据
if ("insert".equals(type) || "update".equals(type) || "bootstrap-insert".equals(type)) {
out.collect(jsonObject);
}
} catch (Exception e) {
System.out.println("发现脏数据:" + value);
}
}
});
//TODO 4.使用FlinkCDC读取MySQL配置信息表创建配置流
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("000000")
.databaseList("gmall-211126-config")
.tableList("gmall-211126-config.table_process")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> mysqlSourceDS = env.fromSource(mySqlSource,
WatermarkStrategy.noWatermarks(),
"MysqlSource");
//TODO 5.将配置流处理为广播流
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
BroadcastStream<String> broadcastStream = mysqlSourceDS.broadcast(mapStateDescriptor);
//TODO 6.连接主流与广播流
BroadcastConnectedStream<JSONObject, String> connectedStream = filterJsonObjDS.connect(broadcastStream);
//TODO 7.处理连接流,根据配置信息处理主流数据
SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));
//TODO 8.将数据写出到Phoenix
dimDS.print(">>>>>>>>>>>>");
dimDS.addSink(new DimSinkFunction());
//TODO 9.启动任务
env.execute("DimApp");
}
}
测试
(1)启动HDFS、ZK、Kafka、Maxwell、HBase
(2)运行 IDEA 中的 DimSinkApp
(3)执行 mysql_to_kafka_init.sh 脚本
mysql_to_kafka_init.sh all
(4)通过phoenix查看hbase的schema以及表情况