mysql之sqlalchemy
安装
pip install SQLAlchemy
查看安装版本
import sqlalchemy
sqlalchemy.__version__
连接数据库
from sqlalchemy import create_engine
import pymysql
pymysql.install_as_MySQLdb()
DB_CON_STR = 'mysql+mysqldb://<user>:<password>@<host_ip>/<database>?[params]'
engine = create_engine(DB_CON_STR, echo=False)
SQL表达式
定义和创建数据表
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
metadata = MetaData()
users = Table(
"users", metadata,
Column("id", Integer, primary_key=True),
Column("name", String(36)),
Column('fullname', String(36)),
)
address = Table(
"address", metadata,
Column("id", Integer, primary_key=True),
Column("user_id", Integer, ForeignKey('users.id')),
Column("email_address", String(120), nullable=False)
)
metadata.create_all(engine)
插入语句
ins = users.insert()
# 使用 str() 函数查看生成的语句
str(ins)
# 使用 values()限定插入参数
ins = ins.values(name='jack', fullname='Jack Jones')
str(ins) # 放入的数据并没有呈现出来
# 将语句编译之后,通过 params查看参数
ins.compile().params
执行
# 先获取数据库连接
conn = engine.connect()
result = conn.execute(ins)
# result(ResultProxy) 类似于游标,我可以从中获取重要信息
result.inserted_primary_key # 获取生成的主键
执行多条语句
# insert 可以不用和 values 一起使用
ins = users.insert()
conn.execute(ins, id=2, name='wendy', fullname='Wendy Williams')
conn.execute(ins, name='xiaoming', fullname='xiaoming.li')
# 使用字典列表插入多条数据
conn.execute(ins, [
{"name": 'xiaolu', 'fullname': "小璐.李"},
{"name": 'rongrong', 'fullname': "容.马"}
])
conn.execute(address.insert(), [
{'user_id': 1, 'email_address' : '[email protected]'},
{'user_id': 1, 'email_address' : '[email protected]'},
{'user_id': 2, 'email_address' : '[email protected]'},
{'user_id': 2, 'email_address' : '[email protected]'},
])
查询
from sqlalchemy.sql import select
s = select([users])
result = conn.execute(s) # result和游标类似
# 迭代数据
for row in result:
print(row)
# 像通过列名访问row中的数据
row = result.fetchone()
print("name:", row['name'], "; fullname:", row['fullname'])
# 通过索引访问
print("name:", row[1], "; fullname:", row[2])
# 通过 Column对象访问
print("name:", row[users.c.name], "; fullname:", row[users.c.fullname])
# 用完之后该关闭 result
result.close()
# 指定 select 的列
s = select([users.c.name, users.c.fullname])
...
多表查询
# 下面的语句会产生笛卡尔积
for row in conn.execute(select([users, address])):
print(row)
# where 条件
s = select([users, address]).where(users.c.id == address.c.user_id)
str(users.c.id == address.c.user_id) # > 'users.id = address.user_id'
运算符
print(users.c.id == addresses.c.user_id) # > users.id = addresses.user_id
print(users.c.id == 7) # users.id = :id_1
(users.c.id == 7).compile().params # > {u'id_1': 7}
print(users.c.id != 7) # users.id != :id_1
print(users.c.name == None) # users.name IS NULL
print('fred' > users.c.name) # users.name < :name_1
print(users.c.name + users.c.fullname) # users.name || users.fullname,字符串拼接
print((users.c.name + users.c.fullname).compile(bind=create_engine('mysql://')))
# concat(users.name, users.fullname),会自动转换成对应驱动环境的语义。
#如果真有不可用的运算符 可以使用 Operators.op()方法
print(users.c.name.op('tiddlywinks')('foo')) # users.name tiddlywinks :name_1
users.c.id.op('&')(0xFF) # users.id & :id_1
conn.execute(select([users]).where(users.c.name.op('like')("%xiaolu%")))
连词
from sqlalchemy.sql import and_, or_, not_
print(and_(
users.c.name.like('j%'),
users.c.id == address.c.user_id,
or_(
address.c.email_address == '[email protected]',
address.c.email_address == '[email protected]'
),
not_(users.c.id > 5)
)
)
# 使用 & |
print(users.c.name.like('j%') & (users.c.id == addresses.c.user_id) &
(
(addresses.c.email_address == '[email protected]') |
(addresses.c.email_address == '[email protected]')
)
& ~(users.c.id>5)
)
#users.name LIKE :name_1 AND users.id = address.user_id AND (address.email_address = :email_address_1 OR address.email_address = :email_address_2) AND users.id <= :id_1
# 使用 between()[产生一个BETWEEN子句] 和 label() [使用AS 关键字产生标签]
print(select([users.c.fullname.label('n')]).where(users.c.id.between(10, 20)))
'''
SELECT users.fullname AS n
FROM users
WHERE users.id BETWEEN :id_1 AND :id_2
'''
别名和子查询
a1 = addresses.alias()
a2 = addresses.alias()
s = select([users]).\
where(and_(
users.c.id == a1.c.user_id,
users.c.id == a2.c.user_id,
a1.c.email_address == '[email protected]',
a2.c.email_address == '[email protected]'
))
'''
SELECT users.id, users.name, users.fullname
FROM users, addresses AS addresses_1, addresses AS addresses_2
WHERE users.id = addresses_1.user_id
AND users.id = addresses_2.user_id
AND addresses_1.email_address = ?
AND addresses_2.email_address = ?
('[email protected]', '[email protected]')
'''
addresses_subq = s.alias()
s = select([users.c.name]).where(users.c.id == addresses_subq.c.id)
'''
SELECT users.name
FROM users,
(SELECT users.id AS id, users.name AS name, users.fullname AS fullname
FROM users, addresses AS addresses_1, addresses AS addresses_2
WHERE users.id = addresses_1.user_id AND users.id = addresses_2.user_id
AND addresses_1.email_address = ?
AND addresses_2.email_address = ?) AS anon_1
WHERE users.id = anon_1.id
('[email protected]', '[email protected]')
'''
使用连接
print(users.join(addresses)) # users JOIN addresses ON users.id = addresses.user_id
print(users.join(addresses, addresses.c.email_address.like(users.c.name + '%' )))
# users JOIN addresses ON addresses.email_address LIKE users.name || :name_1
绑定参数对象
from sqlalchemy.sql import bindparam
s = users.select(users.c.name == bindparam('username'))
conn.execute(s, username='wendy').fetchall()
排序,分组,限制,偏移…
stmt = select([users.c.name]).order_by(users.c.name)
stmt = select([users.c.name]).order_by(users.c.name.desc())
stmt = select([users.c.name, func.count(addresses.c.id)]).\
select_from(users.join(addresses)).\
group_by(users.c.name)
stmt = select([users.c.name, func.count(addresses.c.id)]).\
select_from(users.join(addresses)).\
group_by(users.c.name).\
having(func.length(users.c.name) > 4)
stmt = select([users.c.name]).\
where(addresses.c.email_address.
contains(users.c.name)).\
distinct()
stmt = select([users.c.name, addresses.c.email_address]).\
select_from(users.join(addresses)).\
limit(1).offset(1)
更新和删除
stmt = users.update().values(fullname="Fullname: " + users.c.name).where(xxx)
# 多次执行
stmt = users.insert().values(name=bindparam('_name') + " .. name")
conn.execute(stmt, [
{'id':4, '_name':'name1'}, # name1..name ,连接
{'id':5, '_name':'name2'},
{'id':6, '_name':'name3'},
])
stmt = users.update().\
where(users.c.name == bindparam('oldname')).\
values(name=bindparam('newname'))
conn.execute(stmt, [
{'oldname':'jack', 'newname':'ed'},
{'oldname':'wendy', 'newname':'mary'},
{'oldname':'jim', 'newname':'jake'},
])
相关更新
stmt = select([addresses.c.email_address]).\
where(addresses.c.user_id == users.c.id).\
limit(1)
conn.execute(users.update().values(fullname=stmt))
删除
conn.execute(users.delete().where(users.c.name > 'm'))
匹配的行数
result.rowcount
引用自: https://docs.sqlalchemy.org/en/13/core/tutorial.html
ORM
声明一个映射
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
定义一个类
from sqlalchemy import Column, Integer, String
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(30))
fullname = Column(String(60))
nickname = Column(String(60))
def __repr__(self):
return "<User(name='%s', fullname='%s', nickname='%s')>" % (self.name, self.fullname, self.nickname)
Base.metadata.create_all(engine)
创建一个对象
ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname')
创建会话
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
# 实例化一个Session
session = Session()
添加和更新对象
ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname')
session.add(ed_user)
# 我们可以User使用一次添加更多对象 add_all():
session.add_all([
User(name='wendy', fullname='Wendy Williams', nickname='windy'),
User(name='mary', fullname='Mary Contrary', nickname='mary'),
User(name='fred', fullname='Fred Flintstone', nickname='freddy')])
# 改名
ed_user.nickname = 'eddie'
print(ession.dirty) # 有一个对象被修改
print(session.new) # 有三个对象等待处理
session.commit() # 同步到数据库
回滚
ed_user.name = 'Edwardo'
fake_user = User(name='fakeuser', fullname='Invalid', nickname='12345')
session.add(fake_user)
session.rollback()
print(ed_user.name) # u'ed'
print(fake_user in session) # False
查询
for instance in session.query(User).order_by(User.id):
print(instance.name, instance.fullname)
# ...
for name, fullname in session.query(User.name, User.fullname):
...
过滤器运算符
-
equals
query.filter(User.name == 'ed')
-
not equals
query.filter(User.name != 'ed')
-
like/ilike
query.filter(User.name.like('%ed%'))
-
in
query.filter(User.name.in_(['ed', 'wendy', 'jack']))
-
not in
query.filter(~User.name.in_(['ed', 'wendy', 'jack']))
-
is null / is not null
query.filter(User.name == None) query.filter(User.name != None)
-
AND
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
-
OR
from sqlalchemy import or_ query.filter(or_(User.name == 'ed', User.name == 'wendy'))
返回
-
all()
query = session.query(User).filter(User.name.like('%ed')).order_by(User.id) query.all()
-
first()
query.first()
-
one()
user = query.one() # 找到多行 或 0行 抛异常
计数
session.query(User).filter(User.name.like('%ed')).count()
关系
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship("User", back_populates="addresses")
def __repr__(self):
return "<Address(email_address='%s')>" % self.email_address
>>> User.addresses = relationship(
"Address", order_by=Address.id, back_populates="user")
Base.metadata.create_all(engine)
相关对象
jack = User(name='jack', fullname='Jack Bean', nickname='gjffdd')
jack.addresses = [
Address(email_address='[email protected]'),
Address(email_address='[email protected]')]
session.add(jack)
session.commit()
# 查询
jack = session.query(User).filter_by(name='jack').one()
print(jack.addresses)
连接
for u, a in session.query(User, Address).filter(User.id==Address.user_id).filter(Address.email_address=='[email protected]').all():
...
删除
# 先查询 --> 后删除
session.delete(jack)
session.commit()
多对多关系
class BlogPost(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
headline = Column(String(255), nullable=False)
body = Column(Text)
# many to many BlogPost<->Keyword
keywords = relationship('Keyword',
secondary=post_keywords,
back_populates='posts')
def __init__(self, headline, body, author):
self.author = author
self.headline = headline
self.body = body
def __repr__(self):
return "BlogPost(%r, %r, %r)" % (self.headline, self.body, self.author)
class Keyword(Base):
__tablename__ = 'keywords'
id = Column(Integer, primary_key=True)
keyword = Column(String(50), nullable=False, unique=True)
# many to many BlogPost<->Keyword
posts = relationship('BlogPost',
secondary=post_keywords,
back_populates='keywords')
def __init__(self, keyword):
self.keyword = keyword
引用自:https://docs.sqlalchemy.org/en/13/orm/tutorial.html