查询

本节将介绍在关系数据库上通常执行的基本 CRUD 操作

注意

还有一大批示例查询取自Postgresql Exercises网站。示例列在query examples文档中。

创建新记录

您可以使用 Model.create() 创建新的模型实例。此方法接受关键字参数,其中键对应于模型字段的名称。将返回一个新实例,并将一行添加到表中。

>>> User.create(username='Charlie')
<__main__.User object at 0x2529350>

这将在数据库中插入新行。主键将自动检索并存储在模型实例上。

或者,您可以以编程方式构建模型实例,然后调用 save()

>>> user = User(username='Charlie')
>>> user.save()  # save() returns the number of rows modified.
1
>>> user.id
1
>>> huey = User()
>>> huey.username = 'Huey'
>>> huey.save()
1
>>> huey.id
2

当模型具有外键时,您可以在创建新记录时直接将模型实例分配给外键字段。

>>> tweet = Tweet.create(user=huey, message='Hello!')

您还可以使用相关对象的的主键值

>>> tweet = Tweet.create(user=2, message='Hello again!')

如果您只想插入数据而不需要创建模型实例,则可以使用 Model.insert()

>>> User.insert(username='Mickey').execute()
3

在执行插入查询后,将返回新行的主键。

注意

您可以通过多种方式加快批量插入操作。有关更多信息,请查看 批量插入 配方部分。

批量插入

有几种方法可以快速加载大量数据。简单的方法是在循环中调用 Model.create()

data_source = [
    {'field1': 'val1-1', 'field2': 'val1-2'},
    {'field1': 'val2-1', 'field2': 'val2-2'},
    # ...
]

for data_dict in data_source:
    MyModel.create(**data_dict)

上述方法很慢,原因有以下几个

  1. 如果您没有将循环包装在事务中,那么每次调用 create() 都会在其自己的事务中发生。这会非常慢!

  2. 有大量的 Python 逻辑会妨碍您,并且每个 InsertQuery 都必须生成并解析为 SQL。

  3. 您要发送到数据库进行解析的数据(以原始 SQL 字节数为单位)很多。

  4. 我们正在检索最后插入的 ID,这在某些情况下会导致执行其他查询。

您可以通过使用 atomic() 将其包装在事务中来显著提高速度。

# This is much faster.
with db.atomic():
    for data_dict in data_source:
        MyModel.create(**data_dict)

上述代码仍然存在问题 2、3 和 4。我们可以通过使用 insert_many() 来获得另一个巨大的提升。此方法接受元组或字典列表,并通过单个查询插入多行

data_source = [
    {'field1': 'val1-1', 'field2': 'val1-2'},
    {'field1': 'val2-1', 'field2': 'val2-2'},
    # ...
]

# Fastest way to INSERT multiple rows.
MyModel.insert_many(data_source).execute()

insert_many() 方法还接受行元组列表,前提是您还指定了相应的字段

# We can INSERT tuples as well...
data = [('val1-1', 'val1-2'),
        ('val2-1', 'val2-2'),
        ('val3-1', 'val3-2')]

# But we need to indicate which fields the values correspond to.
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()

将批量插入包装在事务中也是一种好做法

# You can, of course, wrap this in a transaction as well:
with db.atomic():
    MyModel.insert_many(data, fields=fields).execute()

注意

SQLite 用户在使用批量插入时应注意一些注意事项。具体来说,您的 SQLite3 版本必须为 3.7.11.0 或更高版本才能利用批量插入 API。此外,默认情况下,SQLite 将 SQL 查询中的绑定变量数量限制为 999,适用于 3.32.0(2020-05-22)之前的 SQLite 版本,以及 32766,适用于 3.32.0 之后的 SQLite 版本。

批量插入行

根据数据源中的行数,您可能需要将其分成块。特别是 SQLite 通常有 每个查询限制 999 或 32766 个变量(批量大小将是 999 // 行长度或 32766 // 行长度)。

您可以编写一个循环,将数据分批(在这种情况下,强烈建议您使用事务)

# Insert rows 100 at a time.
with db.atomic():
    for idx in range(0, len(data_source), 100):
        MyModel.insert_many(data_source[idx:idx+100]).execute()

Peewee 附带一个 chunked() 帮助函数,您可以使用它来高效地将通用可迭代对象分块为一系列批量大小的可迭代对象

from peewee import chunked

# Insert rows 100 at a time.
with db.atomic():
    for batch in chunked(data_source, 100):
        MyModel.insert_many(batch).execute()

替代方案

Model.bulk_create() 方法的行为与 Model.insert_many() 非常相似,但它接受一个未保存的模型实例列表进行插入,并且可以选择接受一个批量大小参数。要使用 bulk_create() API

# Read list of usernames from a file, for example.
with open('user_list.txt') as fh:
    # Create a list of unsaved User instances.
    users = [User(username=line.strip()) for line in fh.readlines()]

# Wrap the operation in a transaction and batch INSERT the users
# 100 at a time.
with db.atomic():
    User.bulk_create(users, batch_size=100)

注意

如果您使用的是 Postgresql(它支持 RETURNING 子句),那么以前未保存的模型实例将自动填充其新的主键值。

此外,Peewee 还提供 Model.bulk_update(),它可以有效地更新模型列表中的一个或多个列。例如

# First, create 3 users with usernames u1, u2, u3.
u1, u2, u3 = [User.create(username='u%s' % i) for i in (1, 2, 3)]

# Now we'll modify the user instances.
u1.username = 'u1-x'
u2.username = 'u2-y'
u3.username = 'u3-z'

# Update all three users with a single UPDATE query.
User.bulk_update([u1, u2, u3], fields=[User.username])

这将导致执行以下 SQL

UPDATE "users" SET "username" = CASE "users"."id"
    WHEN 1 THEN "u1-x"
    WHEN 2 THEN "u2-y"
    WHEN 3 THEN "u3-z" END
WHERE "users"."id" IN (1, 2, 3);

注意

对于大型对象列表,您应该指定一个合理的 batch_size,并用 Database.atomic() 包装对 bulk_update() 的调用

with database.atomic():
    User.bulk_update(list_of_users, fields=['username'], batch_size=50)

警告

Model.bulk_update() 可能不是更新大量记录的最有效方法。此功能的实现方式是,我们使用 SQL CASE 语句为所有正在更新的行创建主键到相应字段值的“映射”。

或者,你可以使用 Database.batch_commit() 助手来处理批次大小事务中的行块。除了 Postgresql 之外,此方法还为数据库提供了一个解决方法,当必须获取新创建行的主键时。

# List of row data to insert.
row_data = [{'username': 'u1'}, {'username': 'u2'}, ...]

# Assume there are 789 items in row_data. The following code will result in
# 8 total transactions (7x100 rows + 1x89 rows).
for row in db.batch_commit(row_data, 100):
    User.create(**row)

从另一个表批量加载

如果你想要批量加载的数据存储在另一个表中,你还可以创建源为SELECT查询的INSERT查询。使用 Model.insert_from() 方法

res = (TweetArchive
       .insert_from(
           Tweet.select(Tweet.user, Tweet.message),
           fields=[TweetArchive.user, TweetArchive.message])
       .execute())

上面的查询等效于以下 SQL

INSERT INTO "tweet_archive" ("user_id", "message")
SELECT "user_id", "message" FROM "tweet";

更新现有记录

一旦模型实例具有主键,对 save() 的任何后续调用都将导致UPDATE而不是另一个INSERT。模型的主键不会更改

>>> user.save()  # save() returns the number of rows modified.
1
>>> user.id
1
>>> user.save()
>>> user.id
1
>>> huey.save()
1
>>> huey.id
2

如果你想更新多条记录,请发出UPDATE查询。以下示例将更新所有 Tweet 对象,如果它们是在今天之前创建的,则将它们标记为已发布Model.update() 接受关键字参数,其中键对应于模型的字段名称

>>> today = datetime.today()
>>> query = Tweet.update(is_published=True).where(Tweet.creation_date < today)
>>> query.execute()  # Returns the number of rows that were updated.
4

有关更多信息,请参阅 Model.update()UpdateModel.bulk_update() 的文档。

注意

如果你想了解有关执行原子更新(例如增加列的值)的更多信息,请查看 原子更新 配方。

原子更新

Peewee 允许你执行原子更新。假设我们需要更新一些计数器。朴素的方法是编写类似这样的内容

>>> for stat in Stat.select().where(Stat.url == request.url):
...     stat.counter += 1
...     stat.save()

不要这样做!这不仅很慢,而且如果多个进程同时更新计数器,它还容易受到竞争条件的影响。

相反,你可以使用 update() 以原子方式更新计数器

>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()

你可以根据需要使这些更新语句变得复杂。让我们给所有员工发放奖金,该奖金等于他们之前的奖金加上其工资的 10%

>>> query = Employee.update(bonus=(Employee.bonus + (Employee.salary * .1)))
>>> query.execute()  # Give everyone a bonus!

我们甚至可以使用子查询来更新列的值。假设我们在 User 模型上有一个非规范化列,该列存储用户已发送的推文数,并且我们定期更新此值。以下是如何编写这样的查询

>>> subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id)
>>> update = User.update(num_tweets=subquery)
>>> update.execute()

