一、迁移MongoDB数据到MySQL
1、使用mongoexport导出.csv文件
mongodb自带mongoexport工具,可便捷导出csv、json等格式数据:
mongoexport -h 127.0.0.1 -u username -p password -d userInfoDB(数据库名称) -c regInfo(集合名称) -f _id,字段1,字段2 --type=csv -o /tmp/mongoStore/userInfo.csv(保存路径)
根据个人需要选择要导出的字段,此处可以无须导出_id字段
2、新建数据库和数据表
按照个人需要设计数据表结构。此处注意数据表的字段顺序必须一一对应于csv文件中首行的key,所以对应数据表可以暂时先不用设置自增id,否则数据导入后数据表字段对应的值会混乱。
3、新建csv导入mysql的sql脚本
创建load_csv.sql文件
load data local infile '/tmp/mongoStore/userInfo.csv'(修改为指定csv文件路径)
into table `userInfo`(修改为mysql中新建数据表名称) character set utf8
fields terminated by ',' optionally enclosed by '"'
lines terminated by '\n'
ignore 1 lines;
执行以下mysql load sql命令
mysql -uroot -pmysql -DuserInfoDB --default-character-set=utf8 --local-infile=1 < ~/load_csv.sql
这样数据就从迁移到了mysql
如果mongodb中键比较多,可以通过如下方式获取keys
比较麻烦的点在于导出csv文件的时候要选择字段和新建mysql表的时候也要写全字段。
如下方式可以比较快的获取字段列表:
从mongodb获取一条数据
$ mongo
> use userInfoDB
> db.regInfo.find().limit(1)
{ "_id" : ObjectId("5ac3ac86af5b4e34af40xxxx"), "regAuthority" : "XXXX", "entranceName" : 1, "have_data_flag" : 1, "orgNumber" : "091xxxx", "termStart" : "2014-02-12", "businessScope" : "咨询"}
以上数据复制到python解释器中,使用python命令获取所有key的列表
(_id的值不符合dict格式,此处删掉)
>>> import json
>>> s = """
... {"regAuthority" : "XXXX", "entranceName" : 1, "have_data_flag" : 1, "orgNumber" : "091xxxx", "termStart" : "2014-02-12", "businessScope" : "咨询"}"""
>>> s_dict = json.loads(s)
>>> s_dict.keys()
['have_data_flag', 'termStart', 'regAuthority', 'orgNumber', 'entranceName', 'businessScope']
这样就获得了keys
二、Python迁移MySQL数据到MongoDB
1、Python模块
执行如下命令即可:
pip install pymysql
pip install pymongo
2、脚本内容
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import pymysql,pymongo,time
# connect to mysql database
mysql = pymysql.connect(host='127.0.0.1', database='database', user='username', password='password')
cursor = mysql.cursor()
#connect to mongodb and obtain total lines in mysql
mongo = pymongo.MongoClient('mongodb://ip').database
mongo.authenticate('username',password='password')
cursor.execute('SELECT max(table_field) FROM table_name')
countlines = cursor.fetchall()
count = countlines[0]['max(table_field)']
#count = 300
print(count)
i = 0
j = 100
start_time = time.time()
#select from mysql to insert mongodb by 100 lines.
for i in range(0,count,100):
#print(a,b)
#print(i)
#print('SELECT * FROM quiz_submission where quiz_submission_id > %d and quiz_submission_id <= %d' %(i,j))
submission = mysql.query(f'SELECT * FROM table_name where table_field > %d and table_field <= %d' %(i,j))
#print(submission)
if submission:
#collection_name like mysql table_name
mongo.collection_name.insert_many(submission)
else:
i +=100
j +=100
continue
i +=100
j +=100
end_time = time.time()
deltatime = end_time - start_time
totalhour = int(deltatime / 3600)
totalminute = int((deltatime - totalhour * 3600) / 60)
totalsecond = int(deltatime - totalhour * 3600 - totalminute * 60)
#print(migrate data total time consuming.)
print("Data Migrate Finished,Total Time Consuming: %d Hour %d Minute %d Seconds" %(totalhour,totalminute,totalsecond))
cursor.close()
mysql.close()
三、使用pandas将MySQL的数据导入MongoDB
需求:把mysql的70万条数据导入到mongodb并去重,同时在第二列加入一个url字段,字段的值和第三列的值一样
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pandas as pd
from sqlalchemy import create_engine
from pymongo import MongoClient
import json
import time
class MongoBase:
def __init__(self, collection):
self.collection = collection
self.OpenDB()
def read_mysql(self):
engine = create_engine(
'mysql+pymysql://usernmae:passwd@ip:port/dbname?charset=utf8') # 用sqlalchemy创建引擎
start=time.time()
max_id=self.get_max_id()
df1 = pd.read_sql(f'select primary_key,phone,plat_code,crawl_time,jrjt_del_dt from test_info where primary_key>{max_id}', engine) # 从数据库中读取表存为DataFrame
end = time.time()
print("查询完毕条数",len(df1['phone']),"用时",end-start)
df1.drop_duplicates('phone', keep='first', inplace=True) #保持phone第一列位置不变
df1.insert(1, 'url', df1['phone']) #在phone列后面插入一个url列,值和phone的值一样
return df1
def OpenDB(self):
self.con = MongoClient(host=host)
self.db = self.con[self.collection]
self.collection = self.db['test']
def closeDB(self):
self.con.close()
def get_max_id(self):
max_id = self.collection.find().sort([('primary_key', -1)]).limit(1)[0]
if max_id:
return max_id.get("primary_key")
if __name__ == '__main__':
start=time.time()
mongo = MongoBase('spider_data')
df =mongo.read_mysql()
mongo.collection.insert(json.loads(df.T.to_json()).values())
mongo.closeDB()
end=time.time()
print("运行完成所用时",end-start)