数据库

Peewee Database 对象表示与数据库的连接。 Database 类使用打开与数据库连接所需的所有信息进行实例化,然后可用于

  • 打开和关闭连接。

  • 执行查询。

  • 管理事务(和保存点)。

  • 内省表、列、索引和约束。

Peewee 提供对 SQLite、MySQL、MariaDB 和 Postgres 的支持。每个数据库类都提供一些基本的、特定于数据库的配置选项。

from peewee import *

# SQLite database using WAL journal mode and 64MB cache.
sqlite_db = SqliteDatabase('/path/to/app.db', pragmas={
    'journal_mode': 'wal',
    'cache_size': -1024 * 64})

# Connect to a MySQL database on network.
mysql_db = MySQLDatabase('my_app', user='app', password='db_password',
                         host='10.1.0.8', port=3306)

# Connect to a Postgres database.
pg_db = PostgresqlDatabase('my_app', user='postgres', password='secret',
                           host='10.1.0.9', port=5432)

Peewee 通过特定于数据库的扩展模块为 SQLite、Postgres 和 CockroachDB 提供高级支持。要使用扩展功能,请导入相应的特定于数据库的模块并使用提供的数据库类

from playhouse.sqlite_ext import SqliteExtDatabase

# Use SQLite (will register a REGEXP function and set busy timeout to 3s).
db = SqliteExtDatabase('/path/to/app.db', regexp_function=True, timeout=3,
                       pragmas={'journal_mode': 'wal'})


from playhouse.postgres_ext import PostgresqlExtDatabase

# Use Postgres (and register hstore extension).
db = PostgresqlExtDatabase('my_app', user='postgres', register_hstore=True)


from playhouse.cockroachdb import CockroachDatabase

# Use CockroachDB.
db = CockroachDatabase('my_app', user='root', port=26257, host='10.1.0.8')

# CockroachDB connections may require a number of parameters, which can
# alternatively be specified using a connection-string.
db = CockroachDatabase('postgresql://...')

有关数据库扩展的更多信息,请参见

初始化数据库

Database 初始化方法将数据库的名称作为第一个参数。在建立连接时,后续关键字参数将传递给底层数据库驱动程序,从而使你可以轻松传递特定于供应商的参数。

例如,使用 Postgresql 时,在创建连接时通常需要指定 hostuserpassword。这些不是标准 Peewee Database 参数,因此在创建连接时,它们将直接传递回 psycopg2

db = PostgresqlDatabase(
    'database_name',  # Required by Peewee.
    user='postgres',  # Will be passed directly to psycopg2.
    password='secret',  # Ditto.
    host='db.mysite.com')  # Ditto.

另一个示例是,pymysql 驱动程序接受 charset 参数,该参数不是标准 Peewee Database 参数。要设置此值,只需将 charset 与其他值一起传递即可

db = MySQLDatabase('database_name', user='www-data', charset='utf8mb4')

查阅数据库驱动程序的文档以了解可用参数

使用 Postgresql

要连接到 Postgresql 数据库,我们将使用 PostgresqlDatabase。第一个参数始终是数据库的名称,之后您可以指定任意 psycopg2 参数

psql_db = PostgresqlDatabase('my_database', user='postgres')

class BaseModel(Model):
    """A base model that will use our Postgresql database"""
    class Meta:
        database = psql_db

class User(BaseModel):
    username = CharField()

Playhouse(Peewee 的扩展)包含一个 Postgresql 扩展模块,该模块提供许多特定于 postgres 的功能,例如

如果您想使用这些超棒的功能,请使用 playhouse.postgres_ext 模块中的 PostgresqlExtDatabase

from playhouse.postgres_ext import PostgresqlExtDatabase

psql_db = PostgresqlExtDatabase('my_database', user='postgres')

隔离级别

从 Peewee 3.9.7 开始,可以使用 psycopg2.extensions 中的符号常量将隔离级别指定为初始化参数

from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE

db = PostgresqlDatabase('my_app', user='postgres', host='db-host',
                        isolation_level=ISOLATION_LEVEL_SERIALIZABLE)

注意

在较旧版本中,您可以在底层 psycopg2 连接上手动设置隔离级别。这可以一次性完成

db = PostgresqlDatabase(...)
conn = db.connection()  # returns current connection.

from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)

要每次创建连接时都运行此操作,请创建子类并实现 _initialize_database() 钩子,该钩子专为此目的而设计

class SerializedPostgresqlDatabase(PostgresqlDatabase):
    def _initialize_connection(self, conn):
        conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)

使用 CockroachDB

使用在 playhouse.cockroachdb 中定义的 CockroachDatabase 数据库类连接到 CockroachDB (CRDB)

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app', user='root', port=26257, host='localhost')

如果您使用的是 Cockroach Cloud,您可能会发现使用连接字符串指定连接参数更容易

db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')

注意

CockroachDB 需要 psycopg2 (postgres) Python 驱动程序。

注意

CockroachDB 安装和入门指南可在以下位置找到:https://www.cockroachlabs.com/docs/stable/install-cockroachdb.html

CRDB 提供客户端事务重试,可通过特殊 CockroachDatabase.run_transaction() 帮助器方法使用。此方法接受一个可调用对象,该对象负责执行可能需要重试的任何事务性语句。

最简单的 run_transaction() 示例

def create_user(email):
    # Callable that accepts a single argument (the database instance) and
    # which is responsible for executing the transactional SQL.
    def callback(db_ref):
        return User.create(email=email)

    return db.run_transaction(callback, max_attempts=10)

huey = create_user('[email protected]')

注意

如果在给定的尝试次数后无法提交事务,将引发 cockroachdb.ExceededMaxAttempts 异常。如果 SQL 格式错误、违反约束等,则该函数将向调用者引发异常。

