sqlalchemy 基本操作

表操作

models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from  sqlalchemy.ext.declarative  import  declarative_base
from  sqlalchemy  import  Column
from  sqlalchemy  import  Integer,String,Text,Date,DateTime
from  sqlalchemy  import  create_engine
 
 
Base  =  declarative_base()
 
class  Users(Base):
     __tablename__  =  'users'
 
     id  =  Column(Integer, primary_key = True )
     name  =  Column(String( 32 ), index = True , nullable = False )
 
 
def  create_all():
     engine  =  create_engine(
         "mysql+pymysql://root:[email protected]:3306/s9day120?charset=utf8" ,
         max_overflow = 0 ,   # 超过连接池大小外最多创建的连接
         pool_size = 5 ,   # 连接池大小
         pool_timeout = 30 ,   # 池中没有线程最多等待的时间,否则报错
         pool_recycle = - 1   # 多久之后对线程池中的线程进行一次连接的回收(重置)
     )
 
     Base.metadata.create_all(engine)
 
def  drop_all():
     engine  =  create_engine(
         "mysql+pymysql://root:[email protected]:3306/s9day120?charset=utf8" ,
         max_overflow = 0 ,   # 超过连接池大小外最多创建的连接
         pool_size = 5 ,   # 连接池大小
         pool_timeout = 30 ,   # 池中没有线程最多等待的时间,否则报错
         pool_recycle = - 1   # 多久之后对线程池中的线程进行一次连接的回收(重置)
     )
     Base.metadata.drop_all(engine)
 
if  __name__  = =  '__main__' :
     create_all()

views.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from  sqlalchemy.orm  import  sessionmaker
from  sqlalchemy  import  create_engine
from  models  import  Users
 
# 创建引擎
engine  =  create_engine(
         "mysql+pymysql://root:[email protected]:3306/s9day120?charset=utf8" ,
         max_overflow = 0 ,   # 超过连接池大小外最多创建的连接
         pool_size = 5 ,   # 连接池大小
         pool_timeout = 30 ,   # 池中没有线程最多等待的时间,否则报错
         pool_recycle = - 1   # 多久之后对线程池中的线程进行一次连接的回收(重置)
     )
 
# 根据引擎创建session工厂
SessionFactory  =  sessionmaker(bind = engine)
 
# 用session工厂创建一个session对象
session  =  SessionFactory()
 
..........
# 根据Users类对users表进行增删改查
..........
 
# 关闭session
session.close()

 行操作

1
2
3
4
5
6
7
8
9
obj  =  Users(name = 'alex' )
session.add(obj)
session.commit()
 
session.add_all([
         Users(name = '小东北' ),
         Users(name = '龙泰' )
])
session.commit()

1
2
session.query(Users). filter (Users. id  > =  2 ).delete()
session.commit()

1
2
3
4
session.query(Users). filter (Users. id  = =  4 ).update({Users.name: '东北' })
session.query(Users). filter (Users. id  = =  4 ).update({ 'name' : '小东北' })
session.query(Users). filter (Users. id  = =  4 ).update({ 'name' :Users.name + "DSB" },synchronize_session = False )
session.commit()

1
2
3
4
5
6
7
8
9
10
result  =  session.query(Users). all ()
for  row  in  result:
         print (row. id ,row.name)
 
result  =  session.query(Users). filter (Users. id  > =  2 )
for  row  in  result:
         print (row. id ,row.name)
 
result  =  session.query(Users). filter (Users. id  > =  2 ).first()
print (result)