Upsert

Peewee 提供对不同类型更新功能的支持。对于 3.24.0 之前的 SQLite 和 MySQL,Peewee 提供 replace(),它允许你插入一条记录或在发生约束冲突时替换现有记录。对于 Sqlite 3.24+ 和 Postgres,peewee 提供对 ON CONFLICT 查询的完全支持。

使用 replace()on_conflict_replace() 的示例

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)

# Insert or update the user. The "last_login" value will be updated
# regardless of whether the user existed previously.
user_id = (User
           .replace(username='the-user', last_login=datetime.now())
           .execute())

# This query is equivalent:
user_id = (User
           .insert(username='the-user', last_login=datetime.now())
           .on_conflict_replace()
           .execute())

注意

除了 replace,SQLite、MySQL 和 Postgresql 提供一个 ignore 操作(参见:on_conflict_ignore()),如果你只想插入并忽略任何潜在的约束冲突。

MySQL 通过 ON DUPLICATE KEY UPDATE 子句支持更新。例如

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)
    login_count = IntegerField()

# Insert a new user.
User.create(username='huey', login_count=0)

# Simulate the user logging in. The login count and timestamp will be
# either created or updated correctly.
now = datetime.now()
rowid = (User
         .insert(username='huey', last_login=now, login_count=1)
         .on_conflict(
             preserve=[User.last_login],  # Use the value we would have inserted.
             update={User.login_count: User.login_count + 1})
         .execute())

在上述示例中,我们可以安全地按需多次调用更新查询。登录计数将以原子方式递增,最后登录列将更新,并且不会创建重复行。

Postgresql 和 SQLite(3.24.0 及更高版本)提供了不同的语法,允许更细粒度地控制哪个约束冲突应触发冲突解决,以及应更新或保留哪些值。

使用 on_conflict() 执行 Postgresql 风格更新(或 SQLite 3.24+)的示例

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)
    login_count = IntegerField()

# Insert a new user.
User.create(username='huey', login_count=0)

# Simulate the user logging in. The login count and timestamp will be
# either created or updated correctly.
now = datetime.now()
rowid = (User
         .insert(username='huey', last_login=now, login_count=1)
         .on_conflict(
             conflict_target=[User.username],  # Which constraint?
             preserve=[User.last_login],  # Use the value we would have inserted.
             update={User.login_count: User.login_count + 1})
         .execute())

在上述示例中,我们可以安全地按需多次调用更新查询。登录计数将以原子方式递增,最后登录列将更新,并且不会创建重复行。

注意

MySQL 和 Postgresql/SQLite 之间的主要区别在于 Postgresql 和 SQLite 要求你指定一个 conflict_target

下面是一个更高级(如果人为)的示例,使用 EXCLUDED 命名空间。EXCLUDED 助手允许我们引用冲突数据中的值。对于我们的示例,我们假设一个简单的表将唯一键(字符串)映射到一个值(整数)

class KV(Model):
    key = CharField(unique=True)
    value = IntegerField()

# Create one row.
KV.create(key='k1', value=1)

# Demonstrate usage of EXCLUDED.
# Here we will attempt to insert a new value for a given key. If that
# key already exists, then we will update its value with the *sum* of its
# original value and the value we attempted to insert -- provided that
# the new value is larger than the original value.
query = (KV.insert(key='k1', value=10)
         .on_conflict(conflict_target=[KV.key],
                      update={KV.value: KV.value + EXCLUDED.value},
                      where=(EXCLUDED.value > KV.value)))

# Executing the above query will result in the following data being
# present in the "kv" table:
# (key='k1', value=11)
query.execute()

# If we attempted to execute the query *again*, then nothing would be
# updated, as the new value (10) is now less than the value in the
# original row (11).

使用 ON CONFLICT 时有几个重要的概念需要理解

  • conflict_target=:具有唯一约束的列。对于用户表,这可能是用户的电子邮件。

  • preserve=:如果发生冲突,此参数用于指示我们希望更新的数据中的哪些值。

  • update=:如果发生冲突,这是要应用于已存在行的映射数据。

  • EXCLUDED:此“魔术”命名空间允许你引用如果约束未失败,本应插入的新数据。

