表操作
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
Column
from
sqlalchemy
import
Integer,String,Text,Date,DateTime
from
sqlalchemy
import
create_engine
Base
=
declarative_base()
class
Users(Base):
__tablename__
=
'users'
id
=
Column(Integer, primary_key
=
True
)
name
=
Column(String(
32
), index
=
True
, nullable
=
False
)
def
create_all():
engine
=
create_engine(
max_overflow
=
0
,
# 超过连接池大小外最多创建的连接
pool_size
=
5
,
# 连接池大小
pool_timeout
=
30
,
# 池中没有线程最多等待的时间,否则报错
pool_recycle
=
-
1
# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine)
def
drop_all():
engine
=
create_engine(
max_overflow
=
0
,
# 超过连接池大小外最多创建的连接
pool_size
=
5
,
# 连接池大小
pool_timeout
=
30
,
# 池中没有线程最多等待的时间,否则报错
pool_recycle
=
-
1
# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine)
if
__name__
=
=
'__main__'
:
create_all()
|
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from
sqlalchemy.orm
import
sessionmaker
from
sqlalchemy
import
create_engine
from
models
import
Users
# 创建引擎
engine
=
create_engine(
max_overflow
=
0
,
# 超过连接池大小外最多创建的连接
pool_size
=
5
,
# 连接池大小
pool_timeout
=
30
,
# 池中没有线程最多等待的时间,否则报错
pool_recycle
=
-
1
# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 根据引擎创建session工厂
SessionFactory
=
sessionmaker(bind
=
engine)
# 用session工厂创建一个session对象
session
=
SessionFactory()
..........
# 根据Users类对users表进行增删改查
..........
# 关闭session
session.close()
|
行操作
增
1
2
3
4
5
6
7
8
9
|
obj
=
Users(name
=
'alex'
)
session.add(obj)
session.commit()
session.add_all([
Users(name
=
'小东北'
),
Users(name
=
'龙泰'
)
])
session.commit()
|
删
1
2
|
session.query(Users).
filter
(Users.
id
>
=
2
).delete()
session.commit()
|
改
1
2
3
4
|
session.query(Users).
filter
(Users.
id
=
=
4
).update({Users.name:
'东北'
})
session.query(Users).
filter
(Users.
id
=
=
4
).update({
'name'
:
'小东北'
})
session.query(Users).
filter
(Users.
id
=
=
4
).update({
'name'
:Users.name
+
"DSB"
},synchronize_session
=
False
)
session.commit()
|
查
1
2
3
4
5
6
7
8
9
10
|
result
=
session.query(Users).
all
()
for
row
in
result:
print
(row.
id
,row.name)
result
=
session.query(Users).
filter
(Users.
id
>
=
2
)
for
row
in
result:
print
(row.
id
,row.name)
result
=
session.query(Users).
filter
(Users.
id
>
=
2
).first()
print
(result)
|
其他常用操作
-
指定查询列
123result
=
session.query(Users.
id
,Users.name.label(
'cname'
)).
all
()
for
item
in
result:
print
(item[
0
],item.
id
,item.cname)
-
多个查询条件(默认and)
1session.query(Users).
filter
(Users.
id
>
1
, Users.name
=
=
'eric'
).
all
()
-
between
1session.query(Users).
filter
(Users.
id
.between(
1
,
3
), Users.name
=
=
'eric'
).
all
()
-
in
12session.query(Users).
filter
(Users.
id
.in_([
1
,
3
,
4
])).
all
()
session.query(Users).
filter
(~Users.
id
.in_([
1
,
3
,
4
])).
all
()
# 非
-
子查询
1session.query(Users).
filter
(Users.
id
.in_(session.query(Users.
id
).
filter
(Users.name
=
=
'eric'
))).
all
()
-
and和or
12345678910from
sqlalchemy
import
and_, or_
session.query(Users).
filter
(Users.
id
>
3
, Users.name
=
=
'eric'
).
all
()
session.query(Users).
filter
(and_(Users.
id
>
3
, Users.name
=
=
'eric'
)).
all
()
session.query(Users).
filter
(or_(Users.
id
<
2
, Users.name
=
=
'eric'
)).
all
()
session.query(Users).
filter
(
or_(
Users.
id
<
2
,
and_(Users.name
=
=
'eric'
, Users.
id
>
3
),
Users.extra !
=
""
)).
all
()
-
filter_by
1session.query(Users).filter_by(name
=
'alex'
).
all
()
-
通配符
12ret
=
session.query(Users).
filter
(Users.name.like(
'e%'
)).
all
()
ret
=
session.query(Users).
filter
(~Users.name.like(
'e%'
)).
all
()
-
切片
1result
=
session.query(Users)[
1
:
2
]
-
排序
12ret
=
session.query(Users).order_by(Users.name.desc()).
all
()
ret
=
session.query(Users).order_by(Users.name.desc(), Users.
id
.asc()).
all
()
-
group by
123456789101112131415ret
=
session.query(
Users.depart_id,
func.count(Users.
id
),
).group_by(Users.depart_id).
all
()
for
item
in
ret:
print
(item)
from
sqlalchemy.sql
import
func
ret
=
session.query(
Users.depart_id,
func.count(Users.
id
),
).group_by(Users.depart_id).having(func.count(Users.
id
) >
=
2
).
all
()
for
item
in
ret:
print
(item)
-
union和union all
1234567q1
=
session.query(Users.name).
filter
(Users.
id
>
2
)
q2
=
session.query(Favor.caption).
filter
(Favor.nid <
2
)
ret
=
q1.union(q2).
all
()
q1
=
session.query(Users.name).
filter
(Users.
id
>
2
)
q2
=
session.query(Favor.caption).
filter
(Favor.nid <
2
)
ret
=
q1.union_all(q2).
all
()
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
Column
from
sqlalchemy
import
Integer,String,Text,Date,DateTime
from
sqlalchemy
import
create_engine
Base
=
declarative_base()
class
Users(Base):
__tablename__
=
'users'
id
=
Column(Integer, primary_key
=
True
)
name
=
Column(String(
32
), index
=
True
, nullable
=
False
)
def
create_all():
engine
=
create_engine(
max_overflow
=
0
,
# 超过连接池大小外最多创建的连接
pool_size
=
5
,
# 连接池大小
pool_timeout
=
30
,
# 池中没有线程最多等待的时间,否则报错
pool_recycle
=
-
1
# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine)
def
drop_all():
engine
=
create_engine(
max_overflow
=
0
,
# 超过连接池大小外最多创建的连接
pool_size
=
5
,
# 连接池大小
pool_timeout
=
30
,
# 池中没有线程最多等待的时间,否则报错
pool_recycle
=
-
1
# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine)
if
__name__
=
=
'__main__'
:
create_all()
|
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from
sqlalchemy.orm
import
sessionmaker
from
sqlalchemy
import
create_engine
from
models
import
Users
# 创建引擎
engine
=
create_engine(
max_overflow
=
0
,
# 超过连接池大小外最多创建的连接
pool_size
=
5
,
# 连接池大小
pool_timeout
=
30
,
# 池中没有线程最多等待的时间,否则报错
pool_recycle
=
-
1
# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 根据引擎创建session工厂
SessionFactory
=
sessionmaker(bind
=
engine)
# 用session工厂创建一个session对象
session
=
SessionFactory()
..........
# 根据Users类对users表进行增删改查
..........
# 关闭session
session.close()
|
行操作
增
1
2
3
4
5
6
7
8
9
|
obj
=
Users(name
=
'alex'
)
session.add(obj)
session.commit()
session.add_all([
Users(name
=
'小东北'
),
Users(name
=
'龙泰'
)
])
session.commit()
|
删
1
2
|
session.query(Users).
filter
(Users.
id
>
=
2
).delete()
session.commit()
|
改
1
2
3
4
|
session.query(Users).
filter
(Users.
id
=
=
4
).update({Users.name:
'东北'
})
session.query(Users).
filter
(Users.
id
=
=
4
).update({
'name'
:
'小东北'
})
session.query(Users).
filter
(Users.
id
=
=
4
).update({
'name'
:Users.name
+
"DSB"
},synchronize_session
=
False
)
session.commit()
|
查
1
2
3
4
5
6
7
8
9
10
|
result
=
session.query(Users).
all
()
for
row
in
result:
print
(row.
id
,row.name)
result
=
session.query(Users).
filter
(Users.
id
>
=
2
)
for
row
in
result:
print
(row.
id
,row.name)
result
=
session.query(Users).
filter
(Users.
id
>
=
2
).first()
print
(result)
|
其他常用操作
-
指定查询列
123result
=
session.query(Users.
id
,Users.name.label(
'cname'
)).
all
()
for
item
in
result:
print
(item[
0
],item.
id
,item.cname)
-
多个查询条件(默认and)
1session.query(Users).
filter
(Users.
id
>
1
, Users.name
=
=
'eric'
).
all
()
-
between
1session.query(Users).
filter
(Users.
id
.between(
1
,
3
), Users.name
=
=
'eric'
).
all
()
-
in
12session.query(Users).
filter
(Users.
id
.in_([
1
,
3
,
4
])).
all
()
session.query(Users).
filter
(~Users.
id
.in_([
1
,
3
,
4
])).
all
()
# 非
-
子查询
1session.query(Users).
filter
(Users.
id
.in_(session.query(Users.
id
).
filter
(Users.name
=
=
'eric'
))).
all
()
-
and和or
12345678910from
sqlalchemy
import
and_, or_
session.query(Users).
filter
(Users.
id
>
3
, Users.name
=
=
'eric'
).
all
()
session.query(Users).
filter
(and_(Users.
id
>
3
, Users.name
=
=
'eric'
)).
all
()
session.query(Users).
filter
(or_(Users.
id
<
2
, Users.name
=
=
'eric'
)).
all
()
session.query(Users).
filter
(
or_(
Users.
id
<
2
,
and_(Users.name
=
=
'eric'
, Users.
id
>
3
),
Users.extra !
=
""
)).
all
()
-
filter_by
1session.query(Users).filter_by(name
=
'alex'
).
all
()
-
通配符
12ret
=
session.query(Users).
filter
(Users.name.like(
'e%'
)).
all
()
ret
=
session.query(Users).
filter
(~Users.name.like(
'e%'
)).
all
()
-
切片
1result
=
session.query(Users)[
1
:
2
]
-
排序
12ret
=
session.query(Users).order_by(Users.name.desc()).
all
()
ret
=
session.query(Users).order_by(Users.name.desc(), Users.
id
.asc()).
all
()
-
group by
123456789101112131415ret
=
session.query(
Users.depart_id,
func.count(Users.
id
),
).group_by(Users.depart_id).
all
()
for
item
in
ret:
print
(item)
from
sqlalchemy.sql
import
func
ret
=
session.query(
Users.depart_id,
func.count(Users.
id
),
).group_by(Users.depart_id).having(func.count(Users.
id
) >
=
2
).
all
()
for
item
in
ret:
print
(item)
-
union和union all
1234567q1
=
session.query(Users.name).
filter
(Users.
id
>
2
)
q2
=
session.query(Favor.caption).
filter
(Favor.nid <
2
)
ret
=
q1.union(q2).
all
()
q1
=
session.query(Users.name).
filter
(Users.
id
>
2
)
q2
=
session.query(Favor.caption).
filter
(Favor.nid <
2
)
ret
=
q1.union_all(q2).
all
()