python使用pymysql总是超时的解决方案

可以使用如下方法对代码进行包裹:

import pymysql
import yaml
from loguru import logger

mysql_config = {
    
    
        "db": "my_db",
        "host": "127.0.0.1",
        "user": "test_user",
        "password": "test_password",
    }

def _query(sql_string, fetchone=False):
    """查询操作"""
    select_max_num = 5 # 重复查询的最多次数
    while True:
        conn = None
        cursor = None
        try:
            conn = pymysql.connect(**mysql_config)
            cursor = conn.cursor()
            cursor.execute(f"""{
      
      sql_string}""")
            if fetchone:
                all_result = cursor.fetchone()
            else:
                all_result = cursor.fetchall()  # 得到所有结果
            return all_result
        except Exception as e:
            logger.exception(e)
        finally:
            if cursor:
                cursor.close()
            if conn:
                conn.close()
        select_max_num -= 1
        if select_max_num <= 0:
            return None
        time.sleep(0.5)

使用如上代码,当出现意外时,会等待0.5s后重新进行查询

如果系统定型并且sql无误的时候,使用如上程序便可以确保连续5次查询,如果5次全部失败,则会返回None

猜你喜欢

转载自blog.csdn.net/weixin_35757704/article/details/132203168