完整示例

class User(Model):
    email = CharField(unique=True)  # Unique identifier for user.
    last_login = DateTimeField()
    login_count = IntegerField(default=0)
    ip_log = TextField(default='')


# Demonstrates the above 4 concepts.
def login(email, ip):
    rowid = (User
             .insert({User.email: email,
                      User.last_login: datetime.now(),
                      User.login_count: 1,
                      User.ip_log: ip})
             .on_conflict(
                 # If the INSERT fails due to a constraint violation on the
                 # user email, then perform an UPDATE instead.
                 conflict_target=[User.email],

                 # Set the "last_login" to the value we would have inserted
                 # (our call to datetime.now()).
                 preserve=[User.last_login],

                 # Increment the user's login count and prepend the new IP
                 # to the user's ip history.
                 update={User.login_count: User.login_count + 1,
                         User.ip_log: fn.CONCAT(EXCLUDED.ip_log, ',', User.ip_log)})
             .execute())

    return rowid

# This will insert the initial row, returning the new row id (1).
print(login('[email protected]', '127.1'))

# Because [email protected] exists, this will trigger the UPSERT. The row id
# from above is returned again (1).
print(login('[email protected]', '127.2'))

u = User.get()
print(u.login_count, u.ip_log)

# Prints "2 127.2,127.1"

有关更多信息,请参阅 Insert.on_conflict()OnConflict

删除记录

要删除单个模型实例,可以使用 Model.delete_instance() 快捷方式。 delete_instance() 将删除给定的模型实例,并且可以选择递归删除任何依赖对象(通过指定 recursive=True)。

>>> user = User.get(User.id == 1)
>>> user.delete_instance()  # Returns the number of rows deleted.
1

>>> User.get(User.id == 1)
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."id" = ?
PARAMS: [1]

要删除一组任意行,可以发出DELETE 查询。以下内容将删除所有超过一年的 Tweet 对象

>>> query = Tweet.delete().where(Tweet.creation_date < one_year_ago)
>>> query.execute()  # Returns the number of rows deleted.
7

有关更多信息,请参阅以下文档

选择单个记录

可以使用 Model.get() 方法来检索与给定查询匹配的单个实例。对于主键查找,还可以使用快捷方式方法 Model.get_by_id()

此方法是一个快捷方式,它使用给定的查询调用 Model.select(),但将结果集限制为单个行。此外,如果没有模型与给定查询匹配,将引发 DoesNotExist 异常。

>>> User.get(User.id == 1)
<__main__.User object at 0x25294d0>

>>> User.get_by_id(1)  # Same as above.
<__main__.User object at 0x252df10>

>>> User[1]  # Also same as above.
<__main__.User object at 0x252dd10>

>>> User.get(User.id == 1).username
u'Charlie'

>>> User.get(User.username == 'Charlie')
<__main__.User object at 0x2529410>

>>> User.get(User.username == 'nobody')
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."username" = ?
PARAMS: ['nobody']

对于更高级的操作,可以使用 SelectBase.get()。以下查询检索名为charlie 的用户的最新推文

>>> (Tweet
...  .select()
...  .join(User)
...  .where(User.username == 'charlie')
...  .order_by(Tweet.created_date.desc())
...  .get())
<__main__.Tweet object at 0x2623410>

有关更多信息,请参阅以下文档

创建或获取

Peewee 有一个帮助程序方法用于执行“获取/创建”类型操作: Model.get_or_create(),它首先尝试检索匹配的行。如果失败,将创建一个新行。

对于“创建或获取”类型逻辑,通常会依赖唯一约束或主键来防止创建重复对象。例如,假设我们希望使用 示例用户模型 实现注册新用户帐户。用户模型对用户名字段具有唯一约束,因此我们将依赖数据库的完整性保证来确保我们不会最终获得重复的用户名

try:
    with db.atomic():
        return User.create(username=username)
except peewee.IntegrityError:
    # `username` is a unique column, so this username already exists,
    # making it safe to call .get().
    return User.get(User.username == username)

您可以轻松地将此类逻辑封装为 classmethod 在您自己的 Model 类中。

上述示例首先尝试创建,然后回退到检索,依靠数据库来强制执行唯一约束。如果您更愿意首先尝试检索记录,则可以使用 get_or_create()。此方法的实现与同名 Django 函数类似。您可以使用 Django 风格关键字参数过滤器来指定 WHERE 条件。该函数返回一个 2 元组,其中包含实例和一个布尔值,指示是否创建了对象。

以下是如何使用 get_or_create() 实现用户帐户创建:

user, created = User.get_or_create(username=username)

假设我们有一个不同的模型 Person,并且想要获取或创建一个 person 对象。在检索 Person 时我们唯一关心的条件是他们的名字和姓氏,但是如果我们最终需要创建一个新记录,我们还将指定他们的出生日期和最喜欢的颜色

person, created = Person.get_or_create(
    first_name=first_name,
    last_name=last_name,
    defaults={'dob': dob, 'favorite_color': 'green'})

传递给 get_or_create() 的任何关键字参数都将在逻辑的 get() 部分中使用,除了 defaults 字典,该字典将用于填充新创建实例中的值。

有关更多详细信息,请阅读 Model.get_or_create() 的文档。

选择多条记录

我们可以使用 Model.select() 从表中检索行。当您构建一个 SELECT 查询时,数据库将返回与您的查询相对应的任何行。Peewee 允许您遍历这些行,以及使用索引和切片操作

>>> query = User.select()
>>> [user.username for user in query]
['Charlie', 'Huey', 'Peewee']

>>> query[1]
<__main__.User at 0x7f83e80f5550>

>>> query[1].username
'Huey'

>>> query[:2]
[<__main__.User at 0x7f83e80f53a8>, <__main__.User at 0x7f83e80f5550>]

Select 查询很智能,因为您可以多次迭代、索引和切片查询,但查询只执行一次。

在以下示例中,我们将简单地调用 select() 并遍历返回值,该返回值是 Select 的一个实例。这将返回 User 表中的所有行

>>> for user in User.select():
...     print(user.username)
...
Charlie
Huey
Peewee

注意

同一查询的后续迭代不会命中数据库,因为结果已被缓存。要禁用此行为(以减少内存使用),请在迭代时调用 Select.iterator()

在遍历包含外键的模型时,请小心访问相关模型上的值的方式。意外解析外键或遍历反向引用会导致 N+1 查询行为

当您创建外键(例如 Tweet.user)时,您可以使用 backref 创建反向引用(User.tweets)。反向引用显示为 Select 实例

