《asyncio 系列》5. 基于 asyncio + SQLAlchemy 并发请求数据库

发布时间 2023-05-04 14:40:12作者: 古明地盆

楔子

上一篇文章探讨了使用 aiohttp 库发出非阻塞 Web 请求,还讨论了使用几种不同的异步 API 来同时运行这些请求。通过结合 asyncio API 和 aiohttp 库,可以同时运行多个耗时的 Web 请求,从而提高应用程序的运行速度。但我们上一章学到的概念不仅适用于 Web 请求,也适用于运行 SOL 查询,且可提高数据库密集型应用程序的性能。

与 Web 请求非常相似,我们需要使用对异步友好的库,因为典型的 SOL 库会阻塞主线程,因此会阻塞事件循环,直到检索到结果为止。在本篇文章中,我们将学习如何异步操作数据库,而想要做到这一点,我们就必须将同步驱动换成异步驱动。

那么异步驱动都有哪些呢?

  • aiosqlite:用于连接 SQLite;
  • asyncmy、aiomysql:用于连接 MySQL;
  • asyncpg、aiopg:用于连接 PostgreSQL;
  • cx_Oracle_async:用于连接 Oracle;
  • aioredis:用于连接 Redis;

现如今 Python 已经进化到 3.11 了,适配不同数据库的异步驱动也已经非常成熟了。但这里我要介绍的不是这些驱动,而是 ORM。不同的驱动使用起来会有一些差异,而 ORM 提供了一个统一的上层接口,屏蔽了不同驱动之间的差异。

Python 里面最有名的 ORM 莫过于 SQLAlchemy,在早期它是一个同步的 ORM,只能适配一些同步驱动。不过从 1.4 版本的时候引入了协程,支持了异步功能,并且在使用上和之前没有太大区别。下面我们来看一下它的用法,并介绍一些最佳实践。

安装 MySQL、PostgreSQL

关于数据库的异步操作就以这两个数据库为例,我们先安装它们,这里我使用的操作系统是 CentOS。

安装 MySQL

1)我们需要去官网下载 MySQL 所需要的安装包。

默认显示的是最新版,当然你也可以点击 Archives 来指定下载的版本,这里我选择的版本是 8.0.30。

然后是选择 Operating System,页面默认显示的便是你当前的操作系统,但这里我们下载安装在 CentOS 7 上的 MySQL,所以将操作系统选择为:Red Hat Enterprise Linux / Oracle Linux。

注意:当你选择操作系统之后,下面会多出来一个下拉菜单:选择 OS Version,也就是指定 Linux 内核版本,这里我们使用的是 CentOS 7,所以选择 Red Hat Enterprise Linux 7 / Oracle Linux 7 (x86, 64-bit)。

然后我们就可以安装了,下面会提供很多种安装包,我们找到 RPM Bundle(一般是第一个),然后点击 Download 下载即可。

2)下载完毕之后是一个 tar 包,我们将其丢到服务器上。

上传成功之后先不着急安装,我们先查看当前是否安装了 MySQL 或者 MariaDB,如果安装了那么将其卸载掉(如果你执行安装 MySQL 8.x 版本的话),具体方式如下:

[root@satori ~]# rpm -qa | grep mariadb
mariadb-libs-5.5.68-1.el7.x86_64
# 卸载掉 MariaDB
[root@matsuri ~]# rpm -e mariadb-libs-5.5.68-1.el7.x86_64 --nodeps
# 再次查看
[root@matsuri ~]# rpm -qa | grep mariadb

然后 cd 到 /usr/local 目录,这个目录是存放一些本地的共享资源的,我们在里面创建一个名叫 mysql 的目录,然后将之前的 MySQL 安装包解压到该目录下面。

tar -zxvf mysql-8.0.30-1.el7.x86_64.rpm-bundle.tar -C /usr/local/mysql/
cd /usr/local/mysql/

3)使用 rpm 进行安装。

rpm -ivh mysql-community-common-8.0.30-1.el7.x86_64.rpm --nodeps --force
rpm -ivh mysql-community-libs-8.0.30-1.el7.x86_64.rpm --nodeps --force
rpm -ivh mysql-community-client-8.0.30-1.el7.x86_64.rpm --nodeps --force
rpm -ivh mysql-community-server-8.0.30-1.el7.x86_64.rpm --nodeps --force

