Celery
๐ก pycon korea 19 ์ด์งํ
Celery?
๋ฉ์ธ์ง ์ ๋ฌ์ ๊ธฐ๋ฐ์ผ๋ก ํ ๋น๋๊ธฐ task Queue
- client : ์์ ์์ฒญ
- Worker : ์์ ์ํ
Broker : ๋ฉ์ธ์ง ์ ๋ฌ
- AMQP(Advanced Message Queueing Protocol)
์์ ์ ์ผ๋ก ์๋ฃํ๊ธฐ
Late ACK
Celery Worker๊ฐ ack ํ๋ ์์
- Default
- Worker๊ฐ task ์คํํ๊ธฐ ์ง์ ์ ack
- ์คํ๋์ง ๋ชปํ๋๋ผ๋ Broker๋ ack๋ฅผ ๋ฐ์๊ธฐ ๋๋ฌธ์ ํ์์ ํด๋น task๋ฅผ ์ ๊ฑฐ์ํด
acks_late=True
- task ์คํ์ด ๋ค ์๋ฃ๋๋ ์์ ์์ ack
- task๊ฐ ์คํ๋์ง ๋ชปํด๋ ์์ง ํ์ ๋จ์์์ด์ ๋ค์ ์คํ๊ฐ๋ฅํ ์์ ์ฑ ํ๋ณด
- ๊ทธ๋ฌ๋ ์ค๋ณต ์คํ ๋ ์๋ ์์!
1 2 3 4
@app.task(acks_late=True) def my_task(x): result = x * 2 return result
Task Retry
- task retry ์ต์
- Atomicity
- ์์๊ฐ๋ฅํ Exception์๋ง ์ ์ฉ
1 2 3 4 5
@app.task(autoretry_for=(ConnectionError,), # ์๋ ์ฌ์๋๋ฅผ ์ํํ ์์ธ ํด๋์ค retry_backoff=2, # ์ฌ์๋ ์๊ฐ ๊ฐ๊ฒฉ(secodns) retry_kwargs={'max_retries':3}) # ์ต๋ ์ฌ์๋ ์ ์ง์ def send_mail(..): ...
Visibility Timeout
๋ฉ์์ง๊ฐ ์์ ์์๊ฒ ์ ๋ฌ๋ ํ์ ์ผ๋ง ๋์ ๋ค๋ฅธ ์์ ์๊ฐ ํด๋น ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ ์ํ๋ก ์ ์งํ๋ ์๊ฐ ์ค์ , ์ฆ ํด๋น ์๊ฐ์ด ์ง๋๋ฉด ๋ค๋ฅธ worker์๊ฒ task๊ฐ ๋ฐฐ์ ๋๋ค.
1
2
3
4
@app.task(bind=True, visibility_timeout=10)
def my_task_with_visibility_timeout(self, x):
result = x * 2
return result
์์ ์๊ฐ ํด๋น ๋ฉ์์ง๋ฅผ ๋ฐ์ ํ 10์ด ๋์ ๋ค๋ฅธ ์์ ์๊ฐ ๋์ผํ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ค.
๋ง์ฝ 10์ด ๋ด์ ์์ ์๊ฐ ํด๋น ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ์ง ์์ผ๋ฉด ๋ฉ์์ง๋ ๋ค์ ์ฌ์ฉ ๊ฐ๋ฅํ ์ํ๋ก ๊ฐ์ฃผ๋จ
๊ทธ๋์ ์ฅ์๊ฐ ์คํ๋๋ task๋ผ๋ฉด visibility_timeout
์ต์
์ ๊ธธ๊ฒ ์ค์ ํ๋ ๊ฒ์ด ์ค๋ณต ์คํ ๋ฐฉ์ง๋จ
ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ธฐ
- ์์
์ ์ฒ๋ฆฌ์ ๋ฐ๋ผ ๋ค๋ฅด๊ฒ ์ฒ๋ฆฌ
- IO / CPU
- ์ค์๋
- ์ํ์๊ฐ
- ์คํ ๋น๋
Limits
- Time Limit
- task๊ฐ ์ผ์ ์๊ฐ ์ด์ ์คํ๋๊ณ ์์ผ๋ฉด ์ข ๋ฃ
soft_time_limit=60
, 60์ด๊ฐ ์ง๋๋ฉดSoftTimeLimitExceeded
raisehard_time_limit=60
- ๊ธ๋ก๋ฒ ์ค์ ๋ ๊ฐ๋ฅ(
app.conf.worker_soft_time_limit = 60
)
Prefetch
- worker_prefetch_multiplier
Gevent/ Eventlet
Gevent
1 2 3 4 5 6 7 8 9 10 11 12 13
from celery import Celery from gevent import monkey # gevent๊ฐ I/O ์์ ์ ๋น๋๊ธฐ๋ก ์ฒ๋ฆฌํ ์ ์๋๋ก ํ์ด์ฌ ๋ด์ฅ ๋ชจ๋๋ค์ ๋์ฒดํ๊ฑฐ๋ ์์ ํ๋ ์ญํ monkey.patch_all() app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379') @app.task def my_task(x): # I/O ์์ ์ํ perform_io_operation(x) return x
Eventlet
1 2 3 4 5 6 7 8 9 10 11 12
from celery import Celery import eventlet eventlet.monkey_patch() app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379') @app.task def my_task(x): # I/O ์์ ์ํ perform_io_operation(x) return x
Prefork
Use Multi Queue
- Queue ์ค์
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery import Celery
from kombu import Queue
# Celery app ๊ฐ์ฒด ์์ฑํ๊ณ app์ด๋ฆ์ test
app = Celery('test')
# ํ๊ฒฝ ๋ณ์ DJANGO_SETTINGS_MODULE์ "config.settings"๋ก ์ค์
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
# Django์ ์ค์ ํ์ผ์์ CELERY ๋ค์์คํ์ด์ค์ ํด๋นํ๋ ์ค์ ๊ฐ์ ๋ก๋ํ๋ ์ญํ
app.config_from_object('django.conf:settings', namespace='CELERY')
# task์ ํ๋ฅผ ์ง์ ํ์ง ์์์ ๋ ๊ธฐ๋ณธ์ผ๋ก ์ฌ์ฉ๋ ํ ์ง์
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('slow_tasks', routing_key='slow.#'),
Queue('quick_tasks', routing_key='quick.#'),
)
- task
1
2
3
4
5
6
7
8
9
10
11
12
13
@app.task
def slow_task(x):
time.sleep(x)
return x
@app.task
def quick_task(x):
return x
for i in range(10, 100):
slow_task.apply_async(args=[i], queue='slow_tasks')
quick_task.apply_async(args=[100], queue='quick_tasks')
- worker ์คํ
celery -A tasks worker -Q slow_tasks --loglevel=info
celery -A tasks worker -Q quick_tasks --loglevel=info