文章目录
1. flask+pymysql操作mysql示例
from flask import Flask
import os
import pymysql
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
def connect_db():
conn = pymysql.connect(**{
'host':'ip',
'user':'root',
'password':'123456',
'port': 3306 ,
'database':'db01', #指定你的数据库名
})
return conn.cursor()
class DB:
def __init__(self) -> None:
self.conn = connect_db()
def query(self, sql):
self.conn.execute(sql)
return self.conn.fetchall()
def close(self):
self.conn.close
@app.route('/')
def index():
sql = 'select * from t_book where bookName = "python";'
r = DB().query(sql)
if len(r) !=0:
return '数据表有数据'
return '数据表为空'
if __name__ == '__main__':
app.run(debug=True)
2. ORM及flask_sqlalchemy
2.1 ORM简介
ORM
:(Object Relationship Mapping),模型关系映射优点
:面向对象思维,元编程实现。将一个表抽象成一个类,一条数据抽象成一个对象,操作数据库像操作对象一样。
- 参考文档
https://blog.csdn.net/qq_25672165/article/details/111364597
https://blog.csdn.net/qq_25672165/article/details/111290235
2.2 flask_sqlalchemy简介
- 配置
SQLALCHEMY_DATABASE_URI
:用于连接数据库SQLALCHEMY_BINDS
:用于绑定数据库SQLALCHEMY_ECHO
:用于调试,记录所有发到标准输出(stderr)的语句SQLALCHEMY_POOL_SIZE
:数据库连接池的大小,默认5SQLALCHEMY_POOL_TIMEOUT
:数据库连接池超时时间,默认10SQLALCHEMY_POOL_RECYCLE
:自动回收连接的秒数SQLALCHEMY_MAX_OVERFLOW
:控制在连接池达到最大值后可以创建的连接数SQLALCHEMY_TRACK_MODIFICATIONS
:默认True,会追踪对象的修改并且发送信号
- 常用数据类型
- Integer:整形
- Float:浮点类型
- Boolean:传递True/False进去。
- DECIMAL:定点类型。
- enum:枚举类型。
- Date:传递datetime.date()进去。
- DateTime:传递datetime.datetime()进去。
- Time:传递datetime.time()进去。
- String:字符类型,使用时需要指定长度,区别于Text类型。
- Text:文本类型。
- LONGTEXT:长文本类型。
- PickleType:持久化的python对象
- LargeBinary:任意大的二进制数据
- Column常用参数
- db.ForeignKey(’*’)
- default:默认值。
- nullable:可以为空。
- primary_key:主键。
- unique:唯一。
- index:索引
- autoincrement:自动增长。
- onupdate:更新的时候执行的函数。
- name:该属性在数据库中的字段映射。
- 绑定数据库
- mysql: mysql+pymysql://username:password@host:port/database?charset=utf8mb4
- 其他数据库绑定:http://www.pythondoc.com/flask-sqlalchemy/config.html#uri
- 同时绑定多个库:http://www.pythondoc.com/flask-sqlalchemy/binds.html
3. flask_sqlalchemy使用
3.1 DDL建表
- 方式一:命令行方式
# config.py
# 连接数据库字符串
# dialect+driver://username:password@host:port/database?charset=utf8mb4
dialect = 'mysql'
driver = 'pymysql'
username = 'root'
password = '123456'
host = '47.96.67.53'
port = 3306
database = 'db01'
SQLALCHEMY_DATABASE_URI = '{}+{}://{}:{}@{}:{}/{}?charset=utf8mb4'.format(dialect, driver, username, password, host,port, database)
SQLALCHEMY_TRACK_MODIFICATIONS = False
# app.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import config
app = Flask(__name__)
app.config.from_object(config) # 配置mysql连接
db = SQLAlchemy(app) # 绑定app和db
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
email = db.Column(db.String(120), unique=True)
def create_all():
db.create_all()
$ set set FLASK_APP=app.py
$ flask shell # 进入shell
>>> from app import db
>>> db.create_all()
- 方式二:migrate 方式建表
# app.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
import config
app = Flask(__name__)
app.config.from_object(config) # 配置mysql连接
db = SQLAlchemy(app) # 绑定app和db
migrate = Migrate(app, db)
# 定义数据库结构
class User(db.Model):
__tablename__ = 'user' # 定义表名,不写表名则为类名小写
id = db.Column(db.Integer,primary_key=True,autoincrement=True)
username = db.Column(db.String(100),nullable=False)
class Article(db.Model):
__tablename__ = 'article' # 定义表名,不写表名则为类名小写
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(100),nullable=False)
content = db.Column(db.Text,nullable=False)
author_id = db.Column(db.Integer,db.ForeignKey('user.id'))
def create_all():
db.create_all
$ set FLASK_APP=app.py
$ flask db init # 初始化
$ flask db migrate # 生成脚本
$ flask db upgrade # 执行脚本
$ flask db downgrade # 回滚
3.2 DML操作
- insert
# insert一条数据
user1 = User(username='lisi')
db.session.add(user1)
db.session.commit()
# insert多条数据
article1 = Article(title='ccc',content='ddd', author_id=1)
article2 = Article(title='eee',content='fff', author_id=1)
db.session.add_all([article1,article2])
# 事务提交
db.session.commit()
- select
- query后可以接链式调用:eg:query.filter(). filter()…
- 参考文档:https://blog.csdn.net/weixin_41829272/article/details/80609968
# filter_by
article1 = Article.query.filter_by(title = 'aaa').first()
print('title:%s' % article1.title)
# filter(表达式)
# select * from article where article.title='aaa';
article1 = Article.query.filter(Article.title == 'aaa').first()
print('title:%s' % article1.title)
# select * from article where article.title='aaa';
article1 = Article.query.filter(Article.title.startswith('a')).first()
print('title:%s' % article1.title)
- update
# 1. 先把你要更改的数据查找出来
article1 = Article.query.filter(Article.title == 'updates_title').first()
# 2. 把这条数据,你需要修改的地方进行修改
article1.title = 'aaa'
# 3. 做事务的提交
db.session.commit()
- delete
# 1. 把需要删除的数据查找出来
article1 = Article.query.filter(Article.content == 'bbb').first()
# 2. 把这条数据删除掉
db.session.delete(article1)
# 3. 做事务提交
db.session.commit()
3.3 使用原生sql
sql = "select * from article where article.title='aaa';"
article1 = db.session.execute(sql)
data = article1.fetchall()
# 参数化
sql = "select * from article where article.title=:title;"
article1 = db.session.execute(sql, params={
"title": "aaa"})
data = article1.fetchall()
3.4 表数据模型
1对多
class User(db.Model):
# __tablename__ : 定义表名,不写表名则为类名小写
__tablename__ = 'user'
id = db.Column(db.Integer,primary_key=True,autoincrement=True)
username = db.Column(db.String(100),nullable=False)
class Article(db.Model):
__tablename__ = 'article'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(100),nullable=False)
content = db.Column(db.Text,nullable=False)
author_id = db.Column(db.Integer,db.ForeignKey('user.id'))
# 给`Article`这个模型添加一个`author`属性,可以访问这篇文章的作者的数据,像访问普通模型一样。`backref`是定义反向引用,可以通过`User.articles`访问这个模型所写的所有文章。
author = db.relationship('User',backref=db.backref('articles'))
多对多(中间表无额外参数列)
- backref----lazy的参数详解
- lazy不同参数值参数取不同值(dynamic, joined, select)
- select------默认,加载该属性的数据并返回列表
- joined-----关联的两个表进行获取到所有相关的对象
- dynamic-----返回一个query对象, 需要.all()等方法获取对象,且可以使用filter()过滤
- 参考文档:https://blog.csdn.net/qq_34146899/article/details/52559747
# 表1 中间表
# secondary只能接收 `db.Table`对象,不能通过`class`的方式实现
article_tag = db.Table('article_tag',
db.Column('article_id', db.Integer, db.ForeignKey('article.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tag.id'), primary_key=True))
# 表2
class Tag(db.Model):
__tablename__ = 'tag'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(64), nullable=False)
# 表3
class Article(db.Model):
__tablename = 'article'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(128), nullable=False)
content = db.Column(db.Text, nullable=False)
# 设置关联-关键字参数`secondary=中间表`来进行关联
tags = db.relationship('Tag',secondary=article_tag,backref=db.backref('articles'))
db.create_all()
- 添加数据:
article1 = Article(title='aaa', content='content1')
article2 = Article(title='bbb', content='content2')
tag1 = Tag(name='111')
tag2 = Tag(name='222')
article1.tags.append(tag1)
article1.tags.append(tag2)
article2.tags.append(tag1)
article2.tags.append(tag2)
db.session.add_all([article1, article2, tag1, tag2])
db.session.commit()
- 访问数据:
article1 = Article.query.filter(Article.title == 'aaa').first()
tags = article1.tags
for tag in tags:
print(tag.name)
多对多(需要获取额外数据)
- 建表
class ArticleTag(db.Model):
__tablename__ = 'article_tag'
article_id = db.Column('article_id', db.Integer, db.ForeignKey('article.id'), primary_key=True)
tag_id = db.Column('tag_id', db.Integer, db.ForeignKey('tag.id'), primary_key=True)
# 额外新增的列表字段 此时无法使用db.Table
yn = db.Column('yn', db.Integer)
class Tag(db.Model):
__tablename__ = 'tag'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(64), nullable=False)
articles = db.relationship('ArticleTag', backref='tags')
class Article(db.Model):
__tablename = 'article'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(128), nullable=False)
content = db.Column(db.Text, nullable=False)
tags = db.relationship('ArticleTag', backref='articles')
- 访问
db.create_all()
article1 = Article.query.filter(Article.title == 'aaa').first()
print(article1.tags)
for tag in article1.tags:
print(tag.yn)
print(tag.articles)
>
1
<Article 1>
1
<Article 1>
多对多—自引用
- 参考文档
https://blog.csdn.net/ying847782627/article/details/51355714
多对多关联查询
data = db.session.query(Article, Tag).filter(Article.id == Tag.id).all()
4. flask_script使用
简单命令
- 命令封装为函数
from flask_script import Manager
from flask import Flask
app = Flask(__name__)
manager = Manager(app)
# 和数据库相关的操作,我都放在一起
@manager.command
def runserver():
print('服务器跑起来了!!!!!')
if __name__ == '__main__':
manager.run()
- 命令运行:
python file_name func_name
python demo.py runserver
>服务器跑起来了!!!!!
子命令
from flask import Flask
from sss import xmanager
from flask_script import Manager
app = Flask(__name__)
manager = Manager(app)
@manager.command
def runserver():
print('服务器跑起来了!!!!!')
# demo为主命令, init为子命令
manager.add_command('demo', xmanager)
if __name__ == '__main__':
manager.run()
# sss.py
from flask_script import Manager
xmanager = Manager()
@xmanager.command
def init():
print('初始化成功!!!!!')
- 命令运行:
python file_name blind_name func_name
# 运行主命令
python demo.py runserver
>服务器跑起来了!!!!!
# 运行子命令
python demo.py demo init
>初始化成功!!!!!