서버에서 요청을 처리할 때, 작업이 오래 걸리는 요청에 대해서는 응답을 먼저 보내주고 백그라운드에서 나머지 작업을 수행하도록 한다. 이러한 구조를 위해서 보통은 worker thread 를 돌리거나 worker queue 등을 사용하여 다른 쓰레드 또는 프로세스를 통해 백그라운드에서 작업을 수행하도록 합니다.
FastAPI 에서는 starlette 의 BackgroundTasks 를 사용하여 요청에 대한 백그라운드 작업을 실행하는 기능을 제공한다. BackgroundTasks 에 추가된 task 들은 FastAPI 에서 asynchronous 하게 실행한다. 이를 통해서 오래 걸리는 작업은 Background Task 로 등록한 후 response 를 먼저 반환하도록 할 수 있다.
1. BackgroundTasks
BackgroundTasks 는 fastapi 모듈의 BackgroundTasks 클래스를 import 하여 사용할 수 있다. FastAPI app 에서 BackgroundTasks 를 사용할 때는 path operation function 에서 BackgroundTasks 타입으로 변수를 선언하여 사용할 수 있다.
FastAPI 의 path operation function 에서 BackgroundTasks 타입 변수를 선언하는 이유는 fastapi 에서 router 를 조회하여 arguments 를 파싱하는 과정에서 BackgroundTasks 객체를 주입해주기 때문인데 이는 아래에서 좀 더 자세하게 설명한다.
BackgroundTasks 를 사용하는 코드의 예제는 다음과 같다. 다음의 코드는 POST /send-notification/{email} 로 요청이 왔을때, {"message": "Notification sent in the background"} 를 response 로 먼저 반환하고 실제 notification 전송은 백그라운드에서 실행한다.
send_notification 함수는 BackgroundTasks 타입의 background_tasks 변수를 선언하고 이 변수에 write_notification 이라는 task 를 추가합니다. 이렇게 추가된 task 는 reponse 전달 이후 실행됩니다.
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
- task function
BackgroundTasks 에 추가되는 task function 은 parameter 를 입력받아 실행되는 일반적인 형태의 함수이다. BackgroundTasks 에서 실행시에 함수의 타입을 확인하기 때문에 async 와 sync 모두 가능하다.
def add_task(
self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
) -> None:
task = BackgroundTask(func, *args, **kwargs)
self.tasks.append(task)
BackgroundTasks 로 추가하기 위해서는 백그라운드에서 실행할 task function 과 arguments 들을 add_task 함수로 입력해주면 된다. add_task 함수는 아래와 같이 task function 을 입력받는 func 와 task function 의 arguments 를 입력하는 *args, **kwargs 로 이루어져 있다.
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
...
background_tasks.add_task(write_notification, email, message="some notification")
...
send_notification 의 예제에서는 위와 같이 func 로 write_notification 함수를 *args 에 email, **kwargs 에 message="some notification" 을 arguments 로 입력했다. 이렇게 입력된 값들은 백그라운드에서 다음과 같이 실행된다.
write_notification(email, message="some notification")
func 로 입력된 write_notification 에 email 과 "some notification" 이 arguments 로 입력되어 실행된다.
2. FastAPI 와 BackgroundTasks 구조
- BackgroundTask 와 BackgroundTasks
BackgroundTasks 의 클래스 구조는 다음과 같다. FastAPI 는 starlette 프레임워크에 구현된 background 모듈을 import 하여 사용한다.
BackgroundTasks 는 add_task 로 추가된 task 함수와 인자들을 tasks 리스트에 저장하고 해당 객체가 호출되면 task 들을 하나씩 실행한다. 각 task 들은 BackgroundTask 객체로 저장되는데, BackgroundTask 는 func, args, kwargs 를 저장하고 있다가 BackgroundTasks 에서 task 를 하나씩 await 하면 그때 func 으로 저장된 함수를 arguments 들과 함께 실행한다.
BackgroundTask 는 is_async 라는 변수를 통해서 func 가 sync 인지 async 인지 저장해놓고 실행할 때 해당 변수의 값을 확인하여 함수 타입에 맞는 방법으로 실행한다. 이때문에 task function 으로는 sync 와 async 가 모두 가능한 것이다.
class BackgroundTask:
def __init__(
self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
) -> None:
self.func = func
self.args = args
self.kwargs = kwargs
self.is_async = is_async_callable(func)
async def __call__(self) -> None:
if self.is_async:
await self.func(*self.args, **self.kwargs)
else:
await run_in_threadpool(self.func, *self.args, **self.kwargs)
class BackgroundTasks(BackgroundTask):
def __init__(self, tasks: typing.Optional[typing.Sequence[BackgroundTask]] = None):
self.tasks = list(tasks) if tasks else []
def add_task(
self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
) -> None:
task = BackgroundTask(func, *args, **kwargs)
self.tasks.append(task)
async def __call__(self) -> None:
for task in self.tasks:
await task()
- BackgroundTasks injection
FastAPI 문서에 따르면 BackgroundTasks 를 사용할때는 path operation function 에서 사용하라고 한다. 그 이유는 FastAPI 에서 request 의 endpoint path 에 맞는 handler 를 찾고 request 의 값들을 파싱하여 해당 handler function 의 arguments 로 입력하는 과정에서 BackgroundTasks 객체를 주입해주기 때문이다.
...
if dependant.background_tasks_param_name:
if background_tasks is None:
background_tasks = BackgroundTasks()
values[dependant.background_tasks_param_name] = background_tasks
...
위의 코드는 FastAPI 에서 함수의 인자로 BackgroundTasks 객체를 생성하여 입력해주는 코드의 일부를 가져온 것이다. 이 코드에서 values 가 path operation function, 즉 request handler 함수의 keyword arguments 로 입력되는 dict 변수이다.
dependant 는 request handler 함수를 분석하여 해당 함수의 parameter 등에 대한 정보를 가지고 있다. 만약 parameter 중에 BackgroundTasks 타입으로 선언된 parameter 가 있다면 해당 parameter 의 이름을 background_tasks_param_name 으로 가지고 있는다.
이러한 정보를 바탕으로 코드를 해석하면, request handler 함수의 parameter 중에 BackgroundTasks 타입으로 선언된 parameter 가 있다면, BackgroundTasks 객체를 해당 함수의 인자로 추가해준다는 것으로 이해할 수 있다.
FastAPI 는 request 의 path 와 body 등의 값을 request handler 의 parameter 형식에 맞게 파싱할 때, 재귀를 통해서 위의 로직을 반복한다. 그렇기 때문에 BackgroundTasks 타입을 path operation function 의 parameter 또는 Depends 를 사용하여 dependency, sub-dependency 로 선언해도 동일하게 사용할 수 있게 된다.
3. BackgroundTasks 와 Celery
FastAPI 의 Background tasks 문서의 마지막 절에는 BackgroundTasks 사용에 대한 주의사항이 있다. 이 글에서는 보다 더 무겁고 같은 프로세스 안에서 동작해야할 필요가 없는 작업에 대해서는 Celery 를 사용하는 것을 추천한다.
FastAPI 의 BackgroundTasks 는 FastAPI application 의 프로세스 내부에서 실행되기 때문에 무거운 작업의 경우 application 이 다른 request 를 처리할 때 부하가 발생하여 성능적인 문제를 야기할 수 있다. 반면에 Celery 는 message queue 로 통신하여 아예 다른 worker process 에서 작업을 수행하기 때문에 성능적으로 더 안정적으로 application 을 운영할 수 있다.
하지만 Celery 는 message queue 와 여러 설정들을 추가로 해주어야 하고, 다른 프로세스에서 실행되는 만큼 변수와 메모리 등을 공유할 수 없기 때문에 상황에 잘맞는 방식으로 백그라운드 구조를 선택해야 한다.
[References]
- https://fastapi.tiangolo.com/tutorial/background-tasks/#background-tasks
'기타' 카테고리의 다른 글
[SQLite] SQLite 설명 및 예제 (1) | 2023.10.17 |
---|---|
[Celery] Celery 란? (0) | 2023.06.01 |
[NGINX] NGINX 란? (1) | 2023.05.11 |
[MongoDB] Aggregation (0) | 2023.05.02 |
[Gradle] Gradle 사용법 - 설치, 초기화 및 (1) | 2023.01.17 |