Skip to content

Celery

셀러리(Celery)는 분산 메시지 전달에 기반을 둔 오픈 소스 비동기 태스크 큐, 잡 큐이다. 스케줄링을 지원하지만 실시간 운영에 초점을 두고 있다.

Categories

Celery 설치

pip install celery

Example

Broker 선택하기

Celery 는 메시지로 작업(Task)을 주고 받는 시스템이기 때문에 중간에 브로커(Broker) 역할을 하는 분리된 중계 시스템이 필요하다. 다양한 Broker 들과 호환 되지만 오랜 역사와 안정성을 자랑하는 RabbitMQ를 사용하기로 했다.

RabbitMQ는 AMQP (Advanced message queuing protocol)을 지원하는 메시지 브로커이다.

아래 명령어는 로컬 개발환경에서 쉽고 빠르게 RabbitMQ 서버를 실행하기 위해 도커를 사용했다.

docker run --hostname my-rabbit \
    -p 5672:5672 \
    -p 8080:15672 \
    -e RABBITMQ_DEFAULT_USER=guest \
    -e RABBITMQ_DEFAULT_PASS=guest \
    --name some-rabbit \
    rabbitmq:3-management

Task 만들기

가장 먼저 할 일은 Celery 인스턴스를 정의하는 것이다. 보통 이것을 Celery App이라고 부르며, Task를 만들고 Worker를 관리하는 등의 Celery가 수행하는 모든 것에 대한 시작점이 된다.

Celery Class의 첫번째 위치 인자로 모듈의 이름을 전달하고, broker 키워드 인자로 Broker 연결 정보를 넘긴다.

# tasks.py
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest:guest@localhost//')

다음 간단한 Task 함수를 작성 해본다. 간단하게 연습하는 단계라서 같은 파이썬 모듈(tasks.py) 안에 Celery 인스턴스와 Task 함수를 함께 정의했다.

# tasks.py

@app.task
def add(x, y):
    return x + y

Celery Woker 실행하기

Celery 를 설치하면 자동으로 celery 커맨드를 사용할 수 있게 된다. 그래서 Celery Worker 를 구동하기 위해 $ celery worker 같은 형태로 기본 커맨드를 구성하고, -A {모듈명} 또는 --app={모듈명} 파라미터로 위에서 정의한 Celery 인스턴스가 포함된 파이썬 모듈을 지정한다.

celery -A tasks worker --loglevel=info

Task 호출하기

Celery Task 도 결국 파이썬 함수이다. 보통 파이썬 함수를 호출할 때 add() 같이 하면 되지만, Celery Task 로써 호출하려면 add.delay() 같은 형태로 delay() 메소드를 호출한다. (이 메소드는 apply_async() 메소드의 축소 버전이다.)

from tasks import add
add.delay(1, 2)

Task를 호출한 결과로 #AsyncResult 인스턴스가 반환된다. 이 객체를 이용해서 Task가 완료 되었는지 결과값을 반환했는지 같은 상태를 확인할 수 있다.

기본적으로 Task 실행 결과값은 저장되지 않는다. 결과를 DB에 저장하거나 RPC 호출을 위해서는 result backend 설정을 추가해야 한다.

작업 결과 보관하기 (Result Backend)

Celery는 Task 작업 결과를 저장하기 위한 여러가지 result backend 를 내장하고 있다. (예: SQLAlchemy, Django, Memcached, Redis, RPC 등등)

result backend 를 지정하려면 Celery 인스턴스에 backend 키워드 인자를 추가한다.

rpc 를 result backend 로 지정할 경우 작업 결과를 임시 AMQP 메시지로 다시 돌려보내는 방식으로 동작하고, 작업 결과로 반환된 AsyncResult 인스턴스를 유지할 수 있게 된다.

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

Task의 완료 상태를 확인하기 위해 ready() 메소드를 사용한다.

result = add.delay(4, 4)

result.ready()
# True

get() 메소드로 완료된 작업의 결과값을 가져올 수 있다. 만약 호출하는 시점에 작업이 완료되지 않은 경우 완료될때까지 기다리게 된다. (보통 잘 사용하지 않는다고 한다)

result.get()

Task 실행 중 예외가 발생한 경우 get() 메소드를 통해 예외가 전달될 수 있다. propagate 인자를 False로 전달할 경우에는 이 예외가 전달되지 않는다.

result.get(propagate=False)

Task 함수에 task(ignore_result=True) 데코레이터를 적용하면 개별 Task 단위로 result backend 사용을 중지할 수 있다.

@app.task(ignore_result=True)
def subtraction(x, y):
    return x - y

Celery 설정을 분리해서 관리하기

Celery를 운영하는데 많은 설정이 필요하지 않지만, Broker 연결은 필수이며, result backend 는 선택 사항이다. 위 예제에서 기본 설정들을 Celery 인스턴스에 직접 전달했지만, 별도 전용 모듈(py)로 따로 정의하는 것이 하드 큰 프로젝트에서는 설정을 제어하기 더 수월하다. 더 많은 설정은 Configuration and defaults 에서 확인할 수 있다.

설정을 전용 모듈로 분리 했다면 app.config_from_object() 메소드를 이용해 설정 모듈을 Celery 인스턴스에 전달 할 수 있다.

app.config_from_object('celeryconfig')

celeryconfig.py 파일:

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Seoul'
enable_utc = True

# 오작동 한 작업을 전용 대기열로 라우팅하는 설정
task_routes = {
    'tasks.add': 'low-priority'
}

# 작업 속도를 제한하는 설정
task_annotations = {
    'tasks.add': {'rate_limit': '10/m'
}

특정 워커를 선택하는 방법

간단히 요약하면, Message Queue 를 분할 하면 된다.

Using Amazon SQS

AsyncResult

See also

Favorite site