>>> tweet = Tweet.get()
>>> tweet.user  # Accessing a foreign key returns the related model.
<tw.User at 0x7f3ceb017f50>

>>> user = User.get()
>>> user.tweets  # Accessing a back-reference returns a query.
<peewee.ModelSelect at 0x7f73db3bafd0>

您可以像其他任何 Select 一样迭代 user.tweets 反向引用

>>> for tweet in user.tweets:
...     print(tweet.message)
...
hello world
this is fun
look at this picture of my food

除了返回模型实例之外,Select 查询还可以返回字典、元组和命名元组。根据您的用例,您可能会发现将行作为字典来处理更容易,例如

>>> query = User.select().dicts()
>>> for row in query:
...     print(row)

{'id': 1, 'username': 'Charlie'}
{'id': 2, 'username': 'Huey'}
{'id': 3, 'username': 'Peewee'}

有关更多信息,请参阅 namedtuples()tuples()dicts()

迭代大型结果集

默认情况下,peewee 将在迭代 Select 查询时返回的行进行缓存。这是一个优化,可以允许多次迭代以及索引和切片,而不会导致其他查询。但是,当您计划迭代大量行时,这种缓存可能会有问题。

要减少 peewee 在迭代查询时使用的内存量,请使用 iterator() 方法。此方法允许您在不缓存返回的每个模型的情况下进行迭代,从而在迭代大型结果集时使用更少的内存。

# Let's assume we've got 10 million stat objects to dump to a csv file.
stats = Stat.select()

# Our imaginary serializer class
serializer = CSVSerializer()

# Loop over all the stats and serialize.
for stat in stats.iterator():
    serializer.serialize_object(stat)

对于简单的查询,您可以通过将行返回为字典、命名元组或元组来进一步提高速度。以下方法可用于任何 Select 查询,以更改结果行类型

别忘了附加 iterator() 方法调用以减少内存消耗。例如,上面的代码可能如下所示

# Let's assume we've got 10 million stat objects to dump to a csv file.
stats = Stat.select()

# Our imaginary serializer class
serializer = CSVSerializer()

# Loop over all the stats (rendered as tuples, without caching) and serialize.
for stat_tuple in stats.tuples().iterator():
    serializer.serialize_tuple(stat_tuple)

在迭代包含来自多个表中的列的大量行时,peewee 将为返回的每一行重建模型图。对于复杂图形,此操作可能会很慢。例如,如果我们选择一个推文列表以及推文作者的用户名和头像,Peewee 必须为每一行创建两个对象(一个推文和一个用户)。除了上述行类型之外,还有一个第四个方法 objects(),它将返回行作为模型实例,但不会尝试解析模型图。

例如

query = (Tweet
         .select(Tweet, User)  # Select tweet and user data.
         .join(User))

# Note that the user columns are stored in a separate User instance
# accessible at tweet.user:
for tweet in query:
    print(tweet.user.username, tweet.content)

# Using ".objects()" will not create the tweet.user object and assigns all
# user attributes to the tweet instance:
for tweet in query.objects():
    print(tweet.username, tweet.content)

为了获得最佳性能,您可以执行查询,然后使用底层数据库游标迭代结果。 Database.execute() 接受一个查询对象,执行查询,并返回一个 DB-API 2.0 Cursor 对象。游标将返回原始行元组

query = Tweet.select(Tweet.content, User.username).join(User)
cursor = database.execute(query)
for (content, username) in cursor:
    print(username, '->', content)

过滤记录

您可以使用正常的 python 运算符过滤特定记录。Peewee 支持各种 查询运算符

>>> user = User.get(User.username == 'Charlie')
>>> for tweet in Tweet.select().where(Tweet.user == user, Tweet.is_published == True):
...     print(tweet.user.username, '->', tweet.message)
...
Charlie -> hello world
Charlie -> this is fun

>>> for tweet in Tweet.select().where(Tweet.created_date < datetime.datetime(2011, 1, 1)):
...     print(tweet.message, tweet.created_date)
...
Really old tweet 2010-01-01 00:00:00

您还可以跨联接进行筛选

>>> for tweet in Tweet.select().join(User).where(User.username == 'Charlie'):
...     print(tweet.message)
hello world
this is fun
look at this picture of my food

如果您想表达一个复杂的查询,请使用括号和 python 的按位 运算符

>>> Tweet.select().join(User).where(
...     (User.username == 'Charlie') |
...     (User.username == 'Peewee Herman'))

注意

请注意,Peewee 使用按位运算符(&|),而不是逻辑运算符(andor)。原因是 Python 将逻辑运算的返回值强制转换为布尔值。这也是为什么“IN”查询必须使用 .in_() 而不是 in 运算符的原因。

查看查询运算表以了解哪些类型的查询是可能的。

注意

查询的 where 子句中可以包含很多有趣的东西,例如

  • 字段表达式,例如 User.username == 'Charlie'

  • 函数表达式,例如 fn.Lower(fn.Substr(User.username, 1, 1)) == 'a'

  • 将一列与另一列进行比较,例如 Employee.salary < (Employee.tenure * 1000) + 40000

您还可以嵌套查询,例如用户名以“a”开头的用户的推文

# get users whose username starts with "a"
a_users = User.select().where(fn.Lower(fn.Substr(User.username, 1, 1)) == 'a')

# the ".in_()" method signifies an "IN" query
a_user_tweets = Tweet.select().where(Tweet.user.in_(a_users))

更多查询示例

注意

有关各种示例查询,请参阅查询示例文档,其中展示了如何从PostgreSQL Exercises网站实现查询。

获取活跃用户

User.select().where(User.active == True)

获取既是工作人员又是超级用户的用户

User.select().where(
    (User.is_staff == True) | (User.is_superuser == True))

获取名为“charlie”的用户发布的推文

Tweet.select().join(User).where(User.username == 'charlie')

获取工作人员或超级用户发布的推文(假设 FK 关系)

Tweet.select().join(User).where(
    (User.is_staff == True) | (User.is_superuser == True))

使用子查询获取工作人员或超级用户发布的推文

staff_super = User.select(User.id).where(
    (User.is_staff == True) | (User.is_superuser == True))
Tweet.select().where(Tweet.user.in_(staff_super))

对记录进行排序

要按顺序返回行,请使用 order_by() 方法

>>> for t in Tweet.select().order_by(Tweet.created_date):
...     print(t.pub_date)
...
2010-01-01 00:00:00
2011-06-07 14:08:48
2011-06-07 14:12:57

