目录
简单使用
接口和pymysql基本一样
import sqlite3
con = sqlite3.connect('example.db')
cur = con.cursor()
cur.execute('''CREATE TABLE stocks
(date text, trans text, symbol text, qty real, price real)''')
cur.execute("INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")
con.commit()
con.close()
开始使用
sql语法请看:https://www.runoob.com/sqlite/sqlite-tutorial.html
占位符
如果要在sql中使用Python变量,不建议用Python的格式化操作,应该在sql中使用?
占位符,比如:
t = ('RHAT',)
cur.execute('SELECT * FROM stocks WHERE symbol=?', t)
print(cur.fetchone())
purchases = [('2006-03-28', 'BUY', 'IBM', 1000, 45.00),
('2006-04-05', 'BUY', 'MSFT', 1000, 72.00),
('2006-04-06', 'SELL', 'IBM', 500, 53.00),
]
cur.executemany('INSERT INTO stocks VALUES (?,?,?,?,?)', purchases)
获取select结果
cur.fetchone、cur.fetchall或者直接迭代
t = ('RHAT',)
cur.execute('SELECT * FROM stocks WHERE symbol=?', t)
print(cur.fetchone())
t = ('RHAT',)
cur.execute('SELECT * FROM stocks WHERE symbol=?', t)
print(cur.fetchall())
t = ('RHAT',)
for row in cur.execute('SELECT * FROM stocks WHERE symbol=?', t):
print(row)
t = ('RHAT',)
cur.execute('SELECT * FROM stocks WHERE symbol=?', t)
for row in cur:
print(row)
sqlite3.connect参数
sqlite3.connect(database[, timeout, detect_types, isolation_level, check_same_thread, factory, cached_statements, uri])
- database: 为数据库的路径,也可以使用
:memory:
表示在内存中打开 - timeout: 表示等待其他事务处理的超时时间
- isolation_level: 没看懂。取值:None、“DEFERRED”, “IMMEDIATE” 或 “EXCLUSIVE”
- detect_types: 可以通过设置
sqlite3.PARSE_DECLTYPES
和sqlite3.PARSE_COLNAMES
来启用类型检测,register_converter
可以创建转换器(将sql中的字节转换成Python支持的类型) - check_same_thread: 是否只在当前线程内使用该连接,默认True
- factory: *
- cached_statements: 语句缓存,默认缓存100条
- uri: 将database参数当成url来解析,url格式:‘file:path/to/database?mode=ro’, 具体请看:https://www.sqlite.org/uri.html
sqlite3.register_converter(typename, callable)
callable函数用以将数据库中指定为typename类型的字节数据转换成Python类型
举个例子,将数据库里的时间戳用select查询时自动转为时间Python的<class 'time.struct_time'>
类型
import sqlite3
import time
def convert_timestamp(s):
return time.localtime(int(s))
sqlite3.register_converter("time", convert_timestamp)
# 当你需要转换成Python数据类型时,detect_types应该包含sqlite3.PARSE_DECLTYPES
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES)
cur = con.cursor()
cur.execute("create table test(p time)")
# 插入一条时间戳数据
cur.execute("insert into test(p) values (?)", ("1647914557",))
cur.execute("select p from test")
result = cur.fetchone()
print("查询数据库结果:", result)
print('查询结果类型: ', type(result[0]))
cur.close()
con.close()
sqlite3.register_adapter(type, callable)
callable函数用以将python类型转换成sqlite支持的类型,可以是int、float、str或bytes,为了和register_converter对应,返回bytes最好
举个例子,和上面的相反,将<class 'time.struct_time'>
类型以时间戳的形式存到数据库里
import sqlite3
import time
def struct_time2timestamp(t):
return str(int(time.mktime(t))).encode()
sqlite3.register_adapter(time.struct_time, struct_time2timestamp)
# 当需要将Python类型转换为sqlite类型时,需指定detect_types=sqlite3.PARSE_COLNAMES
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_COLNAMES)
cur = con.cursor()
cur.execute("create table test(p)")
cur.execute("insert into test(p) values (?)", (time.localtime(),))
cur.execute('select p from test')
print("查询结果:", cur.fetchone())
cur.close()
con.close()
register_adapter和register_converter
import sqlite3
import time
def struct_time2timestamp(t):
return str(int(time.mktime(t))).encode()
def convert_timestamp(s):
return time.localtime(int(s))
sqlite3.enable_callback_tracebacks(True)
sqlite3.register_converter("time", convert_timestamp)
sqlite3.register_adapter(time.struct_time, struct_time2timestamp)
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_COLNAMES|sqlite3.PARSE_DECLTYPES)
cur = con.cursor()
cur.execute("create table test(p)")
cur.execute("insert into test(p) values (?)", (time.localtime(),))
# 创建数据库时并未指定p的类型,如果想以指定类型读取,应该是as "p [time]",方括号里放的是类型
cur.execute('select p as "p [time]" from test')
print("查询结果:", cur.fetchone())
print(con.isolation_level)
cur.close()
con.close()
sqlite3.Connection方法
- in_transaction: 如果是在事务中,返回True
- cursor(factory=Cursor): 返回cursor对象
- commit():提交当前事务
- rollback():回滚
- close():关闭数据库连接。注意:它不会自动commit
- execute(sql[, parameters]): 执行一条sql语句
- executemany(sql[, parameters]):执行多条sql语句
- executescript(sql_script):执行多行sql脚本
- create_function(name, num_params, func, *, deterministic=False):创建在sql中能运行的函数,name为函数名,func为执行的函数。
import sqlite3
import hashlib
def md5sum(t):
return hashlib.md5(t).hexdigest()
con = sqlite3.connect(":memory:")
con.create_function("md5", 1, md5sum)
cur = con.cursor()
cur.execute("select md5(?)", (b"foo",))
print(cur.fetchone())
con.close()
- create_aggregate(name, num_params, aggregate_class):创建自定义聚合函数。举个例子,求和函数:
import sqlite3
class MySum:
def __init__(self):
self.count = 0
def step(self, value):
self.count += value
def finalize(self):
return self.count
con = sqlite3.connect(":memory:")
con.create_aggregate("mysum", 1, MySum)
cur = con.cursor()
cur.execute("create table test(i)")
cur.execute("insert into test(i) values (1)")
cur.execute("insert into test(i) values (2)")
cur.execute("select mysum(i) from test")
print(cur.fetchone()[0])
con.close()
- create_collation(name, callable): 创建排序规则,例子:
import sqlite3
def collate_reverse(string1, string2):
if string1 == string2:
return 0
elif string1 < string2:
return 1
else:
return -1
con = sqlite3.connect(":memory:")
con.create_collation("reverse", collate_reverse)
# 移除排序规则
# con.create_collation("reverse", None)
cur = con.cursor()
cur.execute("create table test(x)")
cur.executemany("insert into test(x) values (?)", [("a",), ("b",)])
cur.execute("select x from test order by x collate reverse")
for row in cur:
print(row)
con.close()
- interrupt(): 终止该连接的所有查询操作
- set_authorizer(authorizer_callback): 对执行的sql设置权限验证,有啥用和怎么用我也没看懂。
def select_authorizer(*args):
print(args)
return sqlite3.SQLITE_OK #should allow all operations
con.set_authorizer(select_authorizer)
- enable_load_extension(enabled): 启用扩展,下面是加载fts3扩展,至于fts3.so在哪里下我也不知道。另外sqlite3应该已经内置了fts5,看这篇文章:使用fts5进行全文检索
import sqlite3
con = sqlite3.connect(":memory:")
# enable extension loading
con.enable_load_extension(True)
# Load the fulltext search extension
con.execute("select load_extension('./fts3.so')")
# alternatively you can load the extension using an API call:
# con.load_extension("./fts3.so")
# disable extension loading again
con.enable_load_extension(False)
# example from SQLite wiki
con.execute("create virtual table recipe using fts3(name, ingredients)")
con.executescript("""
insert into recipe (name, ingredients) values ('broccoli stew', 'broccoli peppers cheese tomatoes');
insert into recipe (name, ingredients) values ('pumpkin stew', 'pumpkin onions garlic celery');
insert into recipe (name, ingredients) values ('broccoli pie', 'broccoli cheese onions flour');
insert into recipe (name, ingredients) values ('pumpkin pie', 'pumpkin sugar flour butter');
""")
for row in con.execute("select rowid, name, ingredients from recipe where name match 'pie'"):
print(row)
con.close()
- load_extension(path):加载扩展
- row_factory:可以通过设置该属性,来自定义sql结果。比如自定义dict_factory并使用as来返回字典。或者设置为sqlite3.Row来优化性能
import sqlite3
def dict_factory(cursor, row):
d = {
}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
con = sqlite3.connect(":memory:")
con.row_factory = dict_factory
cur = con.cursor()
cur.execute("select 1 as a")
print(cur.fetchone())
con.close()
- text_factory:设置TEXT类型返回的数据,比如设置成str返回字符串,设置成bytes返回字节串,设置成
lambda x: x.decode("utf-8") + "foo"
则返回后面拼接了"foo"的字符串 - total_changes: 打开数据库以来,修改插入和删除的数据库行的总数
- iterdump(): 将数据库保存成sql文件
import sqlite3
con = sqlite3.connect('existing_db.db')
with open('dump.sql', 'w') as f:
for line in con.iterdump():
f.write('%s\n' % line)
con.close()
- backup(target, *, pages=-1, progress=None, name=“main”, sleep=0.250): 备份数据库,target为另一个connection对象,比如拷贝到内存:
import sqlite3
source = sqlite3.connect('existing_db.db')
dest = sqlite3.connect(':memory:')
source.backup(dest)
sqlite3.Cursor
- execute(sql[, parameters]): 执行sql,支持两种格式的占位符,
?
和:name
import sqlite3
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute("create table people (name_last, age)")
who = "Yeltsin"
age = 72
# This is the qmark style:
cur.execute("insert into people values (?, ?)", (who, age))
# And this is the named style:
cur.execute("select * from people where name_last=:who and age=:age", {
"who": who, "age": age})
print(cur.fetchone())
con.close()
- executemany(sql, seq_of_parameters): 多次执行一条sql,第二个参数可传自定义的iterator
import sqlite3
class IterChars:
def __init__(self):
self.count = ord('a')
def __iter__(self):
return self
def __next__(self):
if self.count > ord('z'):
raise StopIteration
self.count += 1
return (chr(self.count - 1),) # this is a 1-tuple
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute("create table characters(c)")
theIter = IterChars()
cur.executemany("insert into characters(c) values (?)", theIter)
cur.execute("select c from characters")
print(cur.fetchall())
con.close()
也可以是生成器
import sqlite3
import string
def char_generator():
for c in string.ascii_lowercase:
yield (c,)
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute("create table characters(c)")
cur.executemany("insert into characters(c) values (?)", char_generator())
cur.execute("select c from characters")
print(cur.fetchall())
con.close()
- executescript(sql_script): 执行多条sql脚本
import sqlite3
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.executescript("""
create table person(
firstname,
lastname,
age
);
create table book(
title,
author,
published
);
insert into book(title, author, published)
values (
'Dirk Gently''s Holistic Detective Agency',
'Douglas Adams',
1987
);
""")
con.close()
- fetchone(): 获取下一行查询结果
- fetchmany(size=cursor.arraysize): 获取下一个多行查询结果
- fetchall():获取所有的查询结果
- close(): *
- rowcount:*
- lastrowid: *
- arraysize:用于控制 fetchmany() 返回行数的可读取/写入属性。 该属性的默认值为 1,表示每次调用将获取单独一行
- description:这个只读属性将提供上一次查询的列名称,为一个包含七个元素的元组,后六个都是None
- connection:*
sqlite3.Row
之前介绍到它可以用来赋值给row_factory以提高性能,他也可以通过keys来得到字典,还是上面的例子,稍微改下:
import sqlite3
con = sqlite3.connect(":memory:")
con.row_factory = sqlite3.Row
cur = con.cursor()
cur.execute("select 1 as a")
row_result = cur.fetchone()
print(dict(row_result))
# 这样也可以
# print({key:row_result[n] for n,key in enumerate(row_result.keys())})
# print({key:row_result[key] for key in row_result.keys()})
con.close()
SQLite 与 Python 类型
Python 类型 | SQLite 类型 |
---|---|
None | NULL |
int | INTEGER |
float | REAL |
str | TEXT |
bytes | BLOB |
自带的转换器
datetime.date对应类型date,datetime.datetime对应类型timestamp
import sqlite3
import datetime
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
cur = con.cursor()
cur.execute("create table test(d date, ts timestamp)")
today = datetime.date.today()
now = datetime.datetime.now()
cur.execute("insert into test(d, ts) values (?, ?)", (today, now))
cur.execute("select d, ts from test")
row = cur.fetchone()
print(today, "=>", row[0], type(row[0]))
print(now, "=>", row[1], type(row[1]))
cur.execute('select current_date as "d [date]", current_timestamp as "ts [timestamp]"')
row = cur.fetchone()
print("current_date", row[0], type(row[0]))
print("current_timestamp", row[1], type(row[1]))
con.close()
不使用cursor
import sqlite3
persons = [
("Hugo", "Boss"),
("Calvin", "Klein")
]
con = sqlite3.connect(":memory:")
con.execute("create table person(firstname, lastname)")
con.executemany("insert into person(firstname, lastname) values (?, ?)", persons)
for row in con.execute("select firstname, lastname from person"):
print(row)
print("I just deleted", con.execute("delete from person").rowcount, "rows")
con.close()
使用with
使用with来执行sql会自动commit或者rollback
import sqlite3
con = sqlite3.connect(":memory:")
con.execute("create table person (id integer primary key, firstname varchar unique)")
# 执行成功会自动commit
with con:
con.execute("insert into person(firstname) values (?)", ("Joe",))
# 出现异常会自动rollback
try:
with con:
con.execute("insert into person(firstname) values (?)", ("Joe",))
except sqlite3.IntegrityError:
print("couldn't add Joe twice")
con.close()