Backend

Dramatiq jako kolejka zadań. Jak usprawnić skalowanie systemu

dramatiq exatel

W jednym z naszych projektów pojawiła się potrzeba jednoczesnego uruchamiania setek lub tysięcy zadań. Były to najczęściej idempotentne, niezależne od siebie funkcje uruchamiane w reakcji na zdarzenie w systemie, na których wynik możemy trochę poczekać. Przykładami takich zadań było generowanie i wysyłanie raportów czy ekstrakcja danych z pliku i zapis do bazy danych.

Większość kodu niewymagającego najwyższej wydajności napisaliśmy w Pythonie. W związku z tym szukaliśmy projektu w łatwy sposób pozwalającego na integrację właśnie z tym językiem. O tym w artykule.

[starbox=”Szymon Nogieć”]

Wymagania, które stawialiśmy kolejce zadań to:

  • wysoka niezawodność i wydajność,
  • prostota skalowania,
  • łatwość użycia,
  • integracja z Pythonem w wersji 3.5+.

Mile widziane były cechy, takie jak:

  • możliwość wykorzystania Redisa jako backend dla zadań i wyników,
  • priorytetyzacja zadań,
  • automatyczne ponawianie,
  • pipelining,
  • prostota implementacji.

Rozwiązania jakie braliśmy pod uwagę to Celery, rq, dramatiq, huey. Które wybraliśmy i dlaczego?

Celery odrzuciliśmy z powodu wielkości, skomplikowania i problemów z używaniem. Nie spełniał kilku z naszych wymagań (prostota implementacji, priorytetyzacja zadań).

rq wykorzystujemy w TAMA i także sprawiała trochę problemów. Jest także dosyć prosta i nie spełnia kilku z cech nice-to-have (niezawodność, automatyczne ponawianie). Dodatkowo brakuje możliwości równoległego przetwarzania bez uruchamiania wielu instancji rq. Chcieliśmy także spróbować innego rozwiązania niż rq – potencjalnie lepszego.

Wybieraliśmy pomiędzy huey a dramatiq. Zdecydowaliśmy się na to drugie, ponieważ jako jedno z głównych wymagań twórcy postawili sobie “high reliability and performance”. Dodatkowo twórca dramatiq zapewnia niezawodność dostarczania zadań do workerów i auto przeładowywanie kodu. Całe porównanie między rozwiązaniami można znaleźć na tej stronie. Jako backend do kolejkowania zadań pozwala na użycie m.in. Redis czy RabbitMQ. Ze względu na dużą znajomość Redisa w zespole oraz fakt, iż już używaliśmy go w projekcie, to właśnie tej bazy użyliśmy jako backend komunikacyjny.

Użycie

Zaprezentowane przykłady uruchamiam z developerskich kontenerów i docker-compose zdefiniowanymi w następujący sposób.

docker-compose.yml

version: "3.5"

services:
 redis:
   image: redis:6.2.6
 worker:
   build:
     context: .
   volumes:
     - './:/code'
   depends_on:
     - redis
   command: dramatiq tasks
 task_sender:
   build:
     context: .
   volumes:
     - './:/code'
   depends_on:
     - worker
   command: python ./main.py

Dockerfile

FROM python:3.10.0-bullseye

COPY ./Pipfile ./Pipfile.lock ./
RUN apt-get update && apt-get install --no-install-recommends -y 
pipenv && pipenv install --system --dev --deploy

WORKDIR /code

Instancje aplikacji workera włączam przy użyciu aplikacji dramatiq. Uruchamia ona wiele równoległych procesów workera, które pobierają z backendu zakolejkowane zadania i je realizuje. Jako parametry przyjmowane są nazwy modułów, w których znajdują się definicje zadań do realizacji w ramach workera. Dramatiq pozwala także na parametryzację liczby używanych procesów/wątków (domyślnie procesów jest tyle, ile rdzeni procesora i 8 wątków na każdy proces).

Funkcję do wykonania w ramach kolejki definiujemy w dramatiq jako aktora. Jest to cienki wrapper dla callable’i zawierający metadane dotyczącego tego, w jaki sposób powinny zostać one asynchronicznie uruchamiane w ramach kolejki.

Podstawowy przykład wygląda w ten sposób:

tasks.py

@dramatiq.actor
def hello_queue():
    print("Hello world!")

Tak udekorowaną funkcję możemy wywoływać w normalny, synchroniczny sposób:

>>> import tasks
>>> tasks.hello_queue()
Hello world!

