이전에 SQLAlchemy의 세션 관리 포스트에서 SQLAlchemy 라이브러리 내부에서 세션의 작동 방식을 알아보았습니다. 하지만 해당 내용은 sync 환경에서만 유효하며, asyncio 기반으로 실행되는 경우에는 여러 side effect가 발생할 수 있습니다. SQLAlchemy 버전이 1.4로 올라가면서 asyncio와 호환될 수 있도록 업데이트되었는데, 해당 내용을 기반으로 asynchronous 환경에서 SQLAlchemy 세션을 사용하는 방법을 정리해 보았습니다.

아래 내용은 현 시점 (2023-03) 기준 최신 버전인 SQLAlchemy 2.0.4 기반으로 작셩하였습니다.


1. AsyncSession

세션 구현체로 sqlalchemy.ext.asyncio.AsyncSession 를 사용할 수 있습니다. 또한 엔진 연결 시에는 create_async_engine 메소드를 이용하여 생성해야 합니다.

from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine

engine = create_async_engine("postgresql+asyncpg://user:password@localhost:5432/test_db")

async with AsyncSession(engine) as session:
    await session.execute(
        insert(User).values(user_name="myname", email="miintto.log@gmail.com")
    )
    await session.commit()
await engine.dispose()

create_async_engine에서 사용하는 인자들은 대부분 create_engine과 동일하지만, 엔진 연결 시에는 async가 지원되는 DBAPI 드라이버를 사용해야만 합니다. 사용 가능한 드라이버로는 asyncpg (PostgreSQL), aiomysql (MySQL), aioodbc (MSSQL) 등이 있습니다.

비동기 세션 구현체인 AsyncSession 또한 기존 Session의 기능을 대부분 지원하지만 2.0 문법 스타일로 작성해야 합니다. 데이터 조회의 경우 Query를 사용한 방식 대신 Session.execute() 메소드 내부에서 select()로 가져오는 방식으로 작성해야 합니다. 그 외에 add(), commit(), close() 등의 메소드들은 그대로 사용할 수 있습니다. 2.0 스타일에 대한 자세한 내용은 Glossary - 2.0-style를 참고하시면 됩니다.

또한 context manager 기능도 지원해서 async with 구문이 끝나는 시점에 자동으로 AsyncSession.close()가 실행되도록 프로그래밍할 수도 있습니다.

class AsyncSession(ReversibleProxy):
    ...
    async def __aenter__(self):
        return self

    async def __aexit__(self, type_, value, traceback):
        task = asyncio.get_event_loop().create_task(self.close())
        await asyncio.shield(task)

2. async_scoped_session

궁극적으로 어플리케이션에 SQLAlchemy를 적용하려고 한다면 scoped_session 도입을 고려할 텐데, 이를 그대로 async 환경에 적용한다면 여러가지 이슈가 발생할 수 있습니다.

scoped_session는 기본적으로 스레드별로 세션을 관리하도록 되어있지만, asyncio의 이벤트 루프는 단일 스레드 기반으로 작동합니다. 이때 만일 여러 request가 같은 프로세스에 할당된다면 중첩된 request들이 동일한 세션 객체를 할당받아 작업을 할 수 있습니다. 이런 상황은 매우 위험한데 의도치 않게 세션 내에 ORM 객체들이 서로 공유되거나 이미 롤백 된 세션을 다른 request에서 가져가서 다시 사용하는 등 개발자가 의도한 대로 동작하지 않을 수 있습니다. 따러서 이러한 async 환경에서의 이슈를 해결하기 위해 SQLAlchemy에서는 async_scoped_session 클래스를 지원하고 있습니다.

class async_scoped_session(ScopedSessionMixin):
    """Provides scoped management of :class:`.AsyncSession` objects."""

    def __init__(self, session_factory, scopefunc):
        self.session_factory = session_factory
        self.registry = ScopedRegistry(session_factory, scopefunc)

async_scoped_session 에서는 thread-local storage를 회피하기 위해 scopefunc를 강제하고 있습니다. 따라서 registry는 반드시 ScopedRegistry만 사용가능합니다. scopefunc를 적절하게 설정해주면 동일한 스레드에서도 클라이언트마다 알맞은 세션을 추적할 수 있습니다. 아래 대략적인 사용 예시를 작성했습니다.

import asyncio
from sqlalchemy.ext.asyncio import (
    async_scoped_session,
    async_sessionmaker,
    create_async_engine,
)

engine = create_async_engine("postgresql+asyncpg://user:password@localhost:5432/test_db")
session_factory = async_sessionmaker(bind=engine)
Session = async_scoped_session(session_factory, scopefunc=asyncio.current_task)

async with Session() as session:
    # To Something
    await session.commit()
await Session.remove()
await engine.dispose()

위에서는 예시로 scopefunc 값으로 asyncio.current_task 메소드를 넣어 주었는데, 이러한 방식으로 동일한 스레드에서도 할당된 Task마다 세션을 구분할 수 있습니다. 다만 이런 식으로 세션을 관리하면 세션을 종료하여도 registry 내부 dictionary에 세션 객체가 남아서 계속 메모리를 차지하기 때문에 작업이 끝날 때마다 async_scoped_session.remove() 메소드를 호출하여 메모리에서 정리해야 합니다.

마지막에 실행되는 AsyncEngine.dispose() 메소드 실행도 필요한 작업인데, 해당 메소드는 현재 사용 중인 커넥션 풀들을 다시 반환하는 역할을 합니다. 일반적인 blocking IO와는 대조적으로 awaitable 구문을 한 번 빠져나와 버리면 다시 await 메소드를 호출할 수 없기 때문에 커넥션들이 올바르게 처리되지 않을 수 있습니다. 이런 식으로 엔진이 정상적으로 종료되지 않는다면 RuntimeError: Event loop is closed 와 같은 경고가 발생할 수도 있습니다.


References