#!/usr/bin/perl
###############################################################################
# Program : ruleCheck.pl 根据规则组,检查规则组的通过率
# Scrpition: 执行本数据字段批量检查需要以下几个步骤
# 1.查询出所有的规则组,进行第一次循环
# 2.查询当前规则组下的所有规则(一般都是针对同一个字段),循环
# 每一个规则,从中提取规则sql
# 3.一组规则的每个规则都会在前一个规则的执行基础之上计算通过率,
# 有效检查记录总数,计算的粒度细分到机构,规则编号
# Writer : zengst
# Version :
###############################################################################
use strict;#对语法做严格限制,如变量必须先用my声明
use DBI;
use Time::Local;
my $dbh;#与数据库的连接
my $USER; #登录用户名
my $PASSWD; #登录口令
my $RUN_DATE;#质量检测日期
my $Topic_Area;#检查的专题
my $Sys_Id;#检查的系统id
my $Map_Id;#规则组id
my $table_Name;#检查的表名
my $Check_Column_Name;#检查的字段名称,一个规则组,检测一个字段
my $PK_Column_Name;#检查的表的主键字段
my $Org_Column_Name;#机构字段名称,注意不是值,因为检查的业务系统的机构字段可能不一致
my $Rule_Name;#规则说明,即检查的是什么
my $checkID; #检查编号
my $SQLClause;#当前规则检查的sql
my $RuleTable='EASTDQM.Rule';#规则信息表
my $RuleGroupTable='EASTDQM.RuleGroup';#规则组信息表
my $Rule_Info;#规则信息
my $Rule_Group;#规则组信息
my $check_Result = 'EASTDQM.Check_Result';#轻度结果汇总表
my $check_detail='EASTDQM.Check_Result_Detail';#检查结果明细表
my $RUNDATE=GetNowDate();#质量检查日期
my $modifydate;
my $CheckDimension;#规则维度
my $CheckSegmentation;#
my $Org_Id;#机构号
my $logFile;
my $check_detail_sample = 'EASTDQM.check_detail_sample';
my $ORG_CHK='ORG_CHK';#防止同一个NBJGH字段在select列出现多次时引发问题
my $job_params = $ARGV[0]; #传入参数
my $JOB = substr($job_params,8,length($job_params)-21);#获得表名。
my $TX_DATE=substr($job_params,length($job_params)-12,8);#数据日期
my $parallelTable = substr($job_params,4,4);
my $parallelColumn = substr($job_params,4,4);
my $whereClause;
if ($job_params == '' ){
print (GetNowTime() . " please input the params that requird !\n");
return 1;
}
#日志文件目录
#一个大于号,表示覆盖原有文件的内容
#两个大于号表示在原有文件内容之后添加
my $lfnowtime =GetNowTimeNospechar();
open $logFile,">checkLog$lfnowtime.txt";
select $logFile;
$|=1;
#初始化数据库的连接信息
sub DBconnect{
$dbh = DBI->connect("dbi:ODBC:ahnxods", "odsuser", "ytods",{RaiseError => 1, AutoCommit => 0 })
|| die "Couldn't connect to database: ".DBI->errstr;
print $logFile (GetNowTime()." get connection success! \n ");
return $dbh;
}
##得到创建临时表的DDL,临时表是session级别的
#sub createtemptableDDL{
#my $createDDL = "create table ${check_detail_temp} as (select * from ${check_detail}) definition only ";
#return $createDDL;
#}
#得到规则组信息。因为每一个规则组里面的规则需要进行单独的
#权重评分,一次性搜索全部的规则时处理逻辑会相当复杂
sub getRuleGroupInfo{
#得到所有正在运行的规则组
my $groupInfoSQL = "select Map_Id,Topic_Area from ${RuleGroupTable} where Status= 1 and Table_Name=${parallelTable} and Check_Column_Name=${parallelColumn} order by Map_Id";
#my $groupInfoSQL = "select Map_Id,Topic_Area from ${RuleGroupTable} where Status= 1 and map_id in (select map_id from EASTDQM.RULE_UPDATE_ADV)";
#my $groupInfoSQL = "select Map_Id,Topic_Area from ${RuleGroupTable} where Status= 1 and map_id in ( select map_id from eastdqm.rule where check_id in ('4012','4013','4014','4025','4026','4027','9007','9013','9015','9023','3072',,'3106')) order by Map_Id";
my $sth = $dbh->prepare($groupInfoSQL) or return -1;
my $ret = $sth->execute() or return -1;
${Rule_Group} = $sth->fetchall_arrayref();
$sth->finish();
return ${Rule_Group};
}
#得到数据库配置表的信息
sub getRuleInfo{
my ($groupId)=@_;#参数传入规则组Map_id
my $ruleConfigSql =
"select check_id,Topic_Area,r.sys_id,r.table_Name,r.Check_Column_Name,".
"r.PK_Column_Name,r.Org_Column_Name,rule_name,modifydate,r.SQLClause,map_id,WhereClause ".
"from ${RuleTable} r".
" where r.status='1' and map_id=$groupId ".
" order by Rule_Prior asc";
my $sth = $dbh->prepare($ruleConfigSql) or return -1;
my $ret = $sth->execute() or return -1;
${Rule_Info} = $sth->fetchall_arrayref();
$sth->finish();
return 0;
}
#执行规则SQL
sub exeRuleSql{
#存储已经排好序的待执行的规则sql
my @exeCheckSql=();
for my $i(0..$#${Rule_Info}){
$checkID=${Rule_Info}->[$i]->[0]; #检查编号
print $logFile (GetNowTime()." Begin To Run Rule[$i]:$checkID \n");
$Topic_Area = ${Rule_Info}->[$i]->[1];#Rule_Info是一个二维数组,${Rule_Info}->[$i]表示是第i行
$Sys_Id=${Rule_Info}->[$i]->[2];
$table_Name=${Rule_Info}->[$i]->[3];
$Check_Column_Name=${Rule_Info}->[$i]->[4];
$PK_Column_Name=${Rule_Info}->[$i]->[5];
$Org_Column_Name=${Rule_Info}->[$i]->[6];
$Rule_Name=${Rule_Info}->[$i]->[7];
$modifydate=${Rule_Info}->[$i]->[8];
#$SQLClause=${Rule_Info}->[$i]->[9];
#$SQLClause =~ s/(\$[\w_]+)/eval $1/ge;
#$SQLClause =~ s/(\$\{[^\}]+\})/eval $1/ge;
$Map_Id=${Rule_Info}->[$i]->[10];
$whereClause = ${Rule_Info}->[$i]->[11];
$SQLClause = "select ${PK_Column_Name},${Org_Column_Name},${Check_Column_Name} from ${table_Name} where ${whereClause}";
push(@exeCheckSql,$SQLClause);
#为了防止多次执行,需要删除轻度汇总表和和检查明细表
#运行时要删除全部的机构数据
print $logFile (GetNowTime() . " **************************Clear The Result Table:**************************\n");
my $DelSql = "Delete From ${check_Result} Where Check_ID = ${checkID} and rundate='${RUNDATE}'";
my $sth = $dbh->prepare($DelSql) or return -1;
#返回影响的行数,select也可以影响行数
my $ret = $sth->execute() or return -1;
$sth->finish();
print $logFile (GetNowTime() . " **************************Clear The Result Table:Succeed[Rows:$ret] **************************\n");
print $logFile (GetNowTime() . " **************************Clear The Detail Table:**************************\n");
#todo 需要加上运行日期的校验 And RUNDATE = Cast('${RUNDATE}' As Date Format 'YYYYMMDD')
my $DelSql = "Delete From ${check_detail} Where Check_ID = ${checkID} and RUNDATE='${RUNDATE}'";
$sth = $dbh->prepare($DelSql) or return -1;
#返回影响的行数,也包括select语句
$ret = $sth->execute() or return -1;
$sth->finish();#释放结果集
print $logFile (GetNowTime() . " **************************Clear The Detail Table:Succeed[Rows:$ret]**************************\n");
#字符串的相等是eq,第一次执行的规则的记录总数是待检测表的记录总数
if($i==0){
my $fromposi = index($SQLClause,'FROM');
my $fromclause = substr($SQLClause,$fromposi+4);
#主键字段是多列,这里将它拼成一列
my $pkselect = '';
my @PKcon = split(",",$PK_Column_Name);
my $PKlen = $#PKcon;#获得数组的下标,数组为空时,返回-1
if($PKlen eq 0){
$pkselect = $PK_Column_Name;
}else{
foreach my $k(0..$PKlen){
if($k eq 0){
$pkselect = @PKcon->[$k];
}else{
$pkselect = $pkselect.'||'.'\'#\''.'||'.(@PKcon->[$k]);
}
}
}
#对检查的字段是多列情况的处理,将多列的数据拼接成一列
my $ckselect= '';
my @CKcon = split(",",$Check_Column_Name);
my $CKlen = $#CKcon;
if($CKlen eq 0){
$ckselect = $Check_Column_Name;
}else{
foreach my $k(0..$CKlen){
if($k eq 0){
$ckselect = @CKcon->[$k];
}else{
$ckselect = $ckselect.'||'.'\'#\''.'||'.(@CKcon->[$k]);
}
}
}
print $logFile (GetNowTime() ." ************************** insert into detail begin **************************\n");
#将规则sql的检查结果插入临时表,临时表表结构和detail表是完全一致的
my $detailinsertSql = "Insert Into ${check_detail}" .
" select ${checkID},'${RUNDATE}', ${Map_Id}," .
" '${Topic_Area}','${Sys_Id}','${table_Name}','${Check_Column_Name}','${CheckDimension}','${CheckSegmentation}',".
" ${pkselect},${Org_Column_Name},${ckselect} from ".$fromclause;
my $detailsth = $dbh->prepare($detailinsertSql) or return -1;
$detailsth->execute() or return -1;
$detailsth->finish();
print $logFile (GetNowTime() ." ************************** insert into detail end **************************\n");
print $logFile (GetNowTime() ." ************************** insert into sample data begin **************************\n");
#从全量表按照规则、机构和运行日期的维度取得样本数据,插入到样例表
#partition by 可以是多个字段
my $sampInsert = "insert into ${check_detail_sample}".
" select ${checkID},'${RUNDATE}',${Map_Id},'${Topic_Area}','${Sys_Id}','${table_Name}','${Check_Column_Name}',".
"'${CheckDimension}','${CheckSegmentation}',pk_column_value,org_id,check_column_value from ".
"(select crt.*,ROW_NUMBER() over( partition by org_id,check_id) as seq from ${check_detail} crt where rundate='${RUNDATE}') detailtmp ".
" where detailtmp.seq<6";
my $inssth = $dbh->prepare($sampInsert) or return -1;
$inssth->execute() or return -1;
$inssth->finish();
print $logFile (GetNowTime() ." ************************** insert into sample data end **************************\n");
print $logFile (GetNowTime() ." ************************** begin insert check_result**************************\n");
#将样本数据插入到result表,直接通过sql进行数据的循环插入,方式insert into table select ******
my $InsSql = " Insert Into ${check_Result}".
" select ${checkID},'${RUNDATE}', '${Rule_Name}',${Map_Id},'${Topic_Area}','${Sys_Id}','${table_Name}','${Check_Column_Name}','${CheckDimension}','${CheckSegmentation}',".
" mainorg ,effectCount,effectCount-decode(FailCount,null,0,FailCount), decode(FailCount,null,0,FailCount),".
" cast(decode(FailCount,null,0,FailCount) as double)/effectCount,cast((effectCount-decode(FailCount,null,0,FailCount)) as double)/effectCount from ".
"(select count(*) as effectCount, $Org_Column_Name as mainorg from ${table_Name} group by ${Org_Column_Name}) tmain ".
" left join ".
"(select count(*) as FailCount, org_id,check_id from ${check_detail} detail where detail.rundate = '${RUNDATE}'and check_id=${checkID} group by org_id,check_id) tdetail".
" on tmain.mainorg = tdetail.org_id";
my $collsth = $dbh->prepare($InsSql) or return -1;
$collsth->execute() or return -1;
$collsth->finish();
print $logFile (GetNowTime() ." ************************** end insert check_result**************************\n");
$dbh->commit();
#没有commit时,就不会插入到数据库
}else{
#上一次执行的sql,一个规则组下的第n个规则执行时,检查的有效记录数=全表-前面n-1次检查出的记录数
my $preWhereCluse = '';
my $arrIndex = $#exeCheckSql;
#当前是第i个规则sql,那么需要得到前面i-1个规则sql检测的sql
for(my $j=0;$j<=($arrIndex-1);$j++){
#这里要求规则sql都是有where条件的,截取的都是where之后的内容
#这里要求执行的规则sql只有where子句
my $whereposi = index(@exeCheckSql[$j],'WHERE');
my $whereclause = substr(@exeCheckSql[$j],$whereposi+5);
if($j eq 0){
$preWhereCluse = ' and not('.$whereclause.')';
}else{
$preWhereCluse = $preWhereCluse.' and not('.$whereclause.')';
}
}
#得到上一次规则sql的PK值,另外 select a ,b,b from (a,b,b,c) ... 这样的语句还是存在字段歧义的问题,并不是按照顺序来匹配数据的
#本次检查在上一次检查的基础上,排除上次规则检查有问题的记录
#因为PK可能是多个列,所以一定要加上括号,表示PK是一个整体一起使用的
my $currSqlCluse = $SQLClause.$preWhereCluse;
my $fromposi = index($currSqlCluse,'FROM');
my $fromclause = substr($currSqlCluse,$fromposi+4);
my $pkselect = '';
my @PKcon = split(",",$PK_Column_Name);
my $PKlen = $#PKcon;#获得数组的下标,数组为空时,返回-1
if($PKlen eq 0){
$pkselect = $PK_Column_Name;
}else{
foreach my $k(0..$PKlen){
if($k eq 0){
$pkselect = @PKcon->[$k];
}else{
$pkselect = $pkselect.'||'.'\'#\''.'||'.(@PKcon->[$k]);
}
}
}
my $ckselect= '';
my @CKcon = split(",",$Check_Column_Name);
my $CKlen = $#CKcon;
if($CKlen eq 0){
$ckselect = $Check_Column_Name;
}else{
foreach my $k(0..$CKlen){
if($k eq 0){
$ckselect = @CKcon->[$k];
}else{
$ckselect = $ckselect.'||'.'\'#\''.'||'.(@CKcon->[$k]);
}
}
}
print $logFile (GetNowTime() ." ************************** insert into detail begin **************************\n");
#将规则sql的检查结果插入明细表
my $detailInsertSql = "Insert Into ${check_detail}" .
" select ${checkID},'${RUNDATE}', ${Map_Id}," .
" '${Topic_Area}','${Sys_Id}','${table_Name}','${Check_Column_Name}','${CheckDimension}','${CheckSegmentation}',".
" ${pkselect},${Org_Column_Name},${ckselect} from ".$fromclause;
my $detailsth = $dbh->prepare($detailInsertSql) or return -1;
$detailsth->execute() or return -1;
$detailsth->finish();
print $logFile (GetNowTime() ." ************************** insert into detail end **************************\n");
print $logFile (GetNowTime() ." ************************** insert into sample data begin **************************\n");
#从临时表按照规则、机构和运行日期的维度取得样本数据,插入明细表
#partition by 可以是多个字段
my $sampInsert = "insert into ${check_detail_sample}".
" select ${checkID},'${RUNDATE}',${Map_Id},'${Topic_Area}','${Sys_Id}','${table_Name}','${Check_Column_Name}',".
"'${CheckDimension}','${CheckSegmentation}',pk_column_value,org_id,check_column_value from ".
"(select crt.*,ROW_NUMBER() over( partition by org_id,check_id) as seq from ${check_detail} crt where rundate='${RUNDATE}') detailtmp ".
" where detailtmp.seq<6";
my $inssth = $dbh->prepare($sampInsert) or return -1;
$inssth->execute() or return -1;
$inssth->finish();
print $logFile (GetNowTime() ." ************************** insert into sample data end **************************\n");
print $logFile (GetNowTime() ." ************************** begin insert check_result**************************\n");
#将样本数据插入到result表,直接通过sql进行数据的循环插入,方式insert into table select ******
my $InsSql = " Insert Into ${check_Result}".
" select ${checkID},'${RUNDATE}', '${Rule_Name}',${Map_Id},'${Topic_Area}','${Sys_Id}','${table_Name}','${Check_Column_Name}','${CheckDimension}','${CheckSegmentation}',".
" mainorg ,effectCount,effectCount-decode(FailCount,null,0,FailCount), decode(FailCount,null,0,FailCount),".
" cast(decode(FailCount,null,0,FailCount) as double)/effectCount,cast((effectCount-decode(FailCount,null,0,FailCount)) as double)/effectCount from ".
"(select count(*) as effectCount, $Org_Column_Name as mainorg from ${table_Name} where 1=1".$preWhereCluse." group by ${Org_Column_Name}) tmain ".
" left join ".
"(select count(*) as FailCount, org_id,check_id from ${check_detail} detail where detail.rundate = '${RUNDATE}'and check_id=${checkID} group by org_id,check_id) tdetail".
" on tmain.mainorg = tdetail.org_id";
my $collsth = $dbh->prepare($InsSql) or return -1;
$collsth->execute() or return -1;
$collsth->finish();
print $logFile (GetNowTime() ." ************************** end insert check_result**************************\n");
$dbh->commit();
#print (GetNowTime() ." ************************** delete temp table begin **************************\n");
#删除临时表的所有记录,采用truncate的方式
#my $deleteTmpTable = "truncate table ${check_detail_temp} immediate ";
#my $delsth = $dbh->prepare($deleteTmpTable) or return -1;
#$delsth->execute() or return -1;
#$delsth->finish();
#print (GetNowTime() ." ************************** delete temp table end **************************\n");
}#else block
}#rule_info for block
return 0;
}
sub main{
#得到数据库连接
$dbh = DBconnect();
#初始化规则组信息
print $logFile (GetNowTime() . " *********************init ruleGroup info ! ***************************\n");
getRuleGroupInfo();
#如果查询没有数据,返回接收的数组取值后,下标的最大值就是-1
my $ruleGrouplength = $#${Rule_Group}+1;
print $logFile (GetNowTime() . " *********************ruleGroup length: $ruleGrouplength ***************************\n");
#判断执行结果
if ($ruleGrouplength = 0 ){
print $logFile (GetNowTime() . " *********************No Rule_Group Info !*********************\n");
} else {
print $logFile (GetNowTime() . " *********************Get Rule_Group:Succeed !********************* \n");
}
#创建临时表
#my $createDDL = createtemptableDDL();
#my $sth = $dbh->prepare($createDDL) or return -1;
#$sth->execute() or return -1;
#$sth->finish();
#循环规则组,根据每一个规则组,找到该规则组下的规则(注意规则状态必须是1,即是可运行的)
my $groupTemp = '';
for my $i(0..$#${Rule_Group}){
$groupTemp = ${Rule_Group}->[$i]->[0];
print $logFile (GetNowTime() . "********************* Begin To Run Rule_Group[$i+1],groupId:${groupTemp}********************* \n");
#传入了规则组参数Mapid
print $logFile (GetNowTime() . " *********************Begin To get Rule Info! ***************************\n");
getRuleInfo($groupTemp);
print $logFile (GetNowTime() . " *********************end To get Rule Info! ***************************\n");
print $logFile (GetNowTime() . " *********************Begin To execute Rule check ! ***************************\n");
exeRuleSql();
print $logFile (GetNowTime() . " *********************end To execute Rule check ! ***************************\n");
}
#删除表
#my $droptable = "drop table ${check_detail_temp}";
#my $sthdrop = $dbh->prepare($droptable) or return -1;
#$sthdrop->execute() or return -1;
#$sthdrop->finish();
print $logFile (GetNowTime() . " ********************* end ***************************\n");
#关闭打开的文件流
close $logFile;
}
sub GetNowTimeNospechar{
my ($sec, $min, $hour, $day, $mon, $year, $wday, $yday, $isdst) = localtime();
$hour = sprintf("%02d", $hour);
$min = sprintf("%02d", $min);
$sec = sprintf("%02d", $sec);
my $stime = GetNowDate().$hour.$min.$sec;
return $stime;
}
sub GetNowTime{
my ($sec, $min, $hour, $day, $mon, $year, $wday, $yday, $isdst) = localtime();
$hour = sprintf("%02d", $hour);
$min = sprintf("%02d", $min);
$sec = sprintf("%02d", $sec);
my $stime = GetNowDate()." ".$hour.":".$min.":".$sec;
return $stime;
}
#找到当前的日期,月份要加1,年份是从1900开始算的
sub GetNowDate{
my ($sec, $min, $hour, $day, $mon, $year, $wday, $yday, $isdst) = localtime();
$mon = sprintf("%02d", $mon+1);
$day = sprintf("%02d", $day);
my $stime = ($year+1900)."-".$mon."-".$day;
return $stime;
}
#方法调用,程序入口
main();