>>> for t in Tweet.select().order_by(Tweet.created_date.desc()):
...     print(t.pub_date)
...
2011-06-07 14:12:57
2011-06-07 14:08:48
2010-01-01 00:00:00

您还可以使用 +- 前缀运算符来指示排序

# The following queries are equivalent:
Tweet.select().order_by(Tweet.created_date.desc())

Tweet.select().order_by(-Tweet.created_date)  # Note the "-" prefix.

# Similarly you can use "+" to indicate ascending order, though ascending
# is the default when no ordering is otherwise specified.
User.select().order_by(+User.username)

您还可以跨联接进行排序。假设您想按作者的用户名对推文进行排序,然后按 created_date 排序

query = (Tweet
         .select()
         .join(User)
         .order_by(User.username, Tweet.created_date.desc()))
SELECT t1."id", t1."user_id", t1."message", t1."is_published", t1."created_date"
FROM "tweet" AS t1
INNER JOIN "user" AS t2
  ON t1."user_id" = t2."id"
ORDER BY t2."username", t1."created_date" DESC

在对计算值进行排序时,您可以包含必要的 SQL 表达式,或引用分配给该值的别名。以下两个示例说明了这些方法

# Let's start with our base query. We want to get all usernames and the number of
# tweets they've made. We wish to sort this list from users with most tweets to
# users with fewest tweets.
query = (User
         .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username))

您可以使用 select 子句中使用的相同 COUNT 表达式进行排序。在下面的示例中,我们按推文 ID 的 COUNT() 降序排序

query = (User
         .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username)
         .order_by(fn.COUNT(Tweet.id).desc()))

或者,您可以在 select 子句中引用分配给计算值的别名。此方法的好处是更容易阅读。请注意,我们没有直接引用命名别名,而是使用 SQL 帮助程序对其进行包装

query = (User
         .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username)
         .order_by(SQL('num_tweets').desc()))

或者,以“peewee”的方式做事

ntweets = fn.COUNT(Tweet.id)
query = (User
         .select(User.username, ntweets.alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username)
         .order_by(ntweets.desc())

获取随机记录

有时,您可能希望从数据库中提取一个随机记录。您可以通过按randomrand函数(取决于您的数据库)排序来实现此目的

Postgresql和Sqlite使用Random函数

# Pick 5 lucky winners:
LotteryNumber.select().order_by(fn.Random()).limit(5)

MySQL使用Rand

# Pick 5 lucky winners:
LotteryNumber.select().order_by(fn.Rand()).limit(5)

分页记录

paginate()方法可以轻松获取页面或记录。 paginate()采用两个参数,page_numberitems_per_page

注意

页码从 1 开始,因此结果的第一页将是第 1 页。

>>> for tweet in Tweet.select().order_by(Tweet.id).paginate(2, 10):
...     print(tweet.message)
...
tweet 10
tweet 11
tweet 12
tweet 13
tweet 14
tweet 15
tweet 16
tweet 17
tweet 18
tweet 19

如果您希望获得更精细的控制,则始终可以使用 limit()offset()

计数记录

您可以在任何选择查询中计数行数

>>> Tweet.select().count()
100
>>> Tweet.select().where(Tweet.id > 50).count()
50

Peewee会将您的查询包装在一个执行计数的外查询中,这会产生类似以下内容的 SQL

SELECT COUNT(1) FROM ( ... your query ... );

聚合记录

假设您有一些用户,并且希望获取它们的列表以及每个用户中的推文计数。

query = (User
         .select(User, fn.Count(Tweet.id).alias('count'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User))

结果查询将返回User对象,其中包含它们的所有常规属性以及一个附加属性count,其中将包含每个用户的推文计数。我们使用左外联接来包括没有推文的用户。

假设您有一个标记应用程序,并且希望查找与一定数量相关对象关联的标记。对于此示例,我们将在多对多配置中使用一些不同的模型

class Photo(Model):
    image = CharField()

class Tag(Model):
    name = CharField()

class PhotoTag(Model):
    photo = ForeignKeyField(Photo)
    tag = ForeignKeyField(Tag)

现在,假设我们希望查找与之关联至少 5 张照片的标记

query = (Tag
         .select()
         .join(PhotoTag)
         .join(Photo)
         .group_by(Tag)
         .having(fn.Count(Photo.id) > 5))

此查询等效于以下 SQL

SELECT t1."id", t1."name"
FROM "tag" AS t1
INNER JOIN "phototag" AS t2 ON t1."id" = t2."tag_id"
INNER JOIN "photo" AS t3 ON t2."photo_id" = t3."id"
GROUP BY t1."id", t1."name"
HAVING Count(t3."id") > 5

假设我们希望获取关联的计数并将其存储在标记上

query = (Tag
         .select(Tag, fn.Count(Photo.id).alias('count'))
         .join(PhotoTag)
         .join(Photo)
         .group_by(Tag)
         .having(fn.Count(Photo.id) > 5))

检索标量值

您可以通过调用Query.scalar()来检索标量值。例如

>>> PageView.select(fn.Count(fn.Distinct(PageView.url))).scalar()
100

您可以通过传递as_tuple=True来检索多个标量值

>>> Employee.select(
...     fn.Min(Employee.salary), fn.Max(Employee.salary)
... ).scalar(as_tuple=True)
(30000, 50000)

窗口函数

Window函数是指一个聚合函数,它对作为SELECT查询一部分进行处理的数据的滑动窗口进行操作。窗口函数可以执行以下操作

  1. 对结果集的子集执行聚合。

  2. 计算运行总计。

  3. 对结果进行排名。

  4. 将行值与前(或后)行中的值进行比较。

peewee 提供对 SQL 窗口函数的支持,可以通过调用 Function.over() 并传入分区或排序参数来创建。

对于以下示例,我们将使用以下模型和示例数据

class Sample(Model):
    counter = IntegerField()
    value = FloatField()

data = [(1, 10),
        (1, 20),
        (2, 1),
        (2, 3),
        (3, 100)]
Sample.insert_many(data, fields=[Sample.counter, Sample.value]).execute()

我们的示例表现在包含

id

counter

value

1

1

10.0

2

1

20.0

3

2

1.0

4

2

3.0

5

3

100.0

有序窗口

让我们计算 value 字段的运行总和。为了使其成为“运行”总和,我们需要对其进行排序,因此我们将根据 Sample 的 id 字段进行排序

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(order_by=[Sample.id]).alias('total'))

for sample in query:
    print(sample.counter, sample.value, sample.total)

# 1    10.    10.
# 1    20.    30.
# 2     1.    31.
# 2     3.    34.
# 3   100    134.

对于另一个示例,我们将计算按 id 排序时当前值与前一个值之间的差。

difference = Sample.value - fn.LAG(Sample.value, 1).over(order_by=[Sample.id])
query = Sample.select(
    Sample.counter,
    Sample.value,
    difference.alias('diff'))

for sample in query:
    print(sample.counter, sample.value, sample.diff)

# 1    10.   NULL
# 1    20.    10.  -- (20 - 10)
# 2     1.   -19.  -- (1 - 20)
# 2     3.     2.  -- (3 - 1)
# 3   100     97.  -- (100 - 3)

分区窗口

让我们计算每个不同的“counter”值的平均 value。请注意,counter 字段有三个可能的值(1、2 和 3)。我们可以通过计算 value 列的 AVG() 在根据 counter 字段进行分区的一个窗口中来实现这一点

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.AVG(Sample.value).over(partition_by=[Sample.counter]).alias('cavg'))