通过 rpm -qa | grep mysql 命令查看 mysql 的安装包。

4)MySQL 数据库的初始化和相关配置。

mysqld --initialize
# 安装之后会自动创建一个 mysql 组、该组下面有一个 mysql 用户
# 我们赋予其操作 /var/lib/mysql 目录的权限
chown mysql:mysql /var/lib/mysql -R
# 开启服务
systemctl start mysqld.service
# 设置开启自启
systemctl enable mysqld

5)查看数据库的初始密码并进行修改。

cat /var/log/mysqld.log | grep password

命令执行完之后会显示 root 用户的登陆密码,这个密码是随机生成的,然后我们把它改掉。使用初始密码登入到数据库中,然后使用如下命令对密码进行修改:

ALTER USER 'root'@'localhost' IDENTIFIED WITH MYSQL_NATIVE_PASSWORD BY '你的密码';

然后退出,使用新密码进行登录,发现登录成功。

6)通过以下命令,进行远程访问的授权。

# 创建一个用户叫 root, 这个是从外界连接的时候所使用的用户
# 然后 % 表示接收任意的 IP, 也就是任何一台计算机都可以通过 root 用户来连接
# 同理密码也是从外界连接的时候所使用的密码 
create user 'root'@'%' identified with mysql_native_password by '你的密码';
# 当然你也可以创建其它用户, 比如: create user 'abc'@'%' identified with mysql_native_password by '123456';
# 那么外界便可以通过 用户名: "abc", 密码: "123456" 连接到当前服务器上的 MySQL

# 进行授权
grant all privileges on *.* to 'root'@'%' with grant option;
# 刷新权限
flush privileges;

到此我们就配置完成了,至于 MySQL 的一些配置可以通过配置文件 /etc/my.cnf 进行修改。

在外界连接时,不要忘记开放 3306 端口。

安装 PostgreSQL

安装 PostgreSQL 比较简单,我们直接去 https://www.postgresql.org/ 页面,点击 Download,然后会进入如下页面:

直接点击 Linux,会弹出 Linux 发行版种类,让你选择,我们选择 Red Hat/CentOS。之后会进入新的页面,让你选择版本,操作系统种类以及架构。

根据当前的系统进行选择,完毕之后直接显示安装命令,我们按照上面的提示进行操作即可。安装完成之后,默认会有一个 postgres 用户,密码为空。我们切换到 postgres 用户,然后输入 psql 即可进入控制台,输入 \password postgres 即可设置用户 postgres 的密码。

下面我们就可以在外界通过 postgres 用户进行连接了,密码就是新设置的密码。但是如果现在就连接的话其实是连接不上的,因为此时 PostgreSQL 数据库还不接受外界的任何请求,我们需要修改配置文件,首先在目录 /var/lib/pgsql/11/data/ 中有两个重要的文件:pg_hba.conf、postgresql.conf。

  • pg_hba.conf: 配置对数据库的访问权限
  • postgresql.conf: 配置PostgreSQL数据库服务器的相应的参数

我们需要对这两个文件进行修改:

# 修改 pg_hba.conf, 在文件下方加入如下内容
# 表示允许所有主机使用所有合法的用户名访问数据库,并提供加密的密码验证
host    all             all             0.0.0.0/0               trust

# 修改 postgresql.conf, 里面会有一个 #listen_addresses = "localhost", 这被注释掉了
# 凡是被注释掉的, 都是默认值, 我们将注释打开, 然后把 localhost 改成 *, 表示允许数据库监听来自任何主机的连接请求
listen_addresses = '*'
# 在这个文件里面我们可以修改很多配置, 比如监听的端口等等

修改之后重启数据库服务,直接 systemctl restart postgresql-11 即可,然后将 5432 端口打开,连接就没有任何问题了。

创建一个异步引擎

SQLAlchemy 不具备连接数据库的能力,它连接数据库还是使用了驱动,所以在使用之前我们必须先下载一个驱动才行。这里我以 MySQL 和 PostgreSQL 为例,使用的异步驱动为 asyncmy 和 asyncpg,直接 pip install asyncmy asyncpg 安装即可。

