본문 바로가기

기타

[FastAPI] BackgroundTasks

반응형

서버에서 요청을 처리할 때, 작업이 오래 걸리는 요청에 대해서는 응답을 먼저 보내주고 백그라운드에서 나머지 작업을 수행하도록 한다. 이러한 구조를 위해서 보통은 worker thread 를 돌리거나 worker queue 등을 사용하여 다른 쓰레드 또는 프로세스를 통해 백그라운드에서 작업을 수행하도록 합니다.

FastAPI 에서는 starlette 의 BackgroundTasks 를 사용하여 요청에 대한 백그라운드 작업을 실행하는 기능을 제공한다. BackgroundTasks 추가된 task 들 FastAPI 에서 asynchronous 하게 실행한다. 이를 통해서 오래 걸리는 작업은 Background Task 로 등록한 후 response 를 먼저 반환하도록 할 수 있다.

1. BackgroundTasks

BackgroundTasks 는 fastapi 모듈의 BackgroundTasks 클래스를 import 하여 사용할 수 있다. FastAPI app 에서 BackgroundTasks 를 사용할 때는 path operation function 에서 BackgroundTasks 타입으로 변수를 선언하여 사용할 수 있다.

 

FastAPI 의 path operation function 에서 BackgroundTasks 타입 변수를 선언하는 이유는 fastapi 에서 router 를 조회하여 arguments 를 파싱하는 과정에서 BackgroundTasks 객체를 주입해주기 때문인데 이는 아래에서 좀 더 자세하게 설명한다.

 

BackgroundTasks 를 사용하는 코드의 예제는 다음과 같다. 다음의 코드는 POST /send-notification/{email} 로 요청이 왔을때, {"message": "Notification sent in the background"} 를 response 로 먼저 반환하고 실제 notification 전송은 백그라운드에서 실행한다.

 

send_notification 함수는 BackgroundTasks 타입의 background_tasks 변수를 선언하고 이 변수에 write_notification 이라는 task 를 추가합니다. 이렇게 추가된 task 는 reponse 전달 이후 실행됩니다.

 

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

- task function

BackgroundTasks 에 추가되는 task function 은 parameter 를 입력받아 실행되는 일반적인 형태의 함수이다. BackgroundTasks 에서 실행시에 함수의 타입을 확인하기 때문에 async 와 sync 모두 가능하다.

 

def add_task(
        self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
    ) -> None:
        task = BackgroundTask(func, *args, **kwargs)
        self.tasks.append(task)

 

BackgroundTasks 로 추가하기 위해서는 백그라운드에서 실행할 task function 과 arguments 들을 add_task 함수로 입력해주면 된다. add_task 함수는 아래와 같이 task function 을 입력받는 func 와 task function 의 arguments 를 입력하는 *args, **kwargs 로 이루어져 있다.

 

def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)

...
    background_tasks.add_task(write_notification, email, message="some notification")
...

 

send_notification 의 예제에서는 위와 같이 func 로 write_notification 함수를 *args 에 email, **kwargs 에 message="some notification" 을 arguments 로 입력했다. 이렇게 입력된 값들은 백그라운드에서 다음과 같이 실행된다.

 

write_notification(email, message="some notification")

 

func 로 입력된 write_notification 에 email 과 "some notification" 이 arguments 로 입력되어 실행된다.

2. FastAPI 와 BackgroundTasks 구조

- BackgroundTask BackgroundTasks

BackgroundTasks 의 클래스 구조는 다음과 같다. FastAPI starlette 프레임워크에 구현된 background 모듈을 import 하여 사용한다.

 

BackgroundTasks add_task 추가된 task 함수와 인자들을 tasks 리스트에 저장하고 해당 객체가 호출되면 task 들을 하나씩 실행한다. task 들은 BackgroundTask 객체로 저장되는데, BackgroundTask 는 func, args, kwargs 저장하고 있다가 BackgroundTasks 에서 task 하나씩 await 하면 그때 func 으로 저장된 함수를 arguments 들 함께 실행한다.

 

