Backend/FastAPI

FastAPI PostgreSQL DB 트랜잭션 동기/비동기 처리

지미닝 2024. 5. 11. 11:40

🕊️ FastAPI 데이터베이스 처리

이번 게시물에서는 FastAPI에서 데이터베이스 처리를 위해 사용하는 두 개의 라이브러리에 대해서 다루어보겠다.

 

지난 게시물에 이어서 이번에는 ASGI인 FastAPI에서 비동기적으로 수행할 수 있는 Asyncpg에 대해서 설명하고자 하는데, 이를 위해서 동기식으로 동작하는 psycopg2에 대해 다루어볼 것이다.

 

아래서 내가 제시하는 get_db(), get_db_session()함수 두개는 각자 두개의 라이브러리를 활용하여 작업할 때 필요한 데이터베이스 접근 방식이다. 각자 동기/비동기적으로 접근한다.


 

✨ psycopg2

해당 라이브러리는 "동기식"으로 작동한다.

따라서, 요청을 처리하는 동안 다른 작업들은 수행하지 못하고 대기 상태에 있게 된다. 

 

그러나, Django와 같은 대부분의 Python 웹 프레임워크에서 기본적으로 사용한다. 특히, PostgreSQL 데이터베이스 어댑터 중에 가장 인기있는 것이기도 하다.

 

아마 호환성이 뛰어나서 그런 것 같다. (SQLAlchemy와 같은 ORM과 호환성이 좋다.)

 

이를 활용하기 위해서는 DB에 Synchronous하게 접근해야하는데 해당 코드는 아래와 같다.

def get_db():
    db = None
    try:
        db = SessionLocal()
        yield db
    finally:
        db.close()

세션

기본적으로, 해당 코드에서 활용하는 Session은 SQLAlchemy의 컨텍스트를 의미한다. 

데이터베이스와의 모든 대화를 캡슐화하는 컨테이너로, 객체와 데이터베이스 세션 사이의 모든 통신을 관리하게 된다.

 

이 세션은 아래 세가지 기능을 한다.

  1. 트랜젝션을 관리한다.
  2. 작업을 버퍼링하고 특정 시점에 커밋한다.
  3. 식별성을 보장하게 된다.
  4. 데이터 베이스 변경 후, 변경 사항을 데이터베이스에 플러시할 시기를 결정한다.

따라서, 위 코드에서 SessionLocal()을 통해 생성된 세션 인스턴스는 데이터베이스 연결을 나타내며, 이 세션을 활용하여 데이터베이스 트랜젝션을 시작하고 관리한다. 

 

yield 키워드는 이 함수를 제너레이터로 만들며, FastAPI 경로 작업에서 이 함수를 종속성으로 사용할 때마다 새로운 세션을 얻을 수 있게된다. 경로 작업이 완료된 후 finally 블록이 실행되어 세션을 안전하게 닫고, 모든 리소스를 정리한다.

 

주로 위와 같은 방식이 FastAPI에서 건장하는 데이터베이스 세션 관리 방법이다. 각 요청이 자체의 데이터베이스 세션을 가지고, 요청 처리 후에는 그 세션을 닫는다.

 

이를 통해서 각 요청이 독립적으로 데이터베이스 작업을 수행하게 된다.

 

 

asyncpg

아래 get_db_session()코드는 데이터베이스 엔진과 세션을 생성한다.
get_db_session이 성공할 경우 세션이 커밋되고 에러가 발생하면 롤백한다.

데이터베이스와 비동기적으로 상호작용하기 위해 SQLAlchemy의 async_sessionmaker를 사용한다.

참고로 asyncpg를 사용하는 URL은 일반적으로 postgresql+asyncpg와 같은 형식을취한다. 

 

TEST_DATABASE_URL = "postgresql+asyncpg://localhost:5432/test"
Base = declarative_base()


async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
    engine = create_async_engine(TEST_DATABASE_URL)
    factory = sessionmaker(
        engine,
        class_=AsyncSession,
        expire_on_commit=False
    )

    async with factory() as session:
        try:
            yield session
        except exc.SQLAlchemyError as error:
            await session.rollback()
            raise error
        finally:
            await session.close()

 

위 코드에서 트렌젝션 관리를 하고 싶다면 아래와 같이 작성하면 된다.

 

async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
    engine = create_async_engine(TEST_DATABASE_URL)
    factory = sessionmaker(
        engine,
        class_=AsyncSession,
        expire_on_commit=False
    )

    async with factory() as session:
        try:
            yield session
            await session.commit()
        except exc.SQLAlchemyError as error:
            await session.rollback()
            raise error
        finally:
            await session.close()

 

이전의 get_db_session()의 경우에는 commit시점이 명시적으로 없기 때문에 따로 처리해주어야하는 번거로움이 있었다.

따라서, 두번째처럼 작성한다면 세션이 종료되는 시점에 commit을 하도록 설정하면 트랜젝션의 커밋 시점을 딱히 고민할 필요가 없다.

 

물론 롤백이나 트랜젝션이 더 유연하게 처리해야할 때는 좋지 않은 방법일 수 있지만, 당장은 더 유연할 필요가 없다고 판단하여 나는 후자의 코드를 활용하였다.

 


⚒️ 두 접근법 비교

두 개의 함수 get_dbget_db_session은 데이터베이스 세션을 관리하는 방법에 있어 핵심적인 차이가 있다.

기본적으로 하나는 동기 방식으로 세션을 관리하고 다른 하나는 비동기 방식을 사용한다.

 

1. get_db 함수 (동기 방식)

동기적 실행

이 함수는 SQLAlchemy의 기본 동기적 세션을 생성하고 관리한다.

SessionLocal()은 sessionmaker를 통해 생성된 동기 세션 팩토리에서 세션 인스턴스를 가져온다.

 

자원 정리

함수가 yield를 사용하여 세션 객체를 반환한 후, 호출자가 세션 사용을 완료하면 finally 블록이 실행되어 세션을 안전하게 닫는다.

 

용도

이 방식은 Flask와 같은 전통적인 WSGI 기반의 웹 프레임워크에서 주로 사용된다. FastAPI에서도 동기 방식의 코드를 실행할 때 사용할 수 있으나, 비동기 실행이 권장된다.

 

2. get_db_session 함수 (비동기 방식)

비동기적 실행

이 함수는 SQLAlchemy의 비동기 엔진(create_async_engine)을 사용하여 비동기적으로 데이터베이스 세션을 생성하고 관리한다.

async with 구문을 통해 세션의 생명주기를 비동기적으로 처리한다.

 

예외 처리와 롤백

세션 사용 중 발생하는 SQLAlchemyError는 즉시 롤백을 수행하고 에러를 다시 발생시킨다.

 

용도

방식은 FastAPI 같은 ASGI 기반의 비동기 프레임워크에서 사용되어야 한다. 비동기 데이터베이스 작업을 위해 설계되어, 데이터베이스 I/O 작업 중에 다른 작업으로 쉽게 컨텍스트 전환을 있어 효율적이다.

 


요약

  • 동기 함수 (get_db): 동기 데이터베이스 작업에 적합하며, 레거시 시스템이나 동기 방식의 코드베이스에 주로 사용된다.
  • 비동기 함수 (get_db_session): FastAPI 같은 비동기 애플리케이션에서 사용하기에 적합하며, 성능 향상 리소스 사용 최적화를 위해 권장된다.