Python读取excel创建10000行mysql测试表

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/sunyuhua_keyboard/article/details/85234317

任务如下:
创建一个表,里面有10000万测试数据,可以最好写成脚本,一遍不断的修改
思路:
1、使用excel的编辑功能,弄10000行insert into的语句
2、整理一个初步的excel,写脚本读取excel,使用excel来来控制建表和测试数据。
方法:
采用第二条
步骤:
1、编辑excel,第一行:字段名,第二行:字段类型;第三行:字段长度;第四行:not null
在这里插入图片描述
2、第二步编写Python代码,读取excel,完成数据的读取和插入操作


import pymysql

import xlrd

import traceback
#读取文件
data=xlrd.open_workbook('data.xlsx')
#通过索引顺序获取
table=data.sheets()[0]
#获取行数
nrows=table.nrows
#获取列数
ncols=table.ncols

table_cell_date=[]

create_sql_part=''
sql_insert=''
#获取第一行的数据
rows_first_name = table.row_values(0)
#循环第一行,获取第二行,第三行对应的类型及其字段长度
for i in  range(len(rows_first_name)):
    table_name=table.row(0)[i].value
    table_type=table.row(1)[i].value
    tbale_type_length=table.row(2)[i].value
    table_not_null=table.row(3)[i].value
    create_sql_part=create_sql_part+"`"+table_name+"` "+table_type+"("+str(int(tbale_type_length))+") "+table_not_null+","
    sql_insert=sql_insert+table_name+","
create_sql="CREATE TABLE IF NOT EXISTS TEST_DB ("+create_sql_part.rstrip(',')+")"
print("create_sql===="+create_sql)
#打开数据库连接
db =pymysql.connect("192.168.66.102","geexek","neusoft","TESTDB",charset='utf8')
#使用 cursor()方法创建游标对象cursor
cursor=db.cursor()

#使用execte()方法执行sql,如果表存在则删除
cursor.execute(create_sql)



sql_insert_db=""
sql_insert_db_part=""
#获取从第五行到最后一行的数据,然后构建插入语句,然后到数据库中。
for i in range(4,nrows):
    cells_date = table.row_values(i)
    for j in range(len(cells_date)):
        ctype=table.cell(i, j).ctype
        cell=table.cell_value(i, j)
        if ctype == 2 and cell % 1 == 0:
            cell_value=int(cell)
        else:
            cell_value="'"+str(cell)+"'"
        sql_insert_db_part=sql_insert_db_part+str(cell_value)+","
    sql_insert_db=sql_insert_db+"("+sql_insert_db_part.rstrip(',')+"),"
    sql_insert_db_part=""
sql_insert_head = "INSERT INTO TEST_DB (" + sql_insert.rstrip(',') + ") VALUES "+sql_insert_db.rstrip(',')

print("sql_insert_head="+sql_insert_head)
try:
    #执行sql语句
    # 读取数据,然后循环插入
    for k in range(1000):
        cursor.execute(sql_insert_head)
        #提交到数据库执行
        db.commit()
except:
    #如果发生错误则回滚
    traceback.print_exc()
    db.rollback()
finally:
    #关闭数据库连接
    db.close()

第三步、查看数据库执行结果
在这里插入图片描述

另外一种方法:

import pymysql
import gevent
import time


class MyPyMysql:
    def __init__(self, host, port, username, password, db, charset='utf8'):
        self.host = host          # mysql主机地址
        self.port = port          # mysql端口
        self.username = username  # mysql远程连接用户名
        self.password = password  # mysql远程连接密码
        self.db = db              # mysql使用的数据库名
        self.charset = charset    # mysql使用的字符编码,默认为utf8
        self.pymysql_connect()    # __init__初始化之后,执行的函数

    def pymysql_connect(self):
        # pymysql连接mysql数据库
        # 需要的参数host,port,user,password,db,charset
        self.conn = pymysql.connect(host=self.host,
                                    port=self.port,
                                    user=self.username,
                                    password=self.password,
                                    db=self.db,
                                    charset=self.charset
                               )
        # 连接mysql后执行的函数
        self.asynchronous()

    def run(self, nmin, nmax):
        # 创建游标
        self.cur = self.conn.cursor()
        
        # 定义sql语句,插入数据id,name,gender,email
        sql = "insert into userinfo(id,name,gender,email) values (%s,%s,%s,%s)"

        # 定义总插入行数为一个空列表
        data_list = []
        for i in range(nmin, nmax):
            # 添加所有任务到总的任务列表
            result = (i, 'zhangsan' + str(i), 'male', 'zhangsan' + str(i) + '@qq.com')
            data_list.append(result)
            
        # 执行多行插入,executemany(sql语句,数据(需一个元组类型))
        content = self.cur.executemany(sql, data_list)
        if content:
             print('成功插入第{}条数据'.format(nmax-1))
            
        # 提交数据,必须提交,不然数据不会保存
        self.conn.commit()


    def asynchronous(self):
        # g_l 任务列表
        # 定义了异步的函数: 这里用到了一个gevent.spawn方法
        max_line = 10000  # 定义每次最大插入行数(max_line=10000,即一次插入10000行)
        g_l = [gevent.spawn(self.run, i, i+max_line) for i in range(1, 3000001, max_line)]

        # gevent.joinall 等待所以操作都执行完毕
        gevent.joinall(g_l)
        self.cur.close()  # 关闭游标
        self.conn.close()  # 关闭pymysql连接


if __name__ == '__main__':
    start_time = time.time()  # 计算程序开始时间
    st = MyPyMysql('192.168.66.102', 3306, 'geexek', 'neusoft', 'db20')  # 实例化类,传入必要参数
    print('程序耗时{:.2f}'.format(time.time() - start_time))  # 计算程序总耗时

猜你喜欢

转载自blog.csdn.net/sunyuhua_keyboard/article/details/85234317