Taką funkcję możemy także wysłać na worker do wykonania asynchronicznego – to znaczy zlecenie realizacji zadania, gdy instancja workera będzie miała taką możliwość. Najpierw jednak należy ustawić konfigurację backendu dla kolejek jako Redis, gdyż domyślnie dramatiq wykorzystuje RabbitMQ:

tasks.py

from dramatiq.brokers.redis import RedisBroker

redis_broker = RedisBroker(host="redis")
dramatiq.set_broker(redis_broker)

Następnie możemy wysłać trzy zadania do realizacji. Wywołanie metody send() natychmiast kolejkuje je do wykonania, wykorzystując wcześniej skonfigurowany backend redisowy.

main.py

import tasks

def example1():
    for i in range(3):
        print(f"Sending task nr {i}")
        tasks.hello_queue.send()

def main():
    example1()

if __name__ == '__main__':
    main()

Następnie uruchamiamy całość z użyciem docker-compose:

➜ docker-compose up  
Creating art_dramatiq_redis_1 ... done
Creating art_dramatiq_worker_1 ... done
Creating art_dramatiq_task_sender_1 ... done
(...)
task_sender_1  | Sending task nr 0
task_sender_1  | Sending task nr 1
task_sender_1  | Sending task nr 2
worker_1       | Hello world!
worker_1       | Hello world!
worker_1       | Hello world!
art_dramatiq_task_sender_1 exited with code 0

Oczywiście do aktorów możemy przekazywać parametry. Jedynym warunkiem, który muszą spełnić jest możliwość serializacji do JSONa, ponieważ w ten sposób zostają przesłane i zapisane w backendzie. Serializator JSON można zmienić, na przykład na pickle.

Innym sposobem definicji aktorów jest używanie struktury klas. Aby zdefiniować zadania dla kolejki w ten sposób, należy zdefiniować klasę dziedziczącą po GenericActor i implementując metodę perform(self).

tasks.py

class ClassBasedActor(dramatiq.GenericActor):
    def perform(self):
        print("Hello from class based actor!")

main.py

def class_based_actor():
    tasks.ClassBasedActor.send()
(...)
def main():
    class_based_actor()

Ustawienia dla aktorów

Każdemu z aktorów możemy ustawiać parametry ich wykonywania. W przypadku dekoratora jest to realizowane przez przekazywanie do niego argumentów, dla aktorów opartych o hierarchię klas jest to realizowane w postaci parametrów wewnętrznej klasy Meta. Poniżej spojrzymy na kilka z nich.

Powtórzenia

Dla każdego aktora możemy zdefiniować parametry określające w jaki sposób workery mają realizować jego powtarzanie w przypadku niepowodzenia (na przykład rzucenie wyjątkiem). Możemy zdefiniować np. maksymalną liczbę powtórzeń, warunek kiedy zadanie ma być uruchamiane ponownie, czas po jakim ma zostać powtórzone, wyjątki które nie rozpoczną powtórnego uruchomienia, etc.

Poniżej mamy przykład aktora, który zostanie uruchomiony ponownie maksymalnie dwa razy:

tasks.py

@dramatiq.actor(max_retries=2)
def task_to_retry():
    print("Running task")
    raise RuntimeError("Error occurred")

i uruchamiamy go raz. Po starcie widzimy:

worker_1       | Running task
worker_1       | [2021-10-13 11:16:26,971] [PID 9] [Thread-4] [dramatiq.worker.WorkerThread] [ERROR] Failed to process message task_to_retry() with unhandled exception.
worker_1       | Traceback (most recent call last):
(...)
worker_1       | RuntimeError: Error occurred
worker_1       | [2021-10-13 11:16:26,972] [PID 9] [Thread-4] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '61f77357-57ed-4b52-b3e5-b6872f0e5135' in 10844 milliseconds.
art_dramatiq_task_sender_1 exited with code 0
worker_1       | Running task
worker_1       | [2021-10-13 11:16:37,956] [PID 13] [Thread-4] [dramatiq.worker.WorkerThread] [ERROR] Failed to process message task_to_retry() with unhandled exception.
worker_1       | Traceback (most recent call last):
(...)
worker_1       | RuntimeError: Error occurred
worker_1       | [2021-10-13 11:16:37,960] [PID 13] [Thread-4] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '61f77357-57ed-4b52-b3e5-b6872f0e5135' in 18275 milliseconds.
worker_1       | Running task
worker_1       | [2021-10-13 11:16:56,834] [PID 13] [Thread-4] [dramatiq.worker.WorkerThread] [ERROR] Failed to process message task_to_retry() with unhandled exception.
worker_1       | Traceback (most recent call last):
(...)
worker_1       | RuntimeError: Error occurred
worker_1       | [2021-10-13 11:16:56,836] [PID 13] [Thread-4] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '61f77357-57ed-4b52-b3e5-b6872f0e5135' in 58764 milliseconds.