有关更多信息,请参阅

使用 SQLite

要连接到 SQLite 数据库,我们将使用 SqliteDatabase。第一个参数是包含数据库的文件名,或字符串 ':memory:' 以创建内存数据库。在数据库文件名之后,您可以指定一个列表或 pragma 或任何其他任意 sqlite3 参数

sqlite_db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})

class BaseModel(Model):
    """A base model that will use our Sqlite database."""
    class Meta:
        database = sqlite_db

class User(BaseModel):
    username = TextField()
    # etc, etc

Peewee 包含一个 SQLite 扩展模块,它提供了许多特定于 SQLite 的功能,例如 全文搜索json 扩展支持 以及更多内容。如果您想使用这些很棒的功能,请使用 playhouse.sqlite_ext 模块中的 SqliteExtDatabase

from playhouse.sqlite_ext import SqliteExtDatabase

sqlite_db = SqliteExtDatabase('my_app.db', pragmas={
    'journal_mode': 'wal',  # WAL-mode.
    'cache_size': -64 * 1000,  # 64MB cache.
    'synchronous': 0})  # Let the OS manage syncing.

PRAGMA 语句

SQLite 允许通过 PRAGMA 语句(SQLite 文档)对许多参数进行运行时配置。这些语句通常在创建新的数据库连接时运行。要针对新连接运行一个或多个 PRAGMA 语句,您可以将它们指定为字典或包含 pragma 名称和值的 2 元组列表

db = SqliteDatabase('my_app.db', pragmas={
    'journal_mode': 'wal',
    'cache_size': 10000,  # 10000 pages, or ~40MB
    'foreign_keys': 1,  # Enforce foreign-key constraints
})

也可以使用 pragma() 方法或 SqliteDatabase 对象上公开的特殊属性动态配置 PRAGMA

# Set cache size to 64MB for *current connection*.
db.pragma('cache_size', -1024 * 64)

# Same as above.
db.cache_size = -1024 * 64

# Read the value of several pragmas:
print('cache_size:', db.cache_size)
print('foreign_keys:', db.foreign_keys)
print('journal_mode:', db.journal_mode)
print('page_size:', db.page_size)

# Set foreign_keys pragma on current connection *AND* on all
# connections opened subsequently.
db.pragma('foreign_keys', 1, permanent=True)

注意

使用 pragma() 方法设置的 Pragmas 在默认情况下,在连接关闭后不会保留。要配置一个 pragma 在每次打开连接时运行,请指定 permanent=True

注意

可以在 SQLite 文档中找到 PRAGMA 设置、其含义和可接受值的完整列表:https://sqlite.ac.cn/pragma.html

用户定义函数

SQLite 可以通过用户定义的 Python 代码进行扩展。 SqliteDatabase 类支持三种类型用户定义扩展

  • 函数 - 接受任意数量的参数并返回一个值。

  • 聚合 - 聚合来自多行的参数并返回一个值。

  • 排序规则 - 描述如何对某个值进行排序。

注意

有关更多扩展支持,请参阅 SqliteExtDatabase,它位于 playhouse.sqlite_ext 模块中。

示例用户定义函数

db = SqliteDatabase('analytics.db')

from urllib.parse import urlparse

@db.func('hostname')
def hostname(url):
    if url is not None:
        return urlparse(url).netloc

# Call this function in our code:
# The following finds the most common hostnames of referrers by count:
query = (PageView
         .select(fn.hostname(PageView.referrer), fn.COUNT(PageView.id))
         .group_by(fn.hostname(PageView.referrer))
         .order_by(fn.COUNT(PageView.id).desc()))

示例用户定义聚合

from hashlib import md5

@db.aggregate('md5')
class MD5Checksum(object):
    def __init__(self):
        self.checksum = md5()

    def step(self, value):
        self.checksum.update(value.encode('utf-8'))

    def finalize(self):
        return self.checksum.hexdigest()

# Usage:
# The following computes an aggregate MD5 checksum for files broken
# up into chunks and stored in the database.
query = (FileChunk
         .select(FileChunk.filename, fn.MD5(FileChunk.data))
         .group_by(FileChunk.filename)
         .order_by(FileChunk.filename, FileChunk.sequence))

示例排序规则

@db.collation('ireverse')
def collate_reverse(s1, s2):
    # Case-insensitive reverse.
    s1, s2 = s1.lower(), s2.lower()
    return (s1 < s2) - (s1 > s2)  # Equivalent to -cmp(s1, s2)

# To use this collation to sort books in reverse order...
Book.select().order_by(collate_reverse.collation(Book.title))

# Or...
Book.select().order_by(Book.title.asc(collation='reverse'))

示例用户定义表值函数(有关更多详细信息,请参阅 TableFunctiontable_function

from playhouse.sqlite_ext import TableFunction

db = SqliteDatabase('my_app.db')

@db.table_function('series')
class Series(TableFunction):
    columns = ['value']
    params = ['start', 'stop', 'step']

    def initialize(self, start=0, stop=None, step=1):
        """
        Table-functions declare an initialize() method, which is
        called with whatever arguments the user has called the
        function with.
        """
        self.start = self.current = start
        self.stop = stop or float('Inf')
        self.step = step

    def iterate(self, idx):
        """
        Iterate is called repeatedly by the SQLite database engine
        until the required number of rows has been read **or** the
        function raises a `StopIteration` signalling no more rows
        are available.
        """
        if self.current > self.stop:
            raise StopIteration

        ret, self.current = self.current, self.current + self.step
        return (ret,)

# Usage:
cursor = db.execute_sql('SELECT * FROM series(?, ?, ?)', (0, 5, 2))
for value, in cursor:
    print(value)

# Prints:
# 0
# 2
# 4

有关更多信息,请参阅

设置事务的锁定模式

SQLite 事务可以在三种不同模式下打开

  • 延迟默认) - 仅在执行读取或写入时获取锁。第一次读取会创建一个 共享锁,第一次写入会创建一个 保留锁。由于锁的获取被推迟到实际需要时,因此另一个线程或进程有可能在当前线程上的 BEGIN 执行后创建单独的事务并写入数据库。

  • 立即 - 立即获取保留锁。在此模式下,其他数据库不能写入数据库或打开立即独占事务。但是,其他进程可以继续从数据库中读取。

  • 独占 - 打开独占锁,该锁可阻止所有(读取未提交除外)连接在事务完成之前访问数据库。

