Backend

WebSockety w mikroserwisach. Jak zmniejszyć ruch w sieci i czas reakcji klientów na zmianę

mikroserwisy websocket

Dział Technologii, w którym pracujemy składa się z wielu małych zespołów deweloperskich. Nasz zespół – Alpha Centauri – niedawno otrzymał zadanie rozbudowania pewnego systemu złożonego z wielu mikroserwisów i aplikacji webowej. Większość komunikacji między elementami systemu odbywało się po starym, dobrym REST. W ramach rozbudowy postanowiliśmy wykorzystać poza RESTem także protokół WebSockets (WS). W niniejszym artykule opiszemy, na podstawie naszych doświadczeń, jak można wykorzystać WebSockets do komunikacji między serwisami a UI, a także przyjrzymy się testowaniu mikroserwisów komunikujących się po WS.

Grzegorz Hołdys

Grzegorz Hołdys. Full-stack developer w BNY Mellon. Programista z wieloletnim doświadczeniem w różnych technologiach: od serwerów po mikrokontrolery. Pracował dla wielu firm z różnych branż. Oryginalnie pochodzi ze świata Javy, ale coraz bardziej wciąga go uniwersum JavaScript. W BNY Mellon zajmuje się także mentoringiem i przeprowadza rozmowy rekrutacyjne z kandydatami. Ciągle szuka nowych wyzwań i poznaje nowe technologie. W czasie wolnym, lata prywatnym samolotem.

Jakub Madej. Senior Specialist Developer w BNY Mellon. Java back-end developer od 2011. Doświadczenie zdobywał współpracując z ekspertami w projektach greenfieldowych i legacy. W wolnych chwilach lubi wspinaczkę i pływanie.


System

W naszym przykładzie posłużymy się systemem składającym się z wielu mikroserwisów, które przetwarzają dane. System ten jest dość złożony. Jest w nim wiele funkcjonalności, które można włączać i wyłączać w trakcie działania systemu, w zależności od potrzeb. Nazwijmy je „przełącznikami”. Są też procesy, które można uruchomić, a które trwają po kilkanaście minut. Posługując się analogią, możemy powiedzieć, że proces ten to coś na wzór przewijania bardzo długiej taśmy VHS, co w zależności od tego, do którego momentu chcemy taśmę przewinąć, zajmuje mniej lub więcej czasu.

