1. celery beat
celery beat 는 스케줄러이다. celery beat 가 주기적으로 작업을 시작하면, 가용한 워커에서 작업을 수행한다.
celery beat 를 사용하기 위해서는 작업 일정에 대한 entry 를 등록해야 한다. 기본적으로 beat_schedule 설정에서 entry 들을 가져오지만, SQL DB 등과 같이 커스텀 저장소에서 entry 들을 가져올수도 있다.
celery beat 를 사용할 때는 한번에 하나의 스케줄러만 동작하도록 해야한다. 만약 여러개의 스케줄러가 동작하는 경우 작업들이 중복으로 실행되어서 동기화와 락 문제가 발생할 수 있다. 이를 방지하기 위해서 하나의 스케줄러를 통한 중앙집중식으로 구성해야 한다.
2. celery beat 설정
1) Time Zone
스케줄러에서는 기본적으로 UTC 시간대를 사용한다. 시간대를 변경하려면 timezone 설정을 통해서 변경할 수 있다.
celery app 의 설정은 두가지 방법이 있는데, 직접 app 객체에 설정을 해주거나, configuration module 을 추가하여 설정할 수 있다.
# 1. celery app 에 직접 설정
app.conf.timezone = "Europe/London"
# 2. configuration module 을 추가한 설정
app.config_from_object("celeryconfig")
2) Entry
작업을 스케줄링하기 위해서는 beat schedule 목록에 entry 를 추가해야 한다.
entry 를 등록하는 방법으로는 add_periodic_task() 함수를 이용한 방법과 beat_schedule 에 직접 등록하는 방법이 있다.
# 1. add_periodic_task() 함수를 이용한 등록
def setup_periodic_tasks(sender: Celery, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('hello') every 30 seconds.
sender.add_periodic_task(30.0, test.s('hello'), name='add every 30')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
@app.task
def add(x, y):
z = x + y
print(z)
# 2. beat_schedule 에 직접 등록
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
entry 설정시에 사용하는 필드들은 다음과 같다.
- task
실행할 작업의 이름이다. 작업의 이름은 app 에 등록시에 task() 데코레이터에서 명시적으로 선언해 줄 수 있다. 만약 따로 선언하지 않았다면 작업이 정의된 모듈과 작업 함수의 이름의 조합으로 작업의 이름이 생성된다.
- schedule
작업을 실행할 주기를 정의하는 값이다. 초단위의 정수값이나 timedelta, 또는 crontab 객체를 입력할 수있다. 또는 schedule 인터페이스를 상속받아 커스텀 스케줄 타입을 정의하여 사용할 수도 있다.
from celery.schedules import crontab
app.conf.beat_schedule = {
# integer 값으로 스케줄링 - 30초마다 작업을 실행
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
# crontab 을 이용한 작업 스케줄링 - 매일 오전 7시 30분에 작업 실행
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
- args, kwargs
함수에 사용될 매개변수들의 값으로 args 는 list 또는 tuple 형식을 입력할 수 있고, kwargs 는 dict 형식을 입력할 수 있다.
- options
작업 실행 옵션으로 작업 함수를 실행하는 apply_async() 함수에서 사용되는 exchange, routing_key, expires 등의 매개변수들을 사용할 수 있다.
- relative
relative 는 schedule 옵션은 boolean 값으로 설정하여 작업 수행 주기를 상대값으로 적용할지 절대값으로 적용할지를 제어하는 값이다. 기본값은 False 로 매 정각, 매 분 등 절대 시간을 기준으로 실행된다. 반면에 True 인 경우에는 celery beat 가 시작된 시간을 기준으로 상대적인 주기로 작업이 실행된다.
예를들어 "schedule=30.0" 으로 작업을 예약하였다면, relative 가 False 일 때는 매 분 30초마다 (00:00:30. 00:01:30, ...) 작업이 실행된다. 반면에 False 인 경우에는 celery beat 가 실행된 시간을 기준으로 30초마다 작업이 실행된다. (00:00:15 에 시작시, 00:00:45, 00:01:15, 00:02:45 ...)
3. celery beat 실행
celery beat 를 실행하기 위해서는 아래와 같이 명령어를 입력하면 된다. 여기서 proj 는 celery app 이 정의되어 있는 파이썬 모듈을 의미한다.
$ celery -A proj beat
beat 는 작업의 마지막 실행시간을 로컬 DB 파일에 저장해야 한다. 기본값으로는 celery-beat-schdule 이라는 파일명으로 현재 디렉토리에 저장된다. 파일의 경로를 변경하고 싶은 경우 아래와 같이 명령어에 커스텀 경로를 입력하여 변경할 수 있다.
$ celery -A proj beat -s /home/celery/var/run/celery-beat-schdule
4. 실행예제
문자열 "hello" 를 출력하고 반환하는 작업을 10초마다 수행하도록 celery beat 로 구현한 예제이다. 먼저 celery app 을 생성하고 config_from_object() 함수를 통해 beat.conf 모듈로 celery app 을 수행한다.
# app/main.py
from celery import Celery
app = Celery("app", broker="pyamqp://guest@localhost//")
# config_from_object() 함수로 celery beat 설정파일
app.config_from_object("beat.conf")
실제 작업 함수가 정의되어있는 beat.tasks 모듈을 include 하고, beat_schedule 에 수행할 entry 를 정의하였다. tasks_hello 라는 이름으로 say-hello 작업이 10초마다 수행되도록 entry 에 등록하였다.
# beat/conf.py
include = ["beat.tasks"]
beat_schedule={
"tasks_hello": {
"task": "say-hello",
"schedule": 10.0,
"args": ()
},
}
작업 함수인 say_hello() 는 beat/tasks.py 파일에 정의하였다. say_hello() 는 문자열 "hello" 를 sysout 에 출력하고 return 값으로 반환한다.
# beat/tasks.py
from app.main import app
@app.task(name="say-hello")
def say_hello():
hello = "hello"
print(hello)
return hello
설정한 celery app 을 실행하기 위해서는 worker 와 beat 를 모두 동작시켜야 한다. beat 는 스케줄러로 작업을 설정된 주기에 맞춰 시작시키고, worker 는 스케줄러에 의해 시작된 작업을 수행한다.
아래에 worker 와 beat 를 각각 실행하고 그 결과를 캡쳐하였다.
- celery worker 실행 명령어 및 로그
# celery worker 실행 명령어
$ celery -A app.main worker --loglevel=info -P solo
- celery beat 실행 명령어 및 로그
# celery beat 실행 명령어
$ celery -A app.main beat --loglevel=info
celery 실행 결과를 통해 worker 와 beat 가 각각 동작되고 10초마다 beat 에서 say-hello 작업을 sending 하고 worker 에서는 say-hello 작업을 실행하여 hello 가 출력 및 반환되는 것을 확인할 수 있다.
[Reference]
- https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html
- https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html#configuration
- https://docs.celeryq.dev/en/stable/userguide/tasks.html#task-names
- https://blog.naver.com/PostView.nhn?blogId=c_ist82&logNo=220777624611
'기타 > Celery' 카테고리의 다른 글
[Celery] Celery 란? (0) | 2023.06.01 |
---|