본문 바로가기

기타

[Celery] Celery 란?

반응형

FastAPI 에서 백그라운드 작업을 실행하는 방법을 찾으면서 FastAPI BackgroundTasks Celery 를 비교하는 글을 본 적이 있다. 수행에 오랜 시간과 메모리가 필요한 작업을 따로 실행하고 그러한 작업들의 스케줄링을 하기 위해서 Celery 에 대해서 알아본다.

1. Celery ?

Celery 는 python application 에서 많은 양의 작업들을 나눠서 처리할 수 있도록 해주는 분산 시스템이다. Python application 에서 생서된 작업들의 실시간 처리와 작업 스케줄링 등을 제공하는 task queue 이다.

- Task Queue 란?

Task queue 멀티 쓰레드, 또는 멀티 디바이스를 통한 분산 처리에서 사용되는 개념이다.

 

Task queue 에는 처리해야할 작업들, task 들이 저장되고 이 task 들이 순차적으로 worker process 에 의해 처리된다. Worker process 들은 task 를 처리하는 프로세스로 task queue 에 수행해야할 task 가 있는지 모니터링한다.

 

Celery 에서 client worker message broker 통해서 소통한다. Client 에서 처리해야할 새로운 task 를 생성하면 celery 는 broker 를 통해 worker 에게 메시지를 전달하여 task 를 처리하도록 한다.

 

Celery 시스템은 고가용성과 수평적 확장성을 위해서 여러개의 worker broker 들로 구성될 있다.

 

Celery 시스템을 구축하기 위해서는 메시지들을 전달할 있는 broker 필요한데, 보통 RabbitMQ 또는 Redis broker 사용한다.

- Celery 특징

Celery 는 따로 설정 파일이 필요하지 않고, 운영하기 간단하다.

 

Worker 와 client 간의 통신 문제에 대하여 자동으로 재시도하고, broker 들을 primary/primary 또는 primary/replica 등의 구조로 사용하여 고가용성을 보장할 수 있다.

 

단일 Celery 는 1분에 수백만건의 task 를 처리할 수 있는 속도를 제공한다. 그리고 Custom pool 구현, 직렬변화기, 압축 구조, 로깅, 스케줄러, consumer, producer 등등 다양한 기능을 사용하거나 확장할 수 있다.

2. Celery 사용법

- 설치

Celery 는 pip 를 사용하여 간단하게 설치할 수 있다.

 

pip install celery

- broker 설정

Celery 를 사용하기 위해서는 task 메시지들을 주고받기 위한 message broker 가 필요하다. Celery 는 RabbirtMQ, Amazon SQS 등의 다양한 message broker 를 사용할 수 있는데, 이 예제에서는 redis 를 사용할 것이다.

 

아래의 명령어를 통해서 redis 컨테이너를 실행한다. 이때 redis 는 6379 포트를 사용하도록 설정하였다.

 

docker run -d -p 6379:6379 redis

- celery application

Celery 를 사용하기 위해서는 먼저 celery 인스턴스를 생성해야 한다. Celery application 또는 app 이라고 부르는데, 인스턴스는 celery 에서 task 를 생성하거나 worker 를 관리하는 등의 작업을 하기위한 entry-point 역할을 한다. 다른 모듈에서는 해당 인스턴스를 import 하여 사용한다.

 

# tasks.py
from celery import Celery

app = Celery('tasks', backend='redis://localhost:6379', broker='redis://localhost:6379')
 
@app.task
def add(x, y):
  return x + y

 

위의 예제는 Celery 인스턴스를 생성하고 간단한 task 를 등록한 코드 예제이다.

Celery 인스턴스의 첫번째 매개변수는 해당 모듈의 이름 (파일명) 이고 그 다음에 오는 backend 와 broker 는 각각 worker 의 작업 결과를 저장할 백엔드와 message broker 에 대한 정보이다. 인자값으로는 해당 서버들의 url 을 입력한다.

- celery worker server 실행

Celery 명령어의 worker 인자를 사용하여 celery server 를 실행한다.

 

celery –A tasks worker –loglevel=INFO

 

위 명령어에서 -A 는 application 을 의미하는데, tasks 모듈을 읽어서 해당 모듈의 application 을 실행한다는 의미이다. worker 는 worker 인스턴스를 실행한다는 의미이고 loglevel 은 celery 서버에서 출력될 로그의 레벨을 지정하는 인자이다.

 