版本信息:SQLAlchemy 版本是 2.0.3,asncmy 版本是 0.2.5,asyncpg 版本是 0.25.0。

首先我们在 MySQL 和 PostgreSQL 的默认数据库下创建一张表:

-- MySQL 建表语句
CREATE TABLE girls (
    id INT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    age INT,
    address VARCHAR(255)
);

-- Postgres 建表语句
CREATE TABLE girls (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    age INT,
    address VARCHAR(255)
);

-- 然后再往里面插入几条数据
INSERT INTO girls (name, age, address)
VALUES ('古明地觉', 17, '地灵殿'),
('古明地恋', 16, '地灵殿'),
('雾雨魔理沙', 19, '魔法森林'),
('琪露诺', 60, '雾之湖');

然后我们来看看如何创建异步引擎去连接 MySQL 和 PostgreSQL。

"""
使用 create_engine 创建同步引擎
使用 create_async_engine 创建异步引擎

同步引擎搭配同步驱动
异步引擎搭配异步驱动
"""
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.engine import URL

# 也可以直接传递一个字符串,参数和 create_engine 是一样的
# create_async_engine("mysql+asyncmy://...")
mysql_engine = create_async_engine(
    URL.create("mysql+asyncmy",
               username="root",
               password="123456",
               host="82.157.146.194",
               port=3306,
               database="mysql")
)

# 如果是 Postgres,那么把驱动换一下即可
postgres_engine = create_async_engine(
    URL.create("postgresql+asyncpg",
               username="root",
               password="123456",
               host="82.157.146.194",
               port=5432,
               database="postgres")
)
# MySQL 的默认数据库为 mysql
# PostgreSQL 的默认数据库为 postgres

以上我们就创建了一个异步引擎,创建方式和同步引擎没什么区别,它们的参数也都是一样的。create_engien 会返回一个 Engine 对象,而 create_async_engien 则返回一个 AsyncEngine 对象,引擎会负责将 pool 和 dialect 封装在一起,从而实现对数据库的操作。并且创建引擎的时候还可以指定其它参数:

  • pool_size:数据库连接池的初始容量;
  • max_overflow:数据库连接池的最大溢出量,该容量加上初始容量就是最大容量,超出之后再创建连接就会阻塞等待。而等待时长由参数 timeout 控制,默认为 30;
  • pool_recycle:重连周期;

既然引擎有了,那么如何用该引擎操作数据库呢?

操作 MySQL

先来看看 MySQL。

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text

engine = create_async_engine(
    "mysql+asyncmy://root:123456@82.157.146.194/mysql")

# 需要定义一个协程函数
async def get_data():
    # 引擎内部维护了一个连接池,engine.connect() 会从池子里取出一个连接
    async with engine.connect() as conn:
        # 调用 conn.execute() 执行 SQL 语句,SQL 语句需要传到 text 方法中
        query = text("SELECT * FROM girls")
        result = await conn.execute(query)

    # 返回的 result 是一个 CursorResult 对象,通过 keys 方法可以拿到选择的字段
    columns = result.keys()
    print(columns)
    """
    RMKeyView(['id', 'name', 'age', 'address'])
    """
    # 调用 result.fetchone() 拿到单条数据
    data = result.fetchone()
    print(data)
    """
    (1, '古明地觉', 17, '地灵殿')
    """
    # 虽然显示的是一个元组,但它其实是一个 Row 对象,不过可以当成元组来用
    # 我们将它转成字典
    print(dict(zip(columns, data)))
    """
    {'id': 1, 'name': '古明地觉', 'age': 17, 'address': '地灵殿'}
    """

    # result 内部有一个游标
    # 再调用 result.fetchone() 会返回下一条数据
    print(result.fetchone())
    print(result.fetchone())
    print(result.fetchone())
    """
    (2, '古明地恋', 16, '地灵殿')
    (3, '雾雨魔理沙', 19, '魔法森林')
    (4, '琪露诺', 60, '雾之湖')
    """
    # 库里面总共就 4 条数据
    # 所以当没有数据时,就会返回 None
    print(result.fetchone())
    """
    None
    """

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_data())

