SQLite 扩展

默认 SqliteDatabase 已包含许多特定于 SQLite 的功能

入门

要开始使用本文档中描述的功能,您需要使用 playhouse.sqlite_ext 模块中的 SqliteExtDatabase 类。此外,某些功能需要 playhouse._sqlite_ext C 扩展 - 这些功能将在文档中注明。

实例化 SqliteExtDatabase

from playhouse.sqlite_ext import SqliteExtDatabase

db = SqliteExtDatabase('my_app.db', pragmas=(
    ('cache_size', -1024 * 64),  # 64MB page-cache.
    ('journal_mode', 'wal'),  # Use WAL-mode (you should always use this!).
    ('foreign_keys', 1)))  # Enforce foreign-key constraints.

API

class SqliteExtDatabase(database[, pragmas=None[, timeout=5[, c_extensions=None[, rank_functions=True[, hash_functions=False[, regexp_function=False[, bloomfilter=False]]]]]]])
参数
  • pragmas (列表) - 包含 pragma 键和值的 2 元组列表,用于在每次打开连接时设置。

  • timeout - 设置 SQLite 驱动程序上的繁忙超时(以秒为单位)。

  • c_extensions (布尔值) - 声明必须/不得使用 C 扩展加速。如果设置为 True 且扩展模块不可用,将引发 ImproperlyConfigured 异常。

  • rank_functions (布尔值) - 使搜索结果排名函数可用。

  • hash_functions (布尔值) - 使哈希函数可用(md5、sha1 等)。

  • regexp_function (布尔值) - 使 REGEXP 函数可用。

  • bloomfilter (布尔值) - 使 布隆过滤器 可用。

扩展 SqliteDatabase 并继承声明用户定义函数、pragma 等的方法。

CSqliteExtDatabase(database[, pragmas=None[, timeout=5[, c_extensions=None[, rank_functions=True[, hash_functions=False[, regexp_function=False[, bloomfilter=False[, replace_busy_handler=False]]]]]]]])
参数
  • pragmas (列表) - 包含 pragma 键和值的 2 元组列表,用于在每次打开连接时设置。

  • timeout - 设置 SQLite 驱动程序上的繁忙超时(以秒为单位)。

  • c_extensions (布尔值) - 声明必须/不得使用 C 扩展加速。如果设置为 True 且扩展模块不可用,将引发 ImproperlyConfigured 异常。

  • rank_functions (布尔值) - 使搜索结果排名函数可用。

  • hash_functions (布尔值) - 使哈希函数可用(md5、sha1 等)。

  • regexp_function (布尔值) - 使 REGEXP 函数可用。

  • bloomfilter (布尔值) - 使 布隆过滤器 可用。

  • replace_busy_handler (布尔值) – 使用更智能的繁忙处理程序实现。

扩展 SqliteExtDatabase 并要求 playhouse._sqlite_ext 扩展模块可用。

on_commit(fn)

注册一个回调,以便在当前连接上提交事务时执行。回调不接受参数,忽略返回值。

但是,如果回调引发 ValueError,事务将中止并回滚。

示例

db = CSqliteExtDatabase(':memory:')

@db.on_commit
def on_commit():
    logger.info('COMMITing changes')
on_rollback(fn)

注册一个回调,以便在当前连接上回滚事务时执行。回调不接受参数,忽略返回值。

示例

@db.on_rollback
def on_rollback():
    logger.info('Rolling back changes')
on_update(fn)

注册一个回调,以便在写入数据库(通过 UPDATEINSERTDELETE 查询)时执行。回调应接受以下参数

  • query - 查询类型,INSERTUPDATEDELETE

  • 数据库名称 - 默认数据库名为 main

  • 表名称 - 正在修改的表的名称。

  • rowid - 正在修改的行中的 rowid。

忽略回调的返回值。

示例

db = CSqliteExtDatabase(':memory:')

@db.on_update
def on_update(query_type, db, table, rowid):
    # e.g. INSERT row 3 into table users.
    logger.info('%s row %s into table %s', query_type, rowid, table)
changes()

返回当前打开的事务中修改的行数。

autocommit

返回一个布尔值,指示是否启用自动提交的属性。默认情况下,此值将为 True,除非在事务中(或 atomic() 块中)。

示例

>>> db = CSqliteExtDatabase(':memory:')
>>> db.autocommit
True
>>> with db.atomic():
...     print(db.autocommit)
...
False
>>> db.autocommit
True
backup(destination[, pages=None, name=None, progress=None])
参数
  • destination (SqliteDatabase) – 用作备份目标的数据库对象。

  • pages (int) – 每次迭代的页面数。-1 的默认值表示所有页面应一步备份。

  • name (str) – 源数据库的名称(如果您使用 ATTACH DATABASE 加载多个数据库,则可能不同)。默认为“main”。

  • progress – 进度回调,使用三个参数调用:剩余页面数、总页面数以及备份是否完成。

示例

master = CSqliteExtDatabase('master.db')
replica = CSqliteExtDatabase('replica.db')

# Backup the contents of master to replica.
master.backup(replica)
backup_to_file(filename[, pages, name, progress])
参数
  • filename – 存储数据库备份的文件名。

  • pages (int) – 每次迭代的页面数。-1 的默认值表示所有页面应一步备份。

  • name (str) – 源数据库的名称(如果您使用 ATTACH DATABASE 加载多个数据库,则可能不同)。默认为“main”。

  • progress – 进度回调,使用三个参数调用:剩余页面数、总页面数以及备份是否完成。

将当前数据库备份到文件。备份的数据不是数据库转储,而是实际的 SQLite 数据库文件。

示例

db = CSqliteExtDatabase('app.db')

def nightly_backup():
    filename = 'backup-%s.db' % (datetime.date.today())
    db.backup_to_file(filename)
blob_open(table, column, rowid[, read_only=False])
参数
  • table (str) – 包含数据的表名。

  • column (str) – 包含数据的列名。

  • rowid (int) – 要检索的行 ID。

  • read_only (bool) – 仅以只读方式打开 blob。

返回

Blob 实例,可高效访问底层二进制数据。

返回类型

Blob

有关详细信息,请参见 BlobZeroBlob

示例

class Image(Model):
    filename = TextField()
    data = BlobField()

buf_size = 1024 * 1024 * 8  # Allocate 8MB for storing file.
rowid = Image.insert({Image.filename: 'thefile.jpg',
                      Image.data: ZeroBlob(buf_size)}).execute()

# Open the blob, returning a file-like object.
blob = db.blob_open('image', 'data', rowid)

# Write some data to the blob.
blob.write(image_data)
img_size = blob.tell()

# Read the data back out of the blob.
blob.seek(0)
image_data = blob.read(img_size)
RowIDField

对应于 SQLite rowid 字段的主键字段。有关更多信息,请参阅 SQLite 文档中的 rowid 表

示例

class Note(Model):
    rowid = RowIDField()  # Will be primary key.
    content = TextField()
    timestamp = TimestampField()
DocIDField

用于虚拟表(专门使用 docid 作为主键的约定)的 RowIDField 子类。据我所知,这仅适用于使用 FTS3 和 FTS4 全文搜索扩展的表。

注意