指定锁定模式的示例

db = SqliteDatabase('app.db')

with db.atomic('EXCLUSIVE'):
    do_something()


@db.atomic('IMMEDIATE')
def some_other_function():
    # This function is wrapped in an "IMMEDIATE" transaction.
    do_something_else()

有关详细信息,请参阅 SQLite 锁定文档。要详细了解 Peewee 中的事务,请参阅管理事务文档。

APSW,高级 SQLite 驱动程序

Peewee 还附带使用apsw,高级 sqlite 驱动程序的备用 SQLite 数据库。可以在APSW 项目网站上获取有关 APSW 的更多信息。APSW 提供以下特殊功能:

  • 虚拟表、虚拟文件系统、Blob I/O、备份和文件控制。

  • 可以在线程之间共享连接,而无需任何其他锁定。

  • 事务由您的代码显式管理。

  • Unicode 以正确的方式处理。

  • APSW 比标准库 sqlite3 模块更快。

  • 向您的 Python 应用程序公开几乎整个 SQLite C API。

如果您想使用 APSW,请使用apsw_ext模块中的APSWDatabase

from playhouse.apsw_ext import APSWDatabase

apsw_db = APSWDatabase('my_app.db')

使用 MariaDB

Peewee 支持 MariaDB。要使用 MariaDB,请使用 MySQL 后端,该后端在两者之间共享。有关详细信息,请参阅“使用 MySQL”

使用 MySQL

要连接到 MySQL 数据库,我们将使用MySQLDatabase。在数据库名称之后,您可以指定将传递回驱动程序的任意连接参数(例如 pymysqlmysqlclient)。

mysql_db = MySQLDatabase('my_database')

class BaseModel(Model):
    """A base model that will use our MySQL database"""
    class Meta:
        database = mysql_db

class User(BaseModel):
    username = CharField()
    # etc, etc

驱动程序信息

  • pymysql 是一个纯 Python mysql 客户端,适用于 python 2 和 3。Peewee 将尝试首先使用 pymysql。

  • mysqlclient 使用 c 扩展并支持 python 3。它公开了一个MySQLdb模块。如果未安装 pymysql,Peewee 将尝试使用此模块。

  • mysql-python也称为MySQLdb1,它已过时,不应使用。由于它与 mysqlclient 共享相同的模块名称,因此适用相同的规则。

  • mysql-connector python 纯 Python(我认为??)支持 python 3。要使用此驱动程序,您可以从 playhouse.mysql_ext 扩展中使用 MySQLConnectorDatabase

错误 2006:MySQL 服务器已断开

当 MySQL 终止空闲数据库连接时,可能会发生此特定错误。这通常发生在未明确管理数据库连接的 Web 应用程序中。发生的情况是,您的应用程序启动,打开一个连接来处理执行的第一个查询,并且由于该连接从未关闭,因此它保持打开状态,等待更多查询。

要解决此问题,请确保在需要执行查询时明确连接到数据库,并在完成后关闭连接。在 Web 应用程序中,这通常意味着您将在收到请求时打开连接,并在返回响应时关闭连接。

请参阅 框架集成 部分,了解有关配置常见 Web 框架以管理数据库连接的示例。

使用数据库 URL 连接

playhouse 模块 数据库 URL 提供了一个帮助函数 connect(),该函数接受一个数据库 URL 并返回一个 Database 实例。

示例代码

import os

from peewee import *
from playhouse.db_url import connect

# Connect to the database URL defined in the environment, falling
# back to a local Sqlite database if no database URL is specified.
db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')

class BaseModel(Model):
    class Meta:
        database = db

示例数据库 URL

  • sqlite:///my_database.db 将为当前目录中的文件 my_database.db 创建一个 SqliteDatabase 实例。

  • sqlite:///:memory: 将创建一个内存中的 SqliteDatabase 实例。

  • postgresql://postgres:my_password@localhost:5432/my_database 将创建一个 PostgresqlDatabase 实例。提供了用户名和密码,以及要连接的主机和端口。

  • mysql://user:passwd@ip:port/my_db 将为本地 MySQL 数据库 my_db 创建一个 MySQLDatabase 实例。

  • db_url 文档中还有更多示例.

运行时数据库配置

有时,直到运行时才知道数据库连接设置,此时这些值可能会从配置文件或环境中加载。在这些情况下,您可以通过将 None 指定为 database_name 来推迟数据库的初始化。

database = PostgresqlDatabase(None)  # Un-initialized database.

class SomeModel(Model):
    class Meta:
        database = database

如果在数据库未初始化时尝试连接或发出任何查询,您将收到一个异常

>>> database.connect()
Exception: Error, database not properly initialized before opening connection

要初始化数据库,请使用 init() 方法,其中包含数据库名称和任何其他关键字参数

database_name = input('What is the name of the db? ')
database.init(database_name, host='localhost', user='postgres')

要对数据库初始化进行更严格的控制,请参阅下一部分,动态定义数据库