用法很简单,通过 engine.connect() 可以从池子里面取出一个连接,再调用连接的 execute 方法执行 SQL 语句即可。但需要注意:字符串格式的 SQL 语句不能直接传递,需要先调用 SQLAlchemy 提供的 text 方法。

执行完毕之后,会返回一个 CursorResult 对象,调用它的 fetchone 方法会逐条返回结果集的数据。当然除了 fetchone,还有 fetchmany 和 fetchall,我们来看一下。

async def get_data():
    async with engine.connect() as conn:
        query = text("SELECT * FROM girls")
        result = await conn.execute(query)
    # 从结果集取三条数据
    data = result.fetchmany(3)
    print(data)
    """
    [(1, '古明地觉', 17, '地灵殿'), 
     (2, '古明地恋', 16, '地灵殿'), 
     (3, '雾雨魔理沙', 19, '魔法森林')]
    """
    # 再取两条数据,但显然此时只剩下一条了
    data = result.fetchmany(2)
    print(data)
    """
    [(4, '琪露诺', 60, '雾之湖')]
    """
    # 如果没有数据了,fetchmany 会返回空列表
    data = result.fetchmany(1)
    print(data)
    """
    []
    """

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_data())

所以 fetchmany 接收一个整数,就是获取指定数量的数据。而 fetchall 就简单了,显然它是获取结果集的全部数据。

async def get_data():
    async with engine.connect() as conn:
        query = text("SELECT * FROM girls")
        result = await conn.execute(query)
    data = result.fetchall()
    print(data)
    """
    [(1, '古明地觉', 17, '地灵殿'), 
     (2, '古明地恋', 16, '地灵殿'), 
     (3, '雾雨魔理沙', 19, '魔法森林'), 
     (4, '琪露诺', 60, '雾之湖')]
    """
    # 列表里面的 Row 对象都转成字典
    columns = result.keys()
    print([dict(zip(columns, d)) for d in data])
    """
    [{'id': 1, 'name': '古明地觉', 'age': 17, 'address': '地灵殿'}, 
     {'id': 2, 'name': '古明地恋', 'age': 16, 'address': '地灵殿'}, 
     {'id': 3, 'name': '雾雨魔理沙', 'age': 19, 'address': '魔法森林'}, 
     {'id': 4, 'name': '琪露诺', 'age': 60, 'address': '雾之湖'}]
    """

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_data())

还是比较简单的,通过 CursorResult 的这三个方法,便可以获取想要的数据。然后再补充一点,我们说 SQL 语句需要放在 text 方法中,然后才能传给连接的 execute 方法。虽然这个过程稍微有点麻烦,但好处就是我们可以使用 SQLAlchemy 提供的占位符功能。

async def get_data():
    async with engine.connect() as conn:
        # :id 就是一个占位符,那么它等于多少呢?
        # 再调用 bindparams 指定即可
        # 并且占位符的数量没有限制
        query = text(
            "SELECT * FROM girls WHERE id > :id"
        ).bindparams(id=1)
        result = await conn.execute(query)
    data = result.fetchall()
    # 此时只返回了两条数据
    print([dict(zip(result.keys(), d)) for d in data])
    """
    [{'id': 2, 'name': '古明地恋', 'age': 16, 'address': '地灵殿'}, 
     {'id': 3, 'name': '雾雨魔理沙', 'age': 19, 'address': '魔法森林'}, 
     {'id': 4, 'name': '琪露诺', 'age': 60, 'address': '雾之湖'}]
    """

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_data())

以后执行 SQL 语句的时候,就通过这种方式去执行即可。

操作 PostgreSQL

再来看看如何操作 PostgreSQL,方式是一样的,只是将驱动换一下即可。

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text

# 连接 PostgreSQL
engine = create_async_engine(
    "postgresql+asyncpg://postgres:123456@82.157.146.194/postgres")