W logach widzimy, że zadanie zostało uruchomione trzykrotnie. Pierwszy raz było to normalne wykonanie, a po wyrzuceniu wyjątku – dwie próby ponownego wykonania. Po przekroczeniu limitu nastąpiło porzucenie zadania.

Czas

Dramatiq pozwala także na konfigurację maksymalnego czasu dla zadania i maksymalny czas jaki dane zadanie może oczekiwać w kolejce. Poniżej mamy funkcję, której zadania mogą czekać w kolejce maksymalnie 1 sekundę, a jej maksymalny czas wykonania to 5 sekund:

tasks.py

@dramatiq.actor(max_age=1000, time_limit=5000)
def time_limited_task():
    print("I must hurry")
    try:
        time.sleep(6.0)
    except dramatiq.middleware.time_limit.TimeLimitExceeded as e:
        print("Ugh, I didn't get it done in time")

W przypadku przekroczenia maksymalnego czasu wykonania (jak w powyższym przypadku) zostaje rzucony wyjątek TimeLimitExceeded, a zadanie zostanie zatrzymane.

Poszczególne zadania możemy także uruchamiać z opóźnieniem. Realizowane jest to poprzez wywołanie udekorowanej funkcji z metodą send_with_options zamiast zwykłego send. Przykładowo, time_limited_task zostanie wywołane z półsekundowym opóźnieniem:

main.py

def send_with_delay():
    print("Sending task with delay")
    tasks.time_limited_task.send_with_options(delay=500)

Priorytetyzacja

Dramatiq pozwala na definiowanie priorytetów dla poszczególnych zadań. Podobnie jak z innymi opcjami, definiowane są jako parametr dekoratora. Może to być przydatne w przypadku dużego obciążenia workerów i potrzeby wykonywania pilnych zadań w pierwszej kolejności.

tasks.py

@dramatiq.actor(priority=0)
def high_priority_task(index):
    print(f"I'm a very important {index}")
    time.sleep(1.0)

@dramatiq.actor(priority=100)
def low_priority_task(index):
    print(f"I'm not so important {index}")
    time.sleep(2.0)

main.py

def priorities():
    print("Sending 20 low prio tasks")
    for i in range(20):
        tasks.low_priority_task.send(i)
    print("Sending 5 high prio tasks")
    for i in range(5):
        tasks.high_priority_task.send(i)

W wyniku zdefiniowania priorytetów, wcześniej kolejkowane zadania z niskim priorytetem zostają odłożone na później, na rzecz zadań wysoko priorytetowych:

task_sender_1  | Sending 20 low prio tasks
task_sender_1  | Sending 5 high prio tasks
worker_1       | I'm not so important 15
worker_1       | I'm not so important 0
worker_1       | I'm not so important 11
worker_1       | I'm not so important 4
worker_1       | I'm not so important 5
worker_1       | I'm not so important 1
worker_1       | I'm not so important 7
worker_1       | I'm not so important 2
worker_1       | I'm a very important 4
worker_1       | I'm a very important 3
worker_1       | I'm a very important 2
worker_1       | I'm a very important 0
worker_1       | I'm a very important 1
worker_1       | I'm not so important 18
worker_1       | I'm not so important 19
worker_1       | I'm not so important 17
art_dramatiq_task_sender_1 exited with code 0
worker_1       | I'm not so important 16
worker_1       | I'm not so important 3
worker_1       | I'm not so important 6
worker_1       | I'm not so important 12
worker_1       | I'm not so important 10
worker_1       | I'm not so important 8
worker_1       | I'm not so important 9
worker_1       | I'm not so important 13
worker_1       | I'm not so important 14

Wyniki działań aktorów

Domyślnie dramatiq nie przechowuje wyników zwracanych przez zadania. Aby mieć taką możliwość należy zdefiniować backend dla wyników i dodać do naszego brokera. Użyjemy ponownie Redisa:

tasks.py

from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results

redis_broker = RedisBroker(host="redis")
results_backend = RedisBackend(host="redis")
redis_broker.add_middleware(Results(backend=results_backend))
dramatiq.set_broker(redis_broker)

Następnie definiujemy zadanie, które musi w parametrach posiadać opcję store_results na true:

tasks.py

@dramatiq.actor(store_results=True)
def sleep_and_add(x, y):
    print("Got adding task, calculating")
    time.sleep(2.0)
    return x + y

Następnie kolejkujemy zadanie:

main.py

