查询生成器
Peewee 的高级 Model
和 Field
API 构建在底层的 Table
和 Column
对应项之上。虽然这些底层 API 的文档不如其高级对应项那么详细,但本文档将提供一个概述,并提供一些示例,希望能够让你进行尝试。
我们将使用以下架构
CREATE TABLE "person" (
"id" INTEGER NOT NULL PRIMARY KEY,
"first" TEXT NOT NULL,
"last" TEXT NOT NULL);
CREATE TABLE "note" (
"id" INTEGER NOT NULL PRIMARY KEY,
"person_id" INTEGER NOT NULL,
"content" TEXT NOT NULL,
"timestamp" DATETIME NOT NULL,
FOREIGN KEY ("person_id") REFERENCES "person" ("id"));
CREATE TABLE "reminder" (
"id" INTEGER NOT NULL PRIMARY KEY,
"note_id" INTEGER NOT NULL,
"alarm" DATETIME NOT NULL,
FOREIGN KEY ("note_id") REFERENCES "note" ("id"));
声明表
我们可以通过两种方式声明 Table
对象来处理这些表
# Explicitly declare columns
Person = Table('person', ('id', 'first', 'last'))
Note = Table('note', ('id', 'person_id', 'content', 'timestamp'))
# Do not declare columns, they will be accessed using magic ".c" attribute
Reminder = Table('reminder')
通常,我们会希望将我们的表 bind()
到数据库。这可以让我们不必在每次希望对表执行查询时都显式传递数据库
db = SqliteDatabase('my_app.db')
Person = Person.bind(db)
Note = Note.bind(db)
Reminder = Reminder.bind(db)
选择查询
要选择前三个笔记并打印其内容,我们可以编写
query = Note.select().order_by(Note.timestamp).limit(3)
for note_dict in query:
print(note_dict['content'])
注意
默认情况下,行将作为字典返回。如果你愿意,可以使用 tuples()
、namedtuples()
或 objects()
方法为行数据指定不同的容器。
因为我们没有指定任何列,所以我们将在笔记的 Table
构造函数中定义的所有列都将被选中。这对于 Reminder 不起作用,因为我们根本没有指定任何列。
要选择 2018 年发布的所有笔记以及创建者的姓名,我们将使用 join()
。我们还将请求将行作为 namedtuple 对象返回
query = (Note
.select(Note.content, Note.timestamp, Person.first, Person.last)
.join(Person, on=(Note.person_id == Person.id))
.where(Note.timestamp >= datetime.date(2018, 1, 1))
.order_by(Note.timestamp)
.namedtuples())
for row in query:
print(row.timestamp, '-', row.content, '-', row.first, row.last)
让我们查询最多产的人,即获取创建笔记最多的人。这引入了调用 SQL 函数(COUNT),这是使用 fn
对象完成的
name = Person.first.concat(' ').concat(Person.last)
query = (Person
.select(name.alias('name'), fn.COUNT(Note.id).alias('count'))
.join(Note, JOIN.LEFT_OUTER, on=(Note.person_id == Person.id))
.group_by(name)
.order_by(fn.COUNT(Note.id).desc()))
for row in query:
print(row['name'], row['count'])
上述查询中需要注意以下几点
我们把一个表达式存储在一个变量中(
name
),然后在查询中使用它。我们使用
fn.<function>(...)
调用 SQL 函数,传递参数就像它是一个普通的 Python 函数一样。使用
alias()
方法来指定用于列或计算的名称。
作为一个更复杂的示例,我们将生成一个包含所有人员以及他们最近发布的笔记的内容和时间戳的列表。为此,我们最终将在同一个查询的不同上下文中两次使用 Note 表,这将要求我们使用表别名。
# Start with the query that calculates the timestamp of the most recent
# note for each person.
NA = Note.alias('na')
max_note = (NA
.select(NA.person_id, fn.MAX(NA.timestamp).alias('max_ts'))
.group_by(NA.person_id)
.alias('max_note'))
# Now we'll select from the note table, joining on both the subquery and
# on the person table to construct the result set.
query = (Note
.select(Note.content, Note.timestamp, Person.first, Person.last)
.join(max_note, on=((max_note.c.person_id == Note.person_id) &
(max_note.c.max_ts == Note.timestamp)))
.join(Person, on=(Note.person_id == Person.id))
.order_by(Person.first, Person.last))
for row in query.namedtuples():
print(row.first, row.last, ':', row.timestamp, '-', row.content)
在对 max_note 子查询进行连接的连接谓词中,我们可以使用神奇的 “.c” 属性来引用子查询中的列。因此,max_note.c.max_ts 被翻译为“max_note 子查询中的 max_ts 列值”。
我们还可以使用 “.c” 魔术属性来访问未明确定义其列的表上的列,就像我们对 Reminder 表所做的那样。这是一个简单的查询,用于获取今天的全部提醒以及它们关联的笔记内容
today = datetime.date.today()
tomorrow = today + datetime.timedelta(days=1)
query = (Reminder
.select(Reminder.c.alarm, Note.content)
.join(Note, on=(Reminder.c.note_id == Note.id))
.where(Reminder.c.alarm.between(today, tomorrow))
.order_by(Reminder.c.alarm))
for row in query:
print(row['alarm'], row['content'])
注意
为了防止混淆,“.c” 属性将不会在明确定义其列的表上起作用。
插入查询
插入数据很简单。我们可以通过两种不同的方式在 insert()
中指定数据(在这两种情况下,都会返回新行的 ID)
# Using keyword arguments:
zaizee_id = Person.insert(first='zaizee', last='cat').execute()
# Using column: value mappings:
Note.insert({
Note.person_id: zaizee_id,
Note.content: 'meeeeowwww',
Note.timestamp: datetime.datetime.now()}).execute()
批量插入数据很容易,只需传入以下内容
字典列表(所有字典必须具有相同的键/列)。
元组列表(如果明确指定了列)。
示例
people = [
{'first': 'Bob', 'last': 'Foo'},
{'first': 'Herb', 'last': 'Bar'},
{'first': 'Nuggie', 'last': 'Bar'}]
# Inserting multiple rows returns the ID of the last-inserted row.
last_id = Person.insert(people).execute()
# We can also specify row tuples, so long as we tell Peewee which
# columns the tuple values correspond to:
people = [
('Bob', 'Foo'),
('Herb', 'Bar'),
('Nuggie', 'Bar')]
Person.insert(people, columns=[Person.first, Person.last]).execute()
更新查询
update()
查询接受关键字参数或将列映射到值的字典,就像 insert()
一样。
示例
# "Bob" changed his last name from "Foo" to "Baze".
nrows = (Person
.update(last='Baze')
.where((Person.first == 'Bob') &
(Person.last == 'Foo'))
.execute())
# Use dictionary mapping column to value.
nrows = (Person
.update({Person.last: 'Baze'})
.where((Person.first == 'Bob') &
(Person.last == 'Foo'))
.execute())
你还可以使用表达式作为值来执行原子更新。想象一下我们有一个 PageView 表,我们需要对某个 URL 原子地增加页面浏览次数
# Do an atomic update:
(PageView
.update({PageView.count: PageView.count + 1})
.where(PageView.url == some_url)
.execute())
删除查询
delete()
查询是最简单的,因为它们不接受任何参数
# Delete all notes created before 2018, returning number deleted.
n = Note.delete().where(Note.timestamp < datetime.date(2018, 1, 1)).execute()
由于 DELETE(和 UPDATE)查询不支持连接,因此我们可以使用子查询根据相关表中的值删除行。例如,以下是如何删除姓氏为“Foo”的任何人的所有笔记
# Get the id of all people whose last name is "Foo".
foo_people = Person.select(Person.id).where(Person.last == 'Foo')
# Delete all notes by any person whose ID is in the previous query.
Note.delete().where(Note.person_id.in_(foo_people)).execute()
查询对象
Peewee 2.x 提供的抽象的一个基本限制是不存在一个表示与给定模型类无关的结构化查询的类。
这方面的一个示例可能是计算子查询的聚合值。例如,count()
方法返回任意查询中的行数,其实现方式是包装查询
SELECT COUNT(1) FROM (...)
要使用 Peewee 完成此操作,实现方式如下
def count(query):
# Select([source1, ... sourcen], [column1, ...columnn])
wrapped = Select(from_list=[query], columns=[fn.COUNT(SQL('1'))])
curs = wrapped.tuples().execute(db)
return curs[0][0] # Return first column from first row of result.
实际上,我们可以使用 scalar()
方法更简洁地表达此内容,该方法适用于从聚合查询返回值
def count(query):
wrapped = Select(from_list=[query], columns=[fn.COUNT(SQL('1'))])
return wrapped.scalar(db)
查询示例 文档中有一个更复杂的示例,其中我们为预订了最多可用时段的设施编写查询
我们希望表达的 SQL 如下
SELECT facid, total FROM (
SELECT facid, SUM(slots) AS total,
rank() OVER (order by SUM(slots) DESC) AS rank
FROM bookings
GROUP BY facid
) AS ranked
WHERE rank = 1
我们可以使用简单的 Select
作为外部查询,以相当优雅的方式表达此内容
# Store rank expression in variable for readability.
rank_expr = fn.rank().over(order_by=[fn.SUM(Booking.slots).desc()])
subq = (Booking
.select(Booking.facility, fn.SUM(Booking.slots).alias('total'),
rank_expr.alias('rank'))
.group_by(Booking.facility))
# Use a plain "Select" to create outer query.
query = (Select(columns=[subq.c.facid, subq.c.total])
.from_(subq)
.where(subq.c.rank == 1)
.tuples())
# Iterate over the resulting facility ID(s) and total(s):
for facid, total in query.execute(db):
print(facid, total)
再举一个示例,让我们创建一个递归公用表表达式来计算前 10 个斐波那契数
base = Select(columns=(
Value(1).alias('n'),
Value(0).alias('fib_n'),
Value(1).alias('next_fib_n'))).cte('fibonacci', recursive=True)
n = (base.c.n + 1).alias('n')
recursive_term = Select(columns=(
n,
base.c.next_fib_n,
base.c.fib_n + base.c.next_fib_n)).from_(base).where(n < 10)
fibonacci = base.union_all(recursive_term)
query = fibonacci.select_from(fibonacci.c.n, fibonacci.c.fib_n)
results = list(query.execute(db))
# Generates the following result list:
[{'fib_n': 0, 'n': 1},
{'fib_n': 1, 'n': 2},
{'fib_n': 1, 'n': 3},
{'fib_n': 2, 'n': 4},
{'fib_n': 3, 'n': 5},
{'fib_n': 5, 'n': 6},
{'fib_n': 8, 'n': 7},
{'fib_n': 13, 'n': 8},
{'fib_n': 21, 'n': 9},
{'fib_n': 34, 'n': 10}]
更多
有关用于描述 SQL AST 的各种类的说明,请参阅 查询生成器 API 文档。
如果您有兴趣了解更多信息,还可以查看 项目源代码。