目标:把Oracle 下的数据文件迁移到 Hadoop , 数据表都是上亿条,每个数据表的空间都超过100G。
关键是建立数据文件是UTF8格式,这样数据中的汉字在Hadoop 里显示才正常。
平台:CentOS 6.10
工具:Python 2.7 cx_Oracle 库
Oracle 10.2.0.4.0
Hadoop 2.7.2-transwarp-5.2.1
1. 查看表结构
如果是分区表, 直接按分区导出 , 如果是非分区表,查看索引字段, 按索引字段分段导出。
记录DDL 语句。
2.编写Python 脚本。
oralce_lib.py
# coding:utf-8
# 服务器上的Oracle接口
# By 陈年椰子
import cx_Oracle
import socket
import os
import time
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
# 输出日志
def proc_log(log_info):
t_now = time.strftime('%H:%M:%S', time.localtime(time.time()))
log_str = t_now + log_info
print(log_str)
log_file = open('exp_work.log', 'a')
log_file.write("{}\n".format(log_str))
log_file.close()
# 连接XXX平台
def Conncet_JZXZ():
conn_dict = {'ip': '10.X.X.X', 'port': 1521, 'user': 'fx_user', 'pw': 'x123456x', 'sid': 'orcl'}
return Connect(conn_dict)
# 连接ORACLE
def Connect(conn_dict):
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.settimeout(3)
try:
sk.connect((conn_dict['ip'], conn_dict['port']))
except Exception as e:
proc_log('Server({0}) port {1} connect Fail!'.format(conn_dict['ip'], conn_dict['port']))
proc_log(repr(e))
return ""
sk.close()
try:
server_str = '{0}/{1}'.format(conn_dict['ip'], conn_dict['sid'])
oracle_db = cx_Oracle.connect(conn_dict['user'], conn_dict['pw'], server_str)
return oracle_db
except Exception as e:
proc_log("数据库Connect出错\n" + repr(e))
return ""
def pack_file(data_file):
command = "gzip {}".format(data_file)
if os.system(command) == 0:
proc_log('\t压缩{}成功'.format(data_file))
else:
proc_log('\t压缩{}失败'.format(data_file))
def exp_table_func_s(str_sql, file_name):
proc_log("{} -> {}".format(str_sql, file_name))
proc_log('\t开始获取数据')
try:
cnxn = Conncet_JZXZ()
cursor = cnxn.cursor()
cursor.execute(str_sql)
proc_log('\t开始写入数据')
w_i = 0
file_w = file_name
file_list = []
file_i = 0
for row in cursor:
l_str = ""
for x in row:
l_str = l_str + ',{}'.format('' if x is None else x)
w_i = w_i + 1
if w_i % 10000000 == 1:
if file_i > 0:
proc_log('\t关闭文件 {}'.format(file_w))
f.close()
pack_file(file_w)
file_i = file_i + 1
file_w = "{}_{}.{}".format(file_name[:-4], file_i, file_name[-3:])
f = open(file_w, 'w')
file_list.append(file_w)
proc_log('\t建立文件 {}'.format(file_w))
if w_i % 500000 == 0:
proc_log("\t\t写入第{}行".format(w_i))
f.write("{}\n".format(l_str[1:].replace("'", "").replace("None", "")))
f.close()
pack_file(file_w)
proc_log('\t写入数据完成,{}条'.format(w_i))
cursor.close()
cnxn.close()
# for f in file_list:
except Exception as e:
proc_log("导出平台数据出错" + repr(e))
return "fail"
return "ok"
exp.py
# coding=utf-8
"""
模块:批量导出平台数据
功能:
开发人员:陈年椰子
建立时间:2019/5/16
最后修改:2019/5/22
说明:
"""
import oracle_lib as ol
# 这里可以建立工作任务列表
db_src = '''select * from TBL_XXXXXX
where CLR_DATE < to_date('20190401','YYYYMMDD')
AND CLR_DATE >= to_date('20190101','YYYYMMDD') '''
ol.exp_table_func_s(db_src, 'tbl_q1.txt')
3. 运行导出脚本
直接用 python exp.py 或者用 nohup python exp.py & 挂到后台运行。
数据文件会压缩成gz 文件
4. 上传 hadoop 服务器
用 scp 工具上传
5. 上传到hadoop 目录
为了省事,也把在hadoop 下建目录,解压,上传文件写了脚本。
# coding:utf-8
# code by 陈年椰子 [email protected]
import os
import sys
def run_cmd(cmd):
p = os.popen(cmd)
x = p.read()
print(x)
p.close()
if len(sys.argv) < 3:
print("使用方法: python hadoop_table.py table_name txt_file_head [new]")
else:
table_name = sys.argv[1]
file_name = sys.argv[2]
create_table = sys.argv[3] if len(sys.argv)==4 else ''
if create_table == 'new':
print('建立新目录:{}'.format(table_name))
# 把youdir 替换成你的hadoop 用户目录名
cmd_list = [
'gunzip {}*.gz ./'.format(file_name),
'hadoop fs -put {}*.txt /user/youdir/{}'.format(file_name, table_name),
'hadoop fs -count -q -h /user/youdir/{}'.format(table_name),
'hadoop dfs -ls /user/youdir/{}'.format(table_name)]
if create_table == 'new':
create_cmd = 'hadoop fs -mkdir {}'.format(table_name)
run_cmd(create_cmd)
for c in cmd_list:
print('运行命令:{}'.format(c))
run_cmd(c)
运行脚本
新建目录 用
python hadoop_table.py 你的目录名 数据文件开头 new
已有目录 追加用
python hadoop_table.py 你的目录名 数据文件开头
6. 到SQL 界面去建立文本数据外表。 建立语句用 ORACLE里的DDL ,加上外表文件路径即可。
CREATE TABLE finance.T98_INT_ORG_APP_RELA_H
(
....
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/youdir/youtbldatadir';