def results():
    msg = tasks.sleep_and_add.send(2, 2)
    try:
        result = msg.get_result(block=False)
    except dramatiq.results.errors.ResultMissing:
        print("Result is not ready yet...")
        result = msg.get_result(block=True)
        print(f"Result is {result}")

otrzymując wynik:

worker_1       | Got adding task, calculating
task_sender_1  | Result is not ready yet...
task_sender_1  | Result is 4

Testy jednostkowe

Dramatiq posiada klasy stub do wykorzystania w testach jednostkowych. Są to między innymi StubBroker zastępujący zwykły broker i StubBackend do użycia zamiast normalnego backendu dla wyników funkcji.

Moduł z zadaniami możemy dostosować do wykonywania testów jednostkowych, na przykład przy użyciu zmiennej środowiskowej:

tasks.py

import os

# setup redis broker
from dramatiq.brokers.redis import RedisBroker
from dramatiq.brokers.stub import StubBroker
from dramatiq.results.backends import RedisBackend, StubBackend
from dramatiq.results import Results

if os.getenv("UNIT_TESTS") == "1":
    broker = StubBroker()
    stub_backend = StubBackend()
    broker.add_middleware(Results(backend=stub_backend))
    dramatiq.set_broker(broker)
else:
    broker = RedisBroker(host="redis")
    results_backend = RedisBackend(host="redis")
    broker.add_middleware(Results(backend=results_backend))
    dramatiq.set_broker(broker)

Następnie dla wygody możemy zdefiniować z użyciem pytest fixture’y dla stub_brokera oraz stub_workera i umieścić je w conftests.py:

tests/conftest.py

import pytest
from dramatiq import Worker

from tasks import broker

@pytest.fixture()
def stub_broker():
    broker.flush_all()
    return broker

@pytest.fixture()
def stub_worker():
    worker = Worker(broker, worker_timeout=100)
    worker.start()
    yield worker
    worker.stop()

I zacząć pisać testy:

tests/test_tasks.py

import tasks

def test_adding(stub_broker, stub_worker):
    msg = tasks.sleep_and_add.send(20, 22)
    stub_broker.join(tasks.sleep_and_add.queue_name)
    stub_worker.join()
    assert msg.get_result(block=True) == 42

Uruchomić je możemy wewnątrz kontenera task_sender za pomocą skryptu:

run_unit_tests.sh

#!/bin/bash

export UNIT_TESTS=1
export PYTHONPATH=$PYTHONPATH:/code
pytest

I uruchomienie:

➜ docker-compose run --rm task_sender ./run_unit_tests.sh
=============================================================================================================================================================================== test session starts ================================================================================================================================================================================
platform linux -- Python 3.10.0, pytest-6.2.5, py-1.10.0, pluggy-1.0.0
rootdir: /code
collected 1 item                                                                                                                                                                                                                                                                                                                                                                   

tests/test_tasks.py .                                                                                                                                                                                                                                                                                                                                                        [100%]

Podsumowanie

Dramatiq dobrze sprawdził się w naszym zastosowaniu. Mocno uprościł skalowanie systemu – w przypadku dużego obciążenia workerów wystarczy uruchomić kilka kolejnych instancji wpiętych do tej samej bazy Redisa i to wszystko. W przypadku wymagania wysokiej dostępności i niezawodności istnieje możliwość konfiguracji także klastra Redisowego, aby wyeliminować potencjalne problemy z brakiem dostępu/awarią Redisowego backendu.

Dodatkowo, kod źródłowy jest nieskomplikowany, pozwala na wiele opcji konfiguracyjnych. Programista może na przykład dodawać własne middleware’y, zawierające informacje, które mają być współdzielone pomiędzy wątkami workera, takie jak np. uchwyty do baz danych czy konfiguracje.

Kod źródłowy wykorzystanych przykładów można znaleźć na GitHubie.

Zdjęcie główne artykułu pochodzi z unsplash.com.

Uwielbia rozwiązywanie nietypowych problemów i realizację projektów od początku do końca. Swoją przygodę zaczął od programowania robotów w ramach projektu B-Droid na Politechnice Warszawskiej. Później tworzył innowacyjne systemy wspomagania dowodzenia w PIT-Radwar. Aktualnie od ponad 3 lat realizuje projekty R&D w Dziale Nowych Rozwiązań w Exatelu związane z wydajnym przetwarzaniem ruchu sieciowego. Główne zainteresowania to programowanie w językach C/C++, Python, architektura systemów rozproszonych, devops oraz bezpieczeństwo. 

Podobne artykuły

[wpdevart_facebook_comment curent_url="https://justjoin.it/blog/dramatiq-jako-kolejka-zadan-jak-usprawnic-skalowanie-systemu" order_type="social" width="100%" count_of_comments="8" ]