在 FTS3 和 FTS4 中,“docid”只是“rowid”的别名。为了减少混淆,最好始终使用 RowIDField,而不要使用 DocIDField

class NoteIndex(FTSModel):
    docid = DocIDField()  # "docid" is used as an alias for "rowid".
    content = SearchField()

    class Meta:
        database = db
AutoIncrementField

默认情况下,SQLite 可能会在删除行后重复使用主键值。为了确保主键始终单调递增(无论是否删除),您应该使用 AutoIncrementField。此功能会带来一些小的性能开销。有关更多信息,请参阅 SQLite 文档中的 自动递增

JSONField(json_dumps=None, json_loads=None, ...)

适合存储 JSON 数据的字段类,具有专门设计用于配合 json1 扩展 的特殊方法。

SQLite 3.9.0 以扩展库的形式添加了 JSON 支持。SQLite json1 扩展提供了一些用于处理 JSON 数据的帮助器函数。这些 API 以特殊字段类型 JSONField 的方法公开。

要访问或修改 JSON 结构中的特定对象键或数组索引,您可以将 JSONField 视为字典/列表。

参数
  • json_dumps –(可选)用于将数据序列化为 JSON 字符串的函数。如果未提供,将使用 stdlib json.dumps

  • json_loads –(可选)用于将 JSON 反序列化为 Python 对象的函数。如果未提供,将使用 stdlib json.loads

注意

要自定义 JSON 序列化或反序列化,您可以指定自定义 json_dumpsjson_loads 可调用对象。这些函数应分别接受一个参数:要序列化的对象和 JSON 字符串。要修改 stdlib JSON 函数的参数,您可以使用 functools.partial

# Do not escape unicode code-points.
my_json_dumps = functools.partial(json.dumps, ensure_ascii=False)

class SomeModel(Model):
    # Specify our custom serialization function.
    json_data = JSONField(json_dumps=my_json_dumps)

我们来看一些使用 Peewee 中的 SQLite json1 扩展的示例。这里,我们将准备一个数据库和一个简单的模型来测试 json1 扩展

>>> from playhouse.sqlite_ext import *
>>> db = SqliteExtDatabase(':memory:')
>>> class KV(Model):
...     key = TextField()
...     value = JSONField()
...     class Meta:
...         database = db
...

>>> KV.create_table()

存储数据的方式可能符合您的预期。无需将字典或列表序列化为 JSON,因为 Peewee 会自动执行此操作

>>> KV.create(key='a', value={'k1': 'v1'})
<KV: 1>
>>> KV.get(KV.key == 'a').value
{'k1': 'v1'}

我们可以使用字典查找访问 JSON 数据的特定部分

>>> KV.get(KV.value['k1'] == 'v1').key
'a'

可以使用 update() 方法就地更新 JSON 值。请注意“k1=v1”已保留

>>> KV.update(value=KV.value.update({'k2': 'v2', 'k3': 'v3'})).execute()
1
>>> KV.get(KV.key == 'a').value
{'k1': 'v1', 'k2': 'v2', 'k3': 'v3'}

我们还可以原子更新现有数据,或通过将键值设置为 None 来移除键。在以下示例中,我们将更新“k1”的值并移除“k3”(“k2”将不会被修改)

>>> KV.update(value=KV.value.update({'k1': 'v1-x', 'k3': None})).execute()
1
>>> KV.get(KV.key == 'a').value
{'k1': 'v1-x', 'k2': 'v2'}

我们还可以使用 set() 方法设置 JSON 数据的各个部分

>>> KV.update(value=KV.value['k1'].set('v1')).execute()
1
>>> KV.get(KV.key == 'a').value
{'k1': 'v1', 'k2': 'v2'}

除了标量值之外,set() 方法还可以与对象一起使用

>>> KV.update(value=KV.value['k2'].set({'x2': 'y2'})).execute()
1
>>> KV.get(KV.key == 'a').value
{'k1': 'v1', 'k2': {'x2': 'y2'}}

还可以使用 remove() 以原子方式移除 JSON 数据的各个部分

>>> KV.update(value=KV.value['k2'].remove()).execute()
1
>>> KV.get(KV.key == 'a').value
{'k1': 'v1'}

我们还可以使用 json_type() 方法获取存储在 JSON 数据特定位置的值的类型

>>> KV.select(KV.value.json_type(), KV.value['k1'].json_type()).tuples()[:]
[('object', 'text')]

我们来添加一个嵌套值,然后使用 tree() 方法查看如何递归遍历其内容

>>> KV.create(key='b', value={'x1': {'y1': 'z1', 'y2': 'z2'}, 'x2': [1, 2]})
<KV: 2>
>>> tree = KV.value.tree().alias('tree')
>>> query = KV.select(KV.key, tree.c.fullkey, tree.c.value).from_(KV, tree)
>>> query.tuples()[:]
[('a', '$', {'k1': 'v1'}),
 ('a', '$.k1', 'v1'),
 ('b', '$', {'x1': {'y1': 'z1', 'y2': 'z2'}, 'x2': [1, 2]}),
 ('b', '$.x2', [1, 2]),
 ('b', '$.x2[0]', 1),
 ('b', '$.x2[1]', 2),
 ('b', '$.x1', {'y1': 'z1', 'y2': 'z2'}),
 ('b', '$.x1.y1', 'z1'),
 ('b', '$.x1.y2', 'z2')]

tree()children() 方法功能强大。有关如何使用它们的更多信息,请参阅 json1 扩展文档

另请注意,JSONField 查找可以链接

>>> query = KV.select().where(KV.value['x1']['y1'] == 'z1')
>>> for obj in query:
...     print(obj.key, obj.value)
...

'b', {'x1': {'y1': 'z1', 'y2': 'z2'}, 'x2': [1, 2]}

有关更多信息,请参阅 sqlite json1 文档

__getitem__(item)
参数

item – 访问 JSON 数据中的特定键或数组索引。

返回

一个特殊对象,公开对 JSON 数据的访问。

返回类型

JSONPath

访问 JSON 数据中的特定键或数组索引。返回一个 JSONPath 对象,它公开方便的方法,用于读取或修改 JSON 对象的特定部分。

示例

# If metadata contains {"tags": ["list", "of", "tags"]}, we can
# extract the first tag in this way:
Post.select(Post, Post.metadata['tags'][0].alias('first_tag'))

更多示例,请参见 JSONPath API 文档。

extract(*paths)
参数

paths – 要提取的一个或多个 JSON 路径。

提取指定 JSON 路径中的值。如果提供了多个路径,则 Sqlite 将以 list 的形式返回这些值。

extract_json(path)
参数

path (str) – JSON 路径

将指定路径中的值作为 JSON 数据类型提取。这对应于 Sqlite 3.38 中添加的 -> 运算符。

extract_text(path)
参数

path (str) – JSON 路径

将指定路径中的值作为 SQL 数据类型提取。这对应于 Sqlite 3.38 中添加的 ->> 运算符。

set(value[, as_json=None])
参数
  • value – 标量值、列表或字典。

  • as_json (bool) – 强制将值视为 JSON,在这种情况下,它将在 Python 中预先序列化为 JSON。默认情况下,列表和字典被视为要序列化的 JSON,而字符串和整数按原样传递。

