Tworzenie aplikacji Event-Driven w Go
Jeśli zdarzyło Ci się pracować z systemami rozproszonymi, czy mikroserwisami, prawdopodobnie znasz różne sposoby na komunikację. Jednym z najbardziej powszechnych są na przykład połączenia HTTP, czasem też jakaś inna forma RPC. Aplikacje w tym modelu komunikują się ze sobą przy pomocy synchronicznych, blokujących wywołań. Zupełnie inną kategorią jest komunikacja przy pomocy zdarzeń (event-driven) lub wiadomości (message-driven).
Miłosz Smółka. Współzałożyciel Three Dots Labs. Zwykle pracuje z systemami rozproszonymi, od kilku lat głównie w Go. Na przekór stosuje Domain-Driven Design w tym „systemowym” języku. Entuzjasta kultury DevOps, świetnie zgranych zespołów i podejścia Continuous Delivery.
W tej architekturze serwisy polegają na asynchronicznej komunikacji, co ułatwia ich rozbudowę wraz ze wzrostem skali i pomaga zachować dobrą izolację każdej z aplikacji.
Choć to podejście ma wiele zalet, nie jest pozbawione też wad. Systemy tego typu często wymagają użycia innych, mniej powszechnych wzorców i dużo łatwiej jest popełnić w nich błąd. Postaram się przybliżyć budowanie tego typu aplikacji w Go z użyciem biblioteki Watermill. Jeśli dopiero zaczynasz pracę z Go, gotowy projekt pomoże Ci zrozumieć i uruchomić przykład.
Spis treści
Przykład: Integracja repozytorium
Wyobraźmy sobie taką sytuację: pracujemy w zespole, który rozwija kilka serwisów. Każdy z nich ma własne repozytorium i jest niezależnie wdrażany. Zgodnie z podejściem Continous Delivery, nowe wersje instalujemy często, zwykle kilkanaście razy dziennie.
Choć dbamy o jakość aplikacji, czasem na produkcji pojawiają się problemy i ciężko nam od razu stwierdzić, który z serwisów jest odpowiedzialny za błąd. Musimy przekopywać się przez logi, by odnaleźć winowajcę i wykonać jego rollback, co jest dość uciążliwe.
Ktoś podsuwa pomysł na rozwiązanie problemu — potrzebujemy zwizualizować, które wersje są wdrażane i w jakim terminie. Dzięki temu wystarczy rzut oka, by namierzyć problematyczny serwis. Dodatkowo wszyscy w zespole będą na bieżąco z wdrożeniami (np. będą wiedzieć, że ich zmiany są już na produkcji).
Od razu nasuwa się myśl, że będziemy potrzebować pewnego rodzaju integracji pomiędzy naszym repozytoriami, a jakimś zewnętrznym narzędziem, które pozwoli nam prezentować dane. Załóżmy, że nasz kod hostowany jest na GitHubie, który udostępnia webhooki na wypadek różnych zdarzeń. Możemy wykorzystać ten mechanizm do powiadomień o nowych commitach.
Wygodnie byłoby otrzymywać podobne webhooki w momencie deploymentu aplikacji, najlepiej w takim samym formacie. Oczywiście, rzeczywistość zwykle nie jest tak różowa i będziemy mieć szczęście, jeśli będzie istniał jakikolwiek sposób powiadomień. Na potrzeby przykładu przyjmijmy, że instalacją naszych aplikacji zajmuje się inny zespół, który bardzo lubi pracować z RabbitMQ i informacje o wdrożeniach wysyła jako wiadomości na jego kolejce.
Co będzie odbiorcą naszych wiadomości? Możemy wybrać narzędzia, których nasz zespół już używa: Grafana i Slack. Oba udostępniają RESTowe API, więc integracja w tę stronę nie powinna sprawiać problemów.
Projekt
Najprostszym sposobem byłoby napisanie krótkiego skryptu, który nasłuchuje na przychodzące webhooki przez HTTP i odbiera informacje o wdrożeniach, a następnie informuje o nich zewnętrzne aplikacje. Jest to dość ograniczone rozwiązanie, choć faktycznie będzie działać prawidłowo. Zaproponuję bardziej rozbudowany projekt i przedstawię jego zalety.
Schemat poniżej przedstawia pomysł. Dane ze strumieni wejściowych agregujemy w jednym formacie na jakiejś kolejce (przyjmijmy, że będzie to Kafka, choć to tylko szczegół implementacyjny). Druga część aplikacji ściąga z niej dane i wysyła je do odbiorców.
Jeśli w tej chwili pomyśleliście, że to doskonały przykład overengineeringu, to całkiem słusznie. Prosty przykład pozwoli jednak zaprezentować rozwiązania, które równie dobrze sprawdzą się w bardziej krytycznych systemach z dużo większym ruchem.
Watermill
Wydaje się, że tak w zasadzie nie ma tutaj nic odkrywczego do napisania — ot, znaleźć bibliotekę do RabbitMQ i Kafki, uzupełnić dane połączenia i jesteśmy gotowi, żeby odbierać i wysyłać wiadomości. Poprawna konfiguracja tych bibliotek może jednak chwilę zająć, ze względu na różne interfejsy i stosowane podejścia. Pomyłki łatwo jest przeoczyć, co może prowadzić do długich sesji szukania błędu w aplikacji. Żeby uprościć sobie życie, użyjemy biblioteki Watermill, która udostępnia spójny interfejs do pracy z kilkoma kolejkami.
Watermill w zamyśle jest odpowiednikiem routera HTTP dla komunikacji z użyciem wiadomości. Podobnie do bibliotek HTTP, ukrywa za prostym API szczegóły złożonych protokołów.
Koncepcja oparta jest o dwa interfejsy: Publisher i Subscriber. Jak można się domyślić, pierwszy z nich określa kanał, który pozwala publikować wiadomości, a drugi umożliwia ich odbieranie.
type Publisher interface { Publish(topic string, messages ...*Message) error Close() error } type Subscriber interface { Subscribe(ctx context.Context, topic string) (<-chan *Message, error) Close() error }
Implementację, która spełnia oba interfejsy, określamy jako Pub/Sub. Watermill wspiera kilka popularnych Pub/Subów i pozwala dodawać własne.
Do naszego przykładu będziemy potrzebować:
- Subscriber HTTP do odczytu webhooków nadchodzących z GitHuba,
- Subscriber RabbitMQ (AMQP) do odczytu wiadomości o wdrożeniach,
- Publisher i Subscriber Kafki do zapisu i odczytu eventów,
- Publisher HTTP do wywołań REST API zewnętrznych serwisów.
Na początku wspomniałem, że HTTP używa się zwykle do synchronicznych połączeń, a mimo to traktujemy go jako kolejkę. Jak to w końcu jest?
Choć HTTP kojarzy się zwykle z synchronicznym API, a nie eventami, można go też traktować jako pewien rodzaj mechanizmu Pub/Sub. W przypadku Watermilla, jako kolejka użyte może zostać cokolwiek, co spełnia wspomniane interfejsy. Dużą zaletą traktowania HTTP w ten sposób jest ilość aplikacji, które wykorzystują REST API.
Implementacja
Pełne źródła przykładu znaleźć można na GitHubie. Zacznijmy od stworzenia elementów, z których składa się aplikacja.
Message
Message
to podstawowy typ danych Watermilla, który reprezentuje jedną wiadomość (lub event). Oprócz właściwego payloadu, przechowuje też UUID wiadomości i jej metadane. Udostępnia też metody Ack
i Nack
, które pozwalają kolejno zaakceptować i odrzucić wiadomość.
HTTP Subscriber
Pierwszy z subscriberów, który uruchamia serwer HTTP pod wskazanym adresem.
UnmarshalMessageFunc
to funkcja, która przetłumaczy http.Request
na Watermillowy Message
. Przykład poniżej jest najprostszym wariantem, w którym używamy domyślnego unmarshallera. Użycie własnej funkcji w tym miejscu pozwoliłoby np. odczytać pewne nagłówki i zapisać je jako metadane wiadomości.
config := http.SubscriberConfig{ UnmarshalMessageFunc: http.DefaultUnmarshalMessageFunc, } httpSubscriber, err := http.NewSubscriber(":8080", config, logger)
AMQP Subscriber
W ramach przykładu zadowolimy się podstawową konfiguracją subscribera AQMP, więc jego definicja jest trywialna.
config := amqp.NewDurableQueueConfig("amqp://guest:guest@rabbitmq:5672/") amqpSubscriber, err := amqp.NewSubscriber(config, logger)
Kafka Publisher / Subscriber
W przypadku Kafki używamy wbudowanego kafka.DefaultMarshaler
, który przetłumaczy wiadomość z []byte na Message
. Do konstruktorów można też przekazać [sarama.Config](https://godoc.org/gopkg.in/Shopify/sarama.v1#Config)
z dodatkowymi ustawieniami. Nie potrzebujemy ich tutaj, więc zostaniemy przy wartości nil
.
kafkaPublisher, err := kafka.NewPublisher( []string{"kafka:9092"}, kafka.DefaultMarshaler{}, nil, // opcjonalny sarama.Config logger, ) // ... config := kafka.SubscriberConfig{ Brokers: []string{"kafka:9092"}, } kafkaSubscriber, err := kafka.NewSubscriber( config, nil, // opcjonalny sarama.Config kafka.DefaultMarshaler{}, logger, )
HTTP Publisher
Ten element jest o tyle specyficzny, że zdecydowaliśmy się na komunikację z dwoma zewnętrznymi API. Musimy więc zdefiniować dwa publishery.
grafanaPublisher, err := http.NewPublisher( http.PublisherConfig{ MarshalMessageFunc: pkg.GrafanaMarshaller("admin:secret"), }, logger, ) // ... slackPublisher, err := http.NewPublisher( http.PublisherConfig{ MarshalMessageFunc: pkg.SlackMarshaller, }, logger, )
W przypadku publishera HTTP, MarshalMessageFunc
odpowiada za przetłumaczenie Message
na http.Request
. Co istotne, to jest też miejsce, w którym ustawimy nagłówki specyficzne dla konkretnego API. Zwykle będą to np. Content-Type
i Authorization
.
func SlackMarshaller(url string, msg *message.Message) (*http.Request, error) {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(msg.Payload)) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") return req, nil }
Handlers
Przygotowaliśmy już wszystkie elementy związane z komunikacją. Powinniśmy zająć się teraz zachowaniem aplikacji.
Całą logikę związaną z przetwarzaniem wiadomości zamieścić musimy w handlerach. Są to po prostu funkcje, które otrzymują wiadomość i w reakcji na nią wykonują pewne akcje. Mogą to być np. operacje na jakiejś bazie danych (wstawienie rekordu), ale może to być też modyfikacja odebranej wiadomości, odrzucenie jej, lub wysłanie zupełnie nowych wiadomości na kanał zwrotny.
Handler można przedstawić sygnaturą funkcji:
func(*Message) ([]*Message, error)
W naszym przypadku handlery nie mają zbyt wiele logiki. Ich głównym zadaniem jest odczyt wiadomości w jednym formacie i zwrócenie wiadomości w drugim.
Poniższy przykład demonstruje handler dla webhooków z GitHuba. Zwróć uwagę, że publikujemy wiele wiadomości jednocześnie (zwracamy slice). Ustawiamy też metadaną event_type
. Dzięki temu odbiorca będzie wiedział do jakiego typu unmarshallować wiadomość.
Jeśli handler nie zwróci błędu, Router automatycznie zaakceptuje wiadomość (ack). W przeciwnym wypadku zostanie odrzucona (nack).
func GithubWebhookHandler(msg *message.Message) ([]*message.Message, error) { pushEvent := githubPushEvent{} err := json.Unmarshal(msg.Payload, &pushEvent) if err != nil { return nil, err } var messages []*message.Message for _, commit := range pushEvent.Commits { event := commitPushed{ ID: commit.ID, Message: commit.Message, Author: commit.Author.Name, OccurredOn: commit.Timestamp, } eventJSON, err := json.Marshal(event) if err != nil { return nil, err } m := message.NewMessage(watermill.NewUUID(), eventJSON) m.Metadata.Set("event_type", "commitPushed") messages = append(messages, m) } return messages, nil }
Router
Zakładając, że mamy już gotowe elementy powyżej, pozostaje tylko połączyć je w całość.
Router to komponent, który udostępnia API zbliżone do routera HTTP. Podobnie jak w pierwowzorze, dodaje się do niego handlery, które obsługują wiadomości. Chociaż można używać publisherów i subscriberów bezpośrednio, Router znacznie ułatwia pracę z wieloma kanałami równocześnie.
router, err := message.NewRouter(message.RouterConfig{}, logger)
Przykładowe wpięcie handlera może wyglądać tak:
router.AddHandler( "http-to-kafka", // nazwa handlera - powinna być unikalna "/", // topic wejściowy, w przypadku http - endpoint httpSubscriber, // subscriber kafkaTopic, // topic wyjściowy kafkaPublisher, // publisher githubWebhookHandler, // handler )
Po wpięciu wszystkich handlerów, pozostaje uruchomić Router:
err = router.Run()
W router przykładu wpięte są też pomocnicze handlery, które symulują wdrożenia w losowym czasie po wypchnięciu commita. Służy to tylko temu, by można było zaobserwować działanie całego przykładu. Przy okazji można zobaczyć, jak działa publisher AMQP.
Środowisko testowe
Choć cały przykład jest dość zwięzły, fragmenty wszystkich handlerów zajęłyby za dużo miejsca, więc zachęcam do zajrzenia w źródła. Zobaczmy teraz, jak przetestować poprawne działanie na własnym sprzęcie.
Projekt posiada definicję Docker Compose, która zawiera w pełni działające środowisko. Jedyne, czego brakuje, to URL webhooka do Slacka. Można go opcjonalnie podać w pliku .env
, wzorując się na .env-example
. Compose zawiera obrazy wszystkich zależności (Kafka z ZooKeeperem, Grafana, RabbitMQ) oraz serwis z aplikacją.
Całe środowisko można uruchomić przez:
docker-compose up
Pozostaje wystawić serwis na świat i skierować na niego webhooki z repozytorium na GitHubie. Alternatywnie, możesz przetestować działanie środowiska, wysyłając spreparowany JSON. Służy do tego skrypt w repozytorium:
./scripts/send-stub-webhook.sh
Grafana będzie dostępna lokalnie pod localhost:3000 razem z gotowymi dashboardami. Do logowania użyj: admin / secret
.
Na wykresie w Grafanie zostanie naniesiona adnotacja z informacją o wypchnięciu commita. Jeśli poczekamy kilka minut, pojawią się dwie kolejne, z wdrożeniem na środowisko przedprodukcyjne i na produkcję.
Po co ta kolejka?
Czy nie moglibyśmy bezpośrednio odwoływać się do zewnętrznych serwisów, zamiast komplikować stack? Oczywiście możemy tak zrobić, choć wprowadzenie kolejki jako pośrednika ma kilka zalet. Warto zaznaczyć, że w tym przypadku użyliśmy Kafki, ale może to być cokolwiek innego. Watermill pozwala wpiąć w to miejsce dowolny Pub/Sub.
Służy jako storage
Wyobraźmy sobie, że mija trochę czasu od kiedy stworzyliśmy integrację. Dowiadujemy się, że ktoś chce przeanalizować, jak długo zajmuje nam przejście od wypchnięcia kodu do wdrożenia na produkcji. Do naszego zespołu wpada nowe zadanie: zebrać raport wdrożeń z ostatnich kilku miesięcy.
Gdybyśmy napisali prosty skrypt przerzucający dane, pozostałoby nam napisanie nowego narzędzia, które w jakiś sposób skoreluje commity z wszystkich repozytoriów z wdrożeniami. A co, jeśli wiadomości na Rabbicie mają tygodniowy czas życia? Ups.
Jeśli używamy kolejki jako pośrednika, możemy skonfigurować ją tak, by wiadomości miały długi czas życia i przetrzymywać je przez dowolny czas. W takiej sytuacji wystarczy prosty skrypt, który odczyta wszystkie eventy od wybranej daty. Ponieważ mają jednolity format, szybko zbierzemy wymagane dane.
Alternatywnie, zamiast wprowadzać dodatkowy element infrastruktury, możemy też użyć handlera, który zapisze wiadomości w jakimś storage, który już utrzymujemy (np. Elasticsearch, ale może to być równie dobrze baza SQL).
Działa jako bufor
Co się stanie, jeśli sieć przestanie działać i nie dobijemy się do API Slacka? Albo jeśli będziemy aktualizować instancję Grafany i akurat wtedy zostanie wysłany event? Użycie kolejki jako bufora pozwala nam ponowić próbę później w takich przypadkach.
Oczywiście w opisywanym przykładzie takie zdarzenia nie są krytyczne; nic się w końcu nie stanie, jeśli jednorazowo zgubimy kosmetyczną informację o wdrożeniu. Warto jednak mieć to na uwadze przy projektowaniu systemów, które z natury powinny być bardziej odporne na awarie.
Middleware
Router Watermilla wzorowany jest na routerach znanych z bibliotek do obsługi HTTP (go-chi/chi, gorilla/mux, itp.). Podobnie jak one, Router posiada też możliwość wpięcia middleware, czyli funkcji uruchamianych przed każdym requestem, które są w stanie podjąć jakąś akcję.
Istnieje też kilka wbudowanych middleware, np. do obsługi throttlingu, poison queue, czy dodający correlation ID (identyfikator, który pozwala odnaleźć wszystkie wiadomości w ramach jednego requestu). Możemy też łatwo dodać własny, implementując funkcję, która przyjmuje i zwraca handler:
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
Gdzie handler to wspomniany wcześniej interfejs:
type HandlerFunc func(msg *Message) ([]*Message, error)
Gdybyśmy na przykład chcieli dodać proste logowanie do każdej przetworzonej wiadomości, możemy zrobić to w ten sposób:
func LoggingMiddleware(h message.HandlerFunc) message.HandlerFunc { return func(message *message.Message) ([]*message.Message, error) { log.Println("Processing message", message.UUID) producedMessages, err := h(message) if err == nil { log.Println("Message", message.UUID, "processed successfully") } return producedMessages, err } } func main() { /// ... router.AddMiddleware(LoggingMiddleware) /// ... }
Metryki
Nasz przykład obsługuje teraz już dość strumieni, by logi stały się nieczytelne. Jeśli chcemy szybko sprawdzić stan aplikacji, lepszym sposobem jest zbieranie metryk w czasie.
Watermill udostępnia komponent do metryk, który wystawia endpoint w formacie Prometheusa. Jeśli używamy Routera, wpięcie komponentu sprowadza się do kilku linijek:
registry, _ := metrics.CreateRegistryAndServeHTTP(":8081") builder := metrics.NewPrometheusMetricsBuilder(registry, "", "") builder.AddPrometheusRouterMetrics(router)
Dodając gotowy dashboard do Grafany, możemy łatwo podejrzeć status aplikacji. Prezentowany przykład ma już wszystko, co potrzebne – dashboard jest dostępny pod localhost:3000/d/watermill.
Więcej informacji o metrykach znajduje się w dokumentacji.
Co dalej?
Choć przedstawiony przykład jest w zasadzie kompletną aplikacją, zahaczyliśmy tylko o najprostsze przypadki użycia architektury opartej o wiadomości. Są to jednak solidne fundamenty, na których można się opierać w bardziej zaawansowanych wzorcach. Warto np. zapoznać się z komponentem do CQRS. Zachęcam do dalszego zgłębiania tematu.