文章目录
各种Measure内部计算原理
accuracy
任务内容 : 通过两个表的Join来判断两个表的记录是否一致,并计算相关指标。
- 任务入口 : AccuracyExpr2DQSteps
- __missRecords : 左表 left join 右表,如果左表字段值不为空,右表字段值为空
- total_count : 源表记录数
- __missCount : 不匹配记录数
name = "__missRecords"
SELECT `source`.*
FROM `source`
LEFT JOIN `target`
ON coalesce(`source`.`user_id`, '') = coalesce(`target`.`user_id`, '')
AND upper(`source`.`first_name`) = upper(`target`.`first_name`)
AND coalesce(`source`.`last_name`, '') = coalesce(`target`.`last_name`, '')
AND coalesce(`source`.`address`, '') = coalesce(`target`.`address`, '')
AND coalesce(`source`.`email`, '') = coalesce(`target`.`email`, '')
AND coalesce(`source`.`phone`, '') = coalesce(`target`.`phone`, '')
AND coalesce(`source`.`post_code`, '') = coalesce(`target`.`post_code`, '')
WHERE (NOT (`source`.`user_id` IS NULL
AND `source`.`first_name` IS NULL
AND `source`.`last_name` IS NULL
AND `source`.`address` IS NULL
AND `source`.`email` IS NULL
AND `source`.`phone` IS NULL
AND `source`.`post_code` IS NULL))
AND (`target`.`user_id` IS NULL
AND `target`.`first_name` IS NULL
AND `target`.`last_name` IS NULL
AND `target`.`address` IS NULL
AND `target`.`email` IS NULL
AND `target`.`phone` IS NULL
AND `target`.`post_code` IS NULL)
name = "__missCount"
rule = "SELECT COUNT(*) AS `miss_count` FROM `__missRecords`"
name = "__totalCount"
rule = "SELECT COUNT(*) AS `total_count` FROM `source`"
name = "accu"
SELECT A.total AS `total_count`,
A.miss AS `miss_count`,
(A.total - A.miss) AS `matched_count`,
coalesce( (A.total - A.miss) / A.total, 1.0) AS `matchedFraction`
FROM (
SELECT `__totalCount`.`total_count` AS total,
coalesce(`__missCount`.`miss_count`, 0) AS miss
FROM `__totalCount` LEFT JOIN `__missCount`
) AS A
completeness
任务内容 : 通过判断字段是否为空来判断数据是否已经完成,并计算总数,已经完成数,未完成数
- 任务入口 : CompletenessExpr2DQSteps
- 配置的指定字段完全不为NULL,表示数据 complete 状态,否则为incomplete状态
- 输出total, incomplete 和 complete 的数据
name = "__sourceAlias"
rule = "SELECT `email` AS `email`, `post_code` AS `post_code`, `first_name` AS `first_name` FROM `source`"
name = "__incompleteRecords"
rule = "SELECT * FROM `__sourceAlias` WHERE NOT (`email` IS NOT NULL AND `post_code` IS NOT NULL AND `first_name` IS NOT NULL)"
name = "__incompleteCount"
rule = "SELECT COUNT(*) AS `incomplete` FROM `__incompleteRecords`"
name = "__totalCount"
rule = "SELECT COUNT(*) AS `total` FROM `__sourceAlias`"
name = "comp"
SELECT `__totalCount`.`total` AS `total`,
coalesce(`__incompleteCount`.`incomplete`, 0) AS `incomplete`,
(`__totalCount`.`total` - coalesce(`__incompleteCount`.`incomplete`, 0)) AS `complete`
FROM `__totalCount` LEFT JOIN `__incompleteCount`
distinct
任务内容 : group by 指定字段,去除统计,并计算记录基数
- 任务入口 : DistinctnessExpr2DQSteps
- __selfGroup : 根据指定字段group by,并统计每组记录条数
- dupRecords : 数据基数,不为空的去重记录
name = "__sourceAlias"
rule = "SELECT `user_id` AS `user_id` FROM `source`"
name = "__totalMetric"
rule = "SELECT COUNT(*) AS `total` FROM `__sourceAlias`"
name = "__selfGroup"
SELECT `user_id`, (COUNT(*) - 1) AS `dup`,
TRUE AS `__distinct`
FROM `__sourceAlias` GROUP BY `user_id`
name = "__distMetric"
SELECT COUNT(*) AS `distinct`
FROM `__selfGroup` WHERE `__distinct`
name = "__dupRecords"
SELECT `user_id`, `dup`
FROM `__selfGroup` WHERE `dup` > 0
name = "__dupMetric"
SELECT `dup`, COUNT(*) AS `num`
FROM `__dupRecords` GROUP BY `dup`
timeliness
任务内容 : 根据指定的时间字段,计算数据平均延迟,超出 threshold 的记录,数据的时间分布,数据的百分比分布等信息
- 任务入口 : TimelinessExpr2DQSteps
- timeliness : 根据 begin_ts 字段和 end_ts 字段计算 latency ,统计记录总数和平均 latency
- __lateRecords : 因为定义了 threshold ( 3m ),所以超过 threshold 时间的记录
- __rangeMetric : 因为定义了 step (step.size = 2m),所以 根据 (latency / step) 分组统计记录数
- __percentile : 因为定义了 percentile([ 0.95]),所以根据 percentile进行数据分组统计
name = "__inTime"
SELECT *, (`ts`) AS `__begin_ts`,
(`end_ts`) AS `__end_ts`
FROM source WHERE (`ts`) IS NOT NULL AND (`end_ts`) IS NOT NULL
name = "__lat"
rule = "SELECT *, (`__end_ts` - `__begin_ts`) AS `latency` FROM `__inTime`"
name = "timeliness"
SELECT COUNT(*) AS `total`,
CAST(AVG(`latency`) AS BIGINT) AS `avg`
FROM `__lat`
---
name = "__lateRecords"
rule = "SELECT * FROM `__lat` WHERE `latency` > 180000"
---
name = "__range"
SELECT *, CAST((`latency` / 120000) AS BIGINT) AS `step`
FROM `__lat`
name = "__rangeMetric"
SELECT `step`, COUNT(*) AS `cnt`
FROM `__range` GROUP BY `step`
---
name = "__percentile"
SELECT floor(percentile_approx(latency, 0.95)) AS `percentile_95`
FROM `__lat`
uniqueness
任务内容 : 主要判断表的数据是否唯一,唯一的记录条数信息等
- 任务入口 : UniquenessExpr2DQSteps
- __totalMetric : 源表记录总数
- __group : 源表根据主键去重,然后和目标表左连接,其中关联出来的记录条数为 1 的表示 __uniqueRecord
name = "__source"
rule = "SELECT DISTINCT `user_id` AS `user_id` FROM source"
name = "__target"
rule = "SELECT `user_id` AS `user_id` FROM target"
name = "__joined"
SELECT `__source`.`user_id` AS `user_id`
FROM `__target`
RIGHT JOIN `__source`
ON coalesce(`__source`.`user_id`, '') = coalesce(`__target`.`user_id`, '')
name = "__group"
rule = "SELECT `user_id`, (COUNT(*) - 1) AS `dup` FROM `__joined` GROUP BY `user_id`"
name = "__totalMetric"
rule = "SELECT COUNT(*) AS `total` FROM `source`"
name = "__uniqueRecord"
rule = "SELECT * FROM `__group` WHERE `dup` = 0"
name = "__uniqueMetric"
rule = "SELECT COUNT(*) AS `unique` FROM `__uniqueRecord`"
profiling
任务内容 : 主要是根据用户自定义的SQL来计算Metrics
- 任务入口 : ProfilingExpr2DQSteps
- 直接执行根据 rule 解析和拼装出来的SQL
name = "prof"
rule = "SELECT `user_id` AS `user_id`, count(*) AS `cnt` FROM `source` GROUP BY `user_id` "
spark-sql
可以通过自定义一个SQL语句来添加一个Step
{
"dsl.type": "spark-sql",
"out.dataframe.name": "this",
"rule": "select name, age from t1"
}
pre.proc
可以在数据源 connectors 上做数据预处理,需要添加 "pre.proc"
属性。
"pre.proc": [
{
"dsl.type": "spark-sql",
"rule": "select * from this_table where user_id < 10014"
}
]
Service 任务管理模块
配置livy
# livy
livy.uri=http://localhost:8998/batches
任务解析及提交执行流程
SparkSubmitJob 继承 quartz 中的job,并执行 execute() 方法
// class SparkSubmitJob implements Job
@Override
public void execute(JobExecutionContext context) {
JobDetail jd = context.getJobDetail();
try {
initParam(jd);
setLivyConf();
if (!success(mPredicates)) {
updateJobInstanceState(context);
return;
}
if (isNeedLivyQueue) {
//livy batch limit
// 使用队列来调度和执行
livyTaskSubmitHelper.addTaskToWaitingQueue(jd);
} else {
// 直接执行job,方法内部调用 post2LivyWithRetry(),使用http方式提交任务到livy 执行
saveJobInstance(jd);
}
} catch (Exception e) {
LOGGER.error("Post spark task ERROR.", e);
}
}
初始化job对象(GriffinMeasure) 和 livy地址
private void initParam(JobDetail jd) throws IOException {
mPredicates = new ArrayList<>();
jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap()
.getString(PREDICATE_JOB_NAME));
measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY),
GriffinMeasure.class);
livyUri = env.getProperty("livy.uri");
setPredicates(jd.getJobDataMap().getString(PREDICATES_KEY));
// in order to keep metric name unique, we set job name
// as measure name at present
measure.setName(jd.getJobDataMap().getString(JOB_NAME));
}
提交任务到 livy
// LivyTaskSubmitHelper.java
public String postToLivy(String uri) {
//...
if (needKerberos.equalsIgnoreCase("false")) {
logger.info("The livy server doesn't need Kerberos Authentication");
String result = null;
try {
// 这里就是任务提交的关键的地方了,把任务启动的所有参数封装成一个json对象,通过http方式提交到livy server, livyConfMap 中既有 任务环境参数,也有计算任务参数
HttpEntity<String> springEntity = new HttpEntity<>(toJsonWithFormat(livyConfMap),headers);
result = restTemplate.postForObject(uri,springEntity,String.class);
logger.info(result);
} catch (JsonProcessingException e) {
logger.error("Post to livy ERROR. \n {}", e.getMessage());
}
return result;
} else {
// ...
}
}
Spark 任务任务参数的解析及传入
PropertiesConfig 的 init() 方法初始化环境中的 batch,streaming, livy 的配置信息
- sparkProperties.json : Spark 任务启动信息,jar和class信息就在这里
- env_batch.json : Batch job配置的环境变量信息
- env_streaming : Streaming job配置的环境变量信息
可以看到任务会到hdfs上取jar包,并执行MAIN CLASS org.apache.griffin.measure.Application
。
这个任务入口需要传入两个参数
// 初始化measure 对象,就是从 jd 对象中取出参数信息,并进行对象实例化
private void initParam(JobDetail jd) throws IOException {
mPredicates = new ArrayList<>();
jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap()
.getString(PREDICATE_JOB_NAME));
measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY),
GriffinMeasure.class);
livyUri = env.getProperty("livy.uri");
setPredicates(jd.getJobDataMap().getString(PREDICATES_KEY));
// in order to keep metric name unique, we set job name
// as measure name at present
measure.setName(jd.getJobDataMap().getString(JOB_NAME));
}
// 将measure 信息填充到 http 提交参数中
// 这里程序传入了三个参数,arg1 = env json, arg2 = measure json, arg3 = "raw.raw", 好把,这里直接传完整的json参数,在Application程序入口也是可以解析的,就不纠结了
private void setLivyArgs() throws IOException {
List<String> args = new ArrayList<>();
args.add(genEnv());
// 这里通过
String measureJson = JsonUtil.toJsonWithFormat(measure);
// to fix livy bug: character will be ignored by livy
String finalMeasureJson = escapeCharacter(measureJson, "\\`");
LOGGER.info(finalMeasureJson);
args.add(finalMeasureJson);
args.add("raw,raw");
// args 就是运行spark程序的参数
livyConfMap.put("args", args);
}