设置存储在 JSONField 中的值。

使用 json1 扩展中的 json_set() 函数。

replace(value[, as_json=None])
参数
  • value – 标量值、列表或字典。

  • as_json (bool) – 强制将值视为 JSON,在这种情况下,它将在 Python 中预先序列化为 JSON。默认情况下,列表和字典被视为要序列化的 JSON,而字符串和整数按原样传递。

替换存储在 JSONField 中的现有值。

使用 json1 扩展中的 json_replace() 函数。

insert(value[, as_json=None])
参数
  • value – 标量值、列表或字典。

  • as_json (bool) – 强制将值视为 JSON,在这种情况下,它将在 Python 中预先序列化为 JSON。默认情况下,列表和字典被视为要序列化的 JSON,而字符串和整数按原样传递。

将值插入 JSONField

使用 json1 扩展中的 json_insert() 函数。

append(value[, as_json=None])
参数
  • value – 标量值、列表或字典。

  • as_json (bool) – 强制将值视为 JSON,在这种情况下,它将在 Python 中预先序列化为 JSON。默认情况下,列表和字典被视为要序列化的 JSON,而字符串和整数按原样传递。

追加到存储在 JSONField 中的数组。

使用 json1 扩展中的 json_set() 函数。

update(data)
参数

data – 标量值、列表或字典,与当前存储在 JSONField 中的数据合并。要移除特定键,请将该键设置为更新数据中的 None

使用 RFC-7396 MergePatch 算法将新数据合并到 JSON 值中,以针对列数据应用修补程序(data 参数)。MergePatch 可以添加、修改或删除 JSON 对象的元素,这意味着 update()set()remove() 的通用替换。MergePatch 将 JSON 数组对象视为原子对象,因此 update() 无法追加到数组,也无法修改数组的各个元素。

有关详细信息以及示例,请参阅 SQLite json_patch() 函数文档。

remove()

移除存储在 JSONField 中的数据。

使用 json1 扩展中的 json_remove 函数。

json_type()

返回一个字符串,用于标识存储在列中的值类型。

返回的类型将是下列之一

  • 对象

  • 数组

  • 整数

  • 实数

  • 文本

  • null <– 字符串“null”表示一个实际的 NULL 值

  • NULL <– 一个实际的 NULL 值表示未找到路径

使用 json1 扩展中的 json_type 函数。

length()

返回存储在列中的数组的长度。

使用 json1 扩展中的 json_array_length 函数。

children()

children 函数对应于 json_each,这是一个表值函数,用于遍历提供的 JSON 值并返回顶级数组或对象的直接子项。如果指定了路径,则该路径将被视为最顶层元素。

children() 进行调用所返回的行具有以下属性

  • key:当前元素相对于其父元素的键。

  • value:当前元素的值。

  • type:其中一种数据类型(请参阅 json_type())。

  • atom:基本类型的标量值,数组和对象的 NULL

  • id:一个唯一 ID,用于引用树中的当前节点。

  • parent:包含节点的 ID。

  • fullkey:描述当前元素的完整路径。

  • path:当前行的容器的路径。

此方法在内部使用 json1 扩展中的 json_each(文档链接)函数。

示例用法(与 tree() 方法进行比较)

class KeyData(Model):
    key = TextField()
    data = JSONField()

KeyData.create(key='a', data={'k1': 'v1', 'x1': {'y1': 'z1'}})
KeyData.create(key='b', data={'x1': {'y1': 'z1', 'y2': 'z2'}})

# We will query the KeyData model for the key and all the
# top-level keys and values in it's data field.
kd = KeyData.data.children().alias('children')
query = (KeyData
         .select(kd.c.key, kd.c.value, kd.c.fullkey)
         .from_(KeyData, kd)
         .order_by(kd.c.key)
         .tuples())
print(query[:])

# PRINTS:
[('a', 'k1', 'v1',                    '$.k1'),
 ('a', 'x1', '{"y1":"z1"}',           '$.x1'),
 ('b', 'x1', '{"y1":"z1","y2":"z2"}', '$.x1')]
tree()

tree 函数对应于 json_tree,这是一个表值函数,用于递归遍历提供的 JSON 值并返回每个级别的键的信息。如果指定了路径,则该路径将被视为最顶层元素。

tree() 进行调用所返回的行具有与对 children() 进行调用所返回的行相同的属性

  • key:当前元素相对于其父元素的键。

  • value:当前元素的值。

  • type:其中一种数据类型(请参阅 json_type())。

  • atom:基本类型的标量值,数组和对象的 NULL

  • id:一个唯一 ID,用于引用树中的当前节点。

  • parent:包含节点的 ID。

  • fullkey:描述当前元素的完整路径。

  • path:当前行的容器的路径。

此方法在内部使用 json1 扩展中的 json_tree(文档链接)函数。

示例用法

class KeyData(Model):
    key = TextField()
    data = JSONField()

KeyData.create(key='a', data={'k1': 'v1', 'x1': {'y1': 'z1'}})
KeyData.create(key='b', data={'x1': {'y1': 'z1', 'y2': 'z2'}})

# We will query the KeyData model for the key and all the
# keys and values in it's data field, recursively.
kd = KeyData.data.tree().alias('tree')
query = (KeyData
         .select(kd.c.key, kd.c.value, kd.c.fullkey)
         .from_(KeyData, kd)
         .order_by(kd.c.key)
         .tuples())
print(query[:])

# PRINTS:
[('a',  None,  '{"k1":"v1","x1":{"y1":"z1"}}', '$'),
 ('b',  None,  '{"x1":{"y1":"z1","y2":"z2"}}', '$'),
 ('a',  'k1',  'v1',                           '$.k1'),
 ('a',  'x1',  '{"y1":"z1"}',                  '$.x1'),
 ('b',  'x1',  '{"y1":"z1","y2":"z2"}',        '$.x1'),
 ('a',  'y1',  'z1',                           '$.x1.y1'),
 ('b',  'y1',  'z1',                           '$.x1.y1'),
 ('b',  'y2',  'z2',                           '$.x1.y2')]
class JSONPath(field[, path=None])
参数
  • field (JSONField) – 我们打算访问的字段对象。

  • path (tuple) – 组成 JSON 路径的组件。

一种方便的、Pythonic 的方法,用于表示 JSON 路径,以便与 JSONField 一起使用。

JSONPath 对象实现 __getitem__,积累路径组件,它可以将其转换为相应的 json 路径表达式。

__getitem__(item)
参数

item – 访问子键或数组索引。

返回

表示新路径的 JSONPath

访问 JSON 数据中的子键或数组索引。返回 JSONPath 对象,该对象公开方便的方法,用于读取或修改 JSON 对象的特定部分。

示例

# If metadata contains {"tags": ["list", "of", "tags"]}, we can
# extract the first tag in this way:
first_tag = Post.metadata['tags'][0]
query = (Post
         .select(Post, first_tag.alias('first_tag'))
         .order_by(first_tag))
set(value[, as_json=None])
参数
  • value – 标量值、列表或字典。

  • as_json (bool) – 强制将值视为 JSON,在这种情况下,它将在 Python 中预先序列化为 JSON。默认情况下,列表和字典被视为要序列化的 JSON,而字符串和整数按原样传递。