async def get_data():
    async with engine.connect() as conn:
        query = text("SELECT * FROM girls")
        result = await conn.execute(query)

    columns = result.keys()
    print(columns)
    """
    RMKeyView(['id', 'name', 'age', 'address'])
    """
    data = result.fetchone()
    print(data)
    """
    (1, '古明地觉', 17, '地灵殿')
    """
    # 转成字典
    print(dict(zip(columns, data)))
    """
    {'id': 1, 'name': '古明地觉', 'age': 17, 'address': '地灵殿'}
    """
    # 获取两条数据
    print(result.fetchmany(2))
    """
    [(2, '古明地恋', 16, '地灵殿'), (3, '雾雨魔理沙', 19, '魔法森林')]
    """
    # 获取剩余的全部数据,还剩下一条
    print(result.fetchall())
    """
    [(4, '琪露诺', 60, '雾之湖')]
    """

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(get_data())

我们看到操作方式和 MySQL 是一模一样的,所以这就是 ORM 的好处,将细节方面的差异帮我们屏蔽掉了。

目前这里只介绍了查询,增删改还没有说,下面来看看它在面对增删改时的表现。

添加数据

先来看看 MySQL 的添加数据:

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import Table, MetaData, Column
from sqlalchemy.dialects.mysql import INTEGER, VARCHAR

engine = create_async_engine(
    "mysql+asyncmy://root:123456@82.157.146.194/mysql")

async def insert_data():
    # 构建数据库表
    table = Table(
        "girls",  # 表名
        MetaData(),  # MetaData() 实例
        # 表里面的列
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("age", INTEGER),
        Column("address", VARCHAR)
    )
    async with engine.connect() as conn:
        query = table.insert().values(
            {"name": "芙兰朵露", "age": 400, "address": "红魔馆"})
        result = await conn.execute(query)
        # 返回受影响的行数
        print(result.rowcount)  # 1
        # 返回数据在插入之后的主键
        print(result.inserted_primary_key)  # (5,)
        # 对于增删改而言,还必须调用一次 commit
        # 否则数据不会写入到库中
        await conn.commit()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(insert_data())

以上是插入单条数据,我们也可以同时插入多条数据。而方法也很简单,插入单条数据是往 values 里面传一个字典,而插入多条数据只需要传一个列表即可。

async def insert_data():
    # 构建数据库表
    table = Table(
        "girls", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("age", INTEGER),
        Column("address", VARCHAR)
    )
    async with engine.connect() as conn:
        query = table.insert().values(
            [{"name": "八意永琳", "age": 20, "address": "永远亭"},
             {"name": "十六夜咲夜", "age": 33, "address": "红魔乡"}])
        await conn.execute(query)
        await conn.commit()

我们看一下数据库,看看数据有没有变化。

数据成功地写入到库中了,以上是 MySQL 的数据写入,然后再来看看 PostgreSQL。

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import Table, MetaData, Column
from sqlalchemy.dialects.mysql import INTEGER, VARCHAR

engine = create_async_engine(
    "postgresql+asyncpg://postgres:123456@82.157.146.194/postgres")

async def insert_data():
    # 构建数据库表
    table = Table(
        "girls", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("age", INTEGER),
        Column("address", VARCHAR)
    )
    async with engine.connect() as conn:
        query = table.insert().values(
            {"name": "芙兰朵露", "age": 400, "address": "红魔馆"})
        await conn.execute(query)
        query = table.insert().values(
            [{"name": "八意永琳", "age": 20, "address": "永远亭"},
             {"name": "十六夜咲夜", "age": 33, "address": "红魔乡"}])
        await conn.execute(query)
        # 插入数据,然后提交
        await conn.commit()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(insert_data())

方式一模一样,没区别,只需要将引擎换一下即可。我们查看数据库,看数据有没有添加进去。

依旧没问题,增加数据我们就介绍完了。当然啦,我们这里是先创建一个 Table 对象,然后借助于 Table 对象自动帮我们拼接 SQL 语句。你也可以手动编写 SQL 语句,但是不要忘记使用 text 函数包装一下。

修改数据

然后是修改数据,先来看 MySQL。

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import Table, MetaData, Column
from sqlalchemy.dialects.mysql import INTEGER, VARCHAR

engine = create_async_engine(
    "mysql+asyncmy://root:123456@82.157.146.194/mysql")

