FastAPI 에서 백그라운드 작업을 실행하는 방법을 찾으면서 FastAPI BackgroundTasks 와 Celery 를 비교하는 글을 본 적이 있다. 수행에 오랜 시간과 메모리가 필요한 작업을 따로 실행하고 그러한 작업들의 스케줄링을 하기 위해서 Celery 에 대해서 알아본다.
1. Celery 란?
Celery 는 python application 에서 많은 양의 작업들을 나눠서 처리할 수 있도록 해주는 분산 시스템이다. Python application 에서 생서된 작업들의 실시간 처리와 작업 스케줄링 등을 제공하는 task queue 이다.
- Task Queue 란?
Task queue 는 멀티 쓰레드, 또는 멀티 디바이스를 통한 분산 처리에서 사용되는 개념이다.
Task queue 에는 처리해야할 작업들, task 들이 저장되고 이 task 들이 순차적으로 worker process 에 의해 처리된다. Worker process 들은 task 를 처리하는 프로세스로 task queue 에 수행해야할 task 가 있는지 모니터링한다.
Celery 에서 client 와 worker 는 message broker 를 통해서 소통한다. Client 에서 처리해야할 새로운 task 를 생성하면 celery 는 broker 를 통해 worker 에게 메시지를 전달하여 task 를 처리하도록 한다.
Celery 시스템은 고가용성과 수평적 확장성을 위해서 여러개의 worker 와 broker 들로 구성될 수 있다.
Celery 시스템을 구축하기 위해서는 메시지들을 전달할 수 있는 broker 가 필요한데, 보통 RabbitMQ 또는 Redis broker 를 사용한다.
- Celery 특징
Celery 는 따로 설정 파일이 필요하지 않고, 운영하기 간단하다.
Worker 와 client 간의 통신 문제에 대하여 자동으로 재시도하고, broker 들을 primary/primary 또는 primary/replica 등의 구조로 사용하여 고가용성을 보장할 수 있다.
단일 Celery 는 1분에 수백만건의 task 를 처리할 수 있는 속도를 제공한다. 그리고 Custom pool 구현, 직렬변화기, 압축 구조, 로깅, 스케줄러, consumer, producer 등등 다양한 기능을 사용하거나 확장할 수 있다.
2. Celery 사용법
- 설치
Celery 는 pip 를 사용하여 간단하게 설치할 수 있다.
pip install celery
- broker 설정
Celery 를 사용하기 위해서는 task 메시지들을 주고받기 위한 message broker 가 필요하다. Celery 는 RabbirtMQ, Amazon SQS 등의 다양한 message broker 를 사용할 수 있는데, 이 예제에서는 redis 를 사용할 것이다.
아래의 명령어를 통해서 redis 컨테이너를 실행한다. 이때 redis 는 6379 포트를 사용하도록 설정하였다.
docker run -d -p 6379:6379 redis
- celery application
Celery 를 사용하기 위해서는 먼저 celery 인스턴스를 생성해야 한다. Celery application 또는 app 이라고 부르는데, 인스턴스는 celery 에서 task 를 생성하거나 worker 를 관리하는 등의 작업을 하기위한 entry-point 역할을 한다. 다른 모듈에서는 해당 인스턴스를 import 하여 사용한다.
# tasks.py
from celery import Celery
app = Celery('tasks', backend='redis://localhost:6379', broker='redis://localhost:6379')
@app.task
def add(x, y):
return x + y
위의 예제는 Celery 인스턴스를 생성하고 간단한 task 를 등록한 코드 예제이다.
Celery 인스턴스의 첫번째 매개변수는 해당 모듈의 이름 (파일명) 이고 그 다음에 오는 backend 와 broker 는 각각 worker 의 작업 결과를 저장할 백엔드와 message broker 에 대한 정보이다. 인자값으로는 해당 서버들의 url 을 입력한다.
- celery worker server 실행
Celery 명령어의 worker 인자를 사용하여 celery server 를 실행한다.
celery –A tasks worker –loglevel=INFO
위 명령어에서 -A 는 application 을 의미하는데, tasks 모듈을 읽어서 해당 모듈의 application 을 실행한다는 의미이다. worker 는 worker 인스턴스를 실행한다는 의미이고 loglevel 은 celery 서버에서 출력될 로그의 레벨을 지정하는 인자이다.
위의 명령어를 실행하면 celery 서버가 foreground 로 실행되어 화면에 서버 로그들이 출력되게 된다.
- task 실행
Celery 인스턴스에 등록된 task 를 실행하기 위해서는 deley() 메서드를 사용한다. delay() 메서드를 사용하면 task 수행 요청이 celery 서버로 전달되어 worker 에서 수행된다.
# client.py
from tasks import add, app
if __name__ == "__main__":
result = add.delay(4, 4)
print(result.ready())
print(result.ready())
print(result.ready())
print(result.ready())
print(result.get(timeout=1))
print(result.get(timeout=1))
print(result.get(timeout=1))
위의 예제는 이전에 tasks 의 add task 를 실행하고 그 결과를 출력하는 예제이다. add.delay(4, 4) 를 실행하면 delay() 메서드의 인자들이 task 에 입력되어 add(4, 4) 가 실행되고 celery server 의 로그에 결과가 출력된다.
delay() 메서드는 AsyncResult 객체를 반환한다. 이 객체는 task 의 상태와 결과를 확인할 수 있다. 이를 확인하기 위해서는 화면에서와 같이 ready(), get() 메서드를 사용하면 된다. ready() 는 task 가 완료되었는지 여부를 반환하고, get() 은 task 의 결과를 반환한다.
이때 결과를 확인하기 위해서는 celery 의 result_backend 가 설정이 되어있어야 한다. 앞의 celery application 항목에서 celery 인스턴스를 생성할 때 redis 를 backend 로 설정을 해주었다. 이 설정을 통해서 celery 의 작업 결과들이 redis 에 저장되고 이를 통해 task 의 상태와 결과를 조회할 수 있게 된다. 만약 result_backend 를 설정해주지 않으면 ready(), get() 메서드를 호출하면 에러가 발생한다.
- configuration
Celery 는 운영을 많은 설정이 필요없지만 input 과 output 에 대한 설정이 필요한 경우가 있다. 우선 input 은 broker 를 통해서 입력받기 때문에 무조건 연결 설정이 필요하다. Output 의 경우 result_backend 에 task 결과 저장이 필요하다면 result_backend 를 설정해주어야 한다.
Celery 의 기본 설정을 사용해도 되지만 요구사항에 맞는 사용을 위해서 더 자세한 설정이 필요한 경우가 있다. Celery 에서 할 수 있는 설정들은 Configuration and defaults 에서 확인할 수 있다.
Celery 의 설정을 변경하위 해서는 아래의 예제와 같이 celery 인스턴스에 직접 접근하여 설정을 해줄 수 있다. app.conf 에서 해당 설정을 직접 변경해주거나 여러건의 설정을 변경하는 경우 update() 메서드를 사용할 수 있다.
app.conf.task_serializer = 'json'
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
만약에 큰 프로젝트에서 많은 설정을 해야하는 경우에는 이를 모듈로 분리하여 설정에 사용할 수 있다. 따로 모듈에 celery 인스턴스의 설정들을 기술하고 config_from_object() 메서드를 사용하여 해당 설정 모듈을 celery 인스턴스에 적용할 수 있다.
아래는 config_from_object 예제이다. celeryconfig.py 모듈에 celery 설정을 기술하고 이를 config_from_object() 메서드를 통해 celery 인스턴스에 적용한다.
# celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
# celery application code
...
app.config_from_object('celeryconfig')
...
[Reference]
- https://docs.celeryq.dev/en/stable/
- https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html
'기타' 카테고리의 다른 글
[SQLite] SQLite 설명 및 예제 (1) | 2023.10.17 |
---|---|
[FastAPI] BackgroundTasks (0) | 2023.05.26 |
[NGINX] NGINX 란? (1) | 2023.05.11 |
[MongoDB] Aggregation (0) | 2023.05.02 |
[Gradle] Gradle 사용법 - 설치, 초기화 및 (1) | 2023.01.17 |