设置 JSON 数据中给定位置的值。

使用 json1 扩展中的 json_set() 函数。

replace(value[, as_json=None])
参数
  • value – 标量值、列表或字典。

  • as_json (bool) – 强制将值视为 JSON,在这种情况下,它将在 Python 中预先序列化为 JSON。默认情况下,列表和字典被视为要序列化的 JSON,而字符串和整数按原样传递。

替换 JSON 数据中给定位置的现有值。

使用 json1 扩展中的 json_replace() 函数。

insert(value[, as_json=None])
参数
  • value – 标量值、列表或字典。

  • as_json (bool) – 强制将值视为 JSON,在这种情况下,它将在 Python 中预先序列化为 JSON。默认情况下,列表和字典被视为要序列化的 JSON,而字符串和整数按原样传递。

在 JSON 数据中给定位置插入一个新值。

使用 json1 扩展中的 json_insert() 函数。

append(value[, as_json=None])
参数
  • value – 标量值、列表或字典。

  • as_json (bool) – 强制将值视为 JSON,在这种情况下,它将在 Python 中预先序列化为 JSON。默认情况下,列表和字典被视为要序列化的 JSON,而字符串和整数按原样传递。

附加到 JSON 数据中给定位置存储的数组。

使用 json1 扩展中的 json_set() 函数。

更新(数据)
参数

数据 – 标量值、列表或字典,用于与 JSON 数据中给定位置的数据合并。若要删除特定键,请在更新的数据中将该键设置为

使用 RFC-7396 MergePatch 算法将新数据合并到 JSON 值中,以针对列数据应用修补程序(数据 参数)。MergePatch 可以添加、修改或删除 JSON 对象的元素,这意味着 update()set()remove() 的通用替换。MergePatch 将 JSON 数组对象视为原子,因此 update() 无法追加到数组,也无法修改数组的各个元素。

有关详细信息以及示例,请参阅 SQLite json_patch() 函数文档。

删除()

删除存储在 JSON 数据中给定位置的数据。

使用 json1 扩展中的 json_type 函数。

json_type()

返回一个字符串,标识存储在 JSON 数据中给定位置的值的类型。

返回的类型将是下列之一

  • 对象

  • 数组

  • 整数

  • 实数

  • 文本

  • null <– 字符串“null”表示一个实际的 NULL 值

  • NULL <– 一个实际的 NULL 值表示未找到路径

使用 json1 扩展中的 json_type 函数。

长度()

返回存储在 JSON 数据中给定位置的数组的长度。

使用 json1 扩展中的 json_array_length 函数。

子级()

表值函数,用于公开给定位置的 JSON 对象的直接后代。另请参阅 JSONField.children()

()

表值函数,用于递归公开给定位置的 JSON 对象的所有后代。另请参阅 JSONField.tree()

JSONBField(json_dumps=无, json_loads=无, ...)

适用于存储在 jsonb 格式的磁盘数据(从 Sqlite 3.45.0 开始可用)的字段类。应谨慎使用此字段类,因为数据可能会以其编码格式返回,具体取决于查询方式。例如

>>> KV.create(key='a', value={'k1': 'v1'})
<KV: 1>
>>> KV.get(KV.key == 'a').value
b"l'k1'v1"

要获取 JSON 值,需要使用 fn.json() 或辅助 JSONBField.json() 方法

>>> kv = KV.select(KV.value.json()).get()
>>> kv.value
{'k1': 'v1'}
class JSONBPath(field[, path=None])

JSONPath 的子类,用于处理 jsonb 数据。

class SearchField([unindexed=False[, column_name=None]])

字段类,用于表示全文搜索虚拟表的模型上的列。全文搜索扩展禁止对列指定任何类型或约束。此行为由 SearchField 强制执行,如果尝试任何与全文搜索扩展不兼容的配置,它会引发异常。

文档搜索索引的示例模型(时间戳存储在表中,但其数据不可搜索)

class DocumentIndex(FTSModel):
    title = SearchField()
    content = SearchField()
    tags = SearchField()
    timestamp = SearchField(unindexed=True)
match(term)
参数

term (str) – 全文搜索查询/术语

返回

MATCH 运算符对应的 Expression

Sqlite 的全文搜索支持搜索整个表(包括所有已编制索引的列)搜索各个列。 match() 方法可用于将搜索限制到单个列

class SearchIndex(FTSModel):
    title = SearchField()
    body = SearchField()

# Search *only* the title field and return results ordered by
# relevance, using bm25.
query = (SearchIndex
         .select(SearchIndex, SearchIndex.bm25().alias('score'))
         .where(SearchIndex.title.match('python'))
         .order_by(SearchIndex.bm25()))

要搜索所有已编制索引的列,请使用 FTSModel.match() 方法

# Searches *both* the title and body and return results ordered by
# relevance, using bm25.
query = (SearchIndex
         .select(SearchIndex, SearchIndex.bm25().alias('score'))
         .where(SearchIndex.match('python'))
         .order_by(SearchIndex.bm25()))
highlight(left, right)
参数
  • left (str) – 突出显示的开始标签,例如 '<b>'

  • right (str) – 突出显示的结束标签,例如 '</b>'

使用 MATCH 运算符执行搜索时,FTS5 可以返回给定列中的文本突出显示匹配项。

# Search for items matching string 'python' and return the title
# highlighted with square brackets.
query = (SearchIndex
         .search('python')
         .select(SearchIndex.title.highlight('[', ']').alias('hi')))

for result in query:
    print(result.hi)

# For example, might print:
# Learn [python] the hard way
snippet(left, right, over_length='...', max_tokens=16)
参数
  • left (str) – 突出显示的开始标签,例如 '<b>'

  • right (str) – 突出显示的结束标签,例如 '</b>'

  • over_length (str) – 当片段超出最大标记数时要添加或附加的文本。

  • max_tokens (int) – 返回的最大标记数,必须为 1 - 64

使用 MATCH 运算符执行搜索时,FTS5 可以返回文本,其中包含给定列中突出显示匹配项的片段。

# Search for items matching string 'python' and return the title
# highlighted with square brackets.
query = (SearchIndex
         .search('python')
         .select(SearchIndex.title.snippet('[', ']').alias('snip')))

for result in query:
    print(result.snip)
class VirtualModel

旨在用于表示虚拟表的模型类。默认元数据设置略有不同,以匹配虚拟表经常使用的设置。

元数据选项

  • arguments - 传递给虚拟表构造函数的参数。

  • extension_module - 用于虚拟表的扩展的名称。

  • options - 要在虚拟表中应用的设置的字典

    构造函数。

  • primary_key - 默认为 False,表示没有主键。

所有这些都以以下方式组合

CREATE VIRTUAL TABLE <table_name>
USING <extension_module>
([prefix_arguments, ...] fields, ... [arguments, ...], [options...])
class FTSModel

VirtualModel 的子类,用于与 FTS3 和 FTS4 全文搜索扩展一起使用。