async def modify_data():
    # 构建数据库表
    table = Table(
        "girls", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("age", INTEGER),
        Column("address", VARCHAR)
    )
    async with engine.connect() as conn:
        # 修改 id = 1 的 name 字段
        query = table.update().where(
            table.c.id == 1).values({"name": "satori"})
        result = await conn.execute(query)
        print(result.rowcount)  # 1

        # 少女们都长大了 10 岁
        # 不调用 where,则修改所有行
        query = table.update().values(
            {"age": Column("age") + 10}
        )
        result = await conn.execute(query)
        # 受影响的行数为 7
        print(result.rowcount)  # 7
        # 别忘了提交
        await conn.commit()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(modify_data())

看一下表数据有没有变:

数据成功被修改。另外这里的 where 只有单个条件,如果是多个条件,那么彼此之间使用 & 或 | 进行连接,代表 and 和 or。

然后是 PostgreSQL,操作方式没有任何改变,只需要将引擎换掉即可。

engine = create_async_engine(
    "postgresql+asyncpg://postgres:123456@82.157.146.194/postgres")

async def modify_data():
    table = Table(
        "girls", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("age", INTEGER),
        Column("address", VARCHAR)
    )
    async with engine.connect() as conn:
        query = table.update().where(
            table.c.id == 1).values({"name": "satori"})
        result = await conn.execute(query)
        print(result.rowcount)  # 1

        query = table.update().values(
            {"age": Column("age") + 10}
        )
        result = await conn.execute(query)
        print(result.rowcount)  # 7
        await conn.commit()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(modify_data())

看看数据有没有变化:

没有任何问题。

删除数据

最后是删除数据。

engine = create_async_engine(
    "mysql+asyncmy://root:123456@82.157.146.194/mysql")

async def delete_data():
    table = Table(
        "girls", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("age", INTEGER),
        Column("address", VARCHAR)
    )
    async with engine.connect() as conn:
        # 删除 id = 1 的数据
        query = table.delete().where(table.c.id == 1)
        result = await conn.execute(query)
        print(result.rowcount)  # 1

        # 删除 id 为 2、3 的数据
        query = table.delete().where(table.c.id.in_([2, 3]))
        result = await conn.execute(query)
        print(result.rowcount)  # 2
        await conn.commit()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(delete_data())

数据有没有变化呢?

成功将数据删掉了,再来看看 PostgreSQL,代码不变,只需要将引擎换掉即可。

engine = create_async_engine(
    "postgresql+asyncpg://postgres:123456@82.157.146.194/postgres")

async def delete_data():
    table = Table(
        "girls", MetaData(),
        Column("id", INTEGER, primary_key=True,
               autoincrement=True),
        Column("name", VARCHAR),
        Column("age", INTEGER),
        Column("address", VARCHAR)
    )
    async with engine.connect() as conn:
        # 删除 id = 1 的数据
        query = table.delete().where(table.c.id == 1)
        result = await conn.execute(query)
        print(result.rowcount)  # 1

        # 删除 id 为 2、3 的数据
        query = table.delete().where(table.c.id.in_([2, 3]))
        result = await conn.execute(query)
        print(result.rowcount)  # 2
        await conn.commit()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(delete_data())

查看数据:

一切正常。

以上就是数据的增删改查,还是很简单的,在使用了 ORM 之后,即使是不同的数据库,操作方式也没有任何变化。

异步引擎的性能提升

必须要说明的是,如果只是单次的数据库请求,那么同步引擎和异步引擎之间没什么差异,耗时是差不多的。但如果是多个请求,那么异步引擎可以实现并发访问,我们举个例子。这里为了更好地观察到现象,我往表里写了 100w 条数据。

async def get_data():
    async with engine.connect() as conn:
        query = text("SELECT * FROM girl")
        await conn.execute(query)

async def main():
    start = time.perf_counter()
    await get_data()
    end = time.perf_counter()
    print(f"单次请求耗时: {end - start}s")
    """
    单次请求耗时: 26.8164807s
    """

    start = time.perf_counter()
    await asyncio.gather(*[get_data()] * 20)
    end = time.perf_counter()
    print(f"二十次请求耗时: {end - start}s")
    """
    二十次请求耗时: 27.2821891s
    """

    start = time.perf_counter()
    await asyncio.gather(*[get_data()] * 50)
    end = time.perf_counter()
    print(f"五十次请求耗时: {end - start}s")
    """
    五十次请求耗时: 27.480469s
    """

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

