Playhouse,Peewee 的扩展
Peewee 附带许多扩展模块,这些模块收集在 playhouse
命名空间下。尽管名字很傻,但有一些非常有用的扩展,特别是那些公开特定于供应商的数据库功能的扩展,例如 SQLite 扩展 和 Postgresql 扩展 扩展。
下面您将找到构成 playhouse
的各种模块的松散组织列表。
数据库驱动程序/特定于供应商的数据库功能
SQLite 扩展(在它自己的页面上)
高级功能
数据库管理和框架集成
Sqlite 扩展
Sqlite 扩展已移至 它们自己的页面。
SqliteQ
playhouse.sqliteq
模块提供 SqliteExtDatabase
的子类,它将并发写入序列化到 SQLite 数据库。 SqliteQueueDatabase
可用作常规 SqliteDatabase
的直接替换,如果您希望从 **多个线程** 对 SQLite 数据库进行简单的 **读写** 访问。
SQLite 仅允许在任何给定时间只有一个连接写入数据库。因此,如果您有一个需要写入数据库的多线程应用程序(例如 Web 服务器),您可能会在尝试写入的一个或多个线程无法获取锁时看到偶尔的错误。
SqliteQueueDatabase
旨在通过将所有写入查询发送到单个、长生命周期的连接来简化操作。好处是您获得了多个线程写入数据库而不会发生冲突或超时的外观。但是,缺点是您不能发出包含多个查询的写入事务 - 所有写入都在自动提交模式下运行,本质上。
注意
该模块的名称源于所有写入查询都放入线程安全队列的事实。单个工作线程监听队列并执行发送给它的所有查询。
事务
由于所有查询都由单个工作线程序列化和执行,因此来自不同线程的事务性 SQL 可能以乱序执行。在下面的示例中,线程“B”启动的事务被线程“A”回滚(后果很严重!)
线程 A:UPDATE transplants SET organ=’liver’, …;
线程 B:BEGIN TRANSACTION;
线程 B:UPDATE life_support_system SET timer += 60 …;
线程 A:ROLLBACK; - 哦,不…….
由于来自不同事务的查询有可能交织在一起,因此 transaction()
和 atomic()
方法在 SqliteQueueDatabase
上被禁用。
对于您希望从不同线程临时写入数据库的情况,可以使用 pause()
和 unpause()
方法。这些方法会阻塞调用者,直到编写器线程完成其当前工作负载。然后,编写器断开连接,调用者接管,直到调用 unpause
。
可以使用 stop()
、start()
和 is_stopped()
方法来控制写入线程。
注意
查看 SQLite 的 隔离 文档,以获取有关 SQLite 如何处理并发连接的更多信息。
代码示例
创建数据库实例不需要任何特殊处理。 SqliteQueueDatabase
接受一些特殊的参数,您应该注意这些参数。如果您使用的是 gevent,则在实例化数据库时必须指定 use_gevent=True
- 这样 Peewee 就会知道使用适当的对象来处理排队、线程创建和锁定。
from playhouse.sqliteq import SqliteQueueDatabase
db = SqliteQueueDatabase(
'my_app.db',
use_gevent=False, # Use the standard library "threading" module.
autostart=False, # The worker thread now must be started manually.
queue_max_size=64, # Max. # of pending writes that can accumulate.
results_timeout=5.0) # Max. time to wait for query to be executed.
如果 autostart=False
,如上面的示例所示,您需要调用 start()
来启动将执行实际写入查询的 worker 线程。
@app.before_first_request
def _start_worker_threads():
db.start()
如果您打算执行 SELECT 查询或通常希望访问数据库,您需要调用 connect()
和 close()
,就像您对任何其他数据库实例所做的那样。
当您的应用程序准备终止时,使用 stop()
方法关闭 worker 线程。如果有工作积压,则此方法将阻塞,直到所有待处理的工作完成(尽管不允许新的工作)。
import atexit
@atexit.register
def _stop_worker_threads():
db.stop()
最后,is_stopped()
方法可用于确定数据库写入器是否正在运行。
Sqlite 用户定义函数
sqlite_udf
playhouse 模块包含许多用户定义函数、聚合函数和表值函数,您可能会发现这些函数很有用。这些函数按集合分组,您可以单独注册这些用户定义的扩展,也可以按集合注册,或者注册所有内容。
标量函数是接受多个参数并返回单个值的函数。例如,将字符串转换为大写,或计算 MD5 十六进制摘要。
聚合函数类似于标量函数,它们对多行数据进行操作,生成单个结果。例如,计算整数列表的总和,或查找特定列中的最小值。
表值函数只是可以返回多行数据的函数。例如,一个正则表达式搜索函数,它返回给定字符串中的所有匹配项,或者一个接受两个日期并生成所有中间日期的函数。
注意
要使用表值函数,您需要构建 playhouse._sqlite_ext
C 扩展。
注册用户定义函数
db = SqliteDatabase('my_app.db')
# Register *all* functions.
register_all(db)
# Alternatively, you can register individual groups. This will just
# register the DATE and MATH groups of functions.
register_groups(db, 'DATE', 'MATH')
# If you only wish to register, say, the aggregate functions for a
# particular group or groups, you can:
register_aggregate_groups(db, 'DATE')
# If you only wish to register a single function, then you can:
from playhouse.sqlite_udf import gzip, gunzip
db.register_function(gzip, 'gzip')
db.register_function(gunzip, 'gunzip')
使用库函数(“hostname”)
# Assume we have a model, Link, that contains lots of arbitrary URLs.
# We want to discover the most common hosts that have been linked.
query = (Link
.select(fn.hostname(Link.url).alias('host'), fn.COUNT(Link.id))
.group_by(fn.hostname(Link.url))
.order_by(fn.COUNT(Link.id).desc())
.tuples())
# Print the hostname along with number of links associated with it.
for host, count in query:
print('%s: %s' % (host, count))
按集合名称列出的函数
标量函数用 (f)
表示,聚合函数用 (a)
表示,表值函数用 (t)
表示。
CONTROL_FLOW
- if_then_else(cond, truthy[, falsey=None])
简单的三元运算符,根据
cond
参数的真假性,返回truthy
或falsey
值。
DATE
- strip_tz(date_str)
- 参数
date_str – 一个日期时间,编码为字符串。
- 返回值
删除任何时区信息的日期时间。
时间不会以任何方式调整,只是简单地删除了时区。
- humandelta(nseconds[, glue=', '])
- 参数
nseconds (int) – 时间增量中总的秒数。
glue (str) – 用于连接值的片段。
- 返回值
时间增量的易于阅读的描述。
例如,86471 -> “1 天,1 分钟,11 秒”
- mintdiff(datetime_value)
- 参数
datetime_value – 一个日期时间。
- 返回值
列表中任何两个值之间的最小差值。
计算列表中任何两个日期时间之间的最小差值的聚合函数。
- avgtdiff(datetime_value)
- 参数
datetime_value – 一个日期时间。
- 返回值
列表中值之间的平均差值。
计算列表中连续值之间的平均差值的聚合函数。
- duration(datetime_value)
- 参数
datetime_value – 一个日期时间。
- 返回值
列表中最小值到最大值之间的持续时间,以秒为单位。
计算列表中最小值到最大值之间的持续时间的聚合函数,以秒为单位返回。
- date_series(start, stop[, step_seconds=86400])
- 参数
start (datetime) – 开始日期时间
stop (datetime) – 停止日期时间
step_seconds (int) – 构成一步的秒数。
表值函数,它返回由从开始到停止迭代时遇到的日期/+时间值组成的行,每次
step_seconds
。此外,如果开始没有时间部分,并且 step_seconds 大于或等于一天(86400 秒),则返回的值将是日期。相反,如果开始没有日期部分,则值将作为时间返回。否则,值将作为日期时间返回。
示例
SELECT * FROM date_series('2017-01-28', '2017-02-02'); value ----- 2017-01-28 2017-01-29 2017-01-30 2017-01-31 2017-02-01 2017-02-02
FILE
- file_ext(filename)
- 参数
filename (str) – 要从中提取扩展名的文件名。
- 返回值
返回文件扩展名,包括前导“.”。
- file_read(filename)
- 参数
filename (str) – 要读取的文件名。
- 返回值
文件的内容。
HELPER
- gzip(data[, compression=9])
- 参数
data (bytes) – 要压缩的数据。
compression (int) – 压缩级别(9 为最大)。
- 返回值
压缩的二进制数据。
- gunzip(data)
- 参数
data (bytes) – 压缩数据。
- 返回值
解压缩的二进制数据。
- hostname(url)
- 参数
url (str) – 要从中提取主机名的 URL。
- 返回值
URL 的主机名部分
- toggle(key)
- 参数
key – 要切换的键。
在 True/False 状态之间切换键。示例
>>> toggle('my-key') True >>> toggle('my-key') False >>> toggle('my-key') True
- setting(key[, value=None])
- 参数
key – 要设置/检索的键。
value – 要设置的值。
- 返回值
与键关联的值。
在内存中存储/检索设置并在应用程序的生命周期内持久化。要获取当前值,只需指定键。要设置新值,请使用键和新值调用。
数学
- randomrange(start[, stop=None[, step=None]])
- 参数
start (int) – 范围的开始(包含)
end (int) – 范围的结束(不包含)
step (int) – 返回值的间隔。
返回
[start, end)
之间的随机整数。
- gauss_distribution(mean, sigma)
- 参数
mean (float) – 平均值
sigma (float) – 标准差
- sqrt(n)
计算
n
的平方根。
- tonumber(s)
- 参数
s (str) – 要转换为数字的字符串。
- 返回值
整数、浮点数或失败时的 NULL。
- mode(val)
- 参数
val – 列表中的数字。
- 返回值
观察到的众数或最常见的数字。
聚合函数,计算值的众数。
- minrange(val)
- 参数
val – 值
- 返回值
两个值之间的最小差值。
聚合函数,计算序列中两个数字之间的最小距离。
- avgrange(val)
- 参数
val – 值
- 返回值
值之间的平均差值。
聚合函数,计算序列中两个连续数字之间的平均距离。
- range(val)
- 参数
val – 值
- 返回值
序列中最小值到最大值的范围。
聚合函数,返回观察到的值的范围。
- median(val)
- 参数
val – 值
- 返回值
序列中的中位数或中间值。
聚合函数,计算序列中的中间值。
注意
仅在编译了
_sqlite_udf
扩展时可用。
字符串
- substr_count(haystack, needle)
返回
needle
在haystack
中出现的次数。
- strip_chars(haystack, chars)
从
haystack
的开头和结尾删除chars
中的任何字符。
- damerau_levenshtein_dist(s1, s2)
使用 levenshtein 算法的 damerau 变体计算从 s1 到 s2 的编辑距离。
注意
仅在编译了
_sqlite_udf
扩展时可用。
- levenshtein_dist(s1, s2)
使用 levenshtein 算法计算从 s1 到 s2 的编辑距离。
注意
仅在编译了
_sqlite_udf
扩展时可用。
- str_dist(s1, s2)
使用标准库 SequenceMatcher 的算法计算从 s1 到 s2 的编辑距离。
注意
仅在编译了
_sqlite_udf
扩展时可用。
- regex_search(regex, search_string)
- 参数
regex (str) – 正则表达式
search_string (str) – 要搜索 regex 实例的字符串。
表值函数,在字符串中搜索与提供的
regex
匹配的子字符串。为找到的每个匹配项返回行。示例
SELECT * FROM regex_search('\w+', 'extract words, ignore! symbols'); value ----- extract words ignore symbols
apsw,一个高级 sqlite 驱动程序
apsw_ext
模块包含一个适合与 apsw sqlite 驱动程序一起使用的数据库类。
APSW 项目页面: https://github.com/rogerbinns/apsw
APSW 是一个非常棒的库,它在 SQLite 的 C 接口之上提供了一个薄包装器,使您可以使用 SQLite 的所有高级功能。
以下是一些使用 APSW 的原因,摘自文档
APSW 提供了 SQLite 的所有功能,包括虚拟表、虚拟文件系统、blob i/o、备份和文件控制。
连接可以在多个线程之间共享,无需任何额外的锁定。
事务由您的代码显式管理。
APSW 可以处理嵌套事务。
Unicode 处理正确。
APSW 速度更快。
有关 apsw 和 pysqlite 之间差异的更多信息,请查看 apsw 文档。
如何使用 APSWDatabase
from apsw_ext import *
db = APSWDatabase(':memory:')
class BaseModel(Model):
class Meta:
database = db
class SomeModel(BaseModel):
col1 = CharField()
col2 = DateTimeField()
apsw_ext API 说明
APSWDatabase
扩展了 SqliteExtDatabase
并继承了它的高级功能。
- class APSWDatabase(database, **connect_kwargs)
- 参数
database (string) – sqlite 数据库的文件名
connect_kwargs – 打开连接时传递给 apsw 的关键字参数
- register_module(mod_name, mod_inst)
提供了一种全局注册模块的方法。有关更多信息,请参阅 虚拟表文档。
- 参数
mod_name (string) – 模块要使用的名称
mod_inst (对象) – 实现 虚拟表 接口的对象
- unregister_module(mod_name)
注销模块。
- 参数
mod_name (string) – 模块要使用的名称
注意
请确保使用 apsw_ext
模块中定义的 Field
子类,因为它们将正确处理数据类型以进行存储。
例如,不要使用 peewee.DateTimeField
,请确保您正在导入和使用 playhouse.apsw_ext.DateTimeField
。
Sqlcipher 后端
注意
虽然此扩展的代码很短,但尚未经过适当的同行评审,可能存在漏洞。
另请注意,此代码依赖于 sqlcipher3(python 绑定)和 sqlcipher,并且那里的代码也可能存在漏洞,但由于这些是广泛使用的加密模块,我们可以预期“短零日”在那里。
sqlcipher_ext API 说明
- class SqlCipherDatabase(database, passphrase, **kwargs)
SqliteDatabase
的子类,用于存储加密的数据库。它不使用标准的sqlite3
后端,而是使用 sqlcipher3:一个针对 sqlcipher 的 python 包装器,它反过来是一个围绕sqlite3
的加密包装器,因此 API 与SqliteDatabase
的 API 完全相同,除了对象构造参数- 参数
database – 要打开(或创建)的加密数据库文件名的路径。
passphrase – 数据库加密密码:至少应为 8 个字符,但强烈建议在您的实现中强制执行更好的 密码强度 标准。
如果
database
文件不存在,它将使用从passhprase
派生的密钥创建并进行加密。尝试打开现有数据库时,
passhprase
应与创建时使用的密码相同。如果密码不正确,则在首次尝试访问数据库时将引发错误。
- rekey(passphrase)
- 参数
passphrase (str) – 数据库的新密码。
更改数据库的密码。
注意
SQLCipher 可以使用许多扩展 PRAGMA 进行配置。可以在 SQLCipher 文档 中找到 PRAGMA 列表及其说明。
例如,指定用于密钥派生的 PBKDF2 迭代次数(SQLCipher 3.x 中为 64K,SQLCipher 4.x 中默认情况下为 256K)
# Use 1,000,000 iterations.
db = SqlCipherDatabase('my_app.db', pragmas={'kdf_iter': 1000000})
使用 16KB 的密码页面大小和 10,000 页的缓存大小
db = SqlCipherDatabase('my_app.db', passphrase='secret!!!', pragmas={
'cipher_page_size': 1024 * 16,
'cache_size': 10000}) # 10,000 16KB pages, or 160MB.
提示用户输入密码的示例
db = SqlCipherDatabase(None)
class BaseModel(Model):
"""Parent for all app's models"""
class Meta:
# We won't have a valid db until user enters passhrase.
database = db
# Derive our model subclasses
class Person(BaseModel):
name = TextField(primary_key=True)
right_passphrase = False
while not right_passphrase:
db.init(
'testsqlcipher.db',
passphrase=get_passphrase_from_user())
try: # Actually execute a query against the db to test passphrase.
db.get_tables()
except DatabaseError as exc:
# This error indicates the password was wrong.
if exc.args[0] == 'file is encrypted or is not a database':
tell_user_the_passphrase_was_wrong()
db.init(None) # Reset the db.
else:
raise exc
else:
# The password was correct.
right_passphrase = True
另请参见:一个稍微更详细的 示例。
PostgreSQL 扩展
postgresql 扩展模块提供了一些“仅限 postgres”功能,目前
json 支持,包括适用于 Postgres 9.4 的jsonb。
ArrayField
字段类型,用于存储数组。HStoreField
字段类型,用于存储键值对。IntervalField
字段类型,用于存储timedelta
对象。JSONField
字段类型,用于存储 JSON 数据。BinaryJSONField
字段类型,用于jsonb
JSON 数据类型。TSVectorField
字段类型,用于存储全文搜索数据。DateTimeTZField
字段类型,一个时区感知的日期时间字段。
将来,我希望添加对更多 postgresql 功能的支持。如果您想看到添加的特定功能,请 打开 Github 问题。
警告
为了开始使用下面描述的功能,您需要使用扩展 PostgresqlExtDatabase
类,而不是 PostgresqlDatabase
。
下面的代码将假设您正在使用以下数据库和基本模型
from playhouse.postgres_ext import *
ext_db = PostgresqlExtDatabase('peewee_test', user='postgres')
class BaseExtModel(Model):
class Meta:
database = ext_db
JSON 支持
peewee 对 Postgres 的原生 JSON 数据类型提供基本支持,形式为 JSONField
。从 2.4.7 版本开始,peewee 还支持 Postgres 9.4 二进制 json jsonb
类型,通过 BinaryJSONField
。
警告
从 9.2 版本开始,Postgres 本地支持 JSON 数据类型(在 9.3 版本中完全支持)。为了使用此功能,您必须使用具有 psycopg2 2.5 或更高版本的正确 Postgres 版本。
要使用 BinaryJSONField
,它具有许多性能和查询优势,您必须拥有 Postgres 9.4 或更高版本。
注意
您必须确保您的数据库是 PostgresqlExtDatabase
的实例,才能使用 JSONField。
以下是如何声明具有 JSON 字段的模型的示例
import json
import urllib2
from playhouse.postgres_ext import *
db = PostgresqlExtDatabase('my_database')
class APIResponse(Model):
url = CharField()
response = JSONField()
class Meta:
database = db
@classmethod
def request(cls, url):
fh = urllib2.urlopen(url)
return cls.create(url=url, response=json.loads(fh.read()))
APIResponse.create_table()
# Store a JSON response.
offense = APIResponse.request('http://crime-api.com/api/offense/')
booking = APIResponse.request('http://crime-api.com/api/booking/')
# Query a JSON data structure using a nested key lookup:
offense_responses = APIResponse.select().where(
APIResponse.response['meta']['model'] == 'offense')
# Retrieve a sub-key for each APIResponse. By calling .as_json(), the
# data at the sub-key will be returned as Python objects (dicts, lists,
# etc) instead of serialized JSON.
q = (APIResponse
.select(
APIResponse.data['booking']['person'].as_json().alias('person'))
.where(APIResponse.data['meta']['model'] == 'booking'))
for result in q:
print(result.person['name'], result.person['dob'])
BinaryJSONField
的工作方式与常规 JSONField
相同,并支持相同的操作,但为测试包含提供了几个额外的操作。使用二进制 json 字段,您可以测试您的 JSON 数据是否包含其他部分 JSON 结构 (contains()
,contains_any()
,contains_all()
),或者它是否是更大 JSON 文档的子集 (contained_by()
)。
有关更多示例,请参阅下面的 JSONField
和 BinaryJSONField
API 文档。
hstore 支持
Postgresql hstore 是一个嵌入式键值存储。使用 hstore,您可以在数据库中与结构化关系数据一起存储任意键值对。
要使用 hstore
,您需要在实例化 PostgresqlExtDatabase
时指定一个额外的参数
# Specify "register_hstore=True":
db = PostgresqlExtDatabase('my_db', register_hstore=True)
目前,postgres_ext
模块支持以下操作
存储和检索任意字典
按键(或部分字典)过滤
更新/添加一个或多个键到现有字典
从现有字典中删除一个或多个键
选择键、值或压缩键和值
检索键/值的切片
测试键是否存在
测试键是否具有非空值
使用 hstore
首先,您需要从 playhouse.postgres_ext
导入自定义数据库类和 hstore 函数(参见上面的代码片段)。然后,只需将 HStoreField
添加到您的模型即可
class House(BaseExtModel):
address = CharField()
features = HStoreField()
您现在可以在 House
实例上存储任意键值对
>>> h = House.create(
... address='123 Main St',
... features={'garage': '2 cars', 'bath': '2 bath'})
...
>>> h_from_db = House.get(House.id == h.id)
>>> h_from_db.features
{'bath': '2 bath', 'garage': '2 cars'}
您可以按单个键、多个键或部分字典进行过滤
>>> query = House.select()
>>> garage = query.where(House.features.contains('garage'))
>>> garage_and_bath = query.where(House.features.contains(['garage', 'bath']))
>>> twocar = query.where(House.features.contains({'garage': '2 cars'}))
假设您想对房屋进行原子更新
>>> new_features = House.features.update({'bath': '2.5 bath', 'sqft': '1100'})
>>> query = House.update(features=new_features)
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'bath': '2.5 bath', 'garage': '2 cars', 'sqft': '1100'}
或者,或者进行原子删除
>>> query = House.update(features=House.features.delete('bath'))
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'garage': '2 cars', 'sqft': '1100'}
可以同时删除多个键
>>> query = House.update(features=House.features.delete('garage', 'sqft'))
您可以只选择键、只选择值,或者压缩两者
>>> for h in House.select(House.address, House.features.keys().alias('keys')):
... print(h.address, h.keys)
123 Main St [u'bath', u'garage']
>>> for h in House.select(House.address, House.features.values().alias('vals')):
... print(h.address, h.vals)
123 Main St [u'2 bath', u'2 cars']
>>> for h in House.select(House.address, House.features.items().alias('mtx')):
... print(h.address, h.mtx)
123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
您可以检索数据的切片,例如,所有车库数据
>>> query = House.select(House.address, House.features.slice('garage').alias('garage_data'))
>>> for house in query:
... print(house.address, house.garage_data)
123 Main St {'garage': '2 cars'}
您可以检查键是否存在并相应地过滤行
>>> has_garage = House.features.exists('garage')
>>> for house in House.select(House.address, has_garage.alias('has_garage')):
... print(house.address, house.has_garage)
123 Main St True
>>> for house in House.select().where(House.features.exists('garage')):
... print(house.address, house.features['garage']) # <-- just houses w/garage data
123 Main St 2 cars
间隔支持
Postgres 通过 INTERVAL
数据类型支持持续时间 (文档)。
- class IntervalField([null=False[, ...]])
能够存储 Python
datetime.timedelta
实例的字段类。示例
from datetime import timedelta from playhouse.postgres_ext import * db = PostgresqlExtDatabase('my_db') class Event(Model): location = CharField() duration = IntervalField() start_time = DateTimeField() class Meta: database = db @classmethod def get_long_meetings(cls): return cls.select().where(cls.duration > timedelta(hours=1))
服务器端游标
当 psycopg2 执行查询时,通常所有结果都会被后端获取并返回给客户端。这会导致您的应用程序在执行大型查询时使用大量内存。使用服务器端游标,结果会一次返回一小部分(默认情况下为 2000 条记录)。有关权威参考,请参阅 psycopg2 文档。
注意
要使用服务器端(或命名)游标,您必须使用 PostgresqlExtDatabase
。
要使用服务器端游标执行查询,只需使用 ServerSide()
助手包装您的 select 查询
large_query = PageView.select() # Build query normally.
# Iterate over large query inside a transaction.
for page_view in ServerSide(large_query):
# do some interesting analysis here.
pass
# Server-side resources are released.
如果您希望所有 SELECT
查询自动使用服务器端游标,您可以在创建 PostgresqlExtDatabase
时指定这一点
from postgres_ext import PostgresqlExtDatabase
ss_db = PostgresqlExtDatabase('my_db', server_side_cursors=True)
注意
服务器端游标仅在事务期间存在,因此,出于这个原因,peewee 不会在执行 SELECT
查询后自动调用 commit()
。如果您在完成迭代后没有 commit
,您将不会释放服务器端资源,直到连接关闭(或事务稍后提交)。此外,由于 peewee 默认情况下会缓存游标返回的行,因此您应该始终在迭代大型查询时调用 .iterator()
。
如果您使用的是 ServerSide()
助手,事务和对 iterator()
的调用将被透明地处理。
全文搜索
Postgresql 使用特殊数据类型(tsvector
和 tsquery
)提供 复杂的全文搜索。文档应存储或转换为 tsvector
类型,搜索查询应转换为 tsquery
。
对于简单情况,您可以简单地使用 Match()
函数,该函数将自动执行适当的转换,并且不需要任何模式更改
def blog_search(search_term):
return Blog.select().where(
(Blog.status == Blog.STATUS_PUBLISHED) &
Match(Blog.content, search_term))
Match()
函数将自动将左侧操作数转换为 tsvector
,并将右侧操作数转换为 tsquery
。为了获得更好的性能,建议您在要搜索的列上创建一个 GIN
索引
CREATE INDEX blog_full_text_search ON blog USING gin(to_tsvector(content));
或者,您可以使用 TSVectorField
来维护一个专门用于存储 tsvector
数据的列
class Blog(Model):
content = TextField()
search_content = TSVectorField()
注意
TSVectorField
将自动创建带有 GIN 索引。
在插入或更新 search_content
字段时,您需要显式地将传入的文本数据转换为 tsvector
content = 'Excellent blog post about peewee ORM.'
blog_entry = Blog.create(
content=content,
search_content=fn.to_tsvector(content))
要执行全文搜索,请使用 TSVectorField.match()
terms = 'python & (sqlite | postgres)'
results = Blog.select().where(Blog.search_content.match(terms))
有关更多信息,请参阅 Postgres 全文搜索文档。
postgres_ext API 说明
- class PostgresqlExtDatabase(database[, server_side_cursors=False[, register_hstore=False[, ...]]])
与
PostgresqlDatabase
相同,但需要支持- 参数
database (str) – 要连接的数据库名称。
server_side_cursors (bool) –
SELECT
查询是否应该使用服务器端游标。register_hstore (bool) – 使用连接注册 HStore 扩展。
如果您希望使用 HStore 扩展,则必须指定
register_hstore=True
。如果使用
server_side_cursors
,请确保也使用ServerSide()
包装您的查询。
- ServerSide(select_query)
- 参数
select_query – 一个
SelectQuery
实例。- Rtype 生成器
将给定的 select 查询包装在事务中,并调用其
iterator()
方法以避免缓存行实例。为了释放服务器端资源,请确保耗尽生成器(遍历所有行)。用法
large_query = PageView.select() for page_view in ServerSide(large_query): # Do something interesting. pass # At this point server side resources are released.
- class ArrayField([field_class=IntegerField[, field_kwargs=None[, dimensions=1[, convert_values=False]]]])
- 参数
field_class –
Field
的子类,例如IntegerField
。field_kwargs (dict) – 初始化
field_class
的参数。dimensions (int) – 数组的维度。
convert_values (bool) – 将
field_class
值转换应用于数组数据。
能够存储提供的 field_class 数组的字段。
注意
默认情况下,ArrayField 将使用 GIN 索引。要禁用此功能,请使用
index=False
初始化字段。您可以存储和检索列表(或列表的列表)
class BlogPost(BaseModel): content = TextField() tags = ArrayField(CharField) post = BlogPost(content='awesome', tags=['foo', 'bar', 'baz'])
此外,您可以使用
__getitem__
API 查询数据库中的值或切片# Get the first tag on a given blog post. first_tag = (BlogPost .select(BlogPost.tags[0].alias('first_tag')) .where(BlogPost.id == 1) .dicts() .get()) # first_tag = {'first_tag': 'foo'}
获取值的切片
# Get the first two tags. two_tags = (BlogPost .select(BlogPost.tags[:2].alias('two')) .dicts() .get()) # two_tags = {'two': ['foo', 'bar']}
- contains(*items)
- 参数
items – 必须在给定数组字段中的一个或多个项目。
# Get all blog posts that are tagged with both "python" and "django". Blog.select().where(Blog.tags.contains('python', 'django'))
- contains_any(*items)
- 参数
items – 要在给定数组字段中搜索的一个或多个项目。
与
contains()
相似,但将匹配数组包含给定项目中的任何项目的行。# Get all blog posts that are tagged with "flask" and/or "django". Blog.select().where(Blog.tags.contains_any('flask', 'django'))
- class DateTimeTZField(*args, **kwargs)
DateTimeField
的时区感知子类。
- class HStoreField(*args, **kwargs)
用于存储和检索任意键值对的字段。有关使用详细信息,请参阅 hstore 支持。
注意
要使用
HStoreField
,您需要确保 hstore 扩展已在连接中注册。为此,请使用register_hstore=True
初始化PostgresqlExtDatabase
。注意
默认情况下,
HStoreField
将使用 GiST 索引。要禁用此功能,请使用index=False
初始化字段。- keys()
返回给定行的键。
>>> for h in House.select(House.address, House.features.keys().alias('keys')): ... print(h.address, h.keys) 123 Main St [u'bath', u'garage']
- values()
返回给定行的值。
>>> for h in House.select(House.address, House.features.values().alias('vals')): ... print(h.address, h.vals) 123 Main St [u'2 bath', u'2 cars']
- items()
类似于 Python 的
dict
,以列表形式返回键和值。>>> for h in House.select(House.address, House.features.items().alias('mtx')): ... print(h.address, h.mtx) 123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
- slice(*args)
根据键列表返回数据切片。
>>> for h in House.select(House.address, House.features.slice('garage').alias('garage_data')): ... print(h.address, h.garage_data) 123 Main St {'garage': '2 cars'}
- exists(key)
查询给定键是否存在。
>>> for h in House.select(House.address, House.features.exists('garage').alias('has_garage')): ... print(h.address, h.has_garage) 123 Main St True >>> for h in House.select().where(House.features.exists('garage')): ... print(h.address, h.features['garage']) # <-- just houses w/garage data 123 Main St 2 cars
- defined(key)
查询给定键是否与值相关联。
- update(**data)
对给定行或多行的键值对执行原子更新。
>>> query = House.update(features=House.features.update( ... sqft=2000, ... year_built=2012)) >>> query.where(House.id == 1).execute()
- delete(*keys)
删除给定行或多行的指定键。
注意
我们将使用
UPDATE
查询。>>> query = House.update(features=House.features.delete( ... 'sqft', 'year_built')) >>> query.where(House.id == 1).execute()
- contains(value)
- 参数
value – 可以是
dict
、list
(包含键)或单个键。
查询行中是否存在以下内容:
部分字典。
键列表。
单个键。
>>> query = House.select() >>> has_garage = query.where(House.features.contains('garage')) >>> garage_bath = query.where(House.features.contains(['garage', 'bath'])) >>> twocar = query.where(House.features.contains({'garage': '2 cars'}))
- contains_any(*keys)
- 参数
keys – 要搜索的一个或多个键。
查询行中是否存在任何键。
- class JSONField(dumps=None, *args, **kwargs)
- 参数
dumps – 默认情况下调用 json.dumps() 或 dumps 函数。您可以覆盖此方法以创建自定义 JSON 包装器。
适合存储和查询任意 JSON 的字段类。在模型上使用此字段时,将字段的值设置为 Python 对象(
dict
或list
)。从数据库中检索值时,它将以 Python 数据结构的形式返回。注意
您必须使用 Postgres 9.2 / psycopg2 2.5 或更高版本。
注意
如果您使用的是 Postgres 9.4,强烈建议使用
BinaryJSONField
,因为它提供了更好的性能和更强大的查询选项。示例模型声明
db = PostgresqlExtDatabase('my_db') class APIResponse(Model): url = CharField() response = JSONField() class Meta: database = db
存储 JSON 数据的示例
url = 'http://foo.com/api/resource/' resp = json.loads(urllib2.urlopen(url).read()) APIResponse.create(url=url, response=resp) APIResponse.create(url='http://foo.com/baz/', response={'key': 'value'})
要查询,请使用 Python 的
[]
运算符指定嵌套键或数组查找APIResponse.select().where( APIResponse.response['key1']['nested-key'] == 'some-value')
为了说明
[]
运算符的使用,假设我们在APIResponse
中存储了以下数据{ "foo": { "bar": ["i1", "i2", "i3"], "baz": { "huey": "mickey", "peewee": "nugget" } } }
以下是几个查询的结果
def get_data(expression): # Helper function to just retrieve the results of a # particular expression. query = (APIResponse .select(expression.alias('my_data')) .dicts() .get()) return query['my_data'] # Accessing the foo -> bar subkey will return a JSON # representation of the list. get_data(APIResponse.data['foo']['bar']) # '["i1", "i2", "i3"]' # In order to retrieve this list as a Python list, # we will call .as_json() on the expression. get_data(APIResponse.data['foo']['bar'].as_json()) # ['i1', 'i2', 'i3'] # Similarly, accessing the foo -> baz subkey will # return a JSON representation of the dictionary. get_data(APIResponse.data['foo']['baz']) # '{"huey": "mickey", "peewee": "nugget"}' # Again, calling .as_json() will return an actual # python dictionary. get_data(APIResponse.data['foo']['baz'].as_json()) # {'huey': 'mickey', 'peewee': 'nugget'} # When dealing with simple values, either way works as # you expect. get_data(APIResponse.data['foo']['bar'][0]) # 'i1' # Calling .as_json() when the result is a simple value # will return the same thing as the previous example. get_data(APIResponse.data['foo']['bar'][0].as_json()) # 'i1'
- class BinaryJSONField(dumps=None, *args, **kwargs)
- 参数
dumps – 默认情况下调用 json.dumps() 或 dumps 函数。您可以覆盖此方法以创建自定义 JSON 包装器。
存储和查询任意 JSON 文档。数据应使用普通的 Python
dict
和list
对象存储,从数据库中返回数据时,也将使用dict
和list
返回。有关基本查询操作的示例,请参阅上面针对
JSONField
的代码示例。以下示例查询将使用上面描述的相同APIResponse
模型。注意
默认情况下,BinaryJSONField 将使用 GiST 索引。要禁用此功能,请使用
index=False
初始化字段。注意
您必须使用 Postgres 9.4 / psycopg2 2.5 或更高版本。如果您使用的是 Postgres 9.2 或 9.3,则可以使用常规的
JSONField
。- contains(other)
测试给定的 JSON 数据是否包含给定的 JSON 片段或键。
示例
search_fragment = { 'foo': {'bar': ['i2']} } query = (APIResponse .select() .where(APIResponse.data.contains(search_fragment))) # If we're searching for a list, the list items do not need to # be ordered in a particular way: query = (APIResponse .select() .where(APIResponse.data.contains({ 'foo': {'bar': ['i2', 'i1']}})))
我们也可以传入简单的键。要查找在顶层包含键
foo
的 APIResponseAPIResponse.select().where(APIResponse.data.contains('foo'))
我们也可以使用方括号搜索子键
APIResponse.select().where( APIResponse.data['foo']['bar'].contains(['i2', 'i1']))
- contains_any(*items)
搜索一个或多个给定项的存在。
APIResponse.select().where( APIResponse.data.contains_any('foo', 'baz', 'nugget'))
与
contains()
一样,我们也可以搜索子键APIResponse.select().where( APIResponse.data['foo']['bar'].contains_any('i2', 'ix'))
- contains_all(*items)
搜索所有给定项的存在。
APIResponse.select().where( APIResponse.data.contains_all('foo'))
与
contains_any()
一样,我们也可以搜索子键APIResponse.select().where( APIResponse.data['foo']['bar'].contains_all('i1', 'i2', 'i3'))
- contained_by(other)
测试给定的 JSON 文档是否包含在(是给定 JSON 文档的子集)给定的 JSON 文档中。此方法是
contains()
的反函数。big_doc = { 'foo': { 'bar': ['i1', 'i2', 'i3'], 'baz': { 'huey': 'mickey', 'peewee': 'nugget', } }, 'other_key': ['nugget', 'bear', 'kitten'], } APIResponse.select().where( APIResponse.data.contained_by(big_doc))
- concat(data)
将两个字段数据和提供的数据连接起来。请注意,此操作不会合并或执行“深度连接”。
- has_key(key)
测试键是否存在于 JSON 对象的顶层。
- remove(*keys)
从 JSON 对象的顶层删除一个或多个键。
- Match(field, query)
生成全文搜索表达式,自动将左侧操作数转换为
tsvector
,将右侧操作数转换为tsquery
。示例
def blog_search(search_term): return Blog.select().where( (Blog.status == Blog.STATUS_PUBLISHED) & Match(Blog.content, search_term))
- class TSVectorField
适合存储
tsvector
数据的字段类型。此字段将自动创建GIN
索引,以提高搜索性能。注意
存储在此字段中的数据仍然需要手动转换为
tsvector
类型。注意
默认情况下,TSVectorField 将使用 GIN 索引。要禁用此功能,请使用
index=False
初始化字段。示例用法
class Blog(Model): content = TextField() search_content = TSVectorField() content = 'this is a sample blog entry.' blog_entry = Blog.create( content=content, search_content=fn.to_tsvector(content)) # Note `to_tsvector()`.
- match(query[, language=None[, plain=False]])
- 参数
query (str) – 全文搜索查询。
language (str) – 语言名称(可选)。
plain (bool) – 使用普通(简单)解析器解析搜索查询。
- 返回值
表示全文搜索/匹配的表达式。
示例
# Perform a search using the "match" method. terms = 'python & (sqlite | postgres)' results = Blog.select().where(Blog.search_content.match(terms))
Cockroach 数据库
CockroachDB (CRDB) 受到 peewee 的良好支持。
from playhouse.cockroachdb import CockroachDatabase
db = CockroachDatabase('my_app', user='root', host='10.1.0.8')
如果您使用的是 Cockroach Cloud,您可能会发现使用连接字符串指定连接参数更容易。
db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')
注意
CockroachDB 需要 psycopg2
(postgres)Python 驱动程序。
注意
CockroachDB 安装和入门指南可以在这里找到:https://www.cockroachlabs.com/docs/stable/install-cockroachdb.html
SSL 配置
在运行 Cockroach 集群时强烈建议使用 SSL 证书。Psycopg2 原生支持 SSL,但您可能需要在初始化数据库时指定一些其他选项。
db = CockroachDatabase(
'my_app',
user='root',
host='10.1.0.8',
sslmode='verify-full', # Verify the cert common-name.
sslrootcert='/path/to/root.crt')
# Or, alternatively, specified as part of a connection-string:
db = CockroachDatabase('postgresql://root:secret@host:26257/dbname'
'?sslmode=verify-full&sslrootcert=/path/to/root.crt'
'&options=--cluster=my-cluster-xyz')
有关客户端验证的更多详细信息,请参阅 libpq 文档。
Cockroach 扩展 API
playhouse.cockroachdb
扩展模块提供以下类和帮助程序。
CockroachDatabase
-PostgresqlDatabase
的子类,专门设计用于与 CRDB 协同工作。PooledCockroachDatabase
- 与上面类似,但实现了连接池。run_transaction()
- 在事务中运行函数,并提供自动客户端端重试逻辑。
在使用 CRDB 时可能会有用的特殊字段类型。
UUIDKeyField
- 使用 CRDB 的UUID
类型(具有默认的随机生成的 UUID)的 primary-key 字段实现。RowIDField
- 使用 CRDB 的INT
类型(具有默认的unique_rowid()
)的 primary-key 字段实现。JSONField
- 与 PostgresBinaryJSONField
相同,因为 CRDB 将 JSON 视为 JSONB。ArrayField
- 与 Postgres 扩展相同(但不支持多维数组)。
CRDB 与 Postgres 的线协议兼容,并公开非常相似的 SQL 接口,因此可以使用 PostgresqlDatabase
与 CRDB 协同工作(但不推荐)。
CRDB 不支持嵌套事务(保存点),因此
atomic()
方法已实现,以便在使用CockroachDatabase
时强制执行此方法。有关更多信息,请参阅 CRDB 事务。CRDB 在字段类型、日期函数和自省方面可能与 Postgres 有细微的差异。
CRDB 特定功能由
CockroachDatabase
公开,例如指定事务优先级或AS OF SYSTEM TIME
子句。
CRDB 事务
CRDB 不支持嵌套事务(保存点),因此 atomic()
方法在 CockroachDatabase
上已修改,如果遇到无效嵌套,则会引发异常。如果您希望能够嵌套事务代码,可以使用 transaction()
方法,该方法将确保最外层块将管理事务(例如,退出嵌套块不会导致提前提交)。
示例
@db.transaction()
def create_user(username):
return User.create(username=username)
def some_other_function():
with db.transaction() as txn:
# do some stuff...
# This function is wrapped in a transaction, but the nested
# transaction will be ignored and folded into the outer
# transaction, as we are already in a wrapped-block (via the
# context manager).
create_user('[email protected]')
# do other stuff.
# At this point we have exited the outer-most block and the transaction
# will be committed.
return
CRDB 提供客户端端事务重试,可以使用特殊的 run_transaction()
帮助程序获得这些重试。此帮助程序方法接受一个可调用对象,该对象负责执行可能需要重试的任何事务性语句。
run_transaction()
的最简单示例。
def create_user(email):
# Callable that accepts a single argument (the database instance) and
# which is responsible for executing the transactional SQL.
def callback(db_ref):
return User.create(email=email)
return db.run_transaction(callback, max_attempts=10)
huey = create_user('[email protected]')
注意
如果事务在给定次数的尝试后无法提交,则将引发 cockroachdb.ExceededMaxAttempts
异常。如果 SQL 格式错误、违反约束等,则该函数将向调用方引发异常。
使用 run_transaction()
为将金额从一个帐户转移到另一个帐户的事务实现客户端端重试的示例。
from playhouse.cockroachdb import CockroachDatabase
db = CockroachDatabase('my_app')
def transfer_funds(from_id, to_id, amt):
"""
Returns a 3-tuple of (success?, from balance, to balance). If there are
not sufficient funds, then the original balances are returned.
"""
def thunk(db_ref):
src, dest = (Account
.select()
.where(Account.id.in_([from_id, to_id])))
if src.id != from_id:
src, dest = dest, src # Swap order.
# Cannot perform transfer, insufficient funds!
if src.balance < amt:
return False, src.balance, dest.balance
# Update each account, returning the new balance.
src, = (Account
.update(balance=Account.balance - amt)
.where(Account.id == from_id)
.returning(Account.balance)
.execute())
dest, = (Account
.update(balance=Account.balance + amt)
.where(Account.id == to_id)
.returning(Account.balance)
.execute())
return True, src.balance, dest.balance
# Perform the queries that comprise a logical transaction. In the
# event the transaction fails due to contention, it will be auto-
# matically retried (up to 10 times).
return db.run_transaction(thunk, max_attempts=10)
CRDB API
- class CockroachDatabase(database[, **kwargs])
CockroachDB 实现,基于
PostgresqlDatabase
并使用psycopg2
驱动程序。其他关键字参数传递给 psycopg2 连接构造函数,可用于指定数据库
user
、port
等。或者,连接详细信息可以在 URL 格式中指定。
- run_transaction(callback[, max_attempts=None[, system_time=None[, priority=None]]])
- 参数
callback – 接受单个
db
参数(将是调用此方法的数据库实例)的可调用对象。max_attempts (int) – 在放弃之前尝试的最大次数。
system_time (datetime) – 相对于给定值执行事务
AS OF SYSTEM TIME
。priority (str) – “low”、“normal” 或 “high” 之一。
- 返回值
返回回调返回的值。
- 引发
ExceededMaxAttempts
如果超过了max_attempts
。
在事务中运行 SQL,并自动进行客户端端重试。
用户提供的
callback
必须接受一个参数,即表示事务正在运行的连接的
db
实例。必须不尝试提交、回滚或以其他方式管理事务。
可能被调用多次。
应该理想情况下只包含 SQL 操作。
此外,在调用此函数时,数据库中不能有任何打开的事务,因为 CRDB 不支持嵌套事务。尝试这样做会导致
NotImplementedError
错误。最简单的示例
def create_user(email): def callback(db_ref): return User.create(email=email) return db.run_transaction(callback, max_attempts=10) user = create_user('[email protected]')
- class PooledCockroachDatabase(database[, **kwargs])
基于
PooledPostgresqlDatabase
的 CockroachDB 连接池实现。实现与CockroachDatabase
相同的 API,但将执行客户端连接池。
- run_transaction(db, callback[, max_attempts=None[, system_time=None[, priority=None]]])
在事务中运行 SQL,并自动进行客户端重试。有关详细信息,请参阅
CockroachDatabase.run_transaction()
。- 参数
db (CockroachDatabase) – 数据库实例。
callback – 可调用对象,接受单个
db
参数(它将与上面传递的值相同)。
注意
此函数等效于
CockroachDatabase
类上的同名方法。
- class UUIDKeyField
UUID 主键字段,使用 CRDB
gen_random_uuid()
函数自动填充初始值。
- class RowIDField
自动递增的整数主键字段,使用 CRDB
unique_rowid()
函数自动填充初始值。
另请参阅
BinaryJSONField
来自 Postgresql 扩展(在cockroachdb
扩展模块中可用,并别名为JSONField
)。ArrayField
来自 Postgresql 扩展。
MySQL 扩展
Peewee 为使用 mysql-connector 驱动程序或 mariadb-connector 提供了备用数据库实现。这些实现可以在 playhouse.mysql_ext
中找到。
- class MySQLConnectorDatabase(database, **kwargs)
使用 mysql-connector 的数据库实现。支持的 连接参数 的完整列表。
mysql-connector 的示例用法
from playhouse.mysql_ext import MySQLConnectorDatabase # MySQL database implementation that utilizes mysql-connector driver. db = MySQLConnectorDatabase('my_database', host='1.2.3.4', user='mysql')
- class MariaDBConnectorDatabase(database, **kwargs)
使用 mariadb-connector 的数据库实现。支持的 连接参数 的完整列表。
mariadb-connector 的示例用法
from playhouse.mysql_ext import MariaDBConnectorDatabase # MySQL database implementation that utilizes mysql-connector driver. db = MariaDBConnectorDatabase('my_database', host='1.2.3.4', user='mysql')
其他 MySQL 特定助手
DataSet
dataset 模块包含一个高级 API,用于处理以流行的 同名项目 为模型的数据库。dataset 模块的目标是提供
一个简化的 API,用于处理关系数据,类似于处理 JSON。
一种将关系数据导出为 JSON 或 CSV 的简单方法。
一种将 JSON 或 CSV 数据导入关系数据库的简单方法。
最小的数据加载脚本可能如下所示
from playhouse.dataset import DataSet
db = DataSet('sqlite:///:memory:')
table = db['sometable']
table.insert(name='Huey', age=3)
table.insert(name='Mickey', age=5, gender='male')
huey = table.find_one(name='Huey')
print(huey)
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
for obj in table:
print(obj)
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
# {'age': 5, 'gender': 'male', 'id': 2, 'name': 'Mickey'}
您也可以使用字典 API 插入、更新或删除数据
huey = table.find_one(name='Huey')
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
# Perform an update by supplying a partial record of changes.
table[1] = {'gender': 'male', 'age': 4}
print(table[1])
# {'age': 4, 'gender': 'male', 'id': 1, 'name': 'Huey'}
# Or insert a new record:
table[3] = {'name': 'Zaizee', 'age': 2}
print(table[3])
# {'age': 2, 'gender': None, 'id': 3, 'name': 'Zaizee'}
# Or delete a record:
del table[3] # Remove the row we just added.
您可以使用 freeze()
和 thaw()
导出或导入数据
# Export table content to the `users.json` file.
db.freeze(table.all(), format='json', filename='users.json')
# Import data from a CSV file into a new table. Columns will be automatically
# created for each field in the CSV file.
new_table = db['stats']
new_table.thaw(format='csv', filename='monthly_stats.csv')
入门
DataSet
对象通过传入格式为 dialect://user:password@host/dbname
的数据库 URL 来初始化。有关连接到各种数据库的示例,请参阅 数据库 URL 部分。
# Create an in-memory SQLite database.
db = DataSet('sqlite:///:memory:')
存储数据
要存储数据,我们必须首先获取对表的引用。如果表不存在,它将自动创建
# Get a table reference, creating the table if it does not exist.
table = db['users']
我们现在可以 insert()
新行到表中。如果列不存在,它们将自动创建
table.insert(name='Huey', age=3, color='white')
table.insert(name='Mickey', age=5, gender='male')
要更新表中现有的条目,请传入一个包含新值和过滤条件的字典。在 columns 参数中指定用作过滤器的列列表。如果没有指定过滤列,则将更新所有行。
# Update the gender for "Huey".
table.update(name='Huey', gender='male', columns=['name'])
# Update all records. If the column does not exist, it will be created.
table.update(favorite_orm='peewee')
导入数据
要从外部来源(如 JSON 或 CSV 文件)导入数据,可以使用 thaw()
方法。默认情况下,将为遇到的任何属性创建新列。如果您希望只填充表上已定义的列,则可以传入 strict=True
。
# Load data from a JSON file containing a list of objects.
table = dataset['stock_prices']
table.thaw(filename='stocks.json', format='json')
table.all()[:3]
# Might print...
[{'id': 1, 'ticker': 'GOOG', 'price': 703},
{'id': 2, 'ticker': 'AAPL', 'price': 109},
{'id': 3, 'ticker': 'AMZN', 'price': 300}]
使用事务
DataSet 支持使用简单的上下文管理器嵌套事务。
table = db['users']
with db.transaction() as txn:
table.insert(name='Charlie')
with db.transaction() as nested_txn:
# Set Charlie's favorite ORM to Django.
table.update(name='Charlie', favorite_orm='django', columns=['name'])
# jk/lol
nested_txn.rollback()
检查数据库
您可以使用 tables()
方法列出当前数据库中的表
>>> print(db.tables)
['sometable', 'user']
对于给定的表,您可以打印列
>>> table = db['user']
>>> print(table.columns)
['id', 'age', 'name', 'gender', 'favorite_orm']
我们还可以找出表中有多少行
>>> print(len(db['user']))
3
读取数据
要检索所有行,可以使用 all()
方法
# Retrieve all the users.
users = db['user'].all()
# We can iterate over all rows without calling `.all()`
for user in db['user']:
print(user['name'])
可以使用 find()
和 find_one()
检索特定对象。
# Find all the users who like peewee.
peewee_users = db['user'].find(favorite_orm='peewee')
# Find Huey.
huey = db['user'].find_one(name='Huey')
导出数据
要导出数据,请使用 freeze()
方法,传入您要导出的查询
peewee_users = db['user'].find(favorite_orm='peewee')
db.freeze(peewee_users, format='json', filename='peewee_users.json')
API
- class DataSet(url, **kwargs)
- 参数
url – 数据库 URL 或
Database
实例。有关使用 URL 的详细信息,请参阅 数据库 URL 获取示例。kwargs – 反射数据库时传递给
Introspector.generate_models()
的其他关键字参数。
DataSet 类提供了一个用于处理关系数据库的高级 API。
- tables
返回存储在数据库中的表列表。此列表在每次访问时动态计算。
- query(sql[, params=None[, commit=True]])
- 参数
sql (str) – SQL 查询。
params (list) – 查询的可选参数。
commit (bool) – 查询是否应该在执行后提交。
- 返回值
数据库游标。
对数据库执行提供的查询。
- transaction()
创建一个表示新事务(或保存点)的上下文管理器。
- freeze(query[, format='csv'[, filename=None[, file_obj=None[, encoding='utf8'[, **kwargs]]]]])
- 参数
query –
SelectQuery
,使用all()
或 ~Table.find 生成。format – 输出格式。默认情况下,支持 csv 和 json。
filename – 要写入输出的文件名。
file_obj – 要写入输出的文件类对象。
encoding (str) – 文件编码。
kwargs – 用于导出特定功能的任意参数。
- thaw(table[, format='csv'[, filename=None[, file_obj=None[, strict=False[, encoding='utf8'[, **kwargs]]]]]])
- 参数
table (str) – 要将数据加载到的表的名称。
format – 输入格式。默认情况下,支持 csv 和 json。
filename – 要从中读取数据的文件名。
file_obj – 要从中读取数据的文件类对象。
strict (bool) – 是否存储不存在于表上的列的值。
encoding (str) – 文件编码。
kwargs – 用于导入特定功能的任意参数。
- connect()
打开与底层数据库的连接。如果未显式打开连接,则在第一次执行查询时将打开连接。
- close()
关闭与底层数据库的连接。
- class Table(dataset, name, model_class)
- Noindex
提供了一个用于处理给定表中行的 API。
- columns
返回给定表中的列列表。
- create_index(columns[, unique=False])
在给定的列上创建索引。
# Create a unique index on the `username` column. db['users'].create_index(['username'], unique=True)
- insert(**data)
将给定的数据字典插入到表中,根据需要创建新列。
- update(columns=None, conjunction=None, **data)
使用提供的数据更新表。如果在columns参数中指定了一个或多个列,则data字典中这些列的值将用于确定要更新的行。
# Update all rows. db['users'].update(favorite_orm='peewee') # Only update Huey's record, setting his age to 3. db['users'].update(name='Huey', age=3, columns=['name'])
- find(**query)
查询表中与指定相等条件匹配的行。如果未指定查询,则返回所有行。
peewee_users = db['users'].find(favorite_orm='peewee')
- find_one(**query)
返回与指定相等条件匹配的单个行。如果未找到匹配的行,则返回
None
。huey = db['users'].find_one(name='Huey')
- all()
返回给定表中的所有行。
- delete(**query)
删除与给定相等条件匹配的所有行。如果未提供查询,则删除所有行。
# Adios, Django! db['users'].delete(favorite_orm='Django') # Delete all the secret messages. db['secret_messages'].delete()
- freeze([format='csv'[, filename=None[, file_obj=None[, **kwargs]]]])
- 参数
format – 输出格式。默认情况下,支持 csv 和 json。
filename – 要写入输出的文件名。
file_obj – 要写入输出的文件类对象。
kwargs – 用于导出特定功能的任意参数。
- thaw([format='csv'[, filename=None[, file_obj=None[, strict=False[, **kwargs]]]]])
- 参数
format – 输入格式。默认情况下,支持 csv 和 json。
filename – 要从中读取数据的文件名。
file_obj – 要从中读取数据的文件类对象。
strict (bool) – 是否存储不存在于表上的列的值。
kwargs – 用于导入特定功能的任意参数。
字段
这些字段可以在playhouse.fields
模块中找到。
混合属性
混合属性封装了在Python和SQL级别上运行的功能。混合属性的想法来自SQLAlchemy中同名功能。考虑以下示例
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_method
def contains(self, point):
return (self.start <= point) & (point < self.end)
混合属性的名称来源于length
属性的行为方式取决于它是通过Interval
类还是Interval
实例访问的。
如果通过实例访问,则它的行为与预期一致。
但是,如果通过Interval.length
类属性访问,则长度计算将表示为SQL表达式。例如
query = Interval.select().where(Interval.length > 5)
此查询等效于以下SQL
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."end" - "t1"."start") > 5)
The playhouse.hybrid
module also contains a decorator for implementing hybrid methods which can accept parameters. As with hybrid properties, when accessed via a model instance, then the function executes normally as-written. When the hybrid method is called on the class, however, it will generate a SQL expression.
示例
query = Interval.select().where(Interval.contains(2))
此查询等效于以下SQL
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))
对于python实现略微不同于SQL实现的情况,还有一个额外的API。让我们在Interval
模型中添加一个radius
方法。由于此方法计算绝对值,我们将使用Python abs()
函数用于实例部分,并使用fn.ABS()
SQL函数用于类部分。
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_property
def radius(self):
return abs(self.length) / 2
@radius.expression
def radius(cls):
return fn.ABS(cls.length) / 2
有趣的是,两种radius
实现都引用了length
混合属性!当通过Interval
实例访问时,半径计算将在Python中执行。当通过Interval
类调用时,我们将获得相应的SQL。
示例
query = Interval.select().where(Interval.radius < 3)
此查询等效于以下SQL
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE ((abs("t1"."end" - "t1"."start") / 2) < 3)
很酷,对吧?感谢SQLAlchemy的这个酷炫的想法!
混合API
- class hybrid_method(func[, expr=None])
方法装饰器,允许定义具有实例级和类级行为的Python对象方法。
示例
class Interval(Model): start = IntegerField() end = IntegerField() @hybrid_method def contains(self, point): return (self.start <= point) & (point < self.end)
当使用
Interval
实例调用时,contains
方法的行为将与预期一致。但是,当作为类方法调用时,将生成SQL表达式query = Interval.select().where(Interval.contains(2))
将生成以下SQL
SELECT "t1"."id", "t1"."start", "t1"."end" FROM "interval" AS t1 WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))
- expression(expr)
用于指定生成SQL表达式的函数的方法装饰器。
- class hybrid_property(fget[, fset=None[, fdel=None[, expr=None]]])
方法装饰器,允许定义具有实例级和类级行为的Python对象属性。
示例
class Interval(Model): start = IntegerField() end = IntegerField() @hybrid_property def length(self): return self.end - self.start @hybrid_property def radius(self): return abs(self.length) / 2 @radius.expression def radius(cls): return fn.ABS(cls.length) / 2
当在
Interval
实例上访问时,length
和radius
属性的行为将与预期一致。但是,当作为类属性访问时,将生成SQL表达式。query = (Interval .select() .where( (Interval.length > 6) & (Interval.radius >= 3)))
将生成以下SQL
SELECT "t1"."id", "t1"."start", "t1"."end" FROM "interval" AS t1 WHERE ( (("t1"."end" - "t1"."start") > 6) AND ((abs("t1"."end" - "t1"."start") / 2) >= 3) )
键值存储
The playhouse.kv
module contains the implementation of a persistent dictionary.
- class KeyValue([key_field=None[, value_field=None[, ordered=False[, database=None[, table_name='keyvalue']]]]])
- 参数
key_field (Field) – 用于键的字段。默认为
CharField
。必须具有primary_key=True
。value_field (Field) – 用于值的字段。默认为
PickleField
.ordered (bool) – 数据应按键排序返回。
database (Database) – 存储键值数据的数据库。如果未指定,将使用内存中的 SQLite 数据库。
table_name (str) – 数据存储的表名。
用于存储键值数据的类似字典的 API。与字典一样,支持预期的 API,但还具有接受表达式来获取、设置和删除项的附加功能。
当实例化
KeyValue
时,表会自动创建(如果不存在)。使用高效的 upsert 实现来设置和更新/覆盖键值对。
基本示例
# Create a key/value store, which uses an in-memory SQLite database # for data storage. KV = KeyValue() # Set (or overwrite) the value for "k1". KV['k1'] = 'v1' # Set (or update) multiple keys at once (uses an efficient upsert). KV.update(k2='v2', k3='v3') # Getting values works as you'd expect. assert KV['k2'] == 'v2' # We can also do this: for value in KV[KV.key > 'k1']: print(value) # 'v2' # 'v3' # Update multiple values at once using expression: KV[KV.key > 'k1'] = 'vx' # What's stored in the KV? print(dict(KV)) # {'k1': 'v1', 'k2': 'vx', 'k3': 'vx'} # Delete a single item. del KV['k2'] # How many items are stored in the KV? print(len(KV)) # 2 # Delete items that match the given condition. del KV[KV.key > 'k1']
- __contains__(expr)
- 参数
expr – 单个键或表达式
- 返回值
键/表达式是否存在。
示例
>>> kv = KeyValue() >>> kv.update(k1='v1', k2='v2') >>> 'k1' in kv True >>> 'kx' in kv False >>> (KV.key < 'k2') in KV True >>> (KV.key > 'k2') in KV False
- __len__()
- 返回值
存储的项目数量。
- __getitem__(expr)
- 参数
expr – 单个键或表达式。
- 返回值
与键/表达式对应的值。
- 引发
KeyError
如果给出单个键且未找到。
示例
>>> KV = KeyValue() >>> KV.update(k1='v1', k2='v2', k3='v3') >>> KV['k1'] 'v1' >>> KV['kx'] KeyError: "kx" not found >>> KV[KV.key > 'k1'] ['v2', 'v3'] >>> KV[KV.key < 'k1'] []
- __setitem__(expr, value)
- 参数
expr – 单个键或表达式。
value – 要为键设置的值
为给定键设置值。如果
expr
是一个表达式,则与表达式匹配的任何键的值都将被更新。示例
>>> KV = KeyValue() >>> KV.update(k1='v1', k2='v2', k3='v3') >>> KV['k1'] = 'v1-x' >>> print(KV['k1']) 'v1-x' >>> KV[KV.key >= 'k2'] = 'v99' >>> dict(KV) {'k1': 'v1-x', 'k2': 'v99', 'k3': 'v99'}
- __delitem__(expr)
- 参数
expr – 单个键或表达式。
删除给定键。如果给出表达式,则删除与表达式匹配的所有键。
示例
>>> KV = KeyValue() >>> KV.update(k1=1, k2=2, k3=3) >>> del KV['k1'] # Deletes "k1". >>> del KV['k1'] KeyError: "k1" does not exist >>> del KV[KV.key > 'k2'] # Deletes "k3". >>> del KV[KV.key > 'k99'] # Nothing deleted, no keys match.
- keys()
- 返回值
表中所有键的可迭代对象。
- values()
- 返回值
表中所有值的迭代器。
- items()
- 返回值
表中所有键值对的可迭代对象。
- update([__data=None[, **mapping]])
有效地批量插入或替换给定的键值对。
示例
>>> KV = KeyValue() >>> KV.update(k1=1, k2=2) # Sets 'k1'=1, 'k2'=2. >>> dict(KV) {'k1': 1, 'k2': 2} >>> KV.update(k2=22, k3=3) # Updates 'k2'->22, sets 'k3'=3. >>> dict(KV) {'k1': 1, 'k2': 22, 'k3': 3} >>> KV.update({'k2': -2, 'k4': 4}) # Also can pass a dictionary. >>> dict(KV) {'k1': 1, 'k2': -2, 'k3': 3, 'k4': 4}
- get(expr[, default=None])
- 参数
expr – 单个键或表达式。
default – 如果键未找到,则为默认值。
- 返回值
给定键/expr 的值或默认值(如果单个键未找到)。
获取给定键的值。如果键不存在,则返回默认值,除非键是表达式,在这种情况下将返回空列表。
- pop(expr[, default=Sentinel])
- 参数
expr – 单个键或表达式。
default – 如果键不存在,则为默认值。
- 返回值
给定键/expr 的值或默认值(如果单个键未找到)。
获取值并删除给定键。如果键不存在,则返回默认值,除非键是表达式,在这种情况下将返回空列表。
- clear()
从键值表中删除所有项目。
快捷方式
此模块包含用于表达否则使用 peewee 的 API 会比较冗长或繁琐的事物的辅助函数。还有一些用于将模型序列化为字典以及反之的辅助程序。
- model_to_dict(model[, recurse=True[, backrefs=False[, only=None[, exclude=None[, extra_attrs=None[, fields_from_query=None[, max_depth=None[, manytomany=False]]]]]]])
- 参数
recurse (bool) – 是否应递归外键。
backrefs (bool) – 是否应递归相关对象的列表。
only – 应包含在结果字典中的字段实例列表(或集合)。
exclude – 应从结果字典中排除的字段实例列表(或集合)。
extra_attrs – 实例上应包含在字典中的属性或方法名称列表。
fields_from_query (Select) – 创建此模型实例的
SelectQuery
。仅查询显式选择的字段和值将被序列化。max_depth (int) – 递归时的最大深度。
manytomany (bool) – 处理多对多字段。
将模型实例(以及可选的任何相关实例)转换为字典。
示例
>>> user = User.create(username='charlie') >>> model_to_dict(user) {'id': 1, 'username': 'charlie'} >>> model_to_dict(user, backrefs=True) {'id': 1, 'tweets': [], 'username': 'charlie'} >>> t1 = Tweet.create(user=user, message='tweet-1') >>> t2 = Tweet.create(user=user, message='tweet-2') >>> model_to_dict(user, backrefs=True) { 'id': 1, 'tweets': [ {'id': 1, 'message': 'tweet-1'}, {'id': 2, 'message': 'tweet-2'}, ], 'username': 'charlie' } >>> model_to_dict(t1) { 'id': 1, 'message': 'tweet-1', 'user': { 'id': 1, 'username': 'charlie' } } >>> model_to_dict(t2, recurse=False) {'id': 1, 'message': 'tweet-2', 'user': 1}
model_to_dict
的实现相当复杂,因为它试图支持各种用法。如果您有特殊用法,我强烈建议您不要尝试将一些疯狂的参数组合塞入此函数中。只需编写一个简单的函数来完成您要尝试做的事情。
- dict_to_model(model_class, data[, ignore_unknown=False])
- 参数
model_class (Model) – 要构造的模型类。
data (dict) – 数据字典。外键可以包含为嵌套字典,反向引用可以包含为字典列表。
ignore_unknown (bool) – 是否允许识别不出(非字段)属性。
将数据字典转换为模型实例,在适当的情况下创建相关实例。
示例
>>> user_data = {'id': 1, 'username': 'charlie'} >>> user = dict_to_model(User, user_data) >>> user <__main__.User at 0x7fea8fa4d490> >>> user.username 'charlie' >>> note_data = {'id': 2, 'text': 'note text', 'user': user_data} >>> note = dict_to_model(Note, note_data) >>> note.text 'note text' >>> note.user.username 'charlie' >>> user_with_notes = { ... 'id': 1, ... 'username': 'charlie', ... 'notes': [{'id': 1, 'text': 'note-1'}, {'id': 2, 'text': 'note-2'}]} >>> user = dict_to_model(User, user_with_notes) >>> user.notes[0].text 'note-1' >>> user.notes[0].user.username 'charlie'
- update_model_from_dict(instance, data[, ignore_unknown=False])
- 参数
instance (Model) – 要更新的模型实例。
data (dict) – 数据字典。外键可以包含为嵌套字典,反向引用可以包含为字典列表。
ignore_unknown (bool) – 是否允许识别不出(非字段)属性。
使用给定的数据字典更新模型实例。
- resolve_multimodel_query(query[, key='_model_identifier'])
- 参数
query – 复合选择查询。
key (str) – 用于存储模型标识符的键
- 返回值
一个可迭代的游标,它为复合选择查询中选择的每一行生成正确的模型实例。
用于将复合选择查询中返回的行解析为正确的模型实例类型。例如,如果您有两个不同表的联合,此辅助程序将在迭代查询结果时将每一行解析为正确的模型。
- class ThreadSafeDatabaseMetadata
模型
Metadata
实现,它提供对database
属性的线程安全访问,允许应用程序在多线程应用程序中安全地运行时交换数据库。用法
from playhouse.shortcuts import ThreadSafeDatabaseMetadata # Our multi-threaded application will sometimes swap out the primary # for the read-replica at run-time. primary = PostgresqlDatabase(...) read_replica = PostgresqlDatabase(...) class BaseModel(Model): class Meta: database = primary model_metadata_class = ThreadSafeDatabaseMetadata
信号支持
具有信号挂钩(类似于 django)的模型在 playhouse.signals
中提供。要使用信号,您需要将项目的所有模型都设为 playhouse.signals.Model
的子类,该子类会覆盖必要的方法以提供对各种信号的支持。
from playhouse.signals import Model, post_save
class MyModel(Model):
data = IntegerField()
@post_save(sender=MyModel)
def on_save_handler(model_class, instance, created):
put_data_in_cache(instance.data)
警告
出于我认为显而易见的原因,当您使用 Model.insert()
、Model.update()
或 Model.delete()
方法时,Peewee 信号不起作用。这些方法会生成超出 ORM 范围的查询,并且 ORM 不知道在查询执行时哪些模型实例可能会或可能不会受到影响。
信号通过连接到更高层的 peewee API 来工作,例如 Model.save()
和 Model.delete_instance()
,其中受影响的模型实例是预先知道的。
提供以下信号
pre_save
在对象保存到数据库之前立即调用。提供一个额外的关键字参数
created
,指示模型是第一次保存还是更新。post_save
在对象保存到数据库之后立即调用。提供一个额外的关键字参数
created
,指示模型是第一次保存还是更新。pre_delete
在使用
Model.delete_instance()
从数据库中删除对象之前立即调用。post_delete
在使用
Model.delete_instance()
从数据库中删除对象之后立即调用。pre_init
在第一次实例化模型类时调用
连接处理程序
每当调度信号时,它将调用已注册的任何处理程序。这允许完全独立的代码响应模型保存和删除等事件。
该 Signal
类提供了一个 connect()
方法,它接受一个回调函数和两个可选参数“sender”和“name”。如果指定,“sender”参数应为单个模型类,并允许您的回调仅接收来自该模型类的信号。“name”参数用作方便的别名,以防您希望注销信号处理程序。
示例用法
from playhouse.signals import *
def post_save_handler(sender, instance, created):
print('%s was just saved' % instance)
# our handler will only be called when we save instances of SomeModel
post_save.connect(post_save_handler, sender=SomeModel)
所有信号处理程序都接受其前两个参数 sender
和 instance
,其中 sender
是模型类,而 instance
是正在操作的实际模型。
如果您愿意,您也可以使用装饰器来连接信号处理程序。这在功能上等同于上面的示例
@post_save(sender=SomeModel)
def post_save_handler(sender, instance, created):
print('%s was just saved' % instance)
信号 API
- class Signal
存储接收器(回调)列表,并在调用“send”方法时调用它们。
- connect(receiver[, name=None[, sender=None]])
- 参数
receiver (callable) – 一个可调用对象,它至少接受两个参数,“sender”,它是触发信号的 Model 子类,以及“instance”,它是实际的模型实例。
name (string) – 一个简短的别名
sender (Model) – 如果指定,只有此模型类的实例才会触发接收器回调。
将接收器添加到内部接收器列表中,该列表将在发送信号时调用。
from playhouse.signals import post_save from project.handlers import cache_buster post_save.connect(cache_buster, name='project.cache_buster')
- disconnect([receiver=None[, name=None[, sender=None]]])
- 参数
receiver (callable) – 要断开的回调
name (string) – 一个简短的别名
sender (Model) – 断开特定于模型的处理程序。
断开给定的接收器(或具有给定名称别名的接收器),使其不再被调用。必须提供接收器或名称。
post_save.disconnect(name='project.cache_buster')
- send(instance, *args, **kwargs)
- 参数
instance – 一个模型实例
遍历接收器,并按它们连接的顺序调用它们。如果接收器指定了发送者,则只有在实例是发送者的实例时才会调用它。
pwiz,一个模型生成器
pwiz
是一个与 peewee 一起提供的脚本,它能够内省现有的数据库并生成适合与底层数据交互的模型代码。如果您已经拥有数据库,pwiz 可以通过生成具有正确列亲和力和外键的骨架代码来为您提供很好的帮助。
如果您使用 setup.py install
安装 peewee,pwiz 将作为“脚本”安装,您只需运行
python -m pwiz -e postgresql -u postgres my_postgres_db
这将打印一堆模型到标准输出。所以你可以这样做
python -m pwiz -e postgresql my_postgres_db > mymodels.py
python # <-- fire up an interactive shell
>>> from mymodels import Blog, Entry, Tag, Whatever
>>> print([blog.name for blog in Blog.select()])
命令行选项
pwiz 接受以下命令行选项
选项 |
含义 |
示例 |
---|---|---|
-h |
显示帮助 |
|
-e |
数据库后端 |
-e mysql |
-H |
要连接的主机 |
-H remote.db.server |
-p |
要连接的端口 |
-p 9001 |
-u |
数据库用户 |
-u postgres |
-P |
数据库密码 |
-P (将提示输入密码) |
-s |
模式 |
-s public |
-t |
要生成的表 |
-t tweet,users,relationships |
-v |
为 VIEW 生成模型 |
(无参数) |
-i |
将信息元数据添加到生成的文件中 |
(无参数) |
-o |
保留表列顺序 |
(无参数) |
以下是 engine
(-e
) 的有效参数
sqlite
mysql
postgresql
pwiz 示例
内省各种数据库的示例
# Introspect a Sqlite database.
python -m pwiz -e sqlite path/to/sqlite_database.db
# Introspect a MySQL database, logging in as root. You will be prompted
# for a password ("-P").
python -m pwiz -e mysql -u root -P mysql_db_name
# Introspect a Postgresql database on a remote server.
python -m pwiz -e postgres -u postgres -H 10.1.0.3 pg_db_name
完整示例
$ sqlite3 example.db << EOM
CREATE TABLE "user" ("id" INTEGER NOT NULL PRIMARY KEY, "username" TEXT NOT NULL);
CREATE TABLE "tweet" (
"id" INTEGER NOT NULL PRIMARY KEY,
"content" TEXT NOT NULL,
"timestamp" DATETIME NOT NULL,
"user_id" INTEGER NOT NULL,
FOREIGN KEY ("user_id") REFERENCES "user" ("id"));
CREATE UNIQUE INDEX "user_username" ON "user" ("username");
EOM
$ python -m pwiz -e sqlite example.db
产生以下输出
from peewee import *
database = SqliteDatabase('example.db', **{})
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class User(BaseModel):
username = TextField(unique=True)
class Meta:
table_name = 'user'
class Tweet(BaseModel):
content = TextField()
timestamp = DateTimeField()
user = ForeignKeyField(column_name='user_id', field='id', model=User)
class Meta:
table_name = 'tweet'
观察结果
外键
Tweet.user_id
被正确检测并映射。该
User.username
UNIQUE 约束被检测到。每个模型都明确声明其表名,即使在不需要的情况下也是如此(因为 Peewee 会自动将类名转换为相应的表名)。
所有参数
ForeignKeyField
被明确声明,即使它们遵循 Peewee 默认使用的约定。
注意
该 UnknownField
是一个占位符,用于在您的模式包含 Peewee 不知道如何映射到字段类的列声明的情况下。
模式迁移
Peewee 现在支持模式迁移,并对 Postgresql、SQLite 和 MySQL 提供了经过良好测试的支持。与其他模式迁移工具不同,peewee 的迁移不处理内省和数据库“版本控制”。相反,peewee 提供了许多用于生成和运行模式更改语句的辅助函数。该引擎提供了基础,可以在其基础上构建更复杂的工具。
迁移可以写成简单的 Python 脚本,并从命令行执行。由于迁移仅依赖于应用程序的 Database
对象,因此应该很容易管理更改模型定义并维护一组迁移脚本,而不会引入依赖关系。
示例用法
首先从 migrate 模块导入助手
from playhouse.migrate import *
实例化一个 migrator
。 SchemaMigrator
类负责生成更改架构的操作,然后可以通过 migrate()
助手按顺序运行。
# Postgres example:
my_db = PostgresqlDatabase(...)
migrator = PostgresqlMigrator(my_db)
# SQLite example:
my_db = SqliteDatabase('my_database.db')
migrator = SqliteMigrator(my_db)
使用 migrate()
执行一个或多个操作
title_field = CharField(default='')
status_field = IntegerField(null=True)
migrate(
migrator.add_column('some_table', 'title', title_field),
migrator.add_column('some_table', 'status', status_field),
migrator.drop_column('some_table', 'old_column'),
)
警告
迁移不会在事务中运行。如果您希望迁移在事务中运行,则需要将对 migrate 的调用包装在 atomic()
上下文管理器中,例如:
with my_db.atomic():
migrate(...)
支持的操作
向现有模型添加新字段
# Create your field instances. For non-null fields you must specify a
# default value.
pubdate_field = DateTimeField(null=True)
comment_field = TextField(default='')
# Run the migration, specifying the database table, field name and field.
migrate(
migrator.add_column('comment_tbl', 'pub_date', pubdate_field),
migrator.add_column('comment_tbl', 'comment', comment_field),
)
注意
Peewee 默认遵循 Django 的约定,在给定 ForeignKeyField
的列名后附加 _id
。添加外键时,您需要确保为其提供正确的列名。例如,如果我想向 Tweet
模型添加一个 user
外键
# Our desired model will look like this:
class Tweet(BaseModel):
user = ForeignKeyField(User) # I want to add this field.
# ... other fields ...
# Migration code:
user = ForeignKeyField(User, field=User.id, null=True)
migrate(
# Note that the column name given is "user_id".
migrator.add_column(Tweet._meta.table_name, 'user_id', user),
)
重命名字段
# Specify the table, original name of the column, and its new name.
migrate(
migrator.rename_column('story', 'pub_date', 'publish_date'),
migrator.rename_column('story', 'mod_date', 'modified_date'),
)
删除字段
migrate(
migrator.drop_column('story', 'some_old_field'),
)
使字段可为空或不可为空
# Note that when making a field not null that field must not have any
# NULL values present.
migrate(
# Make `pub_date` allow NULL values.
migrator.drop_not_null('story', 'pub_date'),
# Prevent `modified_date` from containing NULL values.
migrator.add_not_null('story', 'modified_date'),
)
更改字段的数据类型
# Change a VARCHAR(50) field to a TEXT field.
migrate(
migrator.alter_column_type('person', 'email', TextField())
)
重命名表
migrate(
migrator.rename_table('story', 'stories_tbl'),
)
添加索引
# Specify the table, column names, and whether the index should be
# UNIQUE or not.
migrate(
# Create an index on the `pub_date` column.
migrator.add_index('story', ('pub_date',), False),
# Create a multi-column index on the `pub_date` and `status` fields.
migrator.add_index('story', ('pub_date', 'status'), False),
# Create a unique index on the category and title fields.
migrator.add_index('story', ('category_id', 'title'), True),
)
删除索引
# Specify the index name.
migrate(migrator.drop_index('story', 'story_pub_date_status'))
添加或删除表约束
# Add a CHECK() constraint to enforce the price cannot be negative.
migrate(migrator.add_constraint(
'products',
'price_check',
Check('price >= 0')))
# Remove the price check constraint.
migrate(migrator.drop_constraint('products', 'price_check'))
# Add a UNIQUE constraint on the first and last names.
migrate(migrator.add_unique('person', 'first_name', 'last_name'))
为列添加或删除数据库级别的默认值
# Add a default value for a status column.
migrate(migrator.add_column_default(
'entries',
'status',
'draft'))
# Remove the default.
migrate(migrator.drop_column_default('entries', 'status'))
# Use a function for the default value (does not work with Sqlite):
migrate(migrator.add_column_default(
'entries',
'timestamp',
fn.now()))
# Or alternatively (works with Sqlite):
migrate(migrator.add_column_default(
'entries',
'timestamp',
'now()'))
注意
Postgres 用户可能需要在使用非标准模式时设置搜索路径。这可以通过以下方式完成
new_field = TextField(default='', null=False)
migrator = PostgresqlMigrator(db)
migrate(migrator.set_search_path('my_schema_name'),
migrator.add_column('table', 'field_name', new_field))
迁移 API
- migrate(*operations)
执行一个或多个更改架构的操作。
用法
migrate( migrator.add_column('some_table', 'new_column', CharField(default='')), migrator.create_index('some_table', ('new_column',)), )
- class SchemaMigrator(database)
- 参数
database – 一个
Database
实例。
SchemaMigrator
负责生成更改架构的语句。- add_column(table, column_name, field)
-
向提供的表添加新列。提供的
field
将用于生成适当的列定义。注意
如果字段不可为空,则必须指定默认值。
注意
对于非空字段,该字段将最初被添加为一个空字段,然后将执行一个
UPDATE
语句,用默认值填充该列。最后,该列将被标记为不可为空。
- drop_column(table, column_name[, cascade=True])
- 参数
table (str) – 要删除列的表的名称。
column_name (str) – 要删除的列的名称。
cascade (bool) – 是否应该使用 CASCADE 删除列。
- rename_column(table, old_name, new_name)
- 参数
table (str) – 包含要重命名的列的表的名称。
old_name (str) – 列的当前名称。
new_name (str) – 列的新名称。
- add_not_null(table, column)
- 参数
table (str) – 包含列的表的名称。
column (str) – 要设置为不可为空的列的名称。
- drop_not_null(table, column)
- 参数
table (str) – 包含列的表的名称。
column (str) – 要设置为可为空的列的名称。
- add_column_default(table, column, default)
- 参数
table (str) – 包含列的表的名称。
column (str) – 要添加默认值的列的名称。
default – 列的新默认值。请参阅下面的说明。
Peewee 尝试在默认值看起来像字符串文字时正确地引用它。否则,默认值将被视为字面量。Postgres 和 MySQL 支持将默认值指定为 peewee 表达式,例如
fn.NOW()
,但 Sqlite 用户需要使用default='now()'
代替。
- drop_column_default(table, column)
- 参数
table (str) – 包含列的表的名称。
column (str) – 要删除默认值的列的名称。
- alter_column_type(table, column, field[, cast=None])
- 参数
更改列的数据类型。此方法应谨慎使用,因为使用不兼容的类型可能不受数据库的良好支持。
- rename_table(old_name, new_name)
- 参数
old_name (str) – 表的当前名称。
new_name (str) – 表的新名称。
- add_index(table, columns[, unique=False[, using=None]])
- 参数
table (str) – 要创建索引的表的名称。
columns (list) – 应该被索引的列的列表。
unique (bool) – 新索引是否应该指定唯一约束。
using (str) – 索引类型(在支持的情况下),例如 GiST 或 GIN。
- drop_index(table, index_name)
- 参数
table (str) – 包含要删除的索引的表的名称。
index_name (str) – 要删除的索引的名称。
- add_constraint(table, name, constraint)
- drop_constraint(table, name)
- 参数
table (str) – 要删除约束的表。
name (str) – 要删除的约束的名称。
- add_unique(table, *column_names)
- 参数
table (str) – 要添加约束的表。
column_names (str) – 一个或多个用于 UNIQUE 约束的列。
- class PostgresqlMigrator(database)
为 Postgresql 数据库生成迁移。
- set_search_path(schema_name)
- 参数
schema_name (str) – 要使用的模式。
设置后续操作的搜索路径(模式)。
- class SqliteMigrator(database)
为 SQLite 数据库生成迁移。
SQLite 对
ALTER TABLE
查询的支持有限,因此以下操作目前不支持 SQLiteadd_constraint
drop_constraint
add_unique
- class MySQLMigrator(database)
为 MySQL 数据库生成迁移。
反射
反射模块包含用于内省现有数据库的帮助程序。此模块在 playhouse 的其他几个模块中被内部使用,包括 DataSet 和 pwiz,一个模型生成器。
- generate_models(database[, schema=None[, **options]])
- 参数
database (Database) – 要内省的数据库实例。
schema (str) – 要内省的可选模式。
options – 任意选项,有关详细信息,请参阅
Introspector.generate_models()
。
- 返回值
一个
dict
,将表名映射到模型类。
为给定数据库中的表生成模型。有关如何使用此函数的示例,请参阅 交互式使用 Peewee 部分。
示例
>>> from peewee import * >>> from playhouse.reflection import generate_models >>> db = PostgresqlDatabase('my_app') >>> models = generate_models(db) >>> list(models.keys()) ['account', 'customer', 'order', 'orderitem', 'product'] >>> globals().update(models) # Inject models into namespace. >>> for cust in customer.select(): # Query using generated model. ... print(cust.name) ... Huey Kitty Mickey Dog
- print_model(model)
- 参数
model (Model) – 要打印的模型类
- 返回值
无返回值
打印模型类的用户友好描述,这对于调试或交互式使用很有用。目前,这将打印表名以及所有字段及其数据类型。 交互式使用 Peewee 部分包含一个示例。
示例输出
>>> from playhouse.reflection import print_model >>> print_model(User) user id AUTO PK email TEXT name TEXT dob DATE index(es) email UNIQUE >>> print_model(Tweet) tweet id AUTO PK user INT FK: User.id title TEXT content TEXT timestamp DATETIME is_published BOOL index(es) user_id is_published, timestamp
- print_table_sql(model)
- 参数
model (Model) – 要打印的模型
- 返回值
无返回值
打印给定模型类的 SQL
CREATE TABLE
,这对于调试或交互式使用可能很有用。有关示例用法,请参阅 交互式使用 Peewee 部分。请注意,索引和约束不包含在此函数的输出中。示例输出
>>> from playhouse.reflection import print_table_sql >>> print_table_sql(User) CREATE TABLE IF NOT EXISTS "user" ( "id" INTEGER NOT NULL PRIMARY KEY, "email" TEXT NOT NULL, "name" TEXT NOT NULL, "dob" DATE NOT NULL ) >>> print_table_sql(Tweet) CREATE TABLE IF NOT EXISTS "tweet" ( "id" INTEGER NOT NULL PRIMARY KEY, "user_id" INTEGER NOT NULL, "title" TEXT NOT NULL, "content" TEXT NOT NULL, "timestamp" DATETIME NOT NULL, "is_published" INTEGER NOT NULL, FOREIGN KEY ("user_id") REFERENCES "user" ("id") )
- class Introspector(metadata[, schema=None])
可以通过实例化一个
Introspector
从数据库中提取元数据。建议使用工厂方法from_database()
,而不是直接实例化此类。- classmethod from_database(database[, schema=None])
- 参数
database – 一个
Database
实例。schema (str) – 可选模式(某些数据库支持)。
创建一个适用于给定数据库的
Introspector
实例。用法
db = SqliteDatabase('my_app.db') introspector = Introspector.from_database(db) models = introspector.generate_models() # User and Tweet (assumed to exist in the database) are # peewee Model classes generated from the database schema. User = models['user'] Tweet = models['tweet']
- generate_models([skip_invalid=False[, table_names=None[, literal_column_names=False[, bare_fields=False[, include_views=False]]]]])
- 参数
skip_invalid (bool) – 跳过名称为无效 Python 标识符的表。
table_names (list) – 要生成的表名列表。如果未指定,则为所有表生成模型。
literal_column_names (bool) – 按原样使用列名。默认情况下,列名将“python 化”,即混合大小写将变为小写。
bare_fields – 仅限 SQLite。不要为内省的列指定数据类型。
include_views – 也为 VIEW 生成模型。
- 返回值
一个字典,将表名映射到模型类。
内省数据库,读取表、列和外键约束,然后生成一个字典,将每个数据库表映射到动态生成的
Model
类。
数据库 URL
此模块包含一个帮助程序函数,用于从 URL 连接字符串生成数据库连接。
- connect(url, **connect_params)
从给定的连接 URL 创建一个
Database
实例。示例
sqlite:///my_database.db 将在当前目录中为文件
my_database.db
创建一个SqliteDatabase
实例。sqlite:///:memory: 将创建一个内存中的
SqliteDatabase
实例。postgresql://postgres:my_password@localhost:5432/my_database 将创建一个
PostgresqlDatabase
实例。提供了用户名和密码,以及要连接到的主机和端口。mysql://user:passwd@ip:port/my_db 将为本地 MySQL 数据库 my_db 创建一个
MySQLDatabase
实例。mysql+pool://user:passwd@ip:port/my_db?max_connections=20&stale_timeout=300 将为本地 MySQL 数据库 my_db 创建一个
PooledMySQLDatabase
实例,并将 max_connections 设置为 20,stale_timeout 设置为 300 秒。
支持的方案
apsw
:APSWDatabase
mysql
:MySQLDatabase
mysql+pool
:PooledMySQLDatabase
postgres
:PostgresqlDatabase
postgres+pool
:PooledPostgresqlDatabase
postgresext
:PostgresqlExtDatabase
postgresext+pool
:PooledPostgresqlExtDatabase
sqlite
:SqliteDatabase
sqliteext
:SqliteExtDatabase
sqlite+pool
:PooledSqliteDatabase
sqliteext+pool
:PooledSqliteExtDatabase
用法
import os from playhouse.db_url import connect # Connect to the database URL defined in the environment, falling # back to a local Sqlite database if no database URL is specified. db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')
- parse(url)
将给定 URL 中的信息解析为包含
database
、host
、port
、user
和/或password
的字典。其他连接参数可以在 URL 查询字符串中传递。如果您使用的是自定义数据库类,则可以使用
parse()
函数从 URL 中提取信息,然后将其传递给您的数据库对象。
- register_database(db_class, *names)
- 参数
db_class –
Database
的子类。names – 用作 URL 中方案的名称列表,例如 'sqlite' 或 'firebird'
在指定的名称下注册其他数据库类。此函数可用于扩展
connect()
函数以支持其他方案。假设您有一个名为FirebirdDatabase
的Firebird
的自定义数据库类。from playhouse.db_url import connect, register_database register_database(FirebirdDatabase, 'firebird') db = connect('firebird://my-firebird-db')
连接池
该 pool
模块包含一些 Database
类,它们为 PostgreSQL、MySQL 和 SQLite 数据库提供连接池。该池通过覆盖 Database
类上打开和关闭与后端连接的方法来工作。该池可以指定连接被回收后的超时时间,以及打开连接数的上限。
在多线程应用程序中,最多将打开 max_connections 个连接。每个线程(或者,如果使用 gevent,则为 greenlet)将拥有自己的连接。
在单线程应用程序中,只创建一个连接。它将不断被回收,直到它超过了陈旧超时时间或被显式关闭(使用 .manual_close())。
默认情况下,您的应用程序只需要确保在您完成连接后关闭它们,它们将被返回到池中。对于 Web 应用程序,这通常意味着在请求开始时,您将打开一个连接,并在返回响应时,您将关闭连接。
简单的 Postgres 池示例代码
# Use the special postgresql extensions.
from playhouse.pool import PooledPostgresqlExtDatabase
db = PooledPostgresqlExtDatabase(
'my_app',
max_connections=32,
stale_timeout=300, # 5 minutes.
user='postgres')
class BaseModel(Model):
class Meta:
database = db
就是这样!如果您希望对连接池进行更细粒度的控制,请查看 连接管理 部分。
池 API
- class PooledDatabase(database[, max_connections=20[, stale_timeout=None[, timeout=None[, **kwargs]]]])
- 参数
database (str) – 数据库或数据库文件的名称。
max_connections (int) – 最大连接数。提供
None
表示无限制。stale_timeout (int) – 允许连接使用的秒数。
timeout (int) – 池满时阻塞的秒数。默认情况下,peewee 在池满时不会阻塞,而是简单地抛出异常。要无限期阻塞,请将此值设置为
0
。kwargs – 传递给数据库类的任意关键字参数。
旨在与
Database
的子类一起使用的混合类。注意
连接不会在它们超过 stale_timeout 时被精确关闭。相反,陈旧的连接只会在请求新连接时被关闭。
注意
如果打开的连接数超过 max_connections,则会引发 ValueError。
- manual_close()
关闭当前打开的连接,而不将其返回到池中。
- close_idle()
关闭所有空闲连接。这并不包括当前正在使用的任何连接——只包括那些以前创建但后来被返回到池中的连接。
- close_stale([age=600])
- 参数
age (int) – 连接被视为陈旧的年龄。
- 返回值
关闭的连接数。
关闭正在使用但超过给定年龄的连接。在调用此方法时要谨慎!
- close_all()
关闭所有连接。这包括当时可能正在使用的任何连接。使用此方法时请谨慎!
- class PooledPostgresqlDatabase
继承自
PostgresqlDatabase
的子类,混合了PooledDatabase
助手。
- class PooledPostgresqlExtDatabase
继承自
PostgresqlExtDatabase
的子类,混合了PooledDatabase
助手。PostgresqlExtDatabase
是 Postgresql 扩展 模块的一部分,并为许多 Postgres 特定功能提供支持。
- class PooledMySQLDatabase
继承自
MySQLDatabase
的子类,混合了PooledDatabase
助手。
- class PooledSqliteDatabase
用于 SQLite 应用程序的持久连接。
- class PooledSqliteExtDatabase
用于 SQLite 应用程序的持久连接,使用 SQLite 扩展 高级数据库驱动程序
SqliteExtDatabase
。
测试工具
包含在测试 peewee 项目时有用的工具。
- class count_queries([only_select=False])
上下文管理器,将在上下文中计算执行的查询数量。
- 参数
only_select (bool) – 仅计算 SELECT 查询。
with count_queries() as counter: huey = User.get(User.username == 'huey') huey_tweets = [tweet.message for tweet in huey.tweets] assert counter.count == 2
- count
执行的查询数量。
- get_queries()
返回一个包含 SQL 查询和参数列表的 2 元组列表。
- assert_query_count(expected[, only_select=False])
函数或方法装饰器,如果在装饰函数中执行的查询数量不等于预期数量,则会引发
AssertionError
。class TestMyApp(unittest.TestCase): @assert_query_count(1) def test_get_popular_blogs(self): popular_blogs = Blog.get_popular() self.assertEqual( [blog.title for blog in popular_blogs], ["Peewee's Playhouse!", "All About Huey", "Mickey's Adventures"])
此函数也可以用作上下文管理器
class TestMyApp(unittest.TestCase): def test_expensive_operation(self): with assert_query_count(1): perform_expensive_operation()
Flask 工具
playhouse.flask_utils
模块包含几个用于将 peewee 与 Flask Web 框架集成的助手。
数据库包装器
FlaskDB
类是用于在 Flask 应用程序中配置和引用 Peewee 数据库的包装器。不要让它的名字欺骗你:它与 peewee 数据库不同。 FlaskDB
旨在从你的 Flask 应用程序中删除以下样板代码
根据应用程序配置数据动态创建 Peewee 数据库实例。
创建一个基类,你的应用程序的所有模型都将从中继承。
在请求开始和结束时注册钩子,以处理打开和关闭数据库连接。
基本用法
import datetime
from flask import Flask
from peewee import *
from playhouse.flask_utils import FlaskDB
DATABASE = 'postgresql://postgres:password@localhost:5432/my_database'
# If we want to exclude particular views from the automatic connection
# management, we list them this way:
FLASKDB_EXCLUDED_ROUTES = ('logout',)
app = Flask(__name__)
app.config.from_object(__name__)
db_wrapper = FlaskDB(app)
class User(db_wrapper.Model):
username = CharField(unique=True)
class Tweet(db_wrapper.Model):
user = ForeignKeyField(User, backref='tweets')
content = TextField()
timestamp = DateTimeField(default=datetime.datetime.now)
上面的代码示例将创建并实例化一个由给定数据库 URL 指定的 peewee PostgresqlDatabase
。请求钩子将被配置为在收到请求时建立连接,并在发送响应时自动关闭连接。最后, FlaskDB
类公开了一个 FlaskDB.Model
属性,该属性可以用作应用程序模型的基类。
以下是如何访问由 FlaskDB
包装器为你配置的包装 Peewee 数据库实例
# Obtain a reference to the Peewee database instance.
peewee_db = db_wrapper.database
@app.route('/transfer-funds/', methods=['POST'])
def transfer_funds():
with peewee_db.atomic():
# ...
return jsonify({'transfer-id': xid})
注意
可以使用 FlaskDB.database
属性访问实际的 peewee 数据库。
以下是用 FlaskDB
配置 Peewee 数据库的另一种方法
app = Flask(__name__)
db_wrapper = FlaskDB(app, 'sqlite:///my_app.db')
虽然上面的示例展示了使用数据库 URL,但对于更高级的用法,你可以指定一个配置选项字典,或者简单地传入一个 peewee Database
实例
DATABASE = {
'name': 'my_app_db',
'engine': 'playhouse.pool.PooledPostgresqlDatabase',
'user': 'postgres',
'max_connections': 32,
'stale_timeout': 600,
}
app = Flask(__name__)
app.config.from_object(__name__)
wrapper = FlaskDB(app)
pooled_postgres_db = wrapper.database
使用 peewee Database
对象
peewee_db = PostgresqlExtDatabase('my_app')
app = Flask(__name__)
db_wrapper = FlaskDB(app, peewee_db)
使用应用程序工厂的数据库
如果你更喜欢使用 应用程序工厂模式, FlaskDB
类实现了一个 init_app()
方法。
用作工厂
db_wrapper = FlaskDB()
# Even though the database is not yet initialized, you can still use the
# `Model` property to create model classes.
class User(db_wrapper.Model):
username = CharField(unique=True)
def create_app():
app = Flask(__name__)
app.config['DATABASE'] = 'sqlite:////home/code/apps/my-database.db'
db_wrapper.init_app(app)
return app
查询工具
flask_utils
模块提供了一些用于在 Web 应用程序中管理查询的助手。一些常见模式包括
- get_object_or_404(query_or_model, *query)
- 参数
query_or_model –
Model
类或预过滤的SelectQuery
。query – 一个任意复杂的 peewee 表达式。
检索与给定查询匹配的对象,或返回 404 未找到响应。一个常见的用例可能是 Weblog 的详细信息页面。你希望检索与给定 URL 匹配的帖子,或者返回 404。
示例
@app.route('/blog/<slug>/') def post_detail(slug): public_posts = Post.select().where(Post.published == True) post = get_object_or_404(public_posts, (Post.slug == slug)) return render_template('post_detail.html', post=post)
- object_list(template_name, query[, context_variable='object_list'[, paginate_by=20[, page_var='page'[, check_bounds=True[, **kwargs]]]]])
- 参数
template_name – 要渲染的模板的名称。
query – 要分页的
SelectQuery
实例。context_variable – 用于分页对象列表的上下文变量名称。
paginate_by – 每页的对象数量。
page_var – 包含页面的
GET
参数的名称。check_bounds – 是否检查给定的页面是否为有效页面。如果
check_bounds
为True
并且指定了无效页面,则将返回 404。kwargs – 要传递到模板上下文的任意键值对。
检索由给定查询指定的分页对象列表。分页对象列表将使用给定的
context_variable
放入上下文,以及有关当前页面和总页面数的元数据,最后是作为关键字参数传递的任何任意上下文数据。页面使用
page
GET
参数指定,例如/my-object-list/?page=3
将返回第三页对象。示例
@app.route('/blog/') def post_index(): public_posts = (Post .select() .where(Post.published == True) .order_by(Post.timestamp.desc())) return object_list( 'post_index.html', query=public_posts, context_variable='post_list', paginate_by=10)
模板将具有以下上下文
post_list
,它包含最多 10 个帖子的列表。page
,它包含基于page
GET
参数的值的当前页面。pagination
,一个PaginatedQuery
实例。
- class PaginatedQuery(query_or_model, paginate_by[, page_var='page'[, check_bounds=False]])
- 参数
query_or_model – 可以是
Model
或SelectQuery
实例,包含您想要分页的记录集合。paginate_by – 每页的对象数量。
page_var – 包含页面的
GET
参数的名称。check_bounds – 是否检查给定的页面是否为有效页面。如果
check_bounds
为True
并且指定了无效页面,则将返回 404。
辅助类,用于根据
GET
参数执行分页。- get_page()
返回当前选定的页面,由
page_var
GET
参数的值指示。如果未显式选择页面,则此方法将返回 1,表示第一页。
- get_page_count()
返回可能的总页数。
- get_object_list()
使用
get_page()
的值,返回用户请求的页面对象。返回值是一个SelectQuery
,带有适当的LIMIT
和OFFSET
子句。如果
check_bounds
设置为True
并且请求的页面不包含任何对象,则会引发 404 错误。