Jak przebiega krok po kroku?

  • Użytkownik dokonuje interakcji z front endem (start the process, oznaczona #1).
  • Front end woła końcówkę (endpoint) RESTową aplikacji; metoda skojarzona z mapowaniem endpointu to startProcess() oznaczona #2.
    • Opcjonalnie kontroler może dokonać prostych walidacji (#3) i zwrócić odpowiednie kody HTTP – 202 w przypadku zaakceptowania żądania (#4) lub 409.
    • Kod 409 użyjemy w sytuacji, kiedy żądanie nie może zostać przyjęte, ponieważ inne jest już w toku a nie obsługujemy kolejkowania zadań. Tę sytuację na diagramie oznaczamy w #5.
  • Ta „czasochłonna operacja”, o której wspomnieliśmy w pierwszym akapicie, oznaczona jest #6 i nazwana po prostu process().
  • Metoda ta może wykonywać się w pętli i każdorazowo wysyłać aktualizację o postępach do tzw notifier service. Notifier service może być prostym serwisem Springowym, który jest świadom Web Socketów i to konkretnie do nich przekieruje aktualizacje (sendUpdateToWebSockets(), #7).
  • Front-end oprócz #2 ma odpowiedzialność by podłączyć się do WebSocketów (websocketConnect(), #8) – jest to konieczne by móc czytać wiadomości z tego kanału (readWebSocketUpdates).

Diagram sekwencji wygląda następująco:

diagram sekwencji

Wracając do wspomnianych „przełączników”, do sterowania nimi wystawione zostały endpointy REST. Poza nimi, są też endpointy pozwalające sprawdzić stan danej funkcji albo postęp wykonywania długiej operacji. Wszystko po REST. Jedna z tych funkcji to zapis wiadomości z kolejki JMS do bazy danych w celu późniejszej analizy. Nie jest to konieczne do działania całego systemu, ale może się czasem przydać więc mamy endpoint, który pozwala nam uruchomić ten zapis:

PUT /api/events/persistance/enabled

Mamy też endpoint, która pozwala nam wyłączyć zapis:

PUT /api/events/persistance/disabled

Oraz oczywiście sprawdzanie czy zapis jest włączony czy wyłączony:

GET /api/events/persistance/status

Tu dostajemy odpowiedź “enabled” lub “disabled” (w rzeczywistości dostajemy więcej informacji, ale dla uproszczenia przyjmijmy, że tylko tyle).

Całość wygląda mniej więcej tak jak na poniższym schemacie. Strzałki przedstawiają wywołania endpointów REST. Komunikacja jest w pełni synchroniczna.

postman

Problem

Endpointy możemy wołać z narzędzia typu Postman i początkowo rozwiązanie to było wystarczające. Jednak z czasem, takich endpointów zaczęło przybywać i oczywistym stało się, że potrzebujemy bardziej wyspecjalizowanego narzędzie z graficznym interfejsem, aby sterować wszystkimi funkcjami. Stworzyliśmy zatem aplikację w Angular 11 oraz mikroserwis w SpringBoot, który pełnił funkcję bramki (gateway) do reszty systemu.

Ta aplikacja to swoista deska rozdzielcza z przełącznikami, dzięki której można szybko zorientować się, która funkcja systemu w jakim jest stanie i ewentualnie ten stan zmienić. Tu pojawił się dość specyficzny problem. Założyliśmy, że aplikacja powinna śledzić stan poszczególnych “przełączników” i informować użytkownika o ewentualnych zmianach dokonanych przez innych użytkowników, ale było to trudne zadania, mając do dyspozycji tylko endpointy REST?

Rozwiązanie idealne

Idealnym rozwiązaniem byłaby refaktoryzacja aplikacji i zmiana wszystkich endpointów używanych do śledzenia stanu na mechanizm typu publish-subscribe. Pierwsze rozwiązanie, które nasuwa się na myśl, zakłada “messaging” (RabbitMQ albo inny MQ). Każdy serwis, dotąd wystawiający tylko endpoint REST do sprawdzania stanu, musiałby publikować zmiany stanu na określonym temacie (topicu).

W kolejce (niebieskie strzałki na diagramie poniżej). Bramka mogłaby śledzić te zmiany poprzez subskrypcję na tzw. “brokerze” (MQ Broker). Następnie UI byłby połączony z bramką poprzez WebSockety (WS – czerwone strzałki na diagramie), które pozwoliłyby serwisowi wysyłać do UI wiadomości o zmianach stanu. Rozwiązanie wydaje się być proste i eleganckie.

gateway websocket

Rozwiązanie realne

Problem z rozwiązaniem powyżej był jednak taki, że nie było możliwości wprowadzenia zmian w istniejących serwisach, np. dodania wsparcia dla MQ. W tym przypadku, gdy do dyspozycji jest REST, pozostało nam jedynie wykorzystać stary, dobry polling, czyli regularne odpytywanie endpointów o aktualny status. Poniekąd przypominało to długą podróż i powtarzające się pytanie “daleko jeszcze?”… Polling nie jest rozwiązaniem bez wad – generuje niepotrzebny ruch w sieci i ma spory narzut, szczególnie gdy komunikacja odbywa się z wykorzystaniem standardu TLS. Ma jednak jedną, choć niewielką, zaletę. Pozwala mianowicie sprawdzać, czy odpytywany serwis w ogóle działa.

Jeśli zapytania zaczynają zwracać “404”, albo inny kod błędu, to wiemy, że mamy do czynienia z sytuacją potrzebującą ingerencji (serwis nieoczekiwanie przestał działać lub mamy do czynienia z problemem sieciowym). Jako że chcieliśmy stworzyć aplikację do monitorowania stanu różnych serwisów i ich funkcji, informacja o tym, że któryś z nich przestał działać była dla nas dość istotna.

Mimo że musieliśmy użyć pollingu między serwisami a bramką, to między bramką a UI oparliśmy komunikację o WebSockety.

ui przeglądarka websocket

Mamy zatem w naszej bramce mieszankę pollingu i komunikacji po WebSocketach. Polling oparliśmy o prosty task scheduler z jednym wątkiem i stałym interwałem (czytanym z pliku konfiguracyjnego). Uruchamiamy go zaraz po stworzeniu beana dzięki adnotacji @PostConstruct.

  @PostConstruct
  public void startPolling() {
    taskScheduler = new ThreadPoolTaskScheduler();
    taskScheduler.setPoolSize(1);
    taskScheduler.setThreadNamePrefix("status-polling");
    taskScheduler.initialize();
    taskScheduler.scheduleWithFixedDelay(
this::updateStatus, 
pollingIntervalMillis);
  }

Task scheduler wywołuje okresowo metodę updateStatus, w której wykorzystujemy standardowy, springowy RestTemplate do odpytywania zdalnego serwisu oraz instancji SimpMessagingTemplate do wysyłania powiadomień o zmianie statusu po WebSocketach do UI. Status jest enumem o trzech możliwych wartościach: ENABLED, DISABLED, UNKNOWN. Przechowujemy go w polu o nazwie “currentStatus”. Powiadomienia publikowane są w topicu “/topic/status” (wartość stałej STATUS_TOPIC widocznej w kodzie poniżej).

  private void updateStatus() {
    var prevStatus = currentStatus;
    var newStatus = UNKNOWN;
    
    try {
      var statusResponse = restTemplate.getForEntity(statusUrl, String.class);
      var body = statusResponse.getBody();
      newStatus = parseStatus(body);
    } catch (RestClientException rce) {
      log.warn("Exception occurred when checking status. Message: " 
+ rce.getMessage());
    }
    currentStatus = newStatus;
 
    if (prevStatus != newStatus) {
      brokerMessagingTemplate.convertAndSend(
STATUS_TOPIC, new StatusMessage(newStatus.toString()));
    }
  }

Po stronie UI, mamy angularowy serwis, który nasłuchuje zmian statusu a następnie ogłasza je komponentom, wykorzystując EventEmitter.

@Injectable()
export class StatusService {
private _currentStatus = "unknown"

@Output() statusChanged = new EventEmitter<string>();

constructor(private http: HttpClient, private wsService: WebSocketService) {
  this.http.get(this.BASE_URL + "/status").subscribe( (value: any) => {
    this.updateStatus(value)
  })
  wsService.connect('status').subscribe(value => {
    this.updateStatus(value)
  })
}
(…)
private updateStatus(value: any) {
  this._currentStatus = value.toLowerCase()
  this.statusChanged.emit(this._currentStatus)
}
}

Zależnością tego serwisu jest standardowy klient HTTP (zmienna http) i specjalnie utworzony WebSocketService (wsService), który zawiera logikę inicjującą połączenie po WebSocketach. Klient HTTP wykorzystywany jest tylko w momencie uruchomienia UI do pobrania początkowego statusu. Wszystkie aktualizacje przychodzą już po WebSocketach.

@Injectable()
export class WebSocketService {
  connect(topic: string): Observable<any> {
    let rxStomp;

    const stompConfig = {
        brokerURL: protocol + '//' + host + '/notifications',
        reconnectDelay: 1000,
    };

    rxStomp = new RxStomp();
    rxStomp.configure(stompConfig);
    rxStomp.activate();

    return rxStomp.watch('/topic/' + topic).pipe(map(function (message) {
      return JSON.parse(message["body"]);
    }));
}
}

Powyższy kod jest oczywiście uproszczony – usunęliśmy z niego logikę niezwiązaną wprost z komunikacją. Pokazuje jednak prosty scenariusz wykorzystania WebSocketów.

Jest jedna rzecz, która może wymagać tu wyjaśnienia. Dlaczego “strzelamy” najpierw po REST do bramki, aby sprawdzić obecny status, zamiast wykorzystać w tym celu WebSockety, skoro reszta komunikacji i tak jest po WS? Faktycznie, moglibyśmy wystawić na bramce endpointy WS, aby odbierać takie zapytania z UI. Odpowiedź musiałaby jednak zostać wysłana w temacie “/topic/status”, co znaczałoby, że wyjdzie do wszystkich podłączonych klientów. Wydawałoby się, że nie ma w tym nic złego, ale naszym celem było wysyłanie na tym topicu powiadomienia o zmianie statusu, a w tym przypadku zaczną pojawiać się informacje o aktualnym statusie tylko dlatego, że podłączył się kolejny klient UI.

Bardziej eleganckim rozwiązaniem są kolejki WS. Dzięki nim bramka może wysłać wiadomość tylko do pytającego klienta. Dobre rozwiązanie, ale nas interesuje super-prosta, synchroniczna komunikacja typu “pytanie-odpowiedź”. Zadając jedno pytanie o status i oczekujemy pojedynczej odpowiedzi. Czekamy asynchronicznie po stronie UI, więc z punktu widzenia użytkownika nic się nie blokuje. Do tego REST nadaje się idealnie.

Testy „na froncie”

Testy po stronie UI sprowadzają się do „zamokowania” obiektu klasy WebSocketService i zwróceniu z metody connect obiektu klasy Subject, który pozwoli nam dowolnie wysyłać wiadomości tak, jakby pochodziły z prawdziwego WebSocketa.

describe('StatusService', () => {
 let injector: TestBed
  let service: StatusService
  let httpMock: HttpTestingController
  let wsMock: jasmine.SpyObj<WebSocketService>
  let mockStatusSubject: Subject<SystemCheckMessage>

  const ENABLED = 'enabled'
  const DISABLED = 'disabled'
  const UNKNOWN = 'unknown'

  beforeEach(() => {
    // We will use this subject to simulate messages coming from the Web Socket
  mockStatusSubject = new Subject()


  // Create a mock WebSocketService and provide a status subject to clients
  wsMock = jasmine.createSpyObj('WebSocketService', ['connect'])
  wsMock.connect.and.returnValue(mockStatusSubject)

  TestBed.configureTestingModule({
    imports: [HttpClientTestingModule],
    providers: [
      MongoControlService,
      {provide: WebSocketService, useValue: wsMock}
    ]
  });

  injector = getTestBed();
  service = injector.inject(MongoControlService);
  httpMock = injector.inject(HttpTestingController);
 })

 it('should emit statusChanged when status changes', (done) => {
 // Verify that the initial status is UNKNOWN:
 expect(service.currentStatus).toBe(UNKNOWN)

 // Subscribe to statusChanged event:
 service.statusChanged.subscribe((value) => {
  expect(value).toBe(DISABLED)
  done()
  })


  // Send the status update via mocked WebSocketService
  mockStatusSubject.next({status: DISABLED})
})
})

Fragment kodu powyżej to prosty test “Jasmine”. Sekcja beforeEach zawiera kod inicjalizujący środowisko testowe, a w nim “zamokowany” WebSocketService i Subject, dzięki któremu możemy symulować odbieranie powiadomień po WebSocketach. Sam test znajduje się jak zwykle w sekcji it. Zaczyna się od sprawdzenia, czy StatusService ma ustawiony status UNKNOWN, czyli wartość domyślną. Dalej subskrybujemy na wydarzenie statusChanged, aby móc wykryć zmianę statusu w StatusService. Lambda obsługująca to wydarzenie wykonuje asercję expect(value).toBe(DISABLED), a następnie powiadamia Jasmine, że test się zakończył, wywołując funkcję done. Ostatni krok to wysłanie statusu DISABLED, który chcemy odebrać w StatusService. Do tego służy nam mockStatusSubject stworzony w sekcji beforeEach.

Testy integracyjne „na back-endzie”

Testy „na froncie” są stosunkowo proste. Przyjrzymy się teraz zatem testom po stronie back-endu. Jesteśmy w środowisku SpringBoot, Java 8+. O ile już nie zostały dołączone w projekcie, to jedyne zależności, których potrzebujemy to spring-boot-starter-test (dostarcza m.in. Junita5 oraz bibliotekę AssertJ). Plan, według którego postępujemy, jest dość prosty i składa się z trzech punktów:

  1. Startujemy klasę testową, do której ładujemy kontekst aplikacji webowej (uruchomić musi się także klasa konfigurująca nasze WebSockety).
  2. W metodzie testowej subskrybujemy się na konkretnym end-poincie, na którym oczekiwać będziemy wiadomości.
  3. W tej samej metodzie testowej musimy mieć asercję, która zaczeka na pojawienie się oczekiwanej wiadomości.

It’s not rocket science.

Test powinien być adnotowany @SpringBootTest – będziemy ładować kontekst aplikacji webowej. @LocalServerPort to z kolei przydatna adnotacja, dzięki której możemy wstrzyknąć w zmienną prywatną wylosowany numer portu.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class WebSocketsTest {

  @LocalServerPort
  private Integer port;
  …
}

By zasubskrybować się pod endpoint, potrzebujemy stworzyć odpowiednio klienta oraz sesję:

@BeforeEach
void setUp() {
  this.webSocketStompClient = new WebSocketStompClient(new SockJsClient(
      Lists.newArrayList(new WebSocketTransport(new StandardWebSocketClient()))));
}

@DisplayName("An update message should be sent to topic")
@Test
void shouldPublishUpdates() throws InterruptedException, ExecutionException, TimeoutException {

  StompSession session = webSocketStompClient
      .connect(String.format("ws://localhost:%d/endpoint", port), new StompSessionHandlerAdapter() {})
      .get(1, SECONDS);

Teraz czas na samą subskrypcję:

session.subscribe("/topic/updates", new StompFrameHandler() {

  @Override
  public Type getPayloadType(StompHeaders headers) {
    return UpdateDTO.class;
  }

  @Override
  public void handleFrame(StompHeaders headers, Object payload) {
    UpdateDTO update = (UpdateDTO)payload;
    latch.countDown();
  }
});

Zauważmy, że po odebraniu wiadomości (reprezentowanej przez klasę UpdateDTO) jedyne co robimy, to zliczamy w dół na obiekcie CountDownLatch [2]. Naszym celem jest, aby ten przykład pozostał prosty, nie będziemy więc rozszerzać testu zanadto. Rozsądek nakazuje sprawdzić, czy aby na pewno w wiadomości znajdują się jakieś sensowne informacje (niepuste Stringi itd) zanim zawołamy countDown() – powiedzmy, że będzie to praca domowa.

Ostatecznie, wywołać trzeba asercję. Na potrzeby artykułu zadowolimy się chociażby jedną odebraną wiadomością na topicu websocketowym. Sprawdzamy jedynie wartość boolean zwróconą przez metodę await(). W naszym prostym przykładzie nie potrzebujemy nic więcej:

assertThat(latch.await(5, SECONDS)).isTrue();

Problemy podczas testowania i znajdowanie ich przyczyn

Problem nr 1 – test kończy się niepowodzeniem, metoda handleFrame() nie zostaje wywołana…

Warto użyć metody handleExceptions(), postawić w niej breakpoint i podejrzeć co się dzieje:

StompSession session = webSocketStompClient
    .connect(String.format("ws://localhost:%d/endpoint", port),
        new StompSessionHandlerAdapter() {

          @Override
          public void handleException(StompSession session, StompCommand command,
              StompHeaders headers, byte[] payload, Throwable exception) {
            super.handleException(session, command, headers, payload, exception);
          }
        })
    .get(1, SECONDS);

Debugger pomoże dostrzec następujący wyjątek:

org.springframework.messaging.converter.MessageConversionException: No suitable converter for payload type [class xyz…] from handler type…

Jest on związany z brakującą konfiguracją mappera/konwertera, którą dodamy w następujący sposób:

@Autowired
private ObjectMapper objectMapper;
private WebSocketStompClient webSocketStompClient;
private static final MappingJackson2MessageConverter mappingJackson2MessageConverter = new MappingJackson2MessageConverter();

@BeforeEach
void setUp() {
  this.webSocketStompClient = new WebSocketStompClient(new SockJsClient(
      Lists.newArrayList(new WebSocketTransport(new StandardWebSocketClient()))));

  objectMapper.registerModule(new JavaTimeModule());
  mappingJackson2MessageConverter.setObjectMapper(objectMapper);
  webSocketStompClient.setMessageConverter(mappingJackson2MessageConverter);
}

Od nas zależy, czy zaimplementujemy handleExceptions() oraz jak to zrobimy. Można odłożyć wyjątek, który został rzucony. Można też od razu zliczać w dół na countDownLatch’u – jest to opcjonalny krok, ale może skrócić czas wykonania testu.

@Override
public void handleException(StompSession session, StompCommand command,
    StompHeaders headers, byte[] payload, Throwable exception) {
       thrown.add(exception);
       latch.countDown();
}

Następnie wywołać należy taką oto prostą asercję:

assertThat(thrown).isEmpty();

Problem nr 2 – 404 not found.

Jeśli borykamy się z wyjątkiem:

java.util.concurrent.ExecutionException: org.springframework.web.client.HttpClientErrorException$NotFound: 404

to w pierwszej kolejności upewnijmy się, że klient i endpoint są kompatybilnie skonfigurowane. W naszym przypadku korzystamy z klienta SockJS (endpoint powinien mieć włączone Sock JS fallback options).

Podsumowanie

WebSockety rozwiązują problem rozsyłania powiadomień. Dzięki nim można uniknąć ciągłego odpytywania serwera po REST, a co za tym idzie zmniejszyć ruch w sieci, a także zmniejszyć czas reakcji klientów na zmianę. Testowanie nie należy do super-prostych, ale jest zdecydowanie wykonalne. Wymaga dodania kolejnego testu integracyjnego do naszego projektu. To niewielka cena za pewność, że implementacja poprawnie wysyła wiadomości/aktualizacje na WebSocketowy topic.

Warto pamiętać, że testy integracyjne nie uruchamiają się tak szybko, jak jednostkowe, dlatego zazwyczaj lepiej przestrzegać separacji jednych od drugich, aczkolwiek powinna być to decyzja projektowa. Jak pewnie zauważyliście, asercje pisaliśmy korzystając z klas biblioteki AssertJ. Jeśli jej nie znacie, zachęcamy do sprawdzenia jej możliwości – szczególnie, że jest dostarczana razem ze spring-boot-starter-test (z której i tak korzystamy). Przypomnijmy też o klasie CountDownLatch [2], która w bardzo elegancki sposób pomaga rozwiązać kwestie testowania asynchronicznego kodu.

Wsparcie dla WS zapewniają już wszystkie główne przeglądarki więc bez problemów można korzystać z tego rozwiązania w komunikacji UI – serwisy. Właściwie nie ma powodów, aby nie stosować WebSocketów. Jedyny scenariusz, w którym ten sposób komunikacji sprawia problemy, to skalowanie serwera. Ale to już temat na zupełnie inny artykuł. W naszym przypadku używaliśmy po jednej instancji mikroserwisu.

Zachęcamy do zadawania pytań i dzielenia się opinią na temat artykułu.


[1] spring.io

[2] docs.oracle.com

Przykładowy snippet:

package com.bny.demo;
 
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
 
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.collect.Lists;
import java.lang.reflect.Type;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
 
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class WebSocketsTest {
 
  @Autowired
  private ObjectMapper objectMapper;
 
  @LocalServerPort
  private Integer port;
 
  private static final Instant now = Instant.parse("2021-01-07T10:29:14.917Z");
  private WebSocketStompClient webSocketStompClient;
  private static final MappingJackson2MessageConverter mappingJackson2MessageConverter = new MappingJackson2MessageConverter();
 
  @BeforeEach
  void setUp() {
    this.webSocketStompClient = new WebSocketStompClient(new SockJsClient(
        Lists.newArrayList(new WebSocketTransport(new StandardWebSocketClient()))));
 
    objectMapper.registerModule(new JavaTimeModule());
    mappingJackson2MessageConverter.setObjectMapper(objectMapper);
    webSocketStompClient.setMessageConverter(mappingJackson2MessageConverter);
  }
 
  @Test
  void shouldPublishUpdates() throws InterruptedException, ExecutionException, TimeoutException {
    // given:
    insertTestData();
    CountDownLatch latch = new CountDownLatch(1);
    final List<Throwable> thrown = new ArrayList<>();
 
    StompSession session = webSocketStompClient
        .connect(String.format("ws://localhost:%d/endpoint", port), new StompSessionHandlerAdapter() {
 
          @Override
          public void handleTransportError(StompSession session, Throwable exception) {
            super.handleTransportError(session, exception);
          }
 
          @Override
          public void handleException(StompSession session, StompCommand command,
              StompHeaders headers, byte[] payload, Throwable exception) {
            thrown.add(exception);
            latch.countDown();
          }
        })
        .get(1, SECONDS);
 
    session.subscribe("/topic/updates", new StompFrameHandler() {
 
      @Override
      public Type getPayloadType(StompHeaders headers) {
        return UpdateDTO.class;
      }
 
      @Override
      public void handleFrame(StompHeaders headers, Object payload) {
        latch.countDown();
      }
    });
 
    // when:
    callLogic();
 
    // then:
    assertThat(latch.await(5, SECONDS)).isTrue();
    assertThat(thrown).isEmpty();
  }
 
}


Zdjęcie główne artykułu pochodzi z unsplash.com.

Podobne artykuły

[wpdevart_facebook_comment curent_url="https://justjoin.it/blog/websockety-w-mikroserwisach-jak-zmniejszyc-ruch-w-sieci-i-czas-reakcji-klientow-na-zmiane" order_type="social" width="100%" count_of_comments="8" ]