Post

Celery

๐Ÿ’ก pycon korea 19 ์ด์ง€ํ›ˆ

Celery?

๋ฉ”์„ธ์ง€ ์ „๋‹ฌ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•œ ๋น„๋™๊ธฐ task Queue

  • client : ์ž‘์—… ์š”์ฒญ
  • Worker : ์ž‘์—… ์ˆ˜ํ–‰
  • Broker : ๋ฉ”์„ธ์ง€ ์ „๋‹ฌ

  • AMQP(Advanced Message Queueing Protocol)
    • Celery์˜ ํ•ต์‹ฌ ํ”„๋กœํ† ์ฝœ

      แ„‰แ…ณแ„แ…ณแ„…แ…ตแ†ซแ„‰แ…ฃแ†บ 2023-05-31 แ„‹แ…ฉแ„’แ…ฎ 11 05 23

    • Producer๋Š” ๋ฉ”์„ธ์ง€๋ฅผ ์ƒ์„ฑํ•˜์—ฌ Broker์—๊ฒŒ ๋ณด๋ƒ„
    • Broker ๋Š” Confirm(ํ™•์ธ)์„ํ•˜๊ณ  consumer์—๊ฒŒ ์ „๋‹ฌ
    • Consumer๋Š” Broker Acknowledge๋ฅผ ๋ฐ›์•„์„œ ์•ˆ์ •์ ์œผ๋กœ ๋ฉ”์„ธ์ง€ ์ „๋‹ฌํ•˜๊ฒŒ ํ•จ

์•ˆ์ •์ ์œผ๋กœ ์™„๋ฃŒํ•˜๊ธฐ

Late ACK

Celery Worker๊ฐ€ ack ํ•˜๋Š” ์‹œ์ 

  • Default
    • Worker๊ฐ€ task ์‹คํ–‰ํ•˜๊ธฐ ์ง์ „์— ack
    • ์‹คํ–‰๋˜์ง€ ๋ชปํ•˜๋”๋ผ๋„ Broker๋Š” ack๋ฅผ ๋ฐ›์•˜๊ธฐ ๋•Œ๋ฌธ์— ํ์—์„œ ํ•ด๋‹น task๋ฅผ ์ œ๊ฑฐ์‹œํ‚ด
  • acks_late=True
    • task ์‹คํ–‰์ด ๋‹ค ์™„๋ฃŒ๋˜๋Š” ์‹œ์ ์—์„œ ack
    • task๊ฐ€ ์‹คํ–‰๋˜์ง€ ๋ชปํ•ด๋„ ์•„์ง ํ์— ๋‚จ์•„์žˆ์–ด์„œ ๋‹ค์‹œ ์‹คํ–‰๊ฐ€๋Šฅํ•œ ์•ˆ์ •์„ฑ ํ™•๋ณด
    • ๊ทธ๋Ÿฌ๋‚˜ ์ค‘๋ณต ์‹คํ–‰ ๋  ์ˆ˜๋„ ์žˆ์Œ!
    1
    2
    3
    4
    
      @app.task(acks_late=True)
      def my_task(x):
          result = x * 2
          return result
    

Task Retry

  • task retry ์˜ต์…˜
    • Atomicity
    • ์˜ˆ์ƒ๊ฐ€๋Šฅํ•œ Exception์—๋งŒ ์ ์šฉ
    1
    2
    3
    4
    5
    
      @app.task(autoretry_for=(ConnectionError,), # ์ž๋™ ์žฌ์‹œ๋„๋ฅผ ์ˆ˜ํ–‰ํ•  ์˜ˆ์™ธ ํด๋ž˜์Šค
      					retry_backoff=2, # ์žฌ์‹œ๋„ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ(secodns)
      					retry_kwargs={'max_retries':3}) # ์ตœ๋Œ€ ์žฌ์‹œ๋„ ์ˆ˜ ์ง€์ •
      def send_mail(..):
      	...
    

Visibility Timeout

๋ฉ”์‹œ์ง€๊ฐ€ ์ž‘์—…์ž์—๊ฒŒ ์ „๋‹ฌ๋œ ํ›„์— ์–ผ๋งˆ ๋™์•ˆ ๋‹ค๋ฅธ ์ž‘์—…์ž๊ฐ€ ํ•ด๋‹น ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์—†๋Š” ์ƒํƒœ๋กœ ์œ ์ง€ํ•˜๋Š” ์‹œ๊ฐ„ ์„ค์ •, ์ฆ‰ ํ•ด๋‹น ์‹œ๊ฐ„์ด ์ง€๋‚˜๋ฉด ๋‹ค๋ฅธ worker์—๊ฒŒ task๊ฐ€ ๋ฐฐ์ •๋œ๋‹ค.

1
2
3
4
@app.task(bind=True, visibility_timeout=10)
def my_task_with_visibility_timeout(self, x):
    result = x * 2
    return result

