SqlAlchemy - 数据库Orm

Sqlalchemy异步操作不完全指北 #

不完全抄袭自: 米洛丶 - 博客园 cnblogs.com

异步SQLAlchemy #

SQLAlchemy作为一款通用的Python Orm工具,在最近的版本也支持了异步操作。但网上很多资料都不是很齐全,API也不是很好查询的情况下,我便有了整理一份基础文档的想法。文章主要会以CRUD为入口,解决大家最基本的需求。

engine的区别 #

在普通的SQLAlchemy中,建立engine对象,我们会采用下面的方式:

from sqlalchemy import create_engine
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_recycle=1500)

而异步的方式如下:

from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(ASYNC_SQLALCHEMY_URI, pool_recycle=1500)

链接参数 #

from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
    SQLALCHEMY_DATABASE_URL,
    pool_size=10,          # 设置最大连接池大小
    max_overflow=20,       # 设置最大溢出连接数
    pool_timeout=30,       # 设置获取连接的超时时间(单位:秒)
    pool_recycle=1500,     # 设置连接回收时间(单位:秒)
    echo=True              # 设置为True以便于调试,显示SQL日志
)

session的区别 #

我们一般用sessionmaker来建立session,不过异步的有点区别:

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker

# 同步session
Session = sessionmaker(engine)

# 异步session 区别在于需要指定对应的class_
async_session = sessionmaker(async_engine, class_=AsyncSession)

建立会话 #

我们还是以代码的形式展示:

# 同步
with Session() as session:
  # 里面是具体的sql操作
  pass
  
# 异步
async with Session() as session:
    # 里面是异步的操作,区别就是从with变成了async with 也就意味着方法必须是async修饰的
    pass

以上是关于建立连接,处理会话的一些区别,接着我们讲对应的CRUD操作。

查询 #

结果处理: https://docs.sqlalchemy.org/en/21/core/connections.html#sqlalchemy.engine.Result.scalars

这里依旧会给出新老版本的对比:

# 注意Session为同步Session,为了区分,异步session为async_session
# model则为具体的Model类

# 异步查询方式
from sqlalchemy import select

async def query():
    async with async_session() as session:
        sql = select(model).where(model.id == 1)
        print(sql) # 这里可以打印出sql
        result = await session.execute(sql)
        # 第一条数据
        data = result.scalars().first()
        # result.scalars().one_or_none() 不报错的获取方式
        # 所有数据
        # data = result.scalars().all()

# 同步查询方式一
def query():
    with Session() as session:
        # 查询id=1的第一条数据 result对应的就是model的实例 如果没有则是None
        result = session.query(model).filter_by(id=1).first()
        # 查询所有数据 result对应的数据为List[model],即model数组
        # result = session.query(model).filter_by(name="zhangsan").all()

# 同步查询方式二
def query():
    with Session() as session:
        # 查询id=1的第一条数据 result对应的就是model的实例 如果没有则是None
        result = session.query(model).filter(model.id == 1).first()
        # 查询所有数据 result对应的数据为List[model],即model数组
        # result = session.query(model).filter(model.name == "zhangsan").all()

新增 #

这里开始就只讲异步的操作了。

async的新增不行啊, 好奇怪

data = DataModel(name='', phone='')
async def insert(data):
    async with async_session() as session:
        async with session.begin():
            session.add(data)
            # 刷新自带的主键
            await session.flush()
            # 释放这个data数据
            session.expunge(data)
            return data

先说一下session.begin,这个你可以理解为一个事务操作,当采用session的begin方法后,你可以发现我们不需要调用commit方法也能把修改存入数据库。

expunge方法,是用例释放这个实例,SQLAlchemy有个特点,当你的session会话结束以后,它会销毁你插入的这种临时数据,你再想访问这个data就访问不了了。所以我们可以释放这个数据。(expunge的作用)

同步方法 #

session.add(_data)
session.commit()

这样就行, 异步的不行… ID都拿到了, 但是数据库里查不到. 异步用上面的方法也不得行.

编辑 #

一般编辑有2种方式:

  • 查询出对应的数据,在数据上修改
  • 根据key-value的形式,修改对应数据的字段
from sqlalchemy import select, update