for sample in query:
    print(sample.counter, sample.value, sample.cavg)

# 1    10.    15.
# 1    20.    15.
# 2     1.     2.
# 2     3.     2.
# 3   100    100.

我们可以通过同时指定 order_bypartition_by 参数在分区内使用排序。例如,让我们在每个不同的 counter 组中按值对样本进行排名。

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.RANK().over(
        order_by=[Sample.value],
        partition_by=[Sample.counter]).alias('rank'))

for sample in query:
    print(sample.counter, sample.value, sample.rank)

# 1    10.    1
# 1    20.    2
# 2     1.    1
# 2     3.    2
# 3   100     1

有界窗口

默认情况下,窗口函数使用窗口的无界前置开始和当前行作为结束进行评估。我们可以通过在对 Function.over() 的调用中指定 start 和/或 end 来更改聚合函数操作的窗口边界。此外,Peewee 在 Window 对象上提供了帮助程序方法,用于生成适当的边界引用

为了检查边界如何工作,我们将计算value列的累加和,按id排序,但是我们只会查看当前行及其前面两行的累加和

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.id],
        start=Window.preceding(2),
        end=Window.CURRENT_ROW).alias('rsum'))

for sample in query:
    print(sample.counter, sample.value, sample.rsum)

# 1    10.    10.
# 1    20.    30.  -- (20 + 10)
# 2     1.    31.  -- (1 + 20 + 10)
# 2     3.    24.  -- (3 + 1 + 20)
# 3   100    104.  -- (100 + 3 + 1)

注意

从技术上讲,我们不需要指定end=Window.CURRENT,因为这是默认值。它在示例中显示用于演示。

我们来看另一个示例。在此示例中,我们将计算累加和的“相反数”,其中按id排序的所有值的总和将减少样本值。为了实现这一点,我们将计算从当前行到最后一行之间的总和。

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.id],
        start=Window.CURRENT_ROW,
        end=Window.following()).alias('rsum'))

# 1    10.   134.  -- (10 + 20 + 1 + 3 + 100)
# 1    20.   124.  -- (20 + 1 + 3 + 100)
# 2     1.   104.  -- (1 + 3 + 100)
# 2     3.   103.  -- (3 + 100)
# 3   100    100.  -- (100)

过滤的聚合

聚合函数还可以支持过滤器函数(Postgres 和 Sqlite 3.25+),这些函数会转换为FILTER (WHERE...)子句。使用Function.filter()方法将过滤器表达式添加到聚合函数。

例如,我们将计算value字段相对于id的累加和,但我们将过滤掉任何counter=2的样本。

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).filter(Sample.counter != 2).over(
        order_by=[Sample.id]).alias('csum'))

for sample in query:
    print(sample.counter, sample.value, sample.csum)

# 1    10.    10.
# 1    20.    30.
# 2     1.    30.
# 2     3.    30.
# 3   100    130.

注意

filter()的调用必须先于对over()的调用。

重复使用窗口定义

如果您打算对多个聚合使用相同的窗口定义,则可以创建一个Window对象。Window对象采用与Function.over()相同的参数,并且可以传递给over()方法,以代替各个参数。

这里我们将声明一个窗口,按样本id排序,并使用该窗口定义调用多个窗口函数

win = Window(order_by=[Sample.id])
query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.LEAD(Sample.value).over(win),
    fn.LAG(Sample.value).over(win),
    fn.SUM(Sample.value).over(win)
).window(win)  # Include our window definition in query.

for row in query.tuples():
    print(row)

# counter  value  lead()  lag()  sum()
# 1          10.     20.   NULL    10.
# 1          20.      1.    10.    30.
# 2           1.      3.    20.    31.
# 2           3.    100.     1.    34.
# 3         100.    NULL     3.   134.

多个窗口定义

在前面的示例中,我们了解了如何声明Window定义并将其重复用于多个不同的聚合。您可以在查询中包含任意多个窗口定义,但必须确保每个窗口都有一个唯一的别名

w1 = Window(order_by=[Sample.id]).alias('w1')
w2 = Window(partition_by=[Sample.counter]).alias('w2')
query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(w1).alias('rsum'),  # Running total.
    fn.AVG(Sample.value).over(w2).alias('cavg')   # Avg per category.
).window(w1, w2)  # Include our window definitions.

for sample in query:
    print(sample.counter, sample.value, sample.rsum, sample.cavg)

# counter  value   rsum     cavg
# 1          10.     10.     15.
# 1          20.     30.     15.
# 2           1.     31.      2.
# 2           3.     34.      2.
# 3         100     134.    100.

类似地,如果您有多个具有相似定义的窗口定义,则可以扩展先前定义的窗口定义。例如,这里我们将按计数器值对数据集进行分区,因此我们将针对计数器进行聚合。然后,我们将定义一个扩展此分区的第二个窗口,并添加一个排序子句

w1 = Window(partition_by=[Sample.counter]).alias('w1')

# By extending w1, this window definition will also be partitioned
# by "counter".
w2 = Window(extends=w1, order_by=[Sample.value.desc()]).alias('w2')

query = (Sample
         .select(Sample.counter, Sample.value,
                 fn.SUM(Sample.value).over(w1).alias('group_sum'),
                 fn.RANK().over(w2).alias('revrank'))
         .window(w1, w2)
         .order_by(Sample.id))

for sample in query:
    print(sample.counter, sample.value, sample.group_sum, sample.revrank)

# counter  value   group_sum   revrank
# 1        10.     30.         2
# 1        20.     30.         1
# 2        1.      4.          2
# 2        3.      4.          1
# 3        100.    100.        1

帧类型:RANGE 与 ROWS 与 GROUPS

根据帧类型,数据库将以不同方式处理有序组。让我们创建两行其他 Sample 行以显示差异