FTSModel 子类应按正常方式定义,但有一些注意事项

  • 不支持唯一约束、非空约束、检查约束和外键。

  • 完全忽略字段索引和多列索引

  • Sqlite 会将所有列类型都视为 TEXT(尽管你可以存储其他数据类型,但 Sqlite 会将它们视为文本)。

  • FTS 模型包含一个 rowid 字段,该字段由 SQLite 自动创建和管理(除非您选择在模型创建期间显式设置它)。在此列上的查找快速且高效

鉴于这些约束,强烈建议在 FTSModel 子类上声明的所有字段都是 SearchField 的实例(但显式声明 RowIDField 时除外)。使用 SearchField 将有助于防止您意外创建无效的列约束。如果您希望在索引中存储元数据,但不希望将其包含在全文索引中,那么在实例化 SearchField 时指定 unindexed=True

上述内容的唯一例外是 rowid 主键,可以使用 RowIDField 声明。对 rowid 的查找非常高效。如果您使用 FTS4,您还可以使用 DocIDField,它是 rowid 的别名(尽管这样做没有任何好处)。

由于缺少辅助索引,因此通常使用 rowid 主键作为常规表中一行的指针是有意义的。例如

class Document(Model):
    # Canonical source of data, stored in a regular table.
    author = ForeignKeyField(User, backref='documents')
    title = TextField(null=False, unique=True)
    content = TextField(null=False)
    timestamp = DateTimeField()

    class Meta:
        database = db

class DocumentIndex(FTSModel):
    # Full-text search index.
    rowid = RowIDField()
    title = SearchField()
    content = SearchField()

    class Meta:
        database = db
        # Use the porter stemming algorithm to tokenize content.
        options = {'tokenize': 'porter'}

要将文档存储在文档索引中,我们将 INSERT 一行到 DocumentIndex 表中,手动设置 rowid 以使其与相应 Document 的主键匹配

def store_document(document):
    DocumentIndex.insert({
        DocumentIndex.rowid: document.id,
        DocumentIndex.title: document.title,
        DocumentIndex.content: document.content}).execute()

要执行搜索并返回排名结果,我们可以查询 Document 表并加入 DocumentIndex。此联接将非常高效,因为对 FTSModel 的 rowid 字段的查找很快

def search(phrase):
    # Query the search index and join the corresponding Document
    # object on each search result.
    return (Document
            .select()
            .join(
                DocumentIndex,
                on=(Document.id == DocumentIndex.rowid))
            .where(DocumentIndex.match(phrase))
            .order_by(DocumentIndex.bm25()))

警告

FTSModel 类的所有 SQL 查询将是全表扫描,除了全文搜索和 rowid 查找。

如果要索引的内容的主要来源存在于单独的表中,您可以通过指示 SQLite 不存储搜索索引内容的附加副本来节省一些磁盘空间。SQLite 仍将创建执行内容搜索所需元数据和数据结构,但内容本身不会存储在搜索索引中。

要实现此目的,您可以使用 content 选项指定表或列。FTS4 文档 有更多信息。

以下是一个简短的示例,说明如何使用 peewee 实现此功能

class Blog(Model):
    title = TextField()
    pub_date = DateTimeField(default=datetime.datetime.now)
    content = TextField()  # We want to search this.

    class Meta:
        database = db

class BlogIndex(FTSModel):
    content = SearchField()

    class Meta:
        database = db
        options = {'content': Blog.content}  # <-- specify data source.

db.create_tables([Blog, BlogIndex])

# Now, we can manage content in the BlogIndex. To populate the
# search index:
BlogIndex.rebuild()

# Optimize the index.
BlogIndex.optimize()

content 选项接受单个 FieldModel,并且可以减少数据库文件使用的存储量。但是,需要将内容手动移至关联的 FTSModel 或从中移出。

classmethod match(term)
参数

term – 搜索词或表达式。

生成一个 SQL 表达式,表示在表中搜索给定词或表达式的操作。SQLite 使用 MATCH 运算符表示全文搜索。

示例

# Search index for "search phrase" and return results ranked
# by relevancy using the BM25 algorithm.
query = (DocumentIndex
         .select()
         .where(DocumentIndex.match('search phrase'))
         .order_by(DocumentIndex.bm25()))
for result in query:
    print('Result: %s' % result.title)
classmethod search(term[, weights=None[, with_score=False[, score_alias='score'[, explicit_ordering=False]]]]])
参数
  • term (str) – 要使用的搜索词。

  • weights – 列的权重列表,按照列在表中的位置排序。或者,以字段或字段名称为键,并映射到一个值的字典。

  • with_score – 是否将分数作为 SELECT 语句的一部分返回。

  • score_alias (str) – 用于计算的排名分数的别名。如果 with_score=True,则这是用于访问分数的属性。

  • explicit_ordering (bool) – 使用完整的 SQL 函数计算排名,而不是简单地在 ORDER BY 子句中引用分数别名。

搜索词和按匹配质量对结果进行排序的简便方法。

注意

此方法使用简化的算法来确定结果的相关性排名。对于更复杂的结果排名,请使用 search_bm25() 方法。

# Simple search.
docs = DocumentIndex.search('search term')
for result in docs:
    print(result.title)

# More complete example.
docs = DocumentIndex.search(
    'search term',
    weights={'title': 2.0, 'content': 1.0},
    with_score=True,
    score_alias='search_score')
for result in docs:
    print(result.title, result.search_score)
类方法 search_bm25(term[, weights=None[, with_score=False[, score_alias='score'[, explicit_ordering=False]]]])
参数
  • term (str) – 要使用的搜索词。

  • weights – 列的权重列表,按照列在表中的位置排序。或者,以字段或字段名称为键,并映射到一个值的字典。

  • with_score – 是否将分数作为 SELECT 语句的一部分返回。

  • score_alias (str) – 用于计算的排名分数的别名。如果 with_score=True,则这是用于访问分数的属性。

  • explicit_ordering (bool) – 使用完整的 SQL 函数计算排名,而不是简单地在 ORDER BY 子句中引用分数别名。

使用 BM25 算法搜索术语并按匹配质量对结果进行排序的简便方法。

注意

BM25 排名算法仅适用于 FTS4。如果您使用的是 FTS3,请改用 search() 方法。

类方法 search_bm25f(term[, weights=None[, with_score=False[, score_alias='score'[, explicit_ordering=False]]]])

FTSModel.search_bm25() 相同,但使用 BM25 排名算法的 BM25f 变体。

类方法 search_lucene(term[, weights=None[, with_score=False[, score_alias='score'[, explicit_ordering=False]]]])

FTSModel.search_bm25() 相同,但使用 Lucene 搜索引擎的结果排名算法。

类方法 rank([col1_weight, col2_weight...coln_weight])
参数

col_weight (float) – (可选) 赋予模型的第 ith 列的权重。默认情况下,所有列的权重均为 1.0

生成一个表达式,用于计算和返回搜索匹配的质量。此 rank 可用于对搜索结果进行排序。更高的排名分数表示更好的匹配。

rank 函数接受可选参数,允许您为各个列指定权重。如果未指定权重,则所有列都被视为同等重要。

注意

rank() 使用的算法简单且相对较快。对于更复杂的搜索结果排名,请使用