其他常用操作

  1. 指定查询列

    1
    2
    3
    result  =  session.query(Users. id ,Users.name.label( 'cname' )). all ()
    for  item  in  result:
             print (item[ 0 ],item. id ,item.cname)
  2. 多个查询条件(默认and)

    1
    session.query(Users). filter (Users. id  1 , Users.name  = =  'eric' ). all ()
  3. between

    1
    session.query(Users). filter (Users. id .between( 1 3 ), Users.name  = =  'eric' ). all ()
  4. in

    1
    2
    session.query(Users). filter (Users. id .in_([ 1 , 3 , 4 ])). all ()
    session.query(Users). filter (~Users. id .in_([ 1 , 3 , 4 ])). all ()  # 非
  5. 子查询

    1
    session.query(Users). filter (Users. id .in_(session.query(Users. id ). filter (Users.name = = 'eric' ))). all ()
  6. and和or

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    from  sqlalchemy  import  and_, or_
    session.query(Users). filter (Users. id  3 , Users.name  = =  'eric' ). all ()
    session.query(Users). filter (and_(Users. id  3 , Users.name  = =  'eric' )). all ()
    session.query(Users). filter (or_(Users. id  2 , Users.name  = =  'eric' )). all ()
    session.query(Users). filter (
         or_(
             Users. id  2 ,
             and_(Users.name  = =  'eric' , Users. id  3 ),
             Users.extra ! =  ""
         )). all ()
  7. filter_by

    1
    session.query(Users).filter_by(name = 'alex' ). all ()
  8. 通配符

    1
    2
    ret  =  session.query(Users). filter (Users.name.like( 'e%' )). all ()
    ret  =  session.query(Users). filter (~Users.name.like( 'e%' )). all ()
  9. 切片

    1
    result  =  session.query(Users)[ 1 : 2 ]
  10. 排序

    1
    2
    ret  =  session.query(Users).order_by(Users.name.desc()). all ()
    ret  =  session.query(Users).order_by(Users.name.desc(), Users. id .asc()). all ()
  11. group by

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    ret  =  session.query(
             Users.depart_id,
             func.count(Users. id ),
    ).group_by(Users.depart_id). all ()
    for  item  in  ret:
             print (item)
     
    from  sqlalchemy.sql  import  func
     
    ret  =  session.query(
             Users.depart_id,
             func.count(Users. id ),
    ).group_by(Users.depart_id).having(func.count(Users. id ) > =  2 ). all ()
    for  item  in  ret:
             print (item)
  12. union和union all

    1
    2
    3
    4
    5
    6
    7
    q1  =  session.query(Users.name). filter (Users. id  2 )
    q2  =  session.query(Favor.caption). filter (Favor.nid <  2 )
    ret  =  q1.union(q2). all ()
     
    q1  =  session.query(Users.name). filter (Users. id  2 )
    q2  =  session.query(Favor.caption). filter (Favor.nid <  2 )
    ret  =  q1.union_all(q2). all ()

models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from  sqlalchemy.ext.declarative  import  declarative_base
from  sqlalchemy  import  Column
from  sqlalchemy  import  Integer,String,Text,Date,DateTime
from  sqlalchemy  import  create_engine
 
 
Base  =  declarative_base()
 
class  Users(Base):
     __tablename__  =  'users'
 
     id  =  Column(Integer, primary_key = True )
     name  =  Column(String( 32 ), index = True , nullable = False )
 
 
def  create_all():
     engine  =  create_engine(
         "mysql+pymysql://root:[email protected]:3306/s9day120?charset=utf8" ,
         max_overflow = 0 ,   # 超过连接池大小外最多创建的连接
         pool_size = 5 ,   # 连接池大小
         pool_timeout = 30 ,   # 池中没有线程最多等待的时间,否则报错
         pool_recycle = - 1   # 多久之后对线程池中的线程进行一次连接的回收(重置)
     )
 
     Base.metadata.create_all(engine)
 
def  drop_all():
     engine  =  create_engine(
         "mysql+pymysql://root:[email protected]:3306/s9day120?charset=utf8" ,
         max_overflow = 0 ,   # 超过连接池大小外最多创建的连接
         pool_size = 5 ,   # 连接池大小
         pool_timeout = 30 ,   # 池中没有线程最多等待的时间,否则报错
         pool_recycle = - 1   # 多久之后对线程池中的线程进行一次连接的回收(重置)
     )
     Base.metadata.drop_all(engine)
 
if  __name__  = =  '__main__' :
     create_all()

views.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from  sqlalchemy.orm  import  sessionmaker
from  sqlalchemy  import  create_engine
from  models  import  Users
 
# 创建引擎
engine  =  create_engine(
         "mysql+pymysql://root:[email protected]:3306/s9day120?charset=utf8" ,
         max_overflow = 0 ,   # 超过连接池大小外最多创建的连接
         pool_size = 5 ,   # 连接池大小
         pool_timeout = 30 ,   # 池中没有线程最多等待的时间,否则报错
         pool_recycle = - 1   # 多久之后对线程池中的线程进行一次连接的回收(重置)
     )
 
