1 # -*- coding: utf-8 -*- 2 """ 3 Created on Mon Nov 25 17:07:26 2019 4 5 @author: user 6 """ 7 8 import pandas as pd 9 from pathlib import Path 10 import numpy as np 11 from tqdm import tqdm 12 import re 13 import os 14 import datetime 15 from datetime import date 16 import pickle 17 import itertools 18 import time 19 from io import StringIO 20 from sqlalchemy import create_engine, Column, String 21 import schedule 22 23 24 25 start = datetime.datetime.now() 26 # ルートフォルダ 27 root = r'\\10.197.104.95\6g_data\6G' # 元々はCAN_Bまで指定していたのを一つ上の階層に設定 28 p = Path(root) 29 30 # 出力先フォルダ作成 31 os.makedirs('output/merge_data', exist_ok=True) 32 os.makedirs('output/processed_data/duplicated', exist_ok=True) 33 os.makedirs('cache', exist_ok=True) 34 os.makedirs('log', exist_ok=True) 35 36 # データ取得期間 37 st_date = date(2018, 10, 1) 38 ed_date = date.today() 39 40 # 利用カラムNo、列名、データ型を格納したcsvファイルを読み込み 41 usecol_list = pd.read_csv('input/usecol_list.csv', encoding='cp932') 42 43 # 增量进DB 44 def into_DB(filename, name): 45 timeid = time.strftime('%H:%M:%S', time.localtime(time.time())) 46 print('DB处理开始' + timeid) 47 db_engine = create_engine( 48 'postgresql+psycopg2://' + 'gpadmin' + ':' + 'gpadmin' + '@' + str('10.39.4.175') + '/' + 'ceshi') 49 50 print('已连接数据库') 51 output = StringIO() 52 connection = db_engine.raw_connection() 53 cursor = connection.cursor() 54 print(str('sz_take_') + str(name) + str('_raw')) 55 # table is fact table, table2 is temp table 56 if name == 'electro_optical_ax_adjst': 57 name = 'electro_optical' 58 table = str('sz_take_') + str(name) + str('_raw') 59 table2 = str('temp_') + str(name) + str('_raw') 60 elif name == 'dome_angle_of_view_insp': 61 name = 'dome_angle' 62 table = str('sz_take_') + str(name) + str('_raw') 63 table2 = str('temp_') + str(name) + str('_raw') 64 else: 65 table = str('sz_take_') + str(name) + str('_raw') 66 table2 = str('temp_') + str(name) + str('_raw') 67 if name == 'lens_assy' or name == 'performance_test' or name == 'modulation_insp': 68 process_id = 'dm_no' 69 elif name == 'focus_adjst': 70 process_id = 'sn' 71 elif name == 'welding' or name == 'air_leak_test' or name == 'laser_mark': 72 process_id = 'front_qr_code' 73 filename = filename.dropna(subset=['model']) 74 filename['need_to_drop'] = filename['date'].str.split("/").str[0].str.len() 75 filename = filename.reset_index() 76 filename.ix[(filename['need_to_drop'] != 4), 'date'] = '20' + filename['date'] 77 filename = filename.drop(['need_to_drop', 'index'], axis=1) 78 filename['front_qr_code'] = filename['front_qr_code'].str.strip() 79 elif name == 'camera_assy' or name == 'bracket_mount' or name == 'low_temp_test' or name == 'angle_of_view_insp' or name == 'electro_optical' or name == 'dome_angle': 80 process_id = 'qr_no' 81 elif name == 'three_ax_adjst': 82 filename['module_id'] = filename['module_id'].str[-15:] 83 process_id = 'dm_no' 84 elif name == 'optical_ax_adjst': 85 process_id = 'sn' 86 filename['exe_ver'] = filename['exe_ver'].str.strip() 87 filename['exe_ver'] = filename['exe_ver'].str.replace('\t', '') 88 filename['dll_ver'] = filename['dll_ver'].str.strip() 89 filename['rotation_at_judge'] = filename['rotation_at_judge'].str.strip() 90 filename['start_time'] = filename['start_time'].str.strip() 91 try: 92 filename = filename.drop(filename[filename.exe_ver == '8'].index) 93 except: 94 pass 95 elif name == 'high_temp_test': 96 # filename['v_sync'] = filename['v_sync'].str[0:9] 97 filename['qr_no'] = filename['qr_no'].str[-17:] 98 process_id = 'qr_no' 99 filename.to_csv(output, sep='\t', index=False, header=False) 100 output.getvalue() 101 cursor.execute('truncate table ' + table2) 102 output.seek(0) 103 cursor.copy_from(output, table2, null='') 104 cursor.execute('insert into ' + table + ' select * from ' + table2) 105 print(len(filename)) 106 # processed增量更新 107 sql_delete = 'delete from ' + str('sz_take_') + str( 108 name) + ' where ' + process_id + ' in ' + '( select distinct ' + process_id + ' from ' + str( 109 'process_') + str(process) + ' )' 110 cursor.execute(sql_delete) 111 cursor.execute( 112 'insert into ' + str('sz_take_') + str(name) + ' select * from ' + str('process_') + str(process)) 113 # 目前做到各工程processed_data完成 114 print('整体结束' + time.strftime('%H:%M:%S', time.localtime(time.time()))) 115 connection.commit() 116 cursor.close() 117 118 def get_new_file(process, all_file): 119 all_file = [fpath for fpath in all_file 120 if (date.fromtimestamp(os.path.getmtime(fpath)) >= st_date) & 121 (date.fromtimestamp(os.path.getmtime(fpath)) < ed_date)] 122 123 print(f'''----------------> 更新日が取得対象期間内のファイル数:{len(all_file)}''') 124 125 # cacheを読み込み 126 try: 127 with open(f'''cache/{process}_file_list.pckl''', mode='rb') as f: 128 cache = pickle.load(f) 129 except FileNotFoundError: 130 cache = [] 131 132 print('■cacheとファイル一覧を比較し、新たに追加するファイルを抽出') 133 diff = list(set(all_file) - set(cache)) 134 135 print(f'''----------------> 前回処理時から新たに追加されたファイル数:{len(diff)}''') 136 137 return diff 138 139 def file_merge(process, new_file, enc, skiprows, select_col): 140 print('■ファイル結合') 141 142 df = pd.DataFrame() 143 df_list = [] 144 uc_list = usecol_list[usecol_list['process'] == process] 145 146 for fpath in tqdm(new_file): 147 if select_col == 'number': 148 try: 149 tmp = pd.read_csv(fpath, engine='python', index_col=False, header=None, skiprows=skiprows, 150 encoding=enc, 151 usecols=uc_list['col_no'].astype(int), 152 names=uc_list['col_name'], 153 dtype=uc_list.set_index('col_name').col_dtype.to_dict(), 154 error_bad_lines=False) 155 tmp = tmp.dropna(how='all') 156 df_list.append(tmp) 157 except pd.io.common.EmptyDataError: 158 print(f'''empty_data_error:{fpath}''') 159 160 elif select_col == 'name': 161 try: 162 tmp = pd.read_csv(fpath, engine='python', index_col=False, skiprows=skiprows, 163 encoding=enc, 164 usecols=uc_list['col_name_org'], 165 dtype=uc_list.set_index('col_name_org').col_dtype.to_dict(), 166 error_bad_lines=False) 167 tmp.rename(columns=uc_list.set_index('col_name_org').col_name.to_dict(), inplace=True) 168 tmp = tmp.dropna(how='all') 169 df_list.append(tmp) 170 except pd.io.common.EmptyDataError: 171 print(f'''empty_data_error:{fpath}''') 172 except ValueError: 173 print(f'''value_error:{fpath}''') 174 # print(df_list) 175 if len(df_list) > 0: df = pd.concat(df_list) 176 177 cnt = len(df) 178 print(f'''----------------> 追加ファイルの総レコード数:{cnt}''') 179 180 if cnt > 0: 181 # ID項目の重複を確認 182 for c in uc_list[uc_list['id_flg'] == 1].col_name.unique(): 183 cnt = len(df[df[c].duplicated()]) 184 print(f'''------------------------> {c} が重複しているレコード数:{cnt}''') 185 186 return df 187 188 def file_update(process, add_data, add_cache, enc): 189 # 工程データファイルの差分更新 190 print('■ファイル更新') 191 df = add_data 192 193 df.to_csv(f'''output/merge_data/{process}.csv''', index=False, encoding=enc) 194 into_DB(df, process) 195 try: 196 with open(f'''cache/{process}_file_list.pckl''', mode='rb') as f: 197 before_cache = pickle.load(f) 198 output = list(set(before_cache + add_cache)) 199 except FileNotFoundError: 200 output = add_cache 201 with open(f'''cache/{process}_file_list.pckl''', 'wb') as f: 202 pickle.dump(output, f) 203 204 def log_output(text): 205 print(text) 206 207 path_w = 'log/' + ed_date.strftime('%Y%m%d') 208 with open(path_w, mode='a') as f: 209 f.write(text) 210 f.write('\n') 211 212 T = 0 213 214 # ============================================================================= 215 # 55_カメラ組立 216 217 # データ格納用 218 df = pd.DataFrame() 219 # キャッシュ格納用 220 new_file = [] 221 222 process = 'camera_assy' 223 print(f'''【{process}】 {datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}''') 224 225 for line_name in ['CAN_B', 'CAN_G']: 226 print(line_name) 227 # 工程のフォルダを指定 228 print('■対象ファイル一覧を取得') 229 # ***********************注释掉 by zhangwensi ******** 230 #all_file = list(p.glob(f'''{line_name}/称重/GP-KDE3[A-F]*RC*/LOG/*.csv''')) + list( 231 # p.glob(f'''{line_name}/称重/GP-KDE301GF/LOG/*.csv''')) 232 #print(f'''----------------> 全ファイル数:{len(all_file)}''') 233 # ***********************注释掉 by zhangwensi ******** 234 235 236 # ***********************add by zhangwensi ******************************************************************************************************************* 237 # step 1 获取今天的文件集合并转换成DF 238 current_files_list=[] 239 for fileName in p.glob(f'''{line_name}/称重/GP-KDE3[A-F]*RC*/LOG/*.csv''')): 240 getmtime = date.fromtimestamp(os.path.getmtime(fileName)) 241 current_files_list.append((str(fileName), getmtime)) 242 for fileName in p.glob(f'''{line_name}/称重/GP-KDE301GF/LOG/*.csv''')): 243 getmtime = date.fromtimestamp(os.path.getmtime(fileName)) 244 current_files_list.append((str(fileName), getmtime)) 245 246 current_files_df = pd.DataFrame(current_files_list, columns=['filename', 'updateTime']) 247 print(f'''---------------->全ファイル数:{len(current_files_df)}''') 248 249 # step2 加载昨日pickle文件,并转换成DF 250 try: 251 with open(f'''cache/{process}_file_list.pckl''', mode='rb') as f: 252 # 加载pickle文件 253 cache = pickle.load(f) 254 # 转换成文件名的数组 255 last_files_list = list(map(lambda x: str(x), cache)) 256 except FileNotFoundError: 257 cache = [] 258 # last_files_list转换df 259 last_files_df = pd.DataFrame(last_files_list, columns=['filename']) 260 261 # step3 获取两天的diff 262 from pandasql import sqldf 263 diff = sqldf(f""" 264 select t1.filename 265 from current_files_df t1 266 LEFT JOIN last_files_df t2 on (t1.filename = t2.filename) 267 where t2.filename is null 268 and updateTime >= '{st_date}' 269 and updateTime < '{ed_date}'; 270 """, globals()).filename.tolist() 271 print(f'''----------------> 前回処理時から新たに追加されたファイル数:{len(diff)}''') 272 new_file_tmp = diff 273 274 # ***********************add by zhangwensi ******************************************************************************************************************* 275 276 # cacheと比較し、最新のファイルのみ取得 277 #new_file_tmp = get_new_file(process, all_file) 278 279 if len(new_file_tmp) >= 1: 280 # ファイル結合 281 tmp = file_merge(process, new_file_tmp, enc='cp932', skiprows=0, select_col='number') 282 tmp['line'] = line_name 283 284 df = df.append(tmp) 285 new_file = list(new_file + new_file_tmp) 286 287 # ファイル更新 288 if len(new_file) >= 1: 289 file_update(process, df, new_file, enc='cp932') 290 T = T + 1 291 292 elif len(new_file) == 0: 293 print('ファイル更新なし') 294 T = T + 0 295 296 print(T) 297 298 if T != 0: 299 db_engine = create_engine( 300 'postgresql+psycopg2://' + 'gpadmin' + ':' + 'gpadmin' + '@' + str('10.39.4.175') + '/' + 'ceshi') 301 # 'postgresql+psycopg2://' + 'gpadmin' + ':' + 'gpadmin' + '@' + str('10.39.4.175') + '/' + 'ceshi') 302 connection = db_engine.raw_connection() 303 cursor = connection.cursor() 304 cursor.execute('truncate table sz_take_all_process') 305 cursor.execute('insert into sz_take_all_process select * from sz_take_all_process_union') 306 print('union', datetime.datetime.now()) 307 cursor.execute('truncate table sz_summary_defects') 308 cursor.execute('insert into sz_summary_defects select * from sz_take_all_process_summary') 309 print('summary', datetime.datetime.now()) 310 connection.commit() 311 cursor.close() 312 else: 313 pass 314 315 end = datetime.datetime.now() 316 print("final is in ", end - start)
# -*- coding: utf-8 -*-"""Created on Mon Nov 25 17:07:26 2019
@author: user"""
import pandas as pdfrom pathlib import Pathimport numpy as npfrom tqdm import tqdmimport reimport osimport datetimefrom datetime import dateimport pickleimport itertoolsimport timefrom io import StringIOfrom sqlalchemy import create_engine, Column, Stringimport schedule
start = datetime.datetime.now()# ルートフォルダroot = r'\\10.197.104.95\6g_data\6G' # 元々はCAN_Bまで指定していたのを一つ上の階層に設定p = Path(root)
# 出力先フォルダ作成os.makedirs('output/merge_data', exist_ok=True)os.makedirs('output/processed_data/duplicated', exist_ok=True)os.makedirs('cache', exist_ok=True)os.makedirs('log', exist_ok=True)
# データ取得期間st_date = date(2018, 10, 1)ed_date = date.today()
# 利用カラムNo、列名、データ型を格納したcsvファイルを読み込みusecol_list = pd.read_csv('input/usecol_list.csv', encoding='cp932')
# 增量进DBdef into_DB(filename, name): timeid = time.strftime('%H:%M:%S', time.localtime(time.time())) print('DB处理开始' + timeid) db_engine = create_engine( 'postgresql+psycopg2://' + 'gpadmin' + ':' + 'gpadmin' + '@' + str('10.39.4.175') + '/' + 'ceshi')
print('已连接数据库') output = StringIO() connection = db_engine.raw_connection() cursor = connection.cursor() print(str('sz_take_') + str(name) + str('_raw')) # table is fact table, table2 is temp table if name == 'electro_optical_ax_adjst': name = 'electro_optical' table = str('sz_take_') + str(name) + str('_raw') table2 = str('temp_') + str(name) + str('_raw') elif name == 'dome_angle_of_view_insp': name = 'dome_angle' table = str('sz_take_') + str(name) + str('_raw') table2 = str('temp_') + str(name) + str('_raw') else: table = str('sz_take_') + str(name) + str('_raw') table2 = str('temp_') + str(name) + str('_raw') if name == 'lens_assy' or name == 'performance_test' or name == 'modulation_insp': process_id = 'dm_no' elif name == 'focus_adjst': process_id = 'sn' elif name == 'welding' or name == 'air_leak_test' or name == 'laser_mark': process_id = 'front_qr_code' filename = filename.dropna(subset=['model']) filename['need_to_drop'] = filename['date'].str.split("/").str[0].str.len() filename = filename.reset_index() filename.ix[(filename['need_to_drop'] != 4), 'date'] = '20' + filename['date'] filename = filename.drop(['need_to_drop', 'index'], axis=1) filename['front_qr_code'] = filename['front_qr_code'].str.strip() elif name == 'camera_assy' or name == 'bracket_mount' or name == 'low_temp_test' or name == 'angle_of_view_insp' or name == 'electro_optical' or name == 'dome_angle': process_id = 'qr_no' elif name == 'three_ax_adjst': filename['module_id'] = filename['module_id'].str[-15:] process_id = 'dm_no' elif name == 'optical_ax_adjst': process_id = 'sn' filename['exe_ver'] = filename['exe_ver'].str.strip() filename['exe_ver'] = filename['exe_ver'].str.replace('\t', '') filename['dll_ver'] = filename['dll_ver'].str.strip() filename['rotation_at_judge'] = filename['rotation_at_judge'].str.strip() filename['start_time'] = filename['start_time'].str.strip() try: filename = filename.drop(filename[filename.exe_ver == '8'].index) except: pass elif name == 'high_temp_test': # filename['v_sync'] = filename['v_sync'].str[0:9] filename['qr_no'] = filename['qr_no'].str[-17:] process_id = 'qr_no' filename.to_csv(output, sep='\t', index=False, header=False) output.getvalue() cursor.execute('truncate table ' + table2) output.seek(0) cursor.copy_from(output, table2, null='') cursor.execute('insert into ' + table + ' select * from ' + table2) print(len(filename)) # processed增量更新 sql_delete = 'delete from ' + str('sz_take_') + str( name) + ' where ' + process_id + ' in ' + '( select distinct ' + process_id + ' from ' + str( 'process_') + str(process) + ' )' cursor.execute(sql_delete) cursor.execute( 'insert into ' + str('sz_take_') + str(name) + ' select * from ' + str('process_') + str(process)) # 目前做到各工程processed_data完成 print('整体结束' + time.strftime('%H:%M:%S', time.localtime(time.time()))) connection.commit() cursor.close()
def get_new_file(process, all_file): all_file = [fpath for fpath in all_file if (date.fromtimestamp(os.path.getmtime(fpath)) >= st_date) & (date.fromtimestamp(os.path.getmtime(fpath)) < ed_date)]
print(f'''----------------> 更新日が取得対象期間内のファイル数:{len(all_file)}''')
# cacheを読み込み try: with open(f'''cache/{process}_file_list.pckl''', mode='rb') as f: cache = pickle.load(f) except FileNotFoundError: cache = []
print('■cacheとファイル一覧を比較し、新たに追加するファイルを抽出') diff = list(set(all_file) - set(cache))
print(f'''----------------> 前回処理時から新たに追加されたファイル数:{len(diff)}''')
return diff
def file_merge(process, new_file, enc, skiprows, select_col): print('■ファイル結合')
df = pd.DataFrame() df_list = [] uc_list = usecol_list[usecol_list['process'] == process]
for fpath in tqdm(new_file): if select_col == 'number': try: tmp = pd.read_csv(fpath, engine='python', index_col=False, header=None, skiprows=skiprows, encoding=enc, usecols=uc_list['col_no'].astype(int), names=uc_list['col_name'], dtype=uc_list.set_index('col_name').col_dtype.to_dict(), error_bad_lines=False) tmp = tmp.dropna(how='all') df_list.append(tmp) except pd.io.common.EmptyDataError: print(f'''empty_data_error:{fpath}''')
elif select_col == 'name': try: tmp = pd.read_csv(fpath, engine='python', index_col=False, skiprows=skiprows, encoding=enc, usecols=uc_list['col_name_org'], dtype=uc_list.set_index('col_name_org').col_dtype.to_dict(), error_bad_lines=False) tmp.rename(columns=uc_list.set_index('col_name_org').col_name.to_dict(), inplace=True) tmp = tmp.dropna(how='all') df_list.append(tmp) except pd.io.common.EmptyDataError: print(f'''empty_data_error:{fpath}''') except ValueError: print(f'''value_error:{fpath}''') # print(df_list) if len(df_list) > 0: df = pd.concat(df_list)
cnt = len(df) print(f'''----------------> 追加ファイルの総レコード数:{cnt}''')
if cnt > 0: # ID項目の重複を確認 for c in uc_list[uc_list['id_flg'] == 1].col_name.unique(): cnt = len(df[df[c].duplicated()]) print(f'''------------------------> {c} が重複しているレコード数:{cnt}''')
return df
def file_update(process, add_data, add_cache, enc): # 工程データファイルの差分更新 print('■ファイル更新') df = add_data
df.to_csv(f'''output/merge_data/{process}.csv''', index=False, encoding=enc) into_DB(df, process) try: with open(f'''cache/{process}_file_list.pckl''', mode='rb') as f: before_cache = pickle.load(f) output = list(set(before_cache + add_cache)) except FileNotFoundError: output = add_cache with open(f'''cache/{process}_file_list.pckl''', 'wb') as f: pickle.dump(output, f)
def log_output(text): print(text)
path_w = 'log/' + ed_date.strftime('%Y%m%d') with open(path_w, mode='a') as f: f.write(text) f.write('\n')
T = 0
# =============================================================================# 55_カメラ組立
# データ格納用df = pd.DataFrame()# キャッシュ格納用new_file = []
process = 'camera_assy'print(f'''【{process}】 {datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}''')
for line_name in ['CAN_B', 'CAN_G']: print(line_name) # 工程のフォルダを指定 print('■対象ファイル一覧を取得')# ***********************注释掉 by zhangwensi ******** #all_file = list(p.glob(f'''{line_name}/称重/GP-KDE3[A-F]*RC*/LOG/*.csv''')) + list( # p.glob(f'''{line_name}/称重/GP-KDE301GF/LOG/*.csv'''))#print(f'''----------------> 全ファイル数:{len(all_file)}''')# ***********************注释掉 by zhangwensi ******** # ***********************add by zhangwensi *******************************************************************************************************************# step 1 获取今天的文件集合并转换成DFcurrent_files_list=[] for fileName in p.glob(f'''{line_name}/称重/GP-KDE3[A-F]*RC*/LOG/*.csv''')): getmtime = date.fromtimestamp(os.path.getmtime(fileName)) current_files_list.append((str(fileName), getmtime))for fileName in p.glob(f'''{line_name}/称重/GP-KDE301GF/LOG/*.csv''')): getmtime = date.fromtimestamp(os.path.getmtime(fileName)) current_files_list.append((str(fileName), getmtime))current_files_df = pd.DataFrame(current_files_list, columns=['filename', 'updateTime'])print(f'''---------------->全ファイル数:{len(current_files_df)}''')# step2 加载昨日pickle文件,并转换成DF try: with open(f'''cache/{process}_file_list.pckl''', mode='rb') as f: # 加载pickle文件 cache = pickle.load(f) # 转换成文件名的数组 last_files_list = list(map(lambda x: str(x), cache)) except FileNotFoundError: cache = []# last_files_list转换df last_files_df = pd.DataFrame(last_files_list, columns=['filename'])# step3 获取两天的difffrom pandasql import sqldfdiff = sqldf(f""" select t1.filename from current_files_df t1 LEFT JOIN last_files_df t2 on (t1.filename = t2.filename) where t2.filename is null and updateTime >= '{st_date}' and updateTime < '{ed_date}'; """, globals()).filename.tolist()print(f'''----------------> 前回処理時から新たに追加されたファイル数:{len(diff)}''')new_file_tmp = diff# ***********************add by zhangwensi *******************************************************************************************************************
# cacheと比較し、最新のファイルのみ取得 #new_file_tmp = get_new_file(process, all_file)
if len(new_file_tmp) >= 1: # ファイル結合 tmp = file_merge(process, new_file_tmp, enc='cp932', skiprows=0, select_col='number') tmp['line'] = line_name
df = df.append(tmp) new_file = list(new_file + new_file_tmp)
# ファイル更新if len(new_file) >= 1: file_update(process, df, new_file, enc='cp932') T = T + 1
elif len(new_file) == 0: print('ファイル更新なし') T = T + 0
print(T)
if T != 0: db_engine = create_engine( 'postgresql+psycopg2://' + 'gpadmin' + ':' + 'gpadmin' + '@' + str('10.39.4.175') + '/' + 'ceshi') # 'postgresql+psycopg2://' + 'gpadmin' + ':' + 'gpadmin' + '@' + str('10.39.4.175') + '/' + 'ceshi') connection = db_engine.raw_connection() cursor = connection.cursor() cursor.execute('truncate table sz_take_all_process') cursor.execute('insert into sz_take_all_process select * from sz_take_all_process_union') print('union', datetime.datetime.now()) cursor.execute('truncate table sz_summary_defects') cursor.execute('insert into sz_summary_defects select * from sz_take_all_process_summary') print('summary', datetime.datetime.now()) connection.commit() cursor.close()else: pass
end = datetime.datetime.now()print("final is in ", end - start)