query = (DocumentIndex
         .select(
             DocumentIndex,
             DocumentIndex.rank().alias('score'))
         .where(DocumentIndex.match('search phrase'))
         .order_by(DocumentIndex.rank()))

for search_result in query:
    print(search_result.title, search_result.score)
classmethod bm25([col1_weight, col2_weight...coln_weight])
参数

col_weight (float) – (可选) 赋予模型的第 ith 列的权重。默认情况下,所有列的权重均为 1.0

生成一个表达式,用于使用 BM25 算法 计算和返回搜索匹配的质量。此值可用于对搜索结果进行排序,更高的分数对应于更好的匹配。

rank() 类似, bm25 函数接受可选参数,允许您为各个列指定权重。如果未指定权重,则所有列都被视为同等重要。

注意

BM25 结果排名算法需要 FTS4。如果您使用的是 FTS3,请改用 rank()

query = (DocumentIndex
         .select(
             DocumentIndex,
             DocumentIndex.bm25().alias('score'))
         .where(DocumentIndex.match('search phrase'))
         .order_by(DocumentIndex.bm25()))

for search_result in query:
    print(search_result.title, search_result.score)

注意

上面的代码示例等效于调用 search_bm25() 方法

query = DocumentIndex.search_bm25('search phrase', with_score=True)
for search_result in query:
    print(search_result.title, search_result.score)
classmethod bm25f([col1_weight, col2_weight...coln_weight])

bm25() 相同,只是它使用 BM25 排名算法的 BM25f 变体。

classmethod lucene([col1_weight, col2_weight...coln_weight])

bm25() 相同,只是它使用 Lucene 搜索结果排名算法。

类方法 重建()

重建搜索索引 - 仅当在创建表期间指定了 content 选项时,此方法才有效。

类方法 优化()

优化搜索索引。

FTS5Model

VirtualModel 的子类,用于 FTS5 全文搜索扩展。

FTS5Model 子类应按正常方式定义,但有几个注意事项

  • FTS5 明确禁止指定任何约束、数据类型或列上的索引。因此,所有列必须SearchField 的实例。

  • FTS5 模型包含一个 rowid 字段,该字段由 SQLite 自动创建和管理(除非您选择在创建模型期间显式设置它)。对该列的查找快速且高效

  • 不支持字段索引和多列索引。

FTS5 扩展附带了 BM25 排名函数的内置实现。因此,searchsearch_bm25 方法已被覆盖,以使用内置排名函数,而不是用户定义的函数。

类方法 fts5_installed()

返回一个布尔值,指示是否已安装 FTS5 扩展。如果未安装,将尝试加载扩展。

类方法 search(term[, weights=None[, with_score=False[, score_alias='score']]])
参数
  • term (str) – 要使用的搜索词。

  • weights – 列的权重列表,按照列在表中的位置排序。或者,以字段或字段名称为键,并映射到一个值的字典。

  • with_score – 是否将分数作为 SELECT 语句的一部分返回。

  • score_alias (str) – 用于计算的排名分数的别名。如果 with_score=True,则这是用于访问分数的属性。

  • explicit_ordering (bool) – 使用完整的 SQL 函数计算排名,而不是简单地在 ORDER BY 子句中引用分数别名。

搜索术语并按匹配质量对结果进行排序的简便方法。FTS5 扩展提供了 BM25 算法的内置实现,该算法用于按相关性对结果进行排名。

较高的分数对应于更好的匹配。

# Simple search.
docs = DocumentIndex.search('search term')
for result in docs:
    print(result.title)

# More complete example.
docs = DocumentIndex.search(
    'search term',
    weights={'title': 2.0, 'content': 1.0},
    with_score=True,
    score_alias='search_score')
for result in docs:
    print(result.title, result.search_score)
类方法 search_bm25(term[, weights=None[, with_score=False[, score_alias='score']]])

使用 FTS5,search_bm25()search() 方法相同。

classmethod rank([col1_weight, col2_weight...coln_weight])
参数

col_weight (float) – (可选) 赋予模型的第 ith 列的权重。默认情况下,所有列的权重均为 1.0

生成一个表达式,用于使用 BM25 算法 计算和返回搜索匹配的质量。此值可用于对搜索结果进行排序,更高的分数对应于更好的匹配。

rank() 函数接受可选参数,允许您为各个列指定权重。如果未指定权重,则所有列都被视为同等重要。

query = (DocumentIndex
         .select(
             DocumentIndex,
             DocumentIndex.rank().alias('score'))
         .where(DocumentIndex.match('search phrase'))
         .order_by(DocumentIndex.rank()))

for search_result in query:
    print(search_result.title, search_result.score)

注意

上面的代码示例等效于调用 search() 方法

query = DocumentIndex.search('search phrase', with_score=True)
for search_result in query:
    print(search_result.title, search_result.score)
classmethod bm25([col1_weight, col2_weight...coln_weight])

由于 FTS5 提供对 BM25 的内置支持,因此 bm25() 方法与 rank() 方法相同。

classmethod VocabModel([table_type='row'|'col'|'instance'[, table_name=None]])
参数
  • table_type (str) – “row”、“col”或“instance”。

  • table_name – 词汇表名称。如果未指定,将为“fts5tablename_v”。

生成一个模型类,适用于访问与 FTS5 搜索索引对应的 词汇表

class TableFunction

实现用户定义的表值函数。与返回单个标量值的简单 标量或聚合 函数不同,表值函数可以返回任意数量的表格数据行。

简单示例

from playhouse.sqlite_ext import TableFunction


class Series(TableFunction):
    # Name of columns in each row of generated data.
    columns = ['value']

    # Name of parameters the function may be called with.
    params = ['start', 'stop', 'step']

    def initialize(self, start=0, stop=None, step=1):
        """
        Table-functions declare an initialize() method, which is
        called with whatever arguments the user has called the
        function with.
        """
        self.start = self.current = start
        self.stop = stop or float('Inf')
        self.step = step

    def iterate(self, idx):
        """
        Iterate is called repeatedly by the SQLite database engine
        until the required number of rows has been read **or** the
        function raises a `StopIteration` signalling no more rows
        are available.
        """
        if self.current > self.stop:
            raise StopIteration

        ret, self.current = self.current, self.current + self.step
        return (ret,)

# Register the table-function with our database, which ensures it
# is declared whenever a connection is opened.
db.table_function('series')(Series)

# Usage:
cursor = db.execute_sql('SELECT * FROM series(?, ?, ?)', (0, 5, 2))
for value, in cursor:
    print(value)

注意

在使用 TableFunction 之前,必须使用数据库连接对其进行注册。为了确保表函数始终可用,可以使用 SqliteDatabase.table_function() 装饰器将函数注册到数据库中。

TableFunction 实现必须提供两个属性并实现两个方法,如下所述。

columns

包含函数返回的数据的列名称的列表。例如,用于按分隔符拆分字符串的函数可能指定 3 列:[substring, start_idx, end_idx]

params

函数可以调用的参数的名称。应列出所有参数,包括可选参数。例如,用于按分隔符拆分字符串的函数可能指定 2 个参数:[string, delimiter]

name

可选 - 指定表函数的名称。如果未提供,名称将取自类名。

print_tracebacks = True