# 根据引擎创建session工厂
SessionFactory  =  sessionmaker(bind = engine)
 
# 用session工厂创建一个session对象
session  =  SessionFactory()
 
..........
# 根据Users类对users表进行增删改查
..........
 
# 关闭session
session.close()

 行操作

1
2
3
4
5
6
7
8
9
obj  =  Users(name = 'alex' )
session.add(obj)
session.commit()
 
session.add_all([
         Users(name = '小东北' ),
         Users(name = '龙泰' )
])
session.commit()

1
2
session.query(Users). filter (Users. id  > =  2 ).delete()
session.commit()

1
2
3
4
session.query(Users). filter (Users. id  = =  4 ).update({Users.name: '东北' })
session.query(Users). filter (Users. id  = =  4 ).update({ 'name' : '小东北' })
session.query(Users). filter (Users. id  = =  4 ).update({ 'name' :Users.name + "DSB" },synchronize_session = False )
session.commit()

1
2
3
4
5
6
7
8
9
10
result  =  session.query(Users). all ()
for  row  in  result:
         print (row. id ,row.name)
 
result  =  session.query(Users). filter (Users. id  > =  2 )
for  row  in  result:
         print (row. id ,row.name)
 
result  =  session.query(Users). filter (Users. id  > =  2 ).first()
print (result)

其他常用操作

  1. 指定查询列

    1
    2
    3
    result  =  session.query(Users. id ,Users.name.label( 'cname' )). all ()
    for  item  in  result:
             print (item[ 0 ],item. id ,item.cname)
  2. 多个查询条件(默认and)

    1
    session.query(Users). filter (Users. id  1 , Users.name  = =  'eric' ). all ()
  3. between

    1
    session.query(Users). filter (Users. id .between( 1 3 ), Users.name  = =  'eric' ). all ()
  4. in

    1
    2
    session.query(Users). filter (Users. id .in_([ 1 , 3 , 4 ])). all ()
    session.query(Users). filter (~Users. id .in_([ 1 , 3 , 4 ])). all ()  # 非
  5. 子查询

    1
    session.query(Users). filter (Users. id .in_(session.query(Users. id ). filter (Users.name = = 'eric' ))). all ()
  6. and和or

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    from  sqlalchemy  import  and_, or_
    session.query(Users). filter (Users. id  3 , Users.name  = =  'eric' ). all ()
    session.query(Users). filter (and_(Users. id  3 , Users.name  = =  'eric' )). all ()
    session.query(Users). filter (or_(Users. id  2 , Users.name  = =  'eric' )). all ()
    session.query(Users). filter (
         or_(
             Users. id  2 ,
             and_(Users.name  = =  'eric' , Users. id  3 ),
             Users.extra ! =  ""
         )). all ()
  7. filter_by

    1
    session.query(Users).filter_by(name = 'alex' ). all ()
  8. 通配符

    1
    2
    ret  =  session.query(Users). filter (Users.name.like( 'e%' )). all ()
    ret  =  session.query(Users). filter (~Users.name.like( 'e%' )). all ()
  9. 切片

    1
    result  =  session.query(Users)[ 1 : 2 ]
  10. 排序

    1
    2
    ret  =  session.query(Users).order_by(Users.name.desc()). all ()
    ret  =  session.query(Users).order_by(Users.name.desc(), Users. id .asc()). all ()
  11. group by

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    ret  =  session.query(
             Users.depart_id,
             func.count(Users. id ),
    ).group_by(Users.depart_id). all ()
    for  item  in  ret:
             print (item)
     
    from  sqlalchemy.sql  import  func
     
    ret  =  session.query(
             Users.depart_id,
             func.count(Users. id ),
    ).group_by(Users.depart_id).having(func.count(Users. id ) > =  2 ). all ()
    for  item  in  ret:
             print (item)
  12. union和union all

    1
    2
    3
    4
    5
    6
    7
    q1  =  session.query(Users.name). filter (Users. id  2 )
    q2  =  session.query(Favor.caption). filter (Favor.nid <  2 )
    ret  =  q1.union(q2). all ()
     
    q1  =  session.query(Users.name). filter (Users. id  2 )
    q2  =  session.query(Favor.caption). filter (Favor.nid <  2 )
    ret  =  q1.union_all(q2). all ()

猜你喜欢

转载自www.cnblogs.com/ray-mmss/p/9424978.html