版权声明:转载 请说明 https://blog.csdn.net/qq_38063791/article/details/82846702
mongdb
from pymongo import MongoClient
from bson import ObjectId
# _id是mongodb自动生成的id,其类型为ObjectId,想要使用就需要转换类型。
#help 远程的话 需要这样操作
#uri = "mongodb://%s:%s@%s" % (
#quote_plus(user), quote_plus(password), quote_plus(socket_path))
#client = MongoClient(uri)
conn = MongoClient() #本地可以不填
db = conn.test #连接测试数据库,没有则自动创建
##绑定 一个集合
def get_collection(collection_name):
return db.get_collection(collection_name)
##根据ID查找
def get_one_by_id(collection_name, id):
collect_cur = get_collection(collection_name)
return collect_cur.find_one({'_id': ObjectId(id)})
#查找一条信息
def get_one(collection_name, ifmation):
collect_cur = get_collection(collection_name)
return collect_cur.find_one(ifmation)
##查找所有信息
def get_all(collection_name, ifmation=None):
collect_cur = get_collection(collection_name)
return collect_cur.find(ifmation)
# 插入多条记录 ##这里的data 为数组
def insert_ifmation(collection_name, data):
collect_cur = get_collection(collection_name)
return collect_cur.insert_many(data)
# 更新一条信息
def update_one(collection_name, ifmation, data):
collect_cur = get_collection(collection_name)
return collect_cur.update_one(ifmation, {"$set": data})
# 更新所有信息
def update_many(collection_name, filter, data):
collect_cur = get_collection(collection_name)
return collect_cur.update_many(filter, {"$set": data})
##删除信息
def delete(collection_name, ifmation):
collect_cur = get_collection(collection_name)
return collect_cur.remove(ifmation)
##删除所有
def del_all(collection_name):
collect_del_all=get_collection(collection_name)
collect_del_all.remove()
if __name__ == '__main__':
# collection_name=get_collection(conn)
conn = MongoClient()
# 第二步 获取指定的数据库实例
db = conn.test # 连接study数据库,没有则自动创建
# 第三步 获取指定的集合
study = db.get_collection('text')
Mysql
import pymysql
建立游标 pymysql.connect(配置信息).cursor()
游标的常用方法:
1. close() 关闭
2. execute() 执行一条SQL语句
3. executemany() 执行多条SQL语句 传入一个列表
4. fetchall() 获取前面 前面必须使用execute() 进行查询 返回全部的信息
5. fetchmany(3) 可指定返回的条数
conn = pymysql.connect(
host='localhost',
port=33060,
user='root',
password='root',
db='test', #数据库名
charset='utf8'
)
class MySQL():
'''
MySQL数据库的操作
'''
##传入连接的数据库
def __init__(self, conn, table):
self.conn = conn
##建立游标
self.cou = self.conn.cursor()
##得到表名,数据
self.table = table
##执行你的sql语句
def finish_you(self, sql=None, *args, **kwargs):
if sql:
try:
self.cou.execute(sql)
self.conn.commit()
except Exception:
print('报错了')
self.conn.rollback()
else:
print('执行成功')
else:
pass
##查看,
def find_ifmations(self, number=None):
'''
:param number: 请输入查看信息的条数,默认查看全部
:return:
'''
##查看前必执行
sql = 'select * from {} '.format(self.table)
self.cou.execute(sql)
if not number:
li_messages = self.cou.fetchall()
for li_message in li_messages:
print('id:%d name:%s age:%d' % li_message)
else:
li1_messages = self.cou.fetchmany(number)
for li_message in li1_messages:
print('id:%d name:%s age:%d' % li_message)
print('查看完毕')
##获取表中一共有多少条数据
def get_numbers(self):
number_sql = 'select * from {}'.format(self.table)
numbers = self.cou.execute(number_sql)
print('{}这张表,一共有{}条数据'.format(self.table, numbers))
##增加多条
def insert_ifmations(self, contents):
##这里 是 字符串,
inserts_sql = "insert {} values (%s,%s,%s);".format(self.table)
try:
self.cou.executemany(inserts_sql, contents)
self.conn.commit()
except Exception:
print('报错了')
self.conn.rollback()
else:
print('成功插入数据')
##增加单条
def insert_ifmation(self, content):
inserts_sql = "insert {} values{};".format(self.table, content)
try:
self.cou.execute(inserts_sql)
self.conn.commit()
except Exception:
print('报错了')
self.conn.rollback()
else:
print('成功插入数据')
##删除信息
def del_ifmations(self, id):
del_sql = "delete from {} where id={};".format(self.table, id)
try:
self.cou.execute(del_sql)
self.conn.commit()
except Exception:
print('报错了')
self.conn.rollback()
else:
print('删除成功')
##更新信息
def update_ifmations(self, your_message, id):
update_sql = "update %s set name='%s' where id=%d" % (self.table, your_message, id)
try:
self.cou.execute(update_sql)
self.conn.commit()
except Exception:
print('报错了')
self.conn.rollback()
else:
print('更新成功')
##先关闭游标,再关闭数据库连接
def close_mysql(self):
self.cou.close()
self.conn.close()
print('关闭成功')
if __name__ == '__main__':
table = 'student'
content_one = "(12,'小小鸟',23)"
content_many = [(str(i), '大大鸟', '23') for i in range(45, 50)]
test = MySQL(conn, table, )
##测试
test.insert_ifmations(content_many)
test.close_mysql()