Cockroach 数据库

CockroachDB (CRDB) 得到 peewee 的良好支持。

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app', user='root', host='10.1.0.8')

如果你正在使用 Cockroach Cloud,你可能会发现使用连接字符串指定连接参数更简单

db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')

注意

CockroachDB 需要 psycopg2 (postgres) Python 驱动程序。

注意

CockroachDB 安装和入门指南可在以下位置找到:https://www.cockroachlabs.com/docs/stable/install-cockroachdb.html

SSL 配置

在运行 Cockroach 集群时强烈建议使用 SSL 证书。Psycopg2 开箱即用地支持 SSL,但你可能需要在初始化数据库时指定一些其他选项

db = CockroachDatabase(
    'my_app',
    user='root',
    host='10.1.0.8',
    sslmode='verify-full',  # Verify the cert common-name.
    sslrootcert='/path/to/root.crt')


# Or, alternatively, specified as part of a connection-string:
db = CockroachDatabase('postgresql://root:secret@host:26257/dbname'
                       '?sslmode=verify-full&sslrootcert=/path/to/root.crt'
                       '&options=--cluster=my-cluster-xyz')

有关客户端验证的更多详细信息,请参阅 libpq 文档

Cockroach 扩展 API

playhouse.cockroachdb 扩展模块提供以下类和帮助程序

在使用 CRDB 时可能派上用场的特殊字段类型

  • UUIDKeyField - 一个主键字段实现,它使用 CRDB 的 UUID 类型,并带有默认随机生成的 UUID。

  • RowIDField - 使用 CRDB 的 INT 类型和默认 unique_rowid() 的主键字段实现。

  • JSONField - 与 Postgres 的 BinaryJSONField 相同,因为 CRDB 将 JSON 视为 JSONB。

  • ArrayField - 与 Postgres 扩展相同(但不支持多维数组)。

CRDB 与 Postgres 的线协议兼容,并公开非常类似的 SQL 接口,因此可以使用 PostgresqlDatabase 与 CRDB(不推荐这样做)。

  1. CRDB 不支持嵌套事务(保存点),因此 atomic() 方法已实现为在使用 CockroachDatabase 时强制执行此操作。有关更多信息,请参阅 CRDB 事务

  2. CRDB 在字段类型、日期函数和 Postgres 的自省方面可能存在细微差别。

  3. CRDB 特定功能由 CockroachDatabase 公开,例如指定事务优先级或 AS OF SYSTEM TIME 子句。

CRDB 事务

CRDB 不支持嵌套事务(保存点),因此 atomic() 方法在 CockroachDatabase 上已修改为在遇到无效嵌套时引发异常。如果您希望能够嵌套事务代码,可以使用 transaction() 方法,该方法将确保最外层块管理事务(例如,退出嵌套块不会导致提前提交)。

示例

@db.transaction()
def create_user(username):
    return User.create(username=username)

def some_other_function():
    with db.transaction() as txn:
        # do some stuff...

        # This function is wrapped in a transaction, but the nested
        # transaction will be ignored and folded into the outer
        # transaction, as we are already in a wrapped-block (via the
        # context manager).
        create_user('[email protected]')

        # do other stuff.

    # At this point we have exited the outer-most block and the transaction
    # will be committed.
    return

CRDB 提供客户端事务重试,可以使用特殊的 run_transaction() 帮助程序。此帮助程序方法接受一个可调用对象,负责执行可能需要重试的任何事务性语句。

run_transaction() 最简单的示例

def create_user(email):
    # Callable that accepts a single argument (the database instance) and
    # which is responsible for executing the transactional SQL.
    def callback(db_ref):
        return User.create(email=email)

    return db.run_transaction(callback, max_attempts=10)

huey = create_user('[email protected]')

注意

如果在给定的尝试次数后无法提交事务,将引发 cockroachdb.ExceededMaxAttempts 异常。如果 SQL 格式错误、违反约束等,则该函数将向调用方引发异常。