BackgroundTask is_async 라는 변수를 통해서 func sync 인지 async 인지 저장해놓고 실행할 해당 변수의 값을 확인하여 함수 타입에 맞는 방법으로 실행한다. 이때문에 task function 으로는 sync async 모두 가능한 것이다.

 

class BackgroundTask:
    def __init__(
        self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
    ) -> None:
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = is_async_callable(func)

    async def __call__(self) -> None:
        if self.is_async:
            await self.func(*self.args, **self.kwargs)
        else:
            await run_in_threadpool(self.func, *self.args, **self.kwargs)


class BackgroundTasks(BackgroundTask):
    def __init__(self, tasks: typing.Optional[typing.Sequence[BackgroundTask]] = None):
        self.tasks = list(tasks) if tasks else []

    def add_task(
        self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
    ) -> None:
        task = BackgroundTask(func, *args, **kwargs)
        self.tasks.append(task)

    async def __call__(self) -> None:
        for task in self.tasks:
            await task()

- BackgroundTasks injection

FastAPI 문서에 따르면 BackgroundTasks 사용할때는 path operation function 에서 사용하라고 한다.  이유는 FastAPI 에서 request endpoint path 에 맞는 handler 를 찾고 request 의 값들을 파싱하여 해당 handler function arguments 로 입력하는 과정에서 BackgroundTasks 객체를 주입해주기 때문이다.

 

...
if dependant.background_tasks_param_name:
        if background_tasks is None:
            background_tasks = BackgroundTasks()
        values[dependant.background_tasks_param_name] = background_tasks
...

 

위의 코드는 FastAPI 에서 함수의 인자로 BackgroundTasks 객체를 생성하여 입력해주는 코드의 일부를 가져온 것이다. 코드에서 values path operation function, request handler 함수의 keyword arguments 입력되는 dict 변수이다.

 

dependant request handler 함수를 분석하여 해당 함수의 parameter 등에 대한 정보를 가지고 있다. 만약 parameter 중에 BackgroundTasks 타입으로 선언된 parameter 있다면 해당 parameter 이름을 background_tasks_param_name 으로 가지고 있는다.

 

이러한 정보를 바탕으로 코드를 해석하면, request handler 함수의 parameter 중에 BackgroundTasks 타입으로 선언된 parameter 있다면, BackgroundTasks 객체를 해당 함수의 인자로 추가해준다는 것으로 이해할 있다.

 

FastAPI request path body 등의 값을 request handler parameter 형식에 맞게 파싱할 , 재귀를 통해서 위의 로직을 반복한다. 그렇기 때문에 BackgroundTasks 타입을 path operation function  parameter 또는 Depends 사용하여 dependency, sub-dependency 선언해도 동일하게 사용할 있게 된다.

3. BackgroundTasks Celery

FastAPI Background tasks 문서의 마지막 절에는 BackgroundTasks 사용에 대한 주의사항이 있다. 글에서는 보다 무겁고 같은 프로세스 안에서 동작해야할 필요가 없는 작업에 대해서는 Celery 사용하는 것을 추천한다.

 

FastAPI BackgroundTasks FastAPI application 프로세스 내부에서 실행되기 때문에 무거운 작업의 경우 application 다른 request 처리할 부하가 발생하여 성능적인 문제를 야기할 있다. 반면에 Celery message queue 통신하여 아예 다른 worker process 에서 작업을 수행하기 때문에 성능적으로 안정적으로 application 운영할 있다.

 

하지만 Celery message queue 여러 설정들을 추가로 해주어야 하고, 다른 프로세스에서 실행되는 만큼 변수와 메모리 등을 공유할 없기 때문에 상황에 잘맞는 방식으로 백그라운드 구조를 선택해야 한다.

[References]

- https://fastapi.tiangolo.com/tutorial/background-tasks/#background-tasks

반응형

'기타' 카테고리의 다른 글

[SQLite] SQLite 설명 및 예제  (1) 2023.10.17
[Celery] Celery 란?  (0) 2023.06.01
[NGINX] NGINX 란?  (1) 2023.05.11
[MongoDB] Aggregation  (0) 2023.05.02
[Gradle] Gradle 사용법 - 설치, 초기화 및  (1) 2023.01.17