打印表函数回调方法中发生的任何错误的完整回溯。当设置为 False 时,将仅显示通用 OperationalError。

initialize(**parameter_values)
参数

parameter_values – 函数调用的参数。

返回

无返回值。

initialize 方法用于使用用户在调用函数时指定的参数初始化表函数。

iterate(idx)
参数

idx (int) – 当前迭代步骤

返回

columns 属性中命名的列相对应的行数据的元组。

引发

StopIteration – 表示没有更多行可用。

此函数被重复调用并返回连续的数据行。该函数可能在所有行都被使用之前终止(尤其是如果用户在结果中指定了 LIMIT)。或者,该函数可以通过引发 StopIteration 异常来表示没有更多数据可用。

classmethod register(conn)
参数

connsqlite3.Connection 对象。

使用 DB-API 2.0 sqlite3.Connection 对象注册表函数。表值函数必须在查询中使用之前进行注册。

示例

class MyTableFunction(TableFunction):
    name = 'my_func'
    # ... other attributes and methods ...

db = SqliteDatabase(':memory:')
db.connect()

MyTableFunction.register(db.connection())

为确保每次打开连接时都注册 TableFunction,请使用 table_function() 装饰器。

ClosureTable(model_class[, foreign_key=None[, referencing_class=None[, referencing_key=None]]])
参数
  • model_class – 包含树中节点的模型类。

  • foreign_key – 模型类上的自引用父节点字段。如果没有提供,peewee 将内省模型以查找合适的键。

  • referencing_class – 多对多关系的中间表。

  • referencing_key – 对于多对多关系,关系的原始方。

返回

返回 VirtualModel 以使用闭包表。

用于创建适合使用 传递闭包 表的模型类的工厂函数。闭包表是 VirtualModel 子类,可与传递闭包 SQLite 扩展一起使用。这些特殊表旨在使有效查询层次数据变得容易。SQLite 扩展在后台管理 AVL 树,在表更改时透明地更新树,并使对层次数据执行常见查询变得容易。

要在项目中使用闭包表扩展,你需要

  1. SQLite 扩展的副本。源代码可以在 SQLite 代码存储库 中找到,或通过克隆 此 gist 找到

    $ git clone https://gist.github.com/coleifer/7f3593c5c2a645913b92 closure
    $ cd closure/
    
  2. 将扩展编译为共享库,例如:

    $ gcc -g -fPIC -shared closure.c -o closure.so
    
  3. 为你的分层数据创建一个模型。这里唯一的需求是模型具有一个整数主键和一个自引用外键。任何附加字段都可以。

    class Category(Model):
        name = CharField()
        metadata = TextField()
        parent = ForeignKeyField('self', index=True, null=True)  # Required.
    
    # Generate a model for the closure virtual table.
    CategoryClosure = ClosureTable(Category)
    

    自引用性也可以通过中间表(对于多对多关系)实现。

    class User(Model):
        name = CharField()
    
    class UserRelations(Model):
        user = ForeignKeyField(User)
        knows = ForeignKeyField(User, backref='_known_by')
    
        class Meta:
            primary_key = CompositeKey('user', 'knows') # Alternatively, a unique index on both columns.
    
    # Generate a model for the closure virtual table, specifying the UserRelations as the referencing table
    UserClosure = ClosureTable(
        User,
        referencing_class=UserRelations,
        foreign_key=UserRelations.knows,
        referencing_key=UserRelations.user)
    
  4. 在你的应用程序代码中,确保当你实例化 Database 对象时加载扩展。这是通过将共享库的路径传递给 load_extension() 方法来完成的。

    db = SqliteExtDatabase('my_database.db')
    db.load_extension('/path/to/closure')
    

警告

在使用 transitive_closure 扩展时,你应该注意两个注意事项。首先,它要求你的源模型具有一个整数主键。其次,强烈建议你在自引用外键上创建一个索引。

示例

class Category(Model):
    name = CharField()
    metadata = TextField()
    parent = ForeignKeyField('self', index=True, null=True)  # Required.

# Generate a model for the closure virtual table.
CategoryClosure = ClosureTable(Category)

 # Create the tables if they do not exist.
 db.create_tables([Category, CategoryClosure], True)

现在可以使用闭包表中的数据执行有趣的查询

# Get all ancestors for a particular node.
laptops = Category.get(Category.name == 'Laptops')
for parent in Closure.ancestors(laptops):
    print(parent.name)

# Computer Hardware
# Computers
# Electronics
# All products

# Get all descendants for a particular node.
hardware = Category.get(Category.name == 'Computer Hardware')
for node in Closure.descendants(hardware):
    print(node.name)

# Laptops
# Desktops
# Hard-drives
# Monitors
# LCD Monitors
# LED Monitors

ClosureTable() 返回的 VirtualModel 的 API。

class BaseClosureTable
id

给定节点的主键字段。

depth

表示给定节点的相对深度的字段。

root

表示相对根节点的字段。

descendants(node[, depth=None[, include_node=False]])

检索给定节点的所有后代。如果指定了深度,则只会返回该深度(相对于给定节点)的节点。

node = Category.get(Category.name == 'Electronics')

# Direct child categories.
children = CategoryClosure.descendants(node, depth=1)

# Grand-child categories.
children = CategoryClosure.descendants(node, depth=2)

# Descendants at all depths.
all_descendants = CategoryClosure.descendants(node)
ancestors(node[, depth=None[, include_node=False]])

检索给定节点的所有祖先。如果指定了深度,则只会返回该深度(相对于给定节点)的节点。

node = Category.get(Category.name == 'Laptops')

# All ancestors.
all_ancestors = CategoryClosure.ancestors(node)

# Grand-parent category.
grandparent = CategoryClosure.ancestores(node, depth=2)
兄弟节点(节点[, include_node=False])

检索指定节点的父节点的所有子节点。

注意

有关 SQLite 传递闭包扩展的深入讨论,请查看这篇博文,使用 Python 和传递闭包扩展查询 SQLite 中的树结构

LSM 表

VirtualModel 子类适用于使用 lsm1 扩展 lsm1 扩展是一个虚拟表,提供 SQL 接口,用于 SQLite4 中的 lsm 键/值存储引擎

注意

LSM1 扩展尚未发布(撰写本文时为 SQLite 版本 3.22),因此请将此功能视为实验性功能,可能会在后续版本中发生更改。

LSM 表定义一个主键列和任意数量的附加值列(这些列在存储引擎中序列化并存储在单个值字段中)。主键必须全部为同一种类型,并使用以下字段类型之一

由于 LSM 存储引擎是键/值存储,因此主键(包括整数)必须由应用程序指定。

注意

LSM 引擎不支持二级索引,因此唯一高效的查询将是对主键进行查找(或范围查询)。可以对其他字段进行查询和筛选,但可能导致对整个表进行扫描。

示例模型声明

db = SqliteExtDatabase('my_app.db')
db.load_extension('lsm.so')  # Load shared library.

class EventLog(LSMTable):
    timestamp = IntegerField(primary_key=True)
    action = TextField()
    sender = TextField()
    target = TextField()

    class Meta:
        database = db
        filename = 'eventlog.ldb'  # LSM data is stored in separate db.

