如何基于Python实现MySQL查询的API设计,附上完整脚本

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接: https://blog.csdn.net/yangjianrong1985/article/details/102597983


这是学习笔记的第 2134 篇文章


640?wx_fmt=gif

  我们在平时的工作中不可避免会有连接到数据库的操作,通常来说我们会使用基于Shell的方式,或者基于数据库驱动的连接方式,比如JDBC,ODBC,PyMySQL,MySQLdb等。

如果是基于Shell的方式,很容易出现一个使用瓶颈,那就是如果通过shell去查看一个表的数据,那么输出是没有规范的格式的,Shell执行是最简单最基本的调用模式,我们也可以利用数据库服务端的特性来输出相应的数据格式,但是基于数据库版本的差异,有些低版本是不支持输出一些格式的,所以使用Shell来输出SQL查询的结果显然不是一个通用而且优雅的实现方式。 

 如果使用数据库启动,基于Python的模式就是一种很不错的选择,我们可以开发一个Python脚本,然后把这个Python脚本使用RESTful API的模式包装起来,这样对外的服务就是API而不是单一的脚本,而且可移植性和扩展性也要好很多。 

大概的设计方式如下:

640?wx_fmt=png

我们因为版本的选型在这里使用的是基于MySQLdb的实现方式,我们来逐个分析一下。 

首先对于SQL查询来说,输出结果,执行时长,结果集行数等这些是我们关注的一些数据,要实现这个功能,实际上要实现一揽子细小的功能。

1)使用MySQLdb或者其他驱动的默认情况下,输出的结果都是只有数据,而没有相应的列名等信息,我们需要做一些补充操作,输出为字段名和字段值的映射,比如{"id":100,"name":"aaa"}这样的形式,

2)基于Python驱动的输出时间类型是映射datetime,对于前端处理来说也不够友好,所以我们要一并处理。

3)对于数据返回行数,默认是long行,我们需要转换为整型

4)对于返回结果,默认是unicode,字符'abc'会显示类似为 u'abc'这种,我们也需要做一层转换。

5)为了前端程序方便解析,我们需要补充一个字段列表,比如有字段id,name,我们就返回一个数组或者列表,包含这些字段。


#! /usr/bin/env python

# _#_ coding:utf-8 _*_


import MySQLdb

import MySQLdb.cursors

import datetime

import json

import sys


def query(db_port,db_name,sql):

    try:

        return_dict = {}

        starttime = datetime.datetime.now()

        conn = MySQLdb.connect(host='127.0.0.1', user='jeanron', passwd='xxxx', db=db_name, port=int(db_port), charset='utf8')

        cur = conn.cursor()

        cur.execute(sql)

        rows = cur.fetchall()

        endtime = datetime.datetime.now()

        return_dict["execution_seconds"] = (endtime - starttime).seconds


        index = cur.description

        result = []

        for res in rows:

            row = {}

            for i in range(len(index)):

                if type(res[i]) == datetime.datetime:

                    row[index[i][0]] = res[i].strftime('%Y-%m-%d %H:%M:%S')

                elif type(res[i]) == int or type(res[i]) == long:

                    row[index[i][0]] = int(res[i])

                elif type(res[i]) == unicode:

                    row[index[i][0]] = res[i].encode('utf8')

                else:

                    print type(res[i])

                    row[index[i][0]] = res[i]

            result.append(row)

        if len(result) > 0:

            column_list = result[0].keys()


        return_dict["column_list"] = column_list

        return_dict["data_rows"] = int(cur.rowcount)

        return_dict["table_data"] = result

        cur.close()

        conn.close()

        return json.dumps(return_dict)

    except MySQLdb.Error as e:

        print(e)


调用可以使用类似下面的代码形式:

sql_text="explain select * from test_data ;"

db_name='test'

db_port=int('3306')

测试没有问题止呕,很自然的可以使用参数化的形式:

sql_text=sys.argv[3]

db_port=sys.argv[1]

db_name=sys.argv[2]


为了封装为一个API,逻辑的部分实现有以下几个要点:

1)调用Python脚本,这里我们是通过Ansible的adhoc来实现的,当然也可以基于原生的paramiko等实现方式。

2)对于数据结果的返回,因为是一个调用-返回的逻辑关系,通过驱动有时候无法得到一些明细的信息,比如错误信息,我们就需要刻意处理几个场景,比如表不存在,SQL语法错误等。

实现的代码如下:


@api_view(['POST'])

def mysql_db_query(request):

    try:

        request_data = request.data.get("data")

        sql_text_raw = request_data.get("sql_content")

        ip_addr = request_data.get("ip_addr")

        db_port = request_data.get("db_port")

        db_name = request_data.get("db_name")


        sql_text = sql_text_raw.replace("=","\=")

        command = "/usr/local/DBA_SCRIPTS/mysql/mysql_db_query.py  %s  %s %s " % (db_port, db_name,"\""+sql_text+"\"")

        result = ansible_adhoc("dba_mysql", ip_addr, "script", command, True)

        print result

        if result['success'][ip_addr][0].find("doesn\'t exist")!= -1:

            return MyJsonResponse(data={}, code=str(status.HTTP_500_INTERNAL_SERVER_ERROR),

                                  message='query failed,table doesn\'t exists')

        elif result['success'][ip_addr][0].find("error in your SQL syntax")!= -1:

            return MyJsonResponse(data={}, code=str(status.HTTP_500_INTERNAL_SERVER_ERROR),

                                  message='query failed,there is an error in your SQL syntax')

        else:

            return_data=json.loads(result['success'][ip_addr][0])

        #print return_data

        return MyJsonResponse(data=return_data, code=str(status.HTTP_200_OK), message='success')


    except MySQLdb.Error as e:

        print(e)


API的输入参数样例为为:

{

    "data": {

        "ip_addr": "xxxx",

        "db_port": "4306",

        "db_name": "test",

        "sql_content":"select * from test_data where id>0"

    }

}


一个比较理想的输出样例如下:

{

    "message": "success",

    "code": "200",

    "data": {

        "data_rows": 10,

        "execution_seconds": 0,

        "column_list": [

            "cdate",

            "id",

            "name"

        ],

        "table_data": [

            {

                "cdate": "2019-10-14 18:49:16",

                "id": 1,

                "name": "aa"

            },

            {

                "cdate": "2019-10-14 18:49:16",

                "id": 2,

                "name": "bb"

            }


        ]

    },

    "pagenation": null

}


大家在使用中有问题和意见也欢迎随时反馈。



相关链接:

个人新书 《MySQL DBA工作笔记》


QQ群号:763628645

QQ群二维码如下,个人微信号:jeanron100, 添加请注明:姓名+地区+职位,否则不予通过


640?wx_fmt=png640?wx_fmt=png


在看,让更多人看到

猜你喜欢

转载自blog.csdn.net/yangjianrong1985/article/details/102597983