Dramatiq jako kolejka zadań. Jak usprawnić skalowanie systemu
W jednym z naszych projektów pojawiła się potrzeba jednoczesnego uruchamiania setek lub tysięcy zadań. Były to najczęściej idempotentne, niezależne od siebie funkcje uruchamiane w reakcji na zdarzenie w systemie, na których wynik możemy trochę poczekać. Przykładami takich zadań było generowanie i wysyłanie raportów czy ekstrakcja danych z pliku i zapis do bazy danych.
Większość kodu niewymagającego najwyższej wydajności napisaliśmy w Pythonie. W związku z tym szukaliśmy projektu w łatwy sposób pozwalającego na integrację właśnie z tym językiem. O tym w artykule.
[starbox=”Szymon Nogieć”]
Wymagania, które stawialiśmy kolejce zadań to:
- wysoka niezawodność i wydajność,
- prostota skalowania,
- łatwość użycia,
- integracja z Pythonem w wersji 3.5+.
Mile widziane były cechy, takie jak:
- możliwość wykorzystania Redisa jako backend dla zadań i wyników,
- priorytetyzacja zadań,
- automatyczne ponawianie,
- pipelining,
- prostota implementacji.
Rozwiązania jakie braliśmy pod uwagę to Celery, rq, dramatiq, huey. Które wybraliśmy i dlaczego?
Celery odrzuciliśmy z powodu wielkości, skomplikowania i problemów z używaniem. Nie spełniał kilku z naszych wymagań (prostota implementacji, priorytetyzacja zadań).
rq wykorzystujemy w TAMA i także sprawiała trochę problemów. Jest także dosyć prosta i nie spełnia kilku z cech nice-to-have (niezawodność, automatyczne ponawianie). Dodatkowo brakuje możliwości równoległego przetwarzania bez uruchamiania wielu instancji rq. Chcieliśmy także spróbować innego rozwiązania niż rq – potencjalnie lepszego.
Wybieraliśmy pomiędzy huey a dramatiq. Zdecydowaliśmy się na to drugie, ponieważ jako jedno z głównych wymagań twórcy postawili sobie “high reliability and performance”. Dodatkowo twórca dramatiq zapewnia niezawodność dostarczania zadań do workerów i auto przeładowywanie kodu. Całe porównanie między rozwiązaniami można znaleźć na tej stronie. Jako backend do kolejkowania zadań pozwala na użycie m.in. Redis czy RabbitMQ. Ze względu na dużą znajomość Redisa w zespole oraz fakt, iż już używaliśmy go w projekcie, to właśnie tej bazy użyliśmy jako backend komunikacyjny.
Spis treści
Użycie
Zaprezentowane przykłady uruchamiam z developerskich kontenerów i docker-compose zdefiniowanymi w następujący sposób.
docker-compose.yml
version: "3.5" services: redis: image: redis:6.2.6 worker: build: context: . volumes: - './:/code' depends_on: - redis command: dramatiq tasks task_sender: build: context: . volumes: - './:/code' depends_on: - worker command: python ./main.py
Dockerfile
FROM python:3.10.0-bullseye COPY ./Pipfile ./Pipfile.lock ./ RUN apt-get update && apt-get install --no-install-recommends -y pipenv && pipenv install --system --dev --deploy WORKDIR /code
Instancje aplikacji workera włączam przy użyciu aplikacji dramatiq. Uruchamia ona wiele równoległych procesów workera, które pobierają z backendu zakolejkowane zadania i je realizuje. Jako parametry przyjmowane są nazwy modułów, w których znajdują się definicje zadań do realizacji w ramach workera. Dramatiq pozwala także na parametryzację liczby używanych procesów/wątków (domyślnie procesów jest tyle, ile rdzeni procesora i 8 wątków na każdy proces).
Funkcję do wykonania w ramach kolejki definiujemy w dramatiq jako aktora. Jest to cienki wrapper dla callable’i zawierający metadane dotyczącego tego, w jaki sposób powinny zostać one asynchronicznie uruchamiane w ramach kolejki.
Podstawowy przykład wygląda w ten sposób:
tasks.py
@dramatiq.actor def hello_queue(): print("Hello world!")
Tak udekorowaną funkcję możemy wywoływać w normalny, synchroniczny sposób:
>>> import tasks >>> tasks.hello_queue() Hello world!
Taką funkcję możemy także wysłać na worker do wykonania asynchronicznego – to znaczy zlecenie realizacji zadania, gdy instancja workera będzie miała taką możliwość. Najpierw jednak należy ustawić konfigurację backendu dla kolejek jako Redis, gdyż domyślnie dramatiq wykorzystuje RabbitMQ:
tasks.py
from dramatiq.brokers.redis import RedisBroker redis_broker = RedisBroker(host="redis") dramatiq.set_broker(redis_broker)
Następnie możemy wysłać trzy zadania do realizacji. Wywołanie metody send() natychmiast kolejkuje je do wykonania, wykorzystując wcześniej skonfigurowany backend redisowy.
main.py
import tasks def example1(): for i in range(3): print(f"Sending task nr {i}") tasks.hello_queue.send() def main(): example1() if __name__ == '__main__': main()
Następnie uruchamiamy całość z użyciem docker-compose:
➜ docker-compose up Creating art_dramatiq_redis_1 ... done Creating art_dramatiq_worker_1 ... done Creating art_dramatiq_task_sender_1 ... done (...) task_sender_1 | Sending task nr 0 task_sender_1 | Sending task nr 1 task_sender_1 | Sending task nr 2 worker_1 | Hello world! worker_1 | Hello world! worker_1 | Hello world! art_dramatiq_task_sender_1 exited with code 0
Oczywiście do aktorów możemy przekazywać parametry. Jedynym warunkiem, który muszą spełnić jest możliwość serializacji do JSONa, ponieważ w ten sposób zostają przesłane i zapisane w backendzie. Serializator JSON można zmienić, na przykład na pickle.
Innym sposobem definicji aktorów jest używanie struktury klas. Aby zdefiniować zadania dla kolejki w ten sposób, należy zdefiniować klasę dziedziczącą po GenericActor i implementując metodę perform(self).
tasks.py
class ClassBasedActor(dramatiq.GenericActor): def perform(self): print("Hello from class based actor!")
main.py
def class_based_actor(): tasks.ClassBasedActor.send() (...) def main(): class_based_actor()
Ustawienia dla aktorów
Każdemu z aktorów możemy ustawiać parametry ich wykonywania. W przypadku dekoratora jest to realizowane przez przekazywanie do niego argumentów, dla aktorów opartych o hierarchię klas jest to realizowane w postaci parametrów wewnętrznej klasy Meta. Poniżej spojrzymy na kilka z nich.
Powtórzenia
Dla każdego aktora możemy zdefiniować parametry określające w jaki sposób workery mają realizować jego powtarzanie w przypadku niepowodzenia (na przykład rzucenie wyjątkiem). Możemy zdefiniować np. maksymalną liczbę powtórzeń, warunek kiedy zadanie ma być uruchamiane ponownie, czas po jakim ma zostać powtórzone, wyjątki które nie rozpoczną powtórnego uruchomienia, etc.
Poniżej mamy przykład aktora, który zostanie uruchomiony ponownie maksymalnie dwa razy:
tasks.py
@dramatiq.actor(max_retries=2) def task_to_retry(): print("Running task") raise RuntimeError("Error occurred")
i uruchamiamy go raz. Po starcie widzimy:
worker_1 | Running task worker_1 | [2021-10-13 11:16:26,971] [PID 9] [Thread-4] [dramatiq.worker.WorkerThread] [ERROR] Failed to process message task_to_retry() with unhandled exception. worker_1 | Traceback (most recent call last): (...) worker_1 | RuntimeError: Error occurred worker_1 | [2021-10-13 11:16:26,972] [PID 9] [Thread-4] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '61f77357-57ed-4b52-b3e5-b6872f0e5135' in 10844 milliseconds. art_dramatiq_task_sender_1 exited with code 0 worker_1 | Running task worker_1 | [2021-10-13 11:16:37,956] [PID 13] [Thread-4] [dramatiq.worker.WorkerThread] [ERROR] Failed to process message task_to_retry() with unhandled exception. worker_1 | Traceback (most recent call last): (...) worker_1 | RuntimeError: Error occurred worker_1 | [2021-10-13 11:16:37,960] [PID 13] [Thread-4] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '61f77357-57ed-4b52-b3e5-b6872f0e5135' in 18275 milliseconds. worker_1 | Running task worker_1 | [2021-10-13 11:16:56,834] [PID 13] [Thread-4] [dramatiq.worker.WorkerThread] [ERROR] Failed to process message task_to_retry() with unhandled exception. worker_1 | Traceback (most recent call last): (...) worker_1 | RuntimeError: Error occurred worker_1 | [2021-10-13 11:16:56,836] [PID 13] [Thread-4] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '61f77357-57ed-4b52-b3e5-b6872f0e5135' in 58764 milliseconds.
W logach widzimy, że zadanie zostało uruchomione trzykrotnie. Pierwszy raz było to normalne wykonanie, a po wyrzuceniu wyjątku – dwie próby ponownego wykonania. Po przekroczeniu limitu nastąpiło porzucenie zadania.
Czas
Dramatiq pozwala także na konfigurację maksymalnego czasu dla zadania i maksymalny czas jaki dane zadanie może oczekiwać w kolejce. Poniżej mamy funkcję, której zadania mogą czekać w kolejce maksymalnie 1 sekundę, a jej maksymalny czas wykonania to 5 sekund:
tasks.py
@dramatiq.actor(max_age=1000, time_limit=5000) def time_limited_task(): print("I must hurry") try: time.sleep(6.0) except dramatiq.middleware.time_limit.TimeLimitExceeded as e: print("Ugh, I didn't get it done in time")
W przypadku przekroczenia maksymalnego czasu wykonania (jak w powyższym przypadku) zostaje rzucony wyjątek TimeLimitExceeded, a zadanie zostanie zatrzymane.
Poszczególne zadania możemy także uruchamiać z opóźnieniem. Realizowane jest to poprzez wywołanie udekorowanej funkcji z metodą send_with_options zamiast zwykłego send. Przykładowo, time_limited_task zostanie wywołane z półsekundowym opóźnieniem:
main.py
def send_with_delay(): print("Sending task with delay") tasks.time_limited_task.send_with_options(delay=500)
Priorytetyzacja
Dramatiq pozwala na definiowanie priorytetów dla poszczególnych zadań. Podobnie jak z innymi opcjami, definiowane są jako parametr dekoratora. Może to być przydatne w przypadku dużego obciążenia workerów i potrzeby wykonywania pilnych zadań w pierwszej kolejności.
tasks.py
@dramatiq.actor(priority=0) def high_priority_task(index): print(f"I'm a very important {index}") time.sleep(1.0) @dramatiq.actor(priority=100) def low_priority_task(index): print(f"I'm not so important {index}") time.sleep(2.0)
main.py
def priorities(): print("Sending 20 low prio tasks") for i in range(20): tasks.low_priority_task.send(i) print("Sending 5 high prio tasks") for i in range(5): tasks.high_priority_task.send(i)
W wyniku zdefiniowania priorytetów, wcześniej kolejkowane zadania z niskim priorytetem zostają odłożone na później, na rzecz zadań wysoko priorytetowych:
task_sender_1 | Sending 20 low prio tasks task_sender_1 | Sending 5 high prio tasks worker_1 | I'm not so important 15 worker_1 | I'm not so important 0 worker_1 | I'm not so important 11 worker_1 | I'm not so important 4 worker_1 | I'm not so important 5 worker_1 | I'm not so important 1 worker_1 | I'm not so important 7 worker_1 | I'm not so important 2 worker_1 | I'm a very important 4 worker_1 | I'm a very important 3 worker_1 | I'm a very important 2 worker_1 | I'm a very important 0 worker_1 | I'm a very important 1 worker_1 | I'm not so important 18 worker_1 | I'm not so important 19 worker_1 | I'm not so important 17 art_dramatiq_task_sender_1 exited with code 0 worker_1 | I'm not so important 16 worker_1 | I'm not so important 3 worker_1 | I'm not so important 6 worker_1 | I'm not so important 12 worker_1 | I'm not so important 10 worker_1 | I'm not so important 8 worker_1 | I'm not so important 9 worker_1 | I'm not so important 13 worker_1 | I'm not so important 14
Wyniki działań aktorów
Domyślnie dramatiq nie przechowuje wyników zwracanych przez zadania. Aby mieć taką możliwość należy zdefiniować backend dla wyników i dodać do naszego brokera. Użyjemy ponownie Redisa:
tasks.py
from dramatiq.brokers.redis import RedisBroker from dramatiq.results.backends import RedisBackend from dramatiq.results import Results redis_broker = RedisBroker(host="redis") results_backend = RedisBackend(host="redis") redis_broker.add_middleware(Results(backend=results_backend)) dramatiq.set_broker(redis_broker)
Następnie definiujemy zadanie, które musi w parametrach posiadać opcję store_results na true:
tasks.py
@dramatiq.actor(store_results=True) def sleep_and_add(x, y): print("Got adding task, calculating") time.sleep(2.0) return x + y
Następnie kolejkujemy zadanie:
main.py
def results(): msg = tasks.sleep_and_add.send(2, 2) try: result = msg.get_result(block=False) except dramatiq.results.errors.ResultMissing: print("Result is not ready yet...") result = msg.get_result(block=True) print(f"Result is {result}")
otrzymując wynik:
worker_1 | Got adding task, calculating task_sender_1 | Result is not ready yet... task_sender_1 | Result is 4
Testy jednostkowe
Dramatiq posiada klasy stub do wykorzystania w testach jednostkowych. Są to między innymi StubBroker zastępujący zwykły broker i StubBackend do użycia zamiast normalnego backendu dla wyników funkcji.
Moduł z zadaniami możemy dostosować do wykonywania testów jednostkowych, na przykład przy użyciu zmiennej środowiskowej:
tasks.py
import os # setup redis broker from dramatiq.brokers.redis import RedisBroker from dramatiq.brokers.stub import StubBroker from dramatiq.results.backends import RedisBackend, StubBackend from dramatiq.results import Results if os.getenv("UNIT_TESTS") == "1": broker = StubBroker() stub_backend = StubBackend() broker.add_middleware(Results(backend=stub_backend)) dramatiq.set_broker(broker) else: broker = RedisBroker(host="redis") results_backend = RedisBackend(host="redis") broker.add_middleware(Results(backend=results_backend)) dramatiq.set_broker(broker)
Następnie dla wygody możemy zdefiniować z użyciem pytest fixture’y dla stub_brokera oraz stub_workera i umieścić je w conftests.py:
tests/conftest.py
import pytest from dramatiq import Worker from tasks import broker @pytest.fixture() def stub_broker(): broker.flush_all() return broker @pytest.fixture() def stub_worker(): worker = Worker(broker, worker_timeout=100) worker.start() yield worker worker.stop()
I zacząć pisać testy:
tests/test_tasks.py
import tasks def test_adding(stub_broker, stub_worker): msg = tasks.sleep_and_add.send(20, 22) stub_broker.join(tasks.sleep_and_add.queue_name) stub_worker.join() assert msg.get_result(block=True) == 42
Uruchomić je możemy wewnątrz kontenera task_sender za pomocą skryptu:
run_unit_tests.sh
#!/bin/bash export UNIT_TESTS=1 export PYTHONPATH=$PYTHONPATH:/code pytest
I uruchomienie:
➜ docker-compose run --rm task_sender ./run_unit_tests.sh =============================================================================================================================================================================== test session starts ================================================================================================================================================================================ platform linux -- Python 3.10.0, pytest-6.2.5, py-1.10.0, pluggy-1.0.0 rootdir: /code collected 1 item tests/test_tasks.py . [100%]
Podsumowanie
Dramatiq dobrze sprawdził się w naszym zastosowaniu. Mocno uprościł skalowanie systemu – w przypadku dużego obciążenia workerów wystarczy uruchomić kilka kolejnych instancji wpiętych do tej samej bazy Redisa i to wszystko. W przypadku wymagania wysokiej dostępności i niezawodności istnieje możliwość konfiguracji także klastra Redisowego, aby wyeliminować potencjalne problemy z brakiem dostępu/awarią Redisowego backendu.
Dodatkowo, kod źródłowy jest nieskomplikowany, pozwala na wiele opcji konfiguracyjnych. Programista może na przykład dodawać własne middleware’y, zawierające informacje, które mają być współdzielone pomiędzy wątkami workera, takie jak np. uchwyty do baz danych czy konfiguracje.
Kod źródłowy wykorzystanych przykładów można znaleźć na GitHubie.
Zdjęcie główne artykułu pochodzi z unsplash.com.