>>> Sample.create(counter=1, value=20.)
<Sample 6>
>>> Sample.create(counter=2, value=1.)
<Sample 7>

我们的表格现在包含

id

counter

value

1

1

10.0

2

1

20.0

3

2

1.0

4

2

3.0

5

3

100.0

6

1

20.0

7

2

1.0

让我们通过计算样本的“累加和”来检查差异,该累加和根据 countervalue 字段进行排序。要指定帧类型,我们可以使用

当存在逻辑重复项时,RANGE 的行为可能会导致意外结果

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.counter, Sample.value],
        frame_type=Window.RANGE).alias('rsum'))

for sample in query.order_by(Sample.counter, Sample.value):
    print(sample.counter, sample.value, sample.rsum)

# counter  value   rsum
# 1          10.     10.
# 1          20.     50.
# 1          20.     50.
# 2           1.     52.
# 2           1.     52.
# 2           3.     55.
# 3         100     155.

随着新行的加入,我们现在有一些行具有重复的 categoryvalue 值。RANGE 帧类型导致这些重复项一起评估,而不是单独评估。

可以使用 ROWS 作为帧类型来获得更预期的结果

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.counter, Sample.value],
        frame_type=Window.ROWS).alias('rsum'))

for sample in query.order_by(Sample.counter, Sample.value):
    print(sample.counter, sample.value, sample.rsum)

# counter  value   rsum
# 1          10.     10.
# 1          20.     30.
# 1          20.     50.
# 2           1.     51.
# 2           1.     52.
# 2           3.     55.
# 3         100     155.

Peewee 使用这些规则来确定要使用什么帧类型

  • 如果用户指定 frame_type,将使用该帧类型。

  • 如果指定 start 和/或 end 边界,Peewee 将默认使用 ROWS

  • 如果用户未指定帧类型或开始/结束边界,Peewee 将使用数据库默认值,即 RANGE

Window.GROUPS 帧类型根据排序项查看窗口范围规范,以按组查看行。使用 GROUPS,我们可以定义帧,以便其涵盖不同的行分组。我们来看一个示例

query = (Sample
         .select(Sample.counter, Sample.value,
                 fn.SUM(Sample.value).over(
                    order_by=[Sample.counter, Sample.value],
                    frame_type=Window.GROUPS,
                    start=Window.preceding(1)).alias('gsum'))
         .order_by(Sample.counter, Sample.value))

for sample in query:
    print(sample.counter, sample.value, sample.gsum)

#  counter   value    gsum
#  1         10       10
#  1         20       50
#  1         20       50   (10) + (20+0)
#  2         1        42
#  2         1        42   (20+20) + (1+1)
#  2         3        5    (1+1) + 3
#  3         100      103  (3) + 100

正如您所希望的那样,窗口按其排序项分组,即 (counter, value)。我们正在查看一个窗口,该窗口从一个前一组延伸到当前组。

注意

有关窗口函数 API 的信息,请参阅

有关窗口函数的一般信息,请阅读 postgres 窗口函数教程

此外,postgres 文档sqlite 文档 包含大量有用的信息。

检索行元组/字典/命名元组

有时,您不需要创建模型实例的开销,而只是想迭代行数据,而不需要 Model 提供的所有 API。要执行此操作,请使用

stats = (Stat
         .select(Stat.url, fn.Count(Stat.url))
         .group_by(Stat.url)
         .tuples())

# iterate over a list of 2-tuples containing the url and count
for stat_url, stat_count in stats:
    print(stat_url, stat_count)

同样,您可以使用 dicts() 将游标中的行作为字典返回

stats = (Stat
         .select(Stat.url, fn.Count(Stat.url).alias('ct'))
         .group_by(Stat.url)
         .dicts())

# iterate over a list of 2-tuples containing the url and count
for stat in stats:
    print(stat['url'], stat['ct'])

返回子句

PostgresqlDatabase 支持 RETURNING 子句,用于 UPDATEINSERTDELETE 查询。指定 RETURNING 子句允许您迭代查询访问的行。

默认情况下,执行不同查询时的返回值为

  • INSERT - 新插入行的自动增量主键值。如果不使用自动增量主键,Postgres 将返回新行的主键,但 SQLite 和 MySQL 不会。

  • UPDATE - 修改的行数

  • DELETE - 删除的行数

当使用返回子句时,执行查询时的返回值将是可迭代的游标对象。

Postgresql 允许通过 RETURNING 子句从查询插入或修改的行中返回数据。

例如,假设您有一个 Update,它停用所有注册已过期的用户帐户。在停用它们后,您希望向每个用户发送电子邮件,让他们知道他们的帐户已被停用。您可以通过带有 RETURNING 子句的单个 UPDATE 查询来完成此操作,而不是编写两个查询,一个 SELECT 和一个 UPDATE

query = (User
         .update(is_active=False)
         .where(User.registration_expired == True)
         .returning(User))

# Send an email to every user that was deactivated.
for deactivate_user in query.execute():
    send_deactivation_email(deactivated_user.email)

RETURNING 子句也可用于 InsertDelete。当与 INSERT 一起使用时,将返回新创建的行。当与 DELETE 一起使用时,将返回已删除的行。

RETURNING 子句的唯一限制是它只能包含查询的 FROM 子句中列出的表的列。要从特定表中选择所有列,您只需传入 Model 类。

作为另一个示例,我们添加一个用户并将他们的创建日期设置为服务器生成的当前时间戳。我们将在一个查询中创建并检索新用户的 ID、电子邮件和创建时间戳

query = (User
         .insert(email='[email protected]', created=fn.now())
         .returning(User))  # Shorthand for all columns on User.

# When using RETURNING, execute() returns a cursor.
cursor = query.execute()

# Get the user object we just inserted and log the data:
user = cursor[0]
logger.info('Created user %s (id=%s) at %s', user.email, user.id, user.created)

默认情况下,游标将返回 Model 实例,但您可以指定不同的行类型

data = [{'name': 'charlie'}, {'name': 'huey'}, {'name': 'mickey'}]
query = (User
         .insert_many(data)
         .returning(User.id, User.username)
         .dicts())

for new_user in query.execute():
    print('Added user "%s", id=%s' % (new_user['username'], new_user['id']))

Select 查询一样,您可以指定各种 结果行类型

公用表表达式

Peewee 支持在所有类型的查询中包含公用表表达式 (CTE)。CTE 可能对以下情况有用

  • 分解一个公用子查询。

  • 按 CTE 结果集中派生的列进行分组或筛选。

  • 编写递归查询。

要声明一个 Select 查询以用作 CTE,请使用 cte() 方法,它将查询包装在一个 CTE 对象中。要指示应将 CTE 包含在查询中,请使用 Query.with_cte() 方法,传递 CTE 对象列表。