动态定义数据库

要对数据库的定义/初始化进行更严格的控制,可以使用 DatabaseProxy 帮助器。 DatabaseProxy 对象充当占位符,然后在运行时可以将其换成不同的对象。在下面的示例中,我们将根据应用程序的配置方式更换数据库

database_proxy = DatabaseProxy()  # Create a proxy for our db.

class BaseModel(Model):
    class Meta:
        database = database_proxy  # Use proxy for our DB.

class User(BaseModel):
    username = CharField()

# Based on configuration, use a different database.
if app.config['DEBUG']:
    database = SqliteDatabase('local.db')
elif app.config['TESTING']:
    database = SqliteDatabase(':memory:')
else:
    database = PostgresqlDatabase('mega_production_db')

# Configure our proxy to use the db we specified in config.
database_proxy.initialize(database)

警告

仅当实际数据库驱动程序在运行时发生变化时才使用此方法。例如,如果您的测试和本地开发环境在 SQLite 上运行,但已部署的应用程序使用 PostgreSQL,则可以使用 DatabaseProxy 在运行时更换引擎。

但是,如果只有连接值在运行时发生变化,例如数据库文件路径或数据库主机,则应该改用 Database.init()。有关更多详细信息,请参阅 运行时数据库配置

注意

避免使用 DatabaseProxy 可能更容易,而改用 Database.bind() 和相关方法来设置或更改数据库。有关详细信息,请参阅 在运行时设置数据库

在运行时设置数据库

我们已经看到了 Peewee 可以配置数据库的三种方法

# The usual way:
db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})


# Specify the details at run-time:
db = SqliteDatabase(None)
...
db.init(db_filename, pragmas={'journal_mode': 'wal'})