# Declare virtual table.
EventLog.create_table()

示例查询

# Use dictionary operators to get, set and delete rows from the LSM
# table. Slices may be passed to represent a range of key values.
def get_timestamp():
    # Return time as integer expressing time in microseconds.
    return int(time.time() * 1000000)

# Create a new row, at current timestamp.
ts = get_timestamp()
EventLog[ts] = ('pageview', 'search', '/blog/some-post/')

# Retrieve row from event log.
log = EventLog[ts]
print(log.action, log.sender, log.target)
# Prints ("pageview", "search", "/blog/some-post/")

# Delete the row.
del EventLog[ts]

# We can also use the "create()" method.
EventLog.create(
    timestamp=get_timestamp(),
    action='signup',
    sender='newsletter',
    target='sqlite-news')

简单键/值模型声明

class KV(LSMTable):
    key = TextField(primary_key=True)
    value = TextField()

    class Meta:
        database = db
        filename = 'kv.ldb'

db.create_tables([KV])

对于由单个值字段组成的表,Peewee 在获取单个项目时将直接返回该值。您还可以请求行切片,在这种情况下,Peewee 返回相应的 Select 查询,可以对其进行迭代。以下是一些示例

>>> KV['k0'] = 'v0'
>>> print(KV['k0'])
'v0'

>>> data = [{'key': 'k%d' % i, 'value': 'v%d' % i} for i in range(20)]
>>> KV.insert_many(data).execute()

>>> KV.select().count()
20

>>> KV['k8']
'v8'

>>> list(KV['k4.1':'k7.x']
[Row(key='k5', value='v5'),
 Row(key='k6', value='v6'),
 Row(key='k7', value='v7')]

>>> list(KV['k6xxx':])
[Row(key='k7', value='v7'),
 Row(key='k8', value='v8'),
 Row(key='k9', value='v9')]

您还可以使用表达式对 LSMTable 进行索引

>>> list(KV[KV.key > 'k6'])
[Row(key='k7', value='v7'),
 Row(key='k8', value='v8'),
 Row(key='k9', value='v9')]

>>> list(KV[(KV.key > 'k6') & (KV.value != 'v8')])
[Row(key='k7', value='v7'),
 Row(key='k9', value='v9')]

您可以使用 del 删除单行,或使用切片或表达式删除多行

>>> del KV['k1']
>>> del KV['k3x':'k8']
>>> del KV[KV.key.between('k10', 'k18')]

>>> list(KV[:])
[Row(key='k0', value='v0'),
 Row(key='k19', value='v19'),
 Row(key='k2', value='v2'),
 Row(key='k3', value='v3'),
 Row(key='k9', value='v9')]

尝试获取单个不存在的键将导致 DoesNotExist,但切片不会引发异常

>>> KV['k1']
...
KV.DoesNotExist: <Model:KV> instance matching query does not exist: ...

>>> list(KV['k1':'k1'])
[]
ZeroBlob(长度)
参数

长度 (int) – 以字节为单位的 blob 大小。

ZeroBlob 仅用于保留存储支持增量 I/O 的 BLOB 的空间。要使用 SQLite BLOB 存储,首先需要将所需大小的 ZeroBlob 插入到要与增量 I/O 一起使用的行中。

例如,请参见 Blob

class Blob(database, table, column, rowid[, read_only=False])
参数
  • databaseSqliteExtDatabase 实例。

  • table (str) – 正在访问的表的名称。

  • column (str) – 正在访问的列的名称。

  • rowid (int) – 正在访问的行的主键。

  • read_only (bool) – 阻止对 blob 数据进行任何修改。

打开存储在给定表/列/行中的 blob 以进行增量 I/O。要为新数据分配存储空间,可以使用 ZeroBlob,它非常高效。

class RawData(Model):
    data = BlobField()

# Allocate 100MB of space for writing a large file incrementally:
query = RawData.insert({'data': ZeroBlob(1024 * 1024 * 100)})
rowid = query.execute()

# Now we can open the row for incremental I/O:
blob = Blob(db, 'rawdata', 'data', rowid)

# Read from the file and write to the blob in chunks of 4096 bytes.
while True:
    data = file_handle.read(4096)
    if not data:
        break
    blob.write(data)

bytes_written = blob.tell()
blob.close()
read([n=None])
参数

n (int) – 仅从文件中当前位置读取最多 n 个字节。

从 blob 文件中当前位置读取最多 n 个字节。如果未指定 n,则将读取整个 blob。

seek(offset[, whence=0])
参数
  • offset (int) – 查找文件中的给定偏移量。

  • whence (int) – 相对于指定的参照系查找。

whence 的值

  • 0:文件开头

  • 1:当前位置

  • 2:文件结尾

tell()

返回文件中的当前偏移量。

write(data)
参数

data (bytes) – 要写入的数据

从文件中当前位置开始写入给定数据。

close()

关闭文件并释放关联的资源。

reopen(rowid)
参数

rowid (int) – 要打开的行的主键。

如果已为给定的表/列打开 Blob,则可以使用 reopen() 方法来重新使用相同的 Blob 对象,以访问表中的多行。

其他功能

SqliteExtDatabase 接受初始化选项来注册对简单 布隆过滤器的 支持。布隆过滤器一旦初始化,就可以用于对大型数据集进行高效的成员资格查询。

以下是一个示例

db = CSqliteExtDatabase(':memory:', bloomfilter=True)

# Create and define a table to store some data.
db.execute_sql('CREATE TABLE "register" ("data" TEXT)')
Register = Table('register', ('data',)).bind(db)

# Populate the database with a bunch of text.
with db.atomic():
    for i in 'abcdefghijklmnopqrstuvwxyz':
        keys = [i * j for j in range(1, 10)]  # a, aa, aaa, ... aaaaaaaaa
        Register.insert([{'data': key} for key in keys]).execute()

# Collect data into a 16KB bloomfilter.
query = Register.select(fn.bloomfilter(Register.data, 16 * 1024).alias('buf'))
row = query.get()
buf = row['buf']

# Use bloomfilter buf to test whether other keys are members.
test_keys = (
    ('aaaa', True),
    ('abc', False),
    ('zzzzzzz', True),
    ('zyxwvut', False))
for key, is_present in test_keys:
    query = Register.select(fn.bloomfilter_contains(key, buf).alias('is_member'))
    answer = query.get()['is_member']
    assert answer == is_present

SqliteExtDatabase 还可以注册其他有用的函数

  • rank_functions(默认启用):注册用于对搜索结果进行排名的函数,例如 bm25lucene

  • hash_functions:注册 md5、sha1、sha256、adler32、crc32 和 murmurhash 函数。

  • regexp_function:注册一个 regexp 函数。

示例

def create_new_user(username, password):
    # DO NOT DO THIS IN REAL LIFE. PLEASE.
    query = User.insert({'username': username, 'password': fn.sha1(password)})
    new_user_id = query.execute()

可以使用 murmurhash 函数将字节哈希为整数,以进行紧凑存储

>>> db = SqliteExtDatabase(':memory:', hash_functions=True)
>>> db.execute_sql('SELECT murmurhash(?)', ('abcdefg',)).fetchone()
(4188131059,)