์ž‘์—…์ž๊ฐ€ ํ•ด๋‹น ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์€ ํ›„ 10์ดˆ ๋™์•ˆ ๋‹ค๋ฅธ ์ž‘์—…์ž๊ฐ€ ๋™์ผํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์—†๋‹ค.

๋งŒ์•ฝ 10์ดˆ ๋‚ด์— ์ž‘์—…์ž๊ฐ€ ํ•ด๋‹น ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•˜์ง€ ์•Š์œผ๋ฉด ๋ฉ”์‹œ์ง€๋Š” ๋‹ค์‹œ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์ƒํƒœ๋กœ ๊ฐ„์ฃผ๋จ

๊ทธ๋ž˜์„œ ์žฅ์‹œ๊ฐ„ ์‹คํ–‰๋˜๋Š” task๋ผ๋ฉด visibility_timeout ์˜ต์…˜์„ ๊ธธ๊ฒŒ ์„ค์ •ํ•˜๋Š” ๊ฒƒ์ด ์ค‘๋ณต ์‹คํ–‰ ๋ฐฉ์ง€๋จ

ํšจ์œจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ

  • ์ž‘์—…์˜ ์ฒ˜๋ฆฌ์— ๋”ฐ๋ผ ๋‹ค๋ฅด๊ฒŒ ์ฒ˜๋ฆฌ
    • IO / CPU
    • ์ค‘์š”๋„
    • ์ˆ˜ํ–‰์‹œ๊ฐ„
    • ์‹คํ–‰ ๋นˆ๋„

Limits

  • Time Limit
    • task๊ฐ€ ์ผ์ • ์‹œ๊ฐ„ ์ด์ƒ ์‹คํ–‰๋˜๊ณ  ์žˆ์œผ๋ฉด ์ข…๋ฃŒ
    • soft_time_limit=60 , 60์ดˆ๊ฐ€ ์ง€๋‚˜๋ฉด SoftTimeLimitExceeded raise
    • hard_time_limit=60
    • ๊ธ€๋กœ๋ฒŒ ์„ค์ •๋„ ๊ฐ€๋Šฅ(app.conf.worker_soft_time_limit = 60)

Prefetch

  • worker_prefetch_multiplier

Gevent/ Eventlet

  • Gevent

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
      from celery import Celery
      from gevent import monkey
        
      # gevent๊ฐ€ I/O ์ž‘์—…์„ ๋น„๋™๊ธฐ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ํŒŒ์ด์ฌ ๋‚ด์žฅ ๋ชจ๋“ˆ๋“ค์„ ๋Œ€์ฒดํ•˜๊ฑฐ๋‚˜ ์ˆ˜์ •ํ•˜๋Š” ์—ญํ• 
      monkey.patch_all()
        
      app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379')
        
      @app.task
      def my_task(x):
          # I/O ์ž‘์—… ์ˆ˜ํ–‰
          perform_io_operation(x)
          return x
    
  • Eventlet

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
      from celery import Celery
      import eventlet
        
      eventlet.monkey_patch()
        
      app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379')
        
      @app.task
      def my_task(x):
          # I/O ์ž‘์—… ์ˆ˜ํ–‰
          perform_io_operation(x)
          return x
    

Prefork

Use Multi Queue

  • Queue ์„ค์ •
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery import Celery
from kombu import Queue

# Celery app ๊ฐ์ฒด ์ƒ์„ฑํ•˜๊ณ  app์ด๋ฆ„์„ test
app = Celery('test')
# ํ™˜๊ฒฝ ๋ณ€์ˆ˜ DJANGO_SETTINGS_MODULE์„ "config.settings"๋กœ ์„ค์ •
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
# Django์˜ ์„ค์ • ํŒŒ์ผ์—์„œ CELERY ๋„ค์ž„์ŠคํŽ˜์ด์Šค์— ํ•ด๋‹นํ•˜๋Š” ์„ค์ • ๊ฐ’์„ ๋กœ๋“œํ•˜๋Š” ์—ญํ• 
app.config_from_object('django.conf:settings', namespace='CELERY')
# task์— ํ๋ฅผ ์ง€์ •ํ•˜์ง€ ์•Š์•˜์„ ๋•Œ ๊ธฐ๋ณธ์œผ๋กœ ์‚ฌ์šฉ๋  ํ ์ง€์ •
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
	Queue('slow_tasks', routing_key='slow.#'),
	Queue('quick_tasks', routing_key='quick.#'),
)
  • task
1
2
3
4
5
6
7
8
9
10
11
12
13
@app.task
def slow_task(x):
	time.sleep(x)
	return x

@app.task
def quick_task(x):
	return x

for i in range(10, 100):
	slow_task.apply_async(args=[i], queue='slow_tasks')

quick_task.apply_async(args=[100], queue='quick_tasks')
  • worker ์‹คํ–‰
    • celery -A tasks worker -Q slow_tasks --loglevel=info
    • celery -A tasks worker -Q quick_tasks --loglevel=info
This post is licensed under CC BY 4.0 by the author.

ยฉ . Some rights reserved.

Using the Chirpy theme for Jekyll.