Sqlalchemy异步操作不完全指北 #
不完全
抄袭
自: 米洛丶 - 博客园 cnblogs.com
异步SQLAlchemy #
SQLAlchemy作为一款通用
的Python Orm工具,在最近的版本也支持了异步操作。但网上很多资料都不是很齐全,API也不是很好查询的情况下,我便有了整理一份基础文档的想法。文章主要会以CRUD为入口,解决大家最基本的需求。
engine的区别 #
在普通的SQLAlchemy中,建立engine对象,我们会采用下面的方式:
from sqlalchemy import create_engine
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_recycle=1500)
而异步的方式如下:
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(ASYNC_SQLALCHEMY_URI, pool_recycle=1500)
链接参数 #
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(
SQLALCHEMY_DATABASE_URL,
pool_size=10, # 设置最大连接池大小
max_overflow=20, # 设置最大溢出连接数
pool_timeout=30, # 设置获取连接的超时时间(单位:秒)
pool_recycle=1500, # 设置连接回收时间(单位:秒)
echo=True # 设置为True以便于调试,显示SQL日志
)
session的区别 #
我们一般用sessionmaker来建立session,不过异步的有点区别:
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker
# 同步session
Session = sessionmaker(engine)
# 异步session 区别在于需要指定对应的class_
async_session = sessionmaker(async_engine, class_=AsyncSession)
建立会话 #
我们还是以代码的形式展示:
# 同步
with Session() as session:
# 里面是具体的sql操作
pass
# 异步
async with Session() as session:
# 里面是异步的操作,区别就是从with变成了async with 也就意味着方法必须是async修饰的
pass
以上是关于建立连接,处理会话的一些区别,接着我们讲对应的CRUD操作。
查询 #
结果处理: https://docs.sqlalchemy.org/en/21/core/connections.html#sqlalchemy.engine.Result.scalars
这里依旧会给出新老版本的对比:
# 注意Session为同步Session,为了区分,异步session为async_session
# model则为具体的Model类
# 异步查询方式
from sqlalchemy import select
async def query():
async with async_session() as session:
sql = select(model).where(model.id == 1)
print(sql) # 这里可以打印出sql
result = await session.execute(sql)
# 第一条数据
data = result.scalars().first()
# result.scalars().one_or_none() 不报错的获取方式
# 所有数据
# data = result.scalars().all()
# 同步查询方式一
def query():
with Session() as session:
# 查询id=1的第一条数据 result对应的就是model的实例 如果没有则是None
result = session.query(model).filter_by(id=1).first()
# 查询所有数据 result对应的数据为List[model],即model数组
# result = session.query(model).filter_by(name="zhangsan").all()
# 同步查询方式二
def query():
with Session() as session:
# 查询id=1的第一条数据 result对应的就是model的实例 如果没有则是None
result = session.query(model).filter(model.id == 1).first()
# 查询所有数据 result对应的数据为List[model],即model数组
# result = session.query(model).filter(model.name == "zhangsan").all()
新增 #
这里开始就只讲异步的操作了。
async的新增不行啊, 好奇怪
data = DataModel(name='', phone='')
async def insert(data):
async with async_session() as session:
async with session.begin():
session.add(data)
# 刷新自带的主键
await session.flush()
# 释放这个data数据
session.expunge(data)
return data
先说一下session.begin,这个你可以理解为一个事务操作,当采用session的begin方法后,你可以发现我们不需要调用commit方法也能把修改存入数据库。
expunge方法,是用例释放这个实例,SQLAlchemy有个特点,当你的session会话结束以后,它会销毁你插入的这种临时数据,你再想访问这个data就访问不了了。所以我们可以释放这个数据。(expunge的作用)
同步方法 #
session.add(_data)
session.commit()
这样就行, 异步的不行… ID都拿到了, 但是数据库里查不到. 异步用上面的方法也不得行.
编辑 #
一般编辑有2种方式:
- 查询出对应的数据,在数据上修改
- 根据key-value的形式,修改对应数据的字段
from sqlalchemy import select, update
# 方式一
async def update_record(model):
async with async_session() as session:
async with session.begin():
result = await session.execute(select(model).where(id=1))
now = result.scalars().first()
if now is None:
raise Exception("记录不存在")
now.name = "李四"
now.age = 23
# 这里测试过,如果去掉flush会导致数据不更新
await session.flush()
session.expunge(now)
return now
# 方式二
async def update_by_map():
async with async_session() as session:
async with session.begin():
# 更新id为1的数据,并把name改为李四 age改为23
sql = update(model).where(model.id == 1).values(name="李四", age=23)
await session.execute(sql)
删除 #
删除的话,软删除大家都是update,所以不需要多说,物理删除的话,也有两种方式:
- 查到以后删除之
- 直接根据条件删除(这种我没有仔细研究,我选的是第一种方式,容错率高点)
async def delete_by_id():
async with async_session() as session:
async with session.begin():
result = await session.execute(select(model).where(model.id == 2))
original = result.scalars().first()
if original is None:
raise Exception("记录不存在")
# 如果是多条
# session.delete(original)
# for item in result:
# session.delete(item)
压缩 #
并不支持 mysql
--compress
连接参数,以下是个伪实现
from sqlalchemy.orm
import validates
class MyClass(Base):
# ...
data = Column(BLOB)
@validates("data")
def _set_data(self, key, value):
return func.compress(value)
查询 #
逻辑操作符 #
1. and or #
from sqlalchemy import and_, or_
print(
select(Address.email_address).where(
and_(
or_(User.name == "squidward", User.name == "sandy"),
Address.user_id == User.id,
)
)
)
2. null #
sql = select(DeviceData).where(DeviceData.device_json_new.is_not(None))
sql = select(DeviceData).where(DeviceData.device_json_new.is_(None))
自适应查询 #
async with async_session_install() as session:
# 使用 DeviceData 模型直接构建查询
sql = select(DeviceData).where(DeviceData.device_json_new.is_not(None))
sql_max_min = select(func.max(DeviceData.id).label("max_id"),func.min(DeviceData.id).label("min_id")).where(DeviceData.device_json_new.is_not(None),DeviceData.brand_name == ) for key, value in filters.items():
if not hasattr(DeviceData, key):
continue
sql = sql.where(getattr(DeviceData, key) == value)
if limit != 0:
sql = sql.limit(limit)
if offset:
sql = sql.offset(offset)
logger.debug(f"fetch sql is: {sql}")
exec = await session.execute(sql)
results = exec.scalars().all()
异步关系 #
class AppPackage(Base):
__tablename__ = 'app_packages'
id = Column(Integer, primary_key=True, autoincrement=True)
app_id = Column(String(64), unique=True, nullable=False)
description = Column(Text)
icon = Column(String(128))
categories = Column(JSON)
tags = Column(JSON)
created_at = Column(TIMESTAMP, nullable=True, server_default=text('CURRENT_TIMESTAMP'))
updated_at = Column(TIMESTAMP, nullable=True, onupdate=text('CURRENT_TIMESTAMP'))
# Relationship to AppPlatform
platforms = relationship('AppPlatform', back_populates='app_package', cascade='all, delete')
class AppPlatform(Base):
__tablename__ = 'app_platforms'
id = Column(Integer, primary_key=True, autoincrement=True)
app_id = Column(String(64), ForeignKey('app_packages.app_id', ondelete='CASCADE'), nullable=False)
platform = Column(String(16), nullable=False)
# Relationship to AppPackage
app_package = relationship('AppPackage', back_populates='platforms')
# Read
async def get_package_by(app_id, platform) -> AppInfo:
async with async_session_install() as session:
appinfo = AppInfo(pkg_name=app_id)
sql = select(AppPackage).options(joinedload(AppPackage.platforms)).where(AppPackage.app_id == app_id, AppPlatform.platform == platform)
rst = await session.execute(sql)
import ipdb; ipdb.set_trace()
info = rst.scalars().first()
if info:
appinfo.description = info.description
appinfo.tags = info.tags
appinfo.categories = info.categories
logger.info(f"app_id: {app_id}, len platforms: {len(info.platforms)}")
return appinfo
else:
return None
同步用就可
sql = select(AppPackage).join(AppPackage).where(AppPackage.app_id == app_id, AppPlatform.platform == platform)
like #
sql_total = sql_total.where(or_(AppPackage.app_id.like(f'%{package}%') for package in params.data.package))
方法(动态表) #
创建表 #
async def create_table_if_not_exists(package_name: str) -> bool:
"""
创建包-账号-使用次数对照表
"""
try:
table_name = TableNamePrefix.format(package_name=package_name)
if table_name not in await get_installer_table_names():
await SavingsPool._add_support_package(package_name)
async with async_engine.begin() as conn:
# 创建表
table = Table(
table_name, metadata,
Column('id', Integer, primary_key=True),
Column('account_id', Integer, comment="账号ID, 账号表中的主键id"),
Column('count', SmallInteger, comment="该账号被使用的次数"),
Column('created_at', DateTime, server_default=func.now(), nullable=False, comment="UTC time"),
Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), nullable=False, comment="UTC time")
)
await conn.run_sync(metadata.create_all, tables=[table])
logger.info(f"Table '{table_name}' created successfully.")
return True
except Exception as e:
logger.error(f"Failed to create table '{table_name}': {e}, details: {traceback.format_exc()}")
return False
1. 有外键&有unsigned的时候(只一个) #
仔细对比与
2
中差异
async def create_table_if_not_exists(table_name: str) -> bool:
"""
创建包-账号-使用次数对照表
"""
try:
if table_name not in await get_installer_table_names():
async with async_engine.begin() as conn:
# 加载进已经存在的表
await conn.run_sync(lambda conn: metadata.reflect(bind=conn, only=['google_accounts']))
logger.info("Existing tables loaded into metadata.")
# 创建表
table = Table(
table_name, metadata,
Column('id', Integer, primary_key=True),
Column('account_id', INTEGER(unsigned=True), unique=True, nullable=False, comment="账号ID, 账号表中的主键id"),
Column('count', SmallInteger, comment="该账号被使用的次数"),
Column('created_at', DateTime, server_default=func.now(), nullable=False, comment="UTC time"),
Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), nullable=False, comment="UTC time"),
ForeignKeyConstraint(['account_id'], ['google_accounts.id'], name='fk_account_id'),
mysql_engine='InnoDB',
extend_existing=True
)
await conn.run_sync(lambda conn: metadata.create_all(bind=conn, tables=[table]))
logger.info(f"Table '{table_name}' created successfully.")
else:
logger.info(f"Table '{table_name}' already exists.")
return True
except Exception as e:
logger.error(f"Failed to create table '{table_name}': {e}, details: {traceback.format_exc()}")
return False
2. 有外键&有unsigned的时候(可多个) #
async with async_engine.begin() as conn:
# 加载进已经存在的表
metadata.clear() # 没有这行(unique(`account_id`)会随着调用次数变多在建表语句中越来越多)
await conn.run_sync(lambda conn: metadata.reflect(bind=conn, only=['google_accounts']))
logger.info("Existing tables loaded into metadata.")
# 创建表
fk_name = package_name.replace('.', '_')
table = Table(
table_name, metadata,
Column('id', Integer, primary_key=True),
Column('account_id', INTEGER(unsigned=True), unique=True, nullable=False, comment="账号ID, 账号表中的主键id"),
Column('count', SmallInteger, comment="该账号被使用的次数"),
Column('created_at', DateTime, server_default=func.now(), nullable=False, comment="UTC time"),
Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), nullable=False, comment="UTC time"),
ForeignKeyConstraint(['account_id'], ['google_accounts.id'], name=f'{fk_name}_account_id'), # 外键不能一样
mysql_engine='InnoDB',
# extend_existing=True # 会修改-metadata 中缓存的建表语句
)
await conn.run_sync(lambda conn: metadata.create_all(bind=conn, tables=[table]))
异步获取表名 #
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import AsyncConnection
import asyncio
# Configure your async engine
DATABASE_URL = "mysql+aiomysql://user:password@host/dbname"
async_engine = create_async_engine(DATABASE_URL, echo=True)
# Define a synchronous function to get table names
def get_table_names_sync(sync_conn):
inspector = inspect(sync_conn)
return inspector.get_table_names()
# Define an async function to use the synchronous function
async def get_installer_table_names():
async with async_engine.begin() as conn:
# Use `run_sync` to call the synchronous function
table_names = await conn.run_sync(get_table_names_sync)
return table_names
# Example usage
async def main():
table_names = await get_installer_table_names()
print(table_names)
asyncio.run(main())
异步加载表查询数据 #
from sqlalchemy import Table, MetaData
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.future import select
import asyncio
# 配置异步引擎
DATABASE_URL = "mysql+aiomysql://user:password@host/dbname"
async_engine = create_async_engine(DATABASE_URL, echo=True)
# 创建同步函数来加载表
def load_table_sync(table_name, engine):
metadata = MetaData()
table = Table(table_name, metadata, autoload_with=engine)
return table
# 异步函数使用 `run_sync` 调用同步函数
async def query_data_and_add_counter(table_name, account_id):
async with async_engine.connect() as conn:
# 使用 `run_sync` 执行同步操作
table = await conn.run_sync(lambda sync_conn: load_table_sync(table_name, sync_conn))
# 构建查询并执行
query = table.select().where(table.c.account_id == account_id)
result = await conn.execute(query)
row = result.scalars().one_or_none()
return row
# 异步函数来插入数据
async def insert_data_counter(table_name, account_id, count) -> bool:
async with async_engine.connect() as conn:
async with conn.begin():
# 使用 `run_sync` 调用同步函数
table = await conn.run_sync(lambda sync_conn: load_table_sync(table_name, sync_conn))
# 构建插入语句
new_user = table.insert().values(account_id=account_id, count=count)
await conn.execute(new_user)
# 提交事务
await conn.commit()
logger.debug(f"Data inserted into table '{table_name}' for account_id {account_id}.")
return True
# 异步函数来查询和更新数据-事务
async def _query_data_and_add_counter(table_name, account_id: int) -> bool:
async with async_engine.connect() as conn:
async with conn.begin(): # 开始事务
# 使用 `run_sync` 调用同步函数
table = await conn.run_sync(lambda sync_conn: __load_table_sync(table_name, sync_conn))
# 构建查询
query = select(table.c.account_id, table.c.count, table.c.created_at).where(table.c.account_id == account_id)
result = await conn.execute(query)
account_id, count, create_at = result.fetchone() or (None, None, None)
if account_id:
logger.info(f"Update account_id {account_id} in table '{table_name}': count+1 {count+1}")
count += 1
# 构建更新语句
update_stmt = table.update().where(table.c.account_id == account_id).values(count=count)
await conn.execute(update_stmt)
# 提交事务
await conn.commit()
return True
else:
# 事务将在 `conn.begin()` 的上下文管理器退出时自动回滚
return False
# 示例使用
async def main():
result = await query_data_and_add_counter("your_table_name", 12345)
print(result)
# 运行示例
asyncio.run(main())
混用错误 #
class GoogleAccounts(Base):
pass
table_account = await conn.run_sync(lambda sync_conn: __load_table_sync("google_accounts", sync_conn))
query = ( select(GoogleAccounts).select_from( table.join(table_account, table.c.account_id == table_account.c.id) ).where( and_( table.c.count >= 10, table_account.c.login_status == LoginStatus.Login.value ) ) )
# 报错
Not unique table/alias: 'google_accounts'
## 原因
这个错误 `(pymysql.err.OperationalError) (1066, "Not unique table/alias: 'google_accounts'")` 表示 SQL 查询中的表别名 `google_accounts` 不是唯一的。这通常发生在你尝试在同一个查询中多次引用同一个表,并且没有为每次引用提供唯一的别名。
# 都使用table_account 就好啦
select(table_account).select_from( table.join(table_account, table.c.account_id == table_account.c.id) ).where( and_( table.c.count >= 10, table_account.c.login_status == LoginStatus.Login.value ) )
# 或者都用GoogleAccounts
query = (select(GoogleAccounts).select_from(table.join(GoogleAccounts, table.c.account_id==GoogleAccounts.id)).where(and_(table.c.count >= 10,GoogleAccounts.login_status == LoginStatus.Login.value)))