# Or use a placeholder:
db = DatabaseProxy()
...
db.initialize(SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'}))

Peewee 还可以为您的模型类设置或更改数据库。Peewee 测试套件使用此技术在运行测试时将测试模型类绑定到各种数据库实例。

有两组互补的方法

作为示例,我们将声明两个模型,指定任何数据库

class User(Model):
    username = TextField()

class Tweet(Model):
    user = ForeignKeyField(User, backref='tweets')
    content = TextField()
    timestamp = TimestampField()

在运行时将模型绑定到数据库

postgres_db = PostgresqlDatabase('my_app', user='postgres')
sqlite_db = SqliteDatabase('my_app.db')

# At this point, the User and Tweet models are NOT bound to any database.

# Let's bind them to the Postgres database:
postgres_db.bind([User, Tweet])

# Now we will temporarily bind them to the sqlite database:
with sqlite_db.bind_ctx([User, Tweet]):
    # User and Tweet are now bound to the sqlite database.
    assert User._meta.database is sqlite_db

# User and Tweet are once again bound to the Postgres database.
assert User._meta.database is postgres_db

对于绑定给定的模型类,Model.bind()Model.bind_ctx() 方法的工作方式相同

# Bind the user model to the sqlite db. By default, Peewee will also
# bind any models that are related to User via foreign-key as well.
User.bind(sqlite_db)

assert User._meta.database is sqlite_db
assert Tweet._meta.database is sqlite_db  # Related models bound too.

# Here we will temporarily bind *just* the User model to the postgres db.
with User.bind_ctx(postgres_db, bind_backrefs=False):
    assert User._meta.database is postgres_db
    assert Tweet._meta.database is sqlite_db  # Has not changed.

# And now User is back to being bound to the sqlite_db.
assert User._meta.database is sqlite_db

测试 Peewee 应用程序 部分也包含一些使用 bind() 方法的示例。

线程安全和多数据库

如果您计划在多线程应用程序中在运行时更改数据库,则将模型的数据库存储在 thread-local 中将防止竞争条件。这可以通过自定义模型 Metadata 类实现(请参见 ThreadSafeDatabaseMetadata,包含在 playhouse.shortcuts 中)

from peewee import *
from playhouse.shortcuts import ThreadSafeDatabaseMetadata

class BaseModel(Model):
    class Meta:
        # Instruct peewee to use our thread-safe metadata implementation.
        model_metadata_class = ThreadSafeDatabaseMetadata

现在可以使用熟悉的 Database.bind()Database.bind_ctx() 方法在多线程环境中运行时安全地交换数据库。

连接管理

要打开与数据库的连接,请使用 Database.connect() 方法

>>> db = SqliteDatabase(':memory:')  # In-memory SQLite database.
>>> db.connect()
True

如果我们尝试对已打开的数据库调用 connect(),则会收到 OperationalError

>>> db.connect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/charles/pypath/peewee.py", line 2390, in connect
    raise OperationalError('Connection already opened.')
peewee.OperationalError: Connection already opened.

为防止引发此异常,我们可以使用附加参数 reuse_if_open 调用 connect()

>>> db.close()  # Close connection.
True
>>> db.connect()
True
>>> db.connect(reuse_if_open=True)
False

请注意,如果数据库连接已打开,则对 connect() 的调用将返回 False

要关闭连接,请使用 Database.close() 方法

>>> db.close()
True

对已关闭的连接调用 close() 不会导致异常,但会返回 False

>>> db.connect()  # Open connection.
True
>>> db.close()  # Close connection.
True
>>> db.close()  # Connection already closed, returns False.
False

您可以使用 Database.is_closed() 方法测试数据库是否已关闭

>>> db.is_closed()
True

使用 autoconnect

如果使用 autoconnect=True(默认值)初始化数据库,则在使用数据库之前不必显式连接到数据库。显式管理连接被认为是最佳实践,因此您可能考虑禁用 autoconnect 行为。

明确连接生命周期非常有帮助。例如,如果连接失败,异常将在打开连接时捕获,而不是在稍后的任意时间执行查询时捕获。此外,如果使用连接池,则必须调用connect()close()以确保正确回收连接。

为了获得最佳正确性保证,请禁用autoconnect

db = PostgresqlDatabase('my_app', user='postgres', autoconnect=False)

线程安全性

Peewee 使用线程局部存储跟踪连接状态,使 Peewee Database 对象可以安全地与多个线程一起使用。每个线程都有自己的连接,因此任何给定的线程在给定时间只会有一个连接处于打开状态。

上下文管理器

数据库对象本身可以用作上下文管理器,它将在包装的代码块持续期间打开连接。此外,在包装的代码块开始时打开一个事务,并在连接关闭之前提交该事务(除非发生错误,在这种情况下将回滚该事务)。

>>> db.is_closed()
True
>>> with db:
...     print(db.is_closed())  # db is open inside context manager.
...
False
>>> db.is_closed()  # db is closed.
True

如果你想单独管理事务,可以使用Database.connection_context()上下文管理器。

>>> with db.connection_context():
...     # db connection is open.
...     pass
...
>>> db.is_closed()  # db connection is closed.
True

connection_context()方法也可以用作装饰器

@db.connection_context()
def prepare_database():
    # DB connection will be managed by the decorator, which opens
    # a connection, calls function, and closes upon returning.
    db.create_tables(MODELS)  # Create schema.
    load_fixture_data(db)

DB-API 连接对象

要获取对底层 DB-API 2.0 连接的引用,请使用Database.connection()方法。此方法将返回当前打开的连接对象(如果存在),否则将打开一个新连接。

>>> db.connection()
<sqlite3.Connection object at 0x7f94e9362f10>

连接池

连接池由pool 模块提供,该模块包含在playhouse扩展库中。该池支持

  • 超时,之后连接将被回收。

  • 打开连接数的上限。

from playhouse.pool import PooledPostgresqlExtDatabase

db = PooledPostgresqlExtDatabase(
    'my_database',
    max_connections=8,
    stale_timeout=300,
    user='postgres')

class BaseModel(Model):
    class Meta:
        database = db

以下池化数据库类可用

有关 peewee 连接池的深入讨论,请参阅连接池部分playhouse文档。

测试 Peewee 应用程序

在为使用 Peewee 的应用程序编写测试时,可能需要为测试使用一个特殊数据库。另一种常见做法是针对干净的数据库运行测试,这意味着确保在每次测试开始时表都是空的。

要在运行时将模型绑定到数据库,可以使用以下方法

  • Database.bind_ctx(),它返回一个上下文管理器,该管理器将在包装块的持续时间内将给定模型绑定到数据库实例。

  • Model.bind_ctx(),它同样返回一个上下文管理器,该管理器将在包装块的持续时间内将模型(以及可选的依赖项)绑定到给定数据库。

  • Database.bind(),这是一个一次性操作,它将模型(以及可选的依赖项)绑定到给定数据库。

  • Model.bind(),这是一个一次性操作,它将模型(以及可选的依赖项)绑定到给定数据库。

根据你的用例,其中一个选项可能更有意义。对于以下示例,我将使用 Model.bind()

示例测试用例设置

# tests.py
import unittest
from my_app.models import EventLog, Relationship, Tweet, User

MODELS = [User, Tweet, EventLog, Relationship]

# use an in-memory SQLite for tests.
test_db = SqliteDatabase(':memory:')

class BaseTestCase(unittest.TestCase):
    def setUp(self):
        # Bind model classes to test db. Since we have a complete list of
        # all models, we do not need to recursively bind dependencies.
        test_db.bind(MODELS, bind_refs=False, bind_backrefs=False)

        test_db.connect()
        test_db.create_tables(MODELS)

    def tearDown(self):
        # Not strictly necessary since SQLite in-memory databases only live
        # for the duration of the connection, and in the next step we close
        # the connection...but a good practice all the same.
        test_db.drop_tables(MODELS)

        # Close connection to db.
        test_db.close()

        # If we wanted, we could re-bind the models to their original
        # database here. But for tests this is probably not necessary.

顺便说一句,根据经验,我建议使用你在生产中使用的相同数据库后端来测试你的应用程序,以避免任何潜在的兼容性问题。

如果你想看到更多有关如何使用 Peewee 运行测试的示例,请查看 Peewee 自有的 测试套件

使用 Gevent 的异步

gevent 建议用于对 Postgresql 或 MySQL 执行异步 I/O。我更喜欢 gevent 的原因

  • 不需要对所有内容进行特殊用途的“循环感知”重新实现。使用 asyncio 的第三方库通常还必须重新实现多层代码以及重新实现协议本身。

  • Gevent 允许你使用正常、干净、惯用的 Python 编写应用程序。无需在每一行中都填充“async”、“await”和其他噪音。没有回调、期货、任务、承诺。没有杂物。

  • Gevent 适用于 Python 2 Python 3。

  • Gevent 是Pythonic。Asyncio 是一个非 Pythonic 的可憎之物。

除了猴子补丁套接字外,如果你使用的是带有纯 Python 驱动程序(如 pymysql)的MySQL或在纯 Python 模式下使用 mysql-connector,则无需执行任何特殊步骤。用 C 编写的 MySQL 驱动程序需要特殊配置,这超出了本文档的范围。

对于Postgrespsycopg2(这是一个 C 扩展),可以使用以下代码段来注册事件挂钩,这将使你的连接变为异步

from gevent.socket import wait_read, wait_write
from psycopg2 import extensions

# Call this function after monkey-patching socket (etc).
def patch_psycopg2():
    extensions.set_wait_callback(_psycopg2_gevent_callback)

def _psycopg2_gevent_callback(conn, timeout=None):
    while True:
        state = conn.poll()
        if state == extensions.POLL_OK:
            break
        elif state == extensions.POLL_READ:
            wait_read(conn.fileno(), timeout=timeout)
        elif state == extensions.POLL_WRITE:
            wait_write(conn.fileno(), timeout=timeout)
        else:
            raise ValueError('poll() returned unexpected result')

SQLite,因为它嵌入在 Python 应用程序本身中,不执行任何套接字操作,这将成为非阻塞的候选者。Async 对 SQLite 数据库没有影响。

框架集成

对于 Web 应用程序,在收到请求时打开连接,并在发送响应时关闭连接很常见。在本节中,我将描述如何向你的 Web 应用程序添加挂钩,以确保正确处理数据库连接。

无论你使用的是简单的 SQLite 数据库还是多个 Postgres 连接池,这些步骤都将确保 peewee 正确处理连接。

注意

接收大量流量的应用程序可能会受益于使用连接池来降低每次请求建立和拆除连接的成本。

Flask

Flask 和 peewee 是一个很好的组合,也是我用于任何规模项目的首选。Flask 提供了两个挂钩,我们将使用它们来打开和关闭我们的数据库连接。当收到请求时,我们将打开连接,然后在返回响应时关闭连接。

from flask import Flask
from peewee import *

database = SqliteDatabase('my_app.db')
app = Flask(__name__)

# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.before_request
def _db_connect():
    database.connect()

# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.teardown_request
def _db_close(exc):
    if not database.is_closed():
        database.close()

Django

虽然将 peewee 与 Django 一起使用的情况较少见,但实际上使用这两种方法非常简单。在我看来,使用 Django 管理你的 peewee 数据库连接的最简单方法是向你的应用程序添加中间件。中间件应该是中间件列表中的第一个,以确保它在处理请求时首先运行,并在返回响应时最后运行。

如果你有一个名为my_blog 的 django 项目,并且你的 peewee 数据库在模块 my_blog.db 中定义,则可以添加以下中间件类

# middleware.py
from my_blog.db import database  # Import the peewee database instance.


def PeeweeConnectionMiddleware(get_response):
    def middleware(request):
        database.connect()
        try:
            response = get_response(request)
        finally:
            if not database.is_closed():
                database.close()
        return response
    return middleware


# Older Django < 1.10 middleware.
class PeeweeConnectionMiddleware(object):
    def process_request(self, request):
        database.connect()

    def process_response(self, request, response):
        if not database.is_closed():
            database.close()
        return response

要确保执行此中间件,请将其添加到你的 settings 模块

# settings.py
MIDDLEWARE_CLASSES = (
    # Our custom middleware appears first in the list.
    'my_blog.middleware.PeeweeConnectionMiddleware',

    # These are the default Django 1.7 middlewares. Yours may differ,
    # but the important this is that our Peewee middleware comes first.
    'django.middleware.common.CommonMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
)

# ... other Django settings ...

Bottle

我本人没有使用 bottle,但查看文档后,我相信以下代码应该可以确保正确管理数据库连接

# app.py
from bottle import hook  #, route, etc, etc.
from peewee import *

db = SqliteDatabase('my-bottle-app.db')

@hook('before_request')
def _connect_db():
    db.connect()

@hook('after_request')
def _close_db():
    if not db.is_closed():
        db.close()

# Rest of your bottle app goes here.

Web.py

请参阅应用程序处理器的文档。

db = SqliteDatabase('my_webpy_app.db')

def connection_processor(handler):
    db.connect()
    try:
        return handler()
    finally:
        if not db.is_closed():
            db.close()

app.add_processor(connection_processor)

Tornado

Tornado 的 RequestHandler 类似乎实现了两个挂钩,可用于在处理请求时打开和关闭连接。

from tornado.web import RequestHandler

db = SqliteDatabase('my_db.db')

class PeeweeRequestHandler(RequestHandler):
    def prepare(self):
        db.connect()
        return super(PeeweeRequestHandler, self).prepare()

    def on_finish(self):
        if not db.is_closed():
            db.close()
        return super(PeeweeRequestHandler, self).on_finish()

在你的应用程序中,现在你可以扩展 PeeweeRequestHandler,而不是扩展默认的 RequestHandler

请注意,这并未解决如何将 peewee 与 Tornado 或其他事件循环一起异步使用的问题。

Wheezy.web

连接处理代码可以放在中间件中。

def peewee_middleware(request, following):
    db.connect()
    try:
        response = following(request)
    finally:
        if not db.is_closed():
            db.close()
    return response

app = WSGIApplication(middleware=[
    lambda x: peewee_middleware,
    # ... other middlewares ...
])

感谢 GitHub 用户@tuukkamustonen提交此代码。

Falcon

连接处理代码可以放在 中间件组件 中。

import falcon
from peewee import *

database = SqliteDatabase('my_app.db')

class PeeweeConnectionMiddleware(object):
    def process_request(self, req, resp):
        database.connect()

    def process_response(self, req, resp, resource, req_succeeded):
        if not database.is_closed():
            database.close()

application = falcon.API(middleware=[
    PeeweeConnectionMiddleware(),
    # ... other middlewares ...
])

Pyramid

设置一个请求工厂,如下处理数据库连接生命周期

from pyramid.request import Request

db = SqliteDatabase('pyramidapp.db')

class MyRequest(Request):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        db.connect()
        self.add_finished_callback(self.finish)

    def finish(self, request):
        if not db.is_closed():
            db.close()

在应用程序 main() 中,确保 MyRequest 用作 request_factory

def main(global_settings, **settings):
    config = Configurator(settings=settings, ...)
    config.set_request_factory(MyRequest)

CherryPy

请参阅 发布/订阅模式

def _db_connect():
    db.connect()

def _db_close():
    if not db.is_closed():
        db.close()

cherrypy.engine.subscribe('before_request', _db_connect)
cherrypy.engine.subscribe('after_request', _db_close)

Sanic

在 Sanic 中,连接处理代码可以放在请求和响应中间件 Sanic 中间件 中。

# app.py
@app.middleware('request')
async def handle_request(request):
    db.connect()

@app.middleware('response')
async def handle_response(request, response):
    if not db.is_closed():
        db.close()

FastAPI

FastAPI 是一个与 asyncio 兼容的框架。Peewee 依赖于线程局部变量(也与 gevent 兼容)来跨请求管理连接状态。对于与 asyncio 一起使用,需要进行一些覆盖,以用与 asyncio 兼容的上下文局部变量替换线程局部变量行为。

有关将 Peewee 与 FastAPI 配合使用的完整处理,请在此处查阅 FastAPI 文档

https://fastapi.org.cn/advanced/sql-databases-peewee/

上述文档涵盖

  • 添加与 asyncio 上下文相关的连接状态跟踪

  • 按请求处理连接

其他框架

在此处看不到您的框架?请 打开 GitHub 工单,我将考虑添加一个部分,或者更好的是,提交一个文档拉取请求。

执行查询

通常通过对使用查询构建器 API(或在 Select 查询的情况下简单地迭代查询对象)构造的查询调用 execute() 来执行 SQL 查询。

db = SqliteDatabase('my_app.db')
db.connect()

# Example of executing a simple query and ignoring the results.
db.execute_sql("ATTACH DATABASE ':memory:' AS cache;")

# Example of iterating over the results of a query using the cursor.
cursor = db.execute_sql('SELECT * FROM users WHERE status = ?', (ACTIVE,))
for row in cursor.fetchall():
    # Do something with row, which is a tuple containing column data.
    pass

对于您希望直接执行 SQL 的情况,可以使用 Database.execute_sql() 方法。

管理事务

Peewee 提供了多个用于处理事务的接口。最通用的接口是 Database.atomic() 方法,它还支持嵌套事务。 atomic() 块将在事务或保存点中运行,具体取决于嵌套级别。

如果在包装块中发生未处理的异常,当前事务/保存点将回滚。否则,语句将在包装块的末尾提交。

# Transaction will commit automatically at the end of the "with" block:
with db.atomic() as txn:
    User.create(username='u1')

# Unhandled exceptions will cause transaction to be rolled-back:
with db.atomic() as txn:
    User.create(username='huey')
    # User has been INSERTed into the database but the transaction is not
    # yet committed because we haven't left the scope of the "with" block.

    raise ValueError('uh-oh')
    # This exception is unhandled - the transaction will be rolled-back and
    # the ValueError will be raised.

注意

示例

with db.atomic() as transaction:  # Opens new transaction.
    try:
        save_some_objects()
    except ErrorSavingData:
        # Because this block of code is wrapped with "atomic", a
        # new transaction will begin automatically after the call
        # to rollback().
        transaction.rollback()
        error_saving = True

    create_report(error_saving=error_saving)
    # Note: no need to call commit. Since this marks the end of the
    # wrapped block of code, the `atomic` context manager will
    # automatically call commit for us.

注意

在由 atomic() 上下文管理器包装的块内,您可以通过调用 Transaction.rollback()Transaction.commit() 来随时显式回滚或提交。当您在包装的代码块内执行此操作时,将自动启动一个新事务。

注意

atomic() 可以用作 上下文管理器装饰器

上下文管理器

atomic 用作上下文管理器

db = SqliteDatabase(':memory:')

with db.atomic() as txn:
    # This is the outer-most level, so this block corresponds to
    # a transaction.
    User.create(username='charlie')

    with db.atomic() as nested_txn:
        # This block corresponds to a savepoint.
        User.create(username='huey')

        # This will roll back the above create() query.
        nested_txn.rollback()

    User.create(username='mickey')

# When the block ends, the transaction is committed (assuming no error
# occurs). At that point there will be two users, "charlie" and "mickey".

您还可以使用 atomic 方法来执行获取或创建操作

try:
    with db.atomic():
        user = User.create(username=username)
    return 'Success'
except peewee.IntegrityError:
    return 'Failure: %s is already in use.' % username

装饰器

atomic 用作装饰器

@db.atomic()
def create_user(username):
    # This statement will run in a transaction. If the caller is already
    # running in an `atomic` block, then a savepoint will be used instead.
    return User.create(username=username)

create_user('charlie')

嵌套事务

atomic() 提供事务的透明嵌套。使用 atomic() 时,最外层的调用将被包装在一个事务中,任何嵌套调用都将使用保存点。

with db.atomic() as txn:
    perform_operation()

    with db.atomic() as nested_txn:
        perform_another_operation()

Peewee 通过使用保存点来支持嵌套事务(有关更多信息,请参见 savepoint())。

显式事务

如果您希望在事务中显式运行代码,可以使用 transaction()。与 atomic() 类似,transaction() 可用作上下文管理器或装饰器。

如果在包装块中发生异常,事务将回滚。否则,语句将在包装块的末尾提交。

db = SqliteDatabase(':memory:')

with db.transaction() as txn:
    # Delete the user and their associated tweets.
    user.delete_instance(recursive=True)

可以在包装块中显式提交或回滚事务。发生这种情况时,将启动一个新事务。

with db.transaction() as txn:
    User.create(username='mickey')
    txn.commit()  # Changes are saved and a new transaction begins.
    User.create(username='huey')

    # Roll back. "huey" will not be saved, but since "mickey" was already
    # committed, that row will remain in the database.
    txn.rollback()

with db.transaction() as txn:
    User.create(username='whiskers')
    # Roll back changes, which removes "whiskers".
    txn.rollback()

    # Create a new row for "mr. whiskers" which will be implicitly committed
    # at the end of the `with` block.
    User.create(username='mr. whiskers')

注意

如果您尝试使用 transaction() 上下文管理器使用 peewee 嵌套事务,则只会使用最外层事务。如果在嵌套块中发生异常,事务将不会回滚——只有冒泡到最外层事务的异常才会触发回滚。

由于这可能会导致不可预测的行为,因此建议您使用 atomic()

显式保存点

就像您可以显式创建事务一样,您还可以使用 savepoint() 方法显式创建保存点。保存点必须在事务中发生,但可以任意深度嵌套。

with db.transaction() as txn:
    with db.savepoint() as sp:
        User.create(username='mickey')

    with db.savepoint() as sp2:
        User.create(username='zaizee')
        sp2.rollback()  # "zaizee" will not be saved, but "mickey" will be.

警告

如果您手动提交或回滚保存点,不会自动创建一个新的保存点。这与 transaction 的行为不同,后者会在手动提交/回滚后自动打开一个新事务。

自动提交模式

默认情况下,Peewee 在自动提交模式下运行,因此在事务外部执行的任何语句都在其自己的事务中运行。为了将多个语句分组到一个事务中,Peewee 提供了 atomic() 上下文管理器/装饰器。这应该涵盖所有用例,但如果您不太可能希望完全禁用 Peewee 的事务管理,则可以使用 Database.manual_commit() 上下文管理器/装饰器。

以下是如何模拟 transaction() 上下文管理器的行为

with db.manual_commit():
    db.begin()  # Have to begin transaction explicitly.
    try:
        user.delete_instance(recursive=True)
    except:
        db.rollback()  # Rollback! An error occurred.
        raise
    else:
        try:
            db.commit()  # Commit changes.
        except:
            db.rollback()
            raise

再次强调——我不认为有人需要这样做,但它就在这里以防万一。

数据库错误

Python DB-API 2.0 规范描述了几种类型的异常。由于大多数数据库驱动程序都有自己的异常实现,因此 Peewee 通过提供对任何特定于实现的异常类的自己的包装器来简化事情。这样,您不必担心导入任何特殊的异常类,您只需使用来自 peewee 的异常类即可

  • DatabaseError

  • DataError

  • IntegrityError

  • InterfaceError

  • InternalError

  • NotSupportedError

  • OperationalError

  • ProgrammingError

注意

所有这些错误类都扩展了 PeeweeException

记录查询

所有查询都使用标准库 logging 模块记录到peewee 命名空间。查询使用DEBUG 级别记录。如果您有兴趣对查询进行一些操作,则只需注册一个处理程序即可。

# Print all queries to stderr.
import logging
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

添加新的数据库驱动程序

Peewee 内置了对 Postgres、MySQL、MariaDB 和 SQLite 的支持。这些数据库非常流行,从快速的可嵌入式数据库到适合大规模部署的重量级服务器,不一而足。话虽如此,那里有很多很酷的数据库,只要驱动程序支持 DB-API 2.0 规范,为您的首选数据库添加支持应该非常容易。

警告

Peewee 要求数据库连接进入自动提交模式。

如果您使用过标准库 sqlite3 驱动程序、psycopg2 或类似的驱动程序,那么您应该熟悉 DB-API 2.0 规范。Peewee 目前依赖于一些部分

  • Connection.commit

  • Connection.execute

  • Connection.rollback

  • Cursor.description

  • Cursor.fetchone

这些方法通常封装在更高级别的抽象中,并由 Database 暴露,因此即使您的驱动程序没有完全执行这些操作,您仍然可以从 peewee 中获得很多好处。一个示例是“playhouse”模块中的 apsw sqlite 驱动程序

首先要提供 Database 的一个子类,该子类将打开一个连接,并确保连接处于自动提交模式(从而禁用所有 DB-API 事务语义)

from peewee import Database
import foodb  # Our fictional DB-API 2.0 driver.


class FooDatabase(Database):
    def _connect(self, database):
        return foodb.connect(self.database, autocommit=True, **self.connect_params)

Database 提供了一个更高级别的 API,负责执行查询、创建表和索引,以及内省数据库以获取表列表。上述实现是绝对需要的最小值,尽管某些功能将不起作用——为了获得最佳结果,您将需要另外添加一个方法,用于从数据库中提取表的表和索引列表。我们假装 FooDB 很像 MySQL,并有特殊的“SHOW”语句

class FooDatabase(Database):
    def _connect(self):
        return foodb.connect(self.database, autocommit=True, **self.connect_params)

    def get_tables(self):
        res = self.execute('SHOW TABLES;')
        return [r[0] for r in res.fetchall()]

数据库处理的其他未在此处涵盖的内容包括

  • last_insert_id()rows_affected()

  • paramquote,它们告诉 SQL 生成代码如何添加参数占位符和引用实体名称。

  • field_types 用于将 INT 或 TEXT 等数据类型映射到其供应商特定的类型名称。

  • operations 用于将“LIKE/ILIKE”等操作映射到其数据库等效项

请参阅 Database API 参考或 源代码。了解详情。

注意

如果您的驱动程序符合 DB-API 2.0 规范,则无需太多工作即可启动并运行。

我们的新数据库可以用作任何其他数据库子类

from peewee import *
from foodb_ext import FooDatabase

db = FooDatabase('my_database', user='foo', password='secret')

class BaseModel(Model):
    class Meta:
        database = db

class Blog(BaseModel):
    title = CharField()
    contents = TextField()
    pub_date = DateTimeField()