위의 명령어를 실행하면 celery 서버가 foreground 로 실행되어 화면에 서버 로그들이 출력되게 된다.

- task 실행

Celery 인스턴스에 등록된 task 를 실행하기 위해서는 deley() 메서드를 사용한다. delay() 메서드를 사용하면 task 수행 요청이 celery 서버로 전달되어 worker 에서 수행된다.

 

# client.py
from tasks import add, app

if __name__ == "__main__":
  result = add.delay(4, 4)

  print(result.ready())
  print(result.ready())
  print(result.ready())
  print(result.ready())
  print(result.get(timeout=1))
  print(result.get(timeout=1))
  print(result.get(timeout=1))

 

위의 예제는 이전에 tasks 의 add task 를 실행하고 그 결과를 출력하는 예제이다. add.delay(4, 4) 를 실행하면 delay() 메서드의 인자들이 task 에 입력되어 add(4, 4) 가 실행되고 celery server 의 로그에 결과가 출력된다.

 

delay() 메서드는 AsyncResult 객체를 반환한다. 이 객체는 task 의 상태와 결과를 확인할 수 있다. 이를 확인하기 위해서는 화면에서와 같이 ready(), get() 메서드를 사용하면 된다. ready() 는 task 가 완료되었는지 여부를 반환하고, get() 은 task 의 결과를 반환한다.

 

이때 결과를 확인하기 위해서는 celery 의 result_backend 가 설정이 되어있어야 한다. 앞의 celery application 항목에서 celery 인스턴스를 생성할 때 redis 를 backend 로 설정을 해주었다. 이 설정을 통해서 celery 의 작업 결과들이 redis 에 저장되고 이를 통해 task 의 상태와 결과를 조회할 수 있게 된다. 만약 result_backend 를 설정해주지 않으면 ready(), get() 메서드를 호출하면 에러가 발생한다.

- configuration

Celery 는 운영을 많은 설정이 필요없지만 input 과 output 에 대한 설정이 필요한 경우가 있다. 우선 input 은 broker 를 통해서 입력받기 때문에 무조건 연결 설정이 필요하다. Output 의 경우 result_backend 에 task 결과 저장이 필요하다면 result_backend 를 설정해주어야 한다.

 

Celery 의 기본 설정을 사용해도 되지만 요구사항에 맞는 사용을 위해서 더 자세한 설정이 필요한 경우가 있다. Celery 에서 할 수 있는 설정들은 Configuration and defaults  에서 확인할 수 있다.

 

Celery 의 설정을 변경하위 해서는 아래의 예제와 같이 celery 인스턴스에 직접 접근하여 설정을 해줄 수 있다. app.conf 에서 해당 설정을 직접 변경해주거나 여러건의 설정을 변경하는 경우 update() 메서드를 사용할 수 있다.

 

app.conf.task_serializer = 'json'

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

 

만약에 큰 프로젝트에서 많은 설정을 해야하는 경우에는 이를 모듈로 분리하여 설정에 사용할 수 있다. 따로 모듈에 celery 인스턴스의 설정들을 기술하고 config_from_object() 메서드를 사용하여 해당 설정 모듈을 celery 인스턴스에 적용할 수 있다.

 

아래는 config_from_object 예제이다. celeryconfig.py 모듈에 celery 설정을 기술하고 이를 config_from_object() 메서드를 통해 celery 인스턴스에 적용한다.

 

# celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'
 
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

 

# celery application code
...
app.config_from_object('celeryconfig')
...

[Reference]

- https://docs.celeryq.dev/en/stable/

 

Celery - Distributed Task Queue — Celery 5.2.7 documentation

This document describes the current stable version of Celery (5.2). For development docs, go here.

docs.celeryq.dev

- https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html

 

First Steps with Celery — Celery 5.2.7 documentation

This document describes the current stable version of Celery (5.2). For development docs, go here. First Steps with Celery Celery is a task queue with batteries included. It’s easy to use so that you can get started without learning the full complexities

docs.celeryq.dev

 

반응형

'기타' 카테고리의 다른 글

[gRPC] gRPC 와 Protocol Buffer  (0) 2024.05.09
[SQLite] SQLite 설명 및 예제  (1) 2023.10.17
[FastAPI] BackgroundTasks  (0) 2023.05.26
[NGINX] NGINX 란?  (1) 2023.05.11
[MongoDB] Aggregation  (0) 2023.05.02