使用 run_transaction() 实现客户端重试的示例,该重试用于将一笔金额从一个帐户转到另一个帐户的事务

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app')


def transfer_funds(from_id, to_id, amt):
    """
    Returns a 3-tuple of (success?, from balance, to balance). If there are
    not sufficient funds, then the original balances are returned.
    """
    def thunk(db_ref):
        src, dest = (Account
                     .select()
                     .where(Account.id.in_([from_id, to_id])))
        if src.id != from_id:
            src, dest = dest, src  # Swap order.

        # Cannot perform transfer, insufficient funds!
        if src.balance < amt:
            return False, src.balance, dest.balance

        # Update each account, returning the new balance.
        src, = (Account
                .update(balance=Account.balance - amt)
                .where(Account.id == from_id)
                .returning(Account.balance)
                .execute())
        dest, = (Account
                 .update(balance=Account.balance + amt)
                 .where(Account.id == to_id)
                 .returning(Account.balance)
                 .execute())
        return True, src.balance, dest.balance

    # Perform the queries that comprise a logical transaction. In the
    # event the transaction fails due to contention, it will be auto-
    # matically retried (up to 10 times).
    return db.run_transaction(thunk, max_attempts=10)

CRDB API

class CockroachDatabase(database[, **kwargs])

基于 PostgresqlDatabase 并使用 psycopg2 驱动程序的 CockroachDB 实现。

其他关键字参数会传递给 psycopg2 连接构造函数,可用于指定数据库 userport 等。

或者,可以在 URL 形式中指定连接详细信息。

run_transaction(callback[, max_attempts=None[, system_time=None[, priority=None]]])
参数
  • callback – 可调用对象,接受一个 db 参数(该参数将是调用此方法的数据库实例)。

  • max_attempts (int) – 放弃之前尝试的最大次数。

  • system_time (datetime) – 根据给定值执行事务 AS OF SYSTEM TIME

  • priority (str) – “low”、“normal” 或 “high”。

返回值

返回回调返回的值。

引发

ExceededMaxAttempts 如果 max_attempts 超出。

在具有自动客户端重试的事务中运行 SQL。

用户提供的 callback

  • 必须接受一个参数,即 db 实例,表示正在运行事务的连接。

  • 必须不尝试提交、回滚或以其他方式管理事务。

  • 可以被调用多次。

  • 应该理想情况下仅包含 SQL 操作。

此外,在调用此函数时,数据库不能有任何打开的事务,因为 CRDB 不支持嵌套事务。尝试这样做将引发 NotImplementedError

最简单的示例

def create_user(email):
    def callback(db_ref):
        return User.create(email=email)

    return db.run_transaction(callback, max_attempts=10)

user = create_user('[email protected]')
class PooledCockroachDatabase(database[, **kwargs])

基于 PooledPostgresqlDatabase 的 CockroachDB 连接池实现。实现与 CockroachDatabase 相同的 API,但将执行客户端连接池。

run_transaction(db, callback[, max_attempts=None[, system_time=None[, priority=None]]])

在具有自动客户端重试的事务中运行 SQL。有关详细信息,请参见 CockroachDatabase.run_transaction()

参数
  • db (CockroachDatabase) – 数据库实例。

  • callback – 可接受单个 db 参数(将与上面传递的值相同)的可调用对象。

注意

此函数等同于 CockroachDatabase 类中同名方法。

class UUIDKeyField

UUID 主键字段,使用 CRDB gen_random_uuid() 函数自动填充初始值。

class RowIDField

使用 CRDB unique_rowid() 函数自动填充初始值的自动增量整数主键字段。

另请参阅

  • BinaryJSONField 来自 Postgresql 扩展(在 cockroachdb 扩展模块中可用,并别名为 JSONField)。

  • ArrayField 来自 Postgresql 扩展。