简单示例

例如,假设我们有一些数据点,它们由键和浮点值组成。我们定义我们的模型并填充一些测试数据

class Sample(Model):
    key = TextField()
    value = FloatField()

data = (
    ('a', (1.25, 1.5, 1.75)),
    ('b', (2.1, 2.3, 2.5, 2.7, 2.9)),
    ('c', (3.5, 3.5)))

# Populate data.
for key, values in data:
    Sample.insert_many([(key, value) for value in values],
                       fields=[Sample.key, Sample.value]).execute()

让我们使用 CTE 计算每个不同键的哪些值高于该键的平均值。

# First we'll declare the query that will be used as a CTE. This query
# simply determines the average value for each key.
cte = (Sample
       .select(Sample.key, fn.AVG(Sample.value).alias('avg_value'))
       .group_by(Sample.key)
       .cte('key_avgs', columns=('key', 'avg_value')))

# Now we'll query the sample table, using our CTE to find rows whose value
# exceeds the average for the given key. We'll calculate how far above the
# average the given sample's value is, as well.
query = (Sample
         .select(Sample.key, Sample.value)
         .join(cte, on=(Sample.key == cte.c.key))
         .where(Sample.value > cte.c.avg_value)
         .order_by(Sample.value)
         .with_cte(cte))

我们可以迭代查询返回的样本,以查看哪些样本对其给定组具有高于平均值的值

>>> for sample in query:
...     print(sample.key, sample.value)

# 'a', 1.75
# 'b', 2.7
# 'b', 2.9

复杂示例

对于一个更完整的示例,让我们考虑以下查询,它使用多个 CTE 在仅限于顶级销售区域中查找按产品划分的销售总额。我们的模型如下所示

class Order(Model):
    region = TextField()
    amount = FloatField()
    product = TextField()
    quantity = IntegerField()

以下是此查询在 SQL 中的编写方式。此示例可以在 postgresql 文档 中找到。

WITH regional_sales AS (
    SELECT region, SUM(amount) AS total_sales
    FROM orders
    GROUP BY region
  ), top_regions AS (
    SELECT region
    FROM regional_sales
    WHERE total_sales > (SELECT SUM(total_sales) / 10 FROM regional_sales)
  )
SELECT region,
       product,
       SUM(quantity) AS product_units,
       SUM(amount) AS product_sales
FROM orders
WHERE region IN (SELECT region FROM top_regions)
GROUP BY region, product;

使用 Peewee,我们将编写

reg_sales = (Order
             .select(Order.region,
                     fn.SUM(Order.amount).alias('total_sales'))
             .group_by(Order.region)
             .cte('regional_sales'))

top_regions = (reg_sales
               .select(reg_sales.c.region)
               .where(reg_sales.c.total_sales > (
                   reg_sales.select(fn.SUM(reg_sales.c.total_sales) / 10)))
               .cte('top_regions'))

query = (Order
         .select(Order.region,
                 Order.product,
                 fn.SUM(Order.quantity).alias('product_units'),
                 fn.SUM(Order.amount).alias('product_sales'))
         .where(Order.region.in_(top_regions.select(top_regions.c.region)))
         .group_by(Order.region, Order.product)
         .with_cte(reg_sales, top_regions))

递归 CTE

Peewee 支持递归 CTE。当您拥有由父链接外键表示的树形数据结构时,递归 CTE 很有用。例如,假设我们有一个在线书店的类别层次结构。我们希望生成一个表,显示所有类别及其绝对深度,以及从根到类别的路径。

我们假设以下模型定义,其中每个类别都有一个外键指向其直接父类别

class Category(Model):
    name = TextField()
    parent = ForeignKeyField('self', backref='children', null=True)

要列出所有类别及其深度和父类别,我们可以使用递归 CTE

# Define the base case of our recursive CTE. This will be categories that
# have a null parent foreign-key.
Base = Category.alias()
level = Value(1).alias('level')
path = Base.name.alias('path')
base_case = (Base
             .select(Base.id, Base.name, Base.parent, level, path)
             .where(Base.parent.is_null())
             .cte('base', recursive=True))

# Define the recursive terms.
RTerm = Category.alias()
rlevel = (base_case.c.level + 1).alias('level')
rpath = base_case.c.path.concat('->').concat(RTerm.name).alias('path')
recursive = (RTerm
             .select(RTerm.id, RTerm.name, RTerm.parent, rlevel, rpath)
             .join(base_case, on=(RTerm.parent == base_case.c.id)))

# The recursive CTE is created by taking the base case and UNION ALL with
# the recursive term.
cte = base_case.union_all(recursive)

# We will now query from the CTE to get the categories, their levels,  and
# their paths.
query = (cte
         .select_from(cte.c.name, cte.c.level, cte.c.path)
         .order_by(cte.c.path))

# We can now iterate over a list of all categories and print their names,
# absolute levels, and path from root -> category.
for category in query:
    print(category.name, category.level, category.path)

# Example output:
# root, 1, root
# p1, 2, root->p1
# c1-1, 3, root->p1->c1-1
# c1-2, 3, root->p1->c1-2
# p2, 2, root->p2
# c2-1, 3, root->p2->c2-1

数据修改 CTE

Peewee 支持数据修改 CTE。

使用数据修改 CTE 将数据从一个表移动到存档表(使用单个查询)的示例

class Event(Model):
    name = CharField()
    timestamp = DateTimeField()

class Archive(Model):
    name = CharField()
    timestamp = DateTimeField()

# Move rows older than 24 hours from the Event table to the Archive.
cte = (Event
       .delete()
       .where(Event.timestamp < (datetime.now() - timedelta(days=1)))
       .returning(Event)
       .cte('moved_rows'))

# Create a simple SELECT to get the resulting rows from the CTE.
src = Select((cte,), (cte.c.id, cte.c.name, cte.c.timestamp))

# Insert into the archive table whatever data was returned by the DELETE.
res = (Archive
       .insert_from(src, (Archive.id, Archive.name, Archive.timestamp))
       .with_cte(cte)
       .execute())

上述内容大致对应于以下 SQL

WITH "moved_rows" AS (
    DELETE FROM "event" WHERE ("timestamp" < XXXX-XX-XXTXX:XX:XX)
    RETURNING "id", "name", "timestamp")
INSERT INTO "archive" ("id", "name", "timestamp")
SELECT "moved_rows"."id", "moved_rows"."name", "moved_rows"."timestamp"
FROM "moved_rows";

有关其他示例,请参阅 models.pysql.py 中的测试

外键和联接

此部分已移至其自己的文档中:关系和联接