前言:当我们用pymysql链接数据库读取少量的数据时一条一条的读取数据并没有什么问题,但当数据量到百万级的时候就可能会出现超过数据库最大连接时间,内存溢出等问题。所以在用python操作数据库的时候往往会新建一个数据库连接类,实现对数据库的各种操作,并加上分页处理。
话不多说直接上代码
数据库链接类:
import pymysql
from setting import DB_CONFIG
class SQLManager(object):
# 连接数据库
# 将conn,cursor作为类的属性,通过connect方法触发生成
def __init__(self):
self.conn = None
self.cur = None
self.connect()
def connect(self):
self.conn = pymysql.connect(
host=DB_CONFIG['host'],
port=DB_CONFIG['port'],
user=DB_CONFIG['user'],
password=DB_CONFIG['password'],
db=DB_CONFIG['db'],
charset=DB_CONFIG['charset']
)
# 检查连接是否存在,断开的话会重连。
self.conn.ping(reconnect=True)
# 链接mysql 生成游标对象
self.cur = self.conn.cursor()
def db_limit(self, sql, offset, number=1000):
"""
# 分条查询 默认一次1000条
:param offset: 偏移量 1000的倍数
:param number: 每次获取1000条
:return:
"""
try:
self.conn.begin()
limit_param = f'limit {offset}, {number}'
ex_sql = " ".join([sql, limit_param])
self.cur.execute(ex_sql) # 修改,由arge改为*args
db_limit_res = self.cur.fetchall()
self.conn.commit()
print('数据库操作成功!')
return db_limit_res
except pymysql.Error as e:
print('数据库操作失败:' + str(e))
self.conn.rollback()
return False
def single_db(self, sql):
try:
self.conn.begin()
self.cur.execute(sql) # 修改,由arge改为*args
self.conn.commit()
print('数据库操作成功!')
return self.cur.fetchone()[0]
except pymysql.Error as e:
print('数据库操作失败:' + str(e))
self.conn.rollback()
return False
# 单条增删改数据,创建表
def modify(self, sql, *args):
try:
self.conn.begin()
self.cur.execute(sql, *args) # 修改,由arge改为*args
self.conn.commit()
print('数据库操作成功!')
return True
except pymysql.Error as e:
print('数据库操作失败:' + str(e))
self.conn.rollback()
return False
# 关闭数据库cursor和连接
def __del__(self):
self.cur.close()
self.conn.close()
print('数据库成功断开链接!')
业务实现类
from math import ceil
from common.sqlCon import SQLManager
import requests
class ToDb():
def __init__(self):
self.db = SQLManager()
self.sql = "select * from `table`"
self.url = 'http://127.0.0.1:8000/enter/create_and_update_data/insertOrUpdate'
def count_db(self):
"""
查询要读取的数据库总数
:return: offsets 用总数对1000向上取整
"""
sql = self.sql
res = self.db.single_db(sql)
return ceil(res / 1000)
def read_db(self):
"""
读数据库
:param read_db_name: 读取的数据库名
:return: 单条数据
"""
offsets = self.count_db()
for offset in range(0, offsets + 1):
db_limit_res = self.db.db_limit(sql=self.sql, offset=offset * 1000)
for limit_one in db_limit_res:
yield limit_one
def read_db_one(self):
read_db = self.read_db()
for limit_one in read_db:
# time.sleep(2)
print(limit_one)
try:
self.post_db(limit_one)
except Exception as e:
print(e)
with open('../logs/log.txt', 'a+', encoding='utf-8') as f:
f.write(str(limit_one) + '\n' + str(e) + '\n')
def post_db(self, limit_one):
"""
根据自己的需要去实现
:param limit_one: 数据库读出的单条数据
:return:
"""
pass
if __name__ == '__main__':
ToDb = ToDb()
ToDb.read_db_one()