可以看到耗时是差不多的,如果你写了一个服务,请求过来的时候需要从数据库读数据(假设耗时 2s),然后返回。那么无论是来一个请求,还是同时来十个请求,耗时都是差不多的,大概 2s。可能同时处理十个请求的耗时会多一些,但不会多太多,因为请求数据库这个过程是并发进行的。

当然啦,并发处理的请求数肯定也是有上限的,不可能无限大,因为数据库连接池内部的连接数量是有限的。所以任何一个由多个组件构成的系统,随着并发数的提高,总会出现瓶颈。可能一开始的瓶颈是服务访问数据库的连接数量不够,但随着连接的增多,瓶颈又会转移到数据库上。这个时候可以搭建一个 MySQL 集群,以及引入 Redis 缓存,进一步提升并发量。

所以服务到底选择什么样的架构,取决于你的业务量,随着业务量的增大,一开始行之有效的架构设计就会变得力不从心,总会在某个地方出现瓶颈。我们只能根据实际情况进行调整,使得服务的处理能力尽可能地延展下去。

引擎的反射

在使用同步引擎的时候,我们应该都用过它的反射功能,举个例子。

from pprint import pprint
from sqlalchemy import create_engine
from sqlalchemy import inspect

# 此处为同步引擎
engine = create_engine(
    "mysql+pymysql://root:123456@82.157.146.194/mysql")
inspector = inspect(engine)

# 返回当前数据库下都有哪些表
pprint(inspector.get_table_names())
"""
['columns_priv',
 'component',
 'db',
 'default_roles',
 ......
"""
# 返回默认的数据库
pprint(inspector.default_schema_name)
"""
'mysql'
"""
# 返回所有的数据库
# 如果是 PostgreSQL,则返回 schema
pprint(inspector.get_schema_names())
"""
['information_schema', 'mysql', 
 'performance_schema', 'sys']
"""
# 返回当前数据库下都有哪些视图
pprint(inspector.get_view_names())
"""
[]
"""
# 查看一张表都有哪些列
# 里面包含了列名、类型、默认值、注释等信息
pprint(inspector.get_columns("girl"))
"""
[{'autoincrement': True,
  'comment': None,
  'default': None,
  'name': 'id',
  'nullable': False,
  'type': INTEGER()},
 {'comment': None,
  'default': None,
  'name': 'name',
  'nullable': True,
  'type': VARCHAR(length=255)},
 {'autoincrement': False,
  'comment': None,
  'default': None,
  'name': 'height',
  'nullable': True,
  'type': INTEGER()}]
"""
# 返回一张表的主键约束
pprint(inspector.get_pk_constraint("girl"))
# 返回一张表的所有外键
pprint(inspector.get_foreign_keys("girl"))
# 返回一张表的索引
pprint(inspector.get_indexes("girl"))
# 返回一张表的唯一性约束
pprint(inspector.get_unique_constraints("girl"))
# 返回一张表的注释
pprint(inspector.get_table_comment("girl"))

通过反射引擎,我们可以拿到很多的元信息。当然,也能将一张表反射出来。但这是同步引擎才具有的功能,异步引擎目前还不支持反射。当然这些信息本质上也是执行了相关查询才获取到的,我们也可以使用异步引擎手动执行,比如查看表字段信息:

async def main():
    async with engine.connect() as conn:
        query = text("SELECT COLUMN_NAME, DATA_TYPE "
                     "FROM INFORMATION_SCHEMA.COLUMNS "
                     "WHERE TABLE_NAME='girl'")
        data = (await conn.execute(query)).fetchall()
        print(list(map(dict, data)))
        """
        [{'COLUMN_NAME': 'height', 'DATA_TYPE': 'int'}, 
         {'COLUMN_NAME': 'id', 'DATA_TYPE': 'int'}, 
         {'COLUMN_NAME': 'name', 'DATA_TYPE': 'varchar'}]
        """

其它的一些元信息也可以通过查询的方式获取。

小结

以上就是 SQLAlchemy + 协程相关的内容,如果你使用的是 FastAPI、Sanic 之类的框架,那么也应该要搭配一个异步的 ORM 才能发挥出威力。