# 方式一
async def update_record(model):
    async with async_session() as session:
        async with session.begin():
            result = await session.execute(select(model).where(id=1))
            now = result.scalars().first()
            if now is None:
                raise Exception("记录不存在")
            now.name = "李四"
            now.age = 23
            # 这里测试过,如果去掉flush会导致数据不更新
            await session.flush()
            session.expunge(now)
            return now

# 方式二
async def update_by_map():
    async with async_session() as session:
        async with session.begin():
            # 更新id为1的数据,并把name改为李四 age改为23
            sql = update(model).where(model.id == 1).values(name="李四", age=23)
            await session.execute(sql)

删除 #

删除的话,软删除大家都是update,所以不需要多说,物理删除的话,也有两种方式:

  • 查到以后删除之
  • 直接根据条件删除(这种我没有仔细研究,我选的是第一种方式,容错率高点)
async def delete_by_id():
    async with async_session() as session:
        async with session.begin():
            result = await session.execute(select(model).where(model.id == 2))
            original = result.scalars().first()
            if original is None:
                raise Exception("记录不存在")
            # 如果是多条
            # session.delete(original)
            # for item in result:
            #     session.delete(item)

压缩 #

并不支持 mysql --compress 连接参数,以下是个伪实现

from sqlalchemy.orm
import validates
class MyClass(Base):
	# ... 
	data = Column(BLOB)
	@validates("data")
	def _set_data(self, key, value):
		return func.compress(value)

查询 #

逻辑操作符 #

1. and or #

from sqlalchemy import and_, or_
print(
    select(Address.email_address).where(
        and_(
            or_(User.name == "squidward", User.name == "sandy"),
            Address.user_id == User.id,
        )
    )
)

2. null #

sql = select(DeviceData).where(DeviceData.device_json_new.is_not(None))
sql = select(DeviceData).where(DeviceData.device_json_new.is_(None))

自适应查询 #

async with async_session_install() as session:
	# 使用 DeviceData 模型直接构建查询
	sql = select(DeviceData).where(DeviceData.device_json_new.is_not(None))
	sql_max_min = select(func.max(DeviceData.id).label("max_id"),func.min(DeviceData.id).label("min_id")).where(DeviceData.device_json_new.is_not(None),DeviceData.brand_name == ) for key, value in filters.items():
	if not hasattr(DeviceData, key):
		continue
	sql = sql.where(getattr(DeviceData, key) == value)
	if limit != 0:
		sql = sql.limit(limit)
	if offset:
		sql = sql.offset(offset)
	logger.debug(f"fetch sql is: {sql}")
	exec = await session.execute(sql)
	results = exec.scalars().all()

异步关系 #


class AppPackage(Base):
    __tablename__ = 'app_packages'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    app_id = Column(String(64), unique=True, nullable=False)
    description = Column(Text)
    icon = Column(String(128))
    categories = Column(JSON)
    tags = Column(JSON)
    created_at = Column(TIMESTAMP, nullable=True, server_default=text('CURRENT_TIMESTAMP'))
    updated_at = Column(TIMESTAMP, nullable=True, onupdate=text('CURRENT_TIMESTAMP'))

    # Relationship to AppPlatform
    platforms = relationship('AppPlatform', back_populates='app_package', cascade='all, delete')

class AppPlatform(Base):
    __tablename__ = 'app_platforms'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    app_id = Column(String(64), ForeignKey('app_packages.app_id', ondelete='CASCADE'), nullable=False)
    platform = Column(String(16), nullable=False)
    
    # Relationship to AppPackage
    app_package = relationship('AppPackage', back_populates='platforms')

# Read
async def get_package_by(app_id, platform) -> AppInfo:
    async with async_session_install() as session:
        appinfo = AppInfo(pkg_name=app_id)
        sql = select(AppPackage).options(joinedload(AppPackage.platforms)).where(AppPackage.app_id == app_id, AppPlatform.platform == platform)
        rst = await session.execute(sql)
        import ipdb; ipdb.set_trace()
        info = rst.scalars().first()
        if info:
            appinfo.description = info.description
            appinfo.tags = info.tags
            appinfo.categories = info.categories
            logger.info(f"app_id: {app_id}, len platforms: {len(info.platforms)}")
            return appinfo
        else:
            return None

同步用就可

sql = select(AppPackage).join(AppPackage).where(AppPackage.app_id == app_id, AppPlatform.platform == platform)

like #

sql_total = sql_total.where(or_(AppPackage.app_id.like(f'%{package}%') for package in params.data.package))

方法(动态表) #

创建表 #

async def create_table_if_not_exists(package_name: str) -> bool:
    """
    创建包-账号-使用次数对照表
    """
    try:
        table_name = TableNamePrefix.format(package_name=package_name)
        if table_name not in await get_installer_table_names():
            await SavingsPool._add_support_package(package_name)
            async with async_engine.begin() as conn:
                # 创建表
                table = Table(
                    table_name, metadata,
                    Column('id', Integer, primary_key=True),
                    Column('account_id', Integer, comment="账号ID, 账号表中的主键id"),
                    Column('count', SmallInteger, comment="该账号被使用的次数"),
                    Column('created_at', DateTime, server_default=func.now(), nullable=False, comment="UTC time"),
                    Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), nullable=False, comment="UTC time")
                )
                await conn.run_sync(metadata.create_all, tables=[table])
                logger.info(f"Table '{table_name}' created successfully.")
        return True
    except Exception as e:
        logger.error(f"Failed to create table '{table_name}': {e}, details: {traceback.format_exc()}")
        return False

1. 有外键&有unsigned的时候(只一个) #

仔细对比与 2 中差异

async def create_table_if_not_exists(table_name: str) -> bool:
    """
    创建包-账号-使用次数对照表
    """
    try:
        if table_name not in await get_installer_table_names():
            async with async_engine.begin() as conn:
                # 加载进已经存在的表
                await conn.run_sync(lambda conn: metadata.reflect(bind=conn, only=['google_accounts']))
                logger.info("Existing tables loaded into metadata.")
                # 创建表
                table = Table(
                    table_name, metadata,
                    Column('id', Integer, primary_key=True),
                    Column('account_id', INTEGER(unsigned=True), unique=True, nullable=False, comment="账号ID, 账号表中的主键id"),
                    Column('count', SmallInteger, comment="该账号被使用的次数"),
                    Column('created_at', DateTime, server_default=func.now(), nullable=False, comment="UTC time"),
                    Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), nullable=False, comment="UTC time"),
                    ForeignKeyConstraint(['account_id'], ['google_accounts.id'], name='fk_account_id'),
                    mysql_engine='InnoDB',
                    extend_existing=True
                )
                await conn.run_sync(lambda conn: metadata.create_all(bind=conn, tables=[table]))
                logger.info(f"Table '{table_name}' created successfully.")
        else:
            logger.info(f"Table '{table_name}' already exists.")
        return True
    except Exception as e:
        logger.error(f"Failed to create table '{table_name}': {e}, details: {traceback.format_exc()}")
        return False

2. 有外键&有unsigned的时候(可多个) #

async with async_engine.begin() as conn:
	# 加载进已经存在的表
	metadata.clear() # 没有这行(unique(`account_id`)会随着调用次数变多在建表语句中越来越多)
	await conn.run_sync(lambda conn: metadata.reflect(bind=conn, only=['google_accounts']))
	logger.info("Existing tables loaded into metadata.")

	# 创建表
	fk_name = package_name.replace('.', '_')
	table = Table(
		table_name, metadata,
		Column('id', Integer, primary_key=True),
		Column('account_id', INTEGER(unsigned=True), unique=True, nullable=False, comment="账号ID, 账号表中的主键id"),
		Column('count', SmallInteger, comment="该账号被使用的次数"),
		Column('created_at', DateTime, server_default=func.now(), nullable=False, comment="UTC time"),
		Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), nullable=False, comment="UTC time"),
		ForeignKeyConstraint(['account_id'], ['google_accounts.id'], name=f'{fk_name}_account_id'),  # 外键不能一样
		mysql_engine='InnoDB',
		# extend_existing=True # 会修改-metadata 中缓存的建表语句
	)
	await conn.run_sync(lambda conn: metadata.create_all(bind=conn, tables=[table]))

异步获取表名 #

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import AsyncConnection
import asyncio

# Configure your async engine
DATABASE_URL = "mysql+aiomysql://user:password@host/dbname"
async_engine = create_async_engine(DATABASE_URL, echo=True)

# Define a synchronous function to get table names
def get_table_names_sync(sync_conn):
    inspector = inspect(sync_conn)
    return inspector.get_table_names()

# Define an async function to use the synchronous function
async def get_installer_table_names():
    async with async_engine.begin() as conn:
        # Use `run_sync` to call the synchronous function
        table_names = await conn.run_sync(get_table_names_sync)
        return table_names

# Example usage
async def main():
    table_names = await get_installer_table_names()
    print(table_names)

asyncio.run(main())

异步加载表查询数据 #

from sqlalchemy import Table, MetaData
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.future import select
import asyncio

# 配置异步引擎
DATABASE_URL = "mysql+aiomysql://user:password@host/dbname"
async_engine = create_async_engine(DATABASE_URL, echo=True)

# 创建同步函数来加载表
def load_table_sync(table_name, engine):
    metadata = MetaData()
    table = Table(table_name, metadata, autoload_with=engine)
    return table

# 异步函数使用 `run_sync` 调用同步函数
async def query_data_and_add_counter(table_name, account_id):
    async with async_engine.connect() as conn:
        # 使用 `run_sync` 执行同步操作
        table = await conn.run_sync(lambda sync_conn: load_table_sync(table_name, sync_conn))
        
        # 构建查询并执行
        query = table.select().where(table.c.account_id == account_id)
        result = await conn.execute(query)
        row = result.scalars().one_or_none()
        return row

# 异步函数来插入数据
async def insert_data_counter(table_name, account_id, count) -> bool:
    async with async_engine.connect() as conn:
        async with conn.begin():
            # 使用 `run_sync` 调用同步函数
            table = await conn.run_sync(lambda sync_conn: load_table_sync(table_name, sync_conn))
            # 构建插入语句
            new_user = table.insert().values(account_id=account_id, count=count)
            await conn.execute(new_user)
        # 提交事务
        await conn.commit()
        logger.debug(f"Data inserted into table '{table_name}' for account_id {account_id}.")
        return True

# 异步函数来查询和更新数据-事务
async def _query_data_and_add_counter(table_name, account_id: int) -> bool:
    async with async_engine.connect() as conn:
        async with conn.begin():  # 开始事务
            # 使用 `run_sync` 调用同步函数
            table = await conn.run_sync(lambda sync_conn: __load_table_sync(table_name, sync_conn))
            # 构建查询
            query = select(table.c.account_id, table.c.count, table.c.created_at).where(table.c.account_id == account_id)
            result = await conn.execute(query)
            account_id, count, create_at = result.fetchone() or (None, None, None)
            if account_id:
                logger.info(f"Update account_id {account_id} in table '{table_name}': count+1 {count+1}")
                count += 1
                # 构建更新语句
                update_stmt = table.update().where(table.c.account_id == account_id).values(count=count)
                await conn.execute(update_stmt)
                # 提交事务
                await conn.commit()
                return True
            else:
                # 事务将在 `conn.begin()` 的上下文管理器退出时自动回滚
                return False

# 示例使用
async def main():
    result = await query_data_and_add_counter("your_table_name", 12345)
    print(result)

# 运行示例
asyncio.run(main())

混用错误 #

class GoogleAccounts(Base):
	pass

table_account = await conn.run_sync(lambda sync_conn: __load_table_sync("google_accounts", sync_conn))

query = ( select(GoogleAccounts).select_from( table.join(table_account, table.c.account_id == table_account.c.id) ).where( and_( table.c.count >= 10, table_account.c.login_status == LoginStatus.Login.value ) ) )

# 报错
Not unique table/alias: 'google_accounts' 
## 原因
这个错误 `(pymysql.err.OperationalError) (1066, "Not unique table/alias: 'google_accounts'")` 表示 SQL 查询中的表别名 `google_accounts` 不是唯一的这通常发生在你尝试在同一个查询中多次引用同一个表并且没有为每次引用提供唯一的别名

# 都使用table_account 就好啦
select(table_account).select_from(             table.join(table_account, table.c.account_id == table_account.c.id)         ).where(             and_(                 table.c.count >= 10,                 table_account.c.login_status == LoginStatus.Login.value             )         )

# 或者都用GoogleAccounts
query = (select(GoogleAccounts).select_from(table.join(GoogleAccounts, table.c.account_id==GoogleAccounts.id)).where(and_(table.c.count >= 10,GoogleAccounts.login_status == LoginStatus.Login.value)))