Nasza droga do Reactive Extensions – cz. 3. – Meta

W drugiej części serii opisałem jak zastosowanie wzorca Event Bus rozwiązało część naszych problemów, ale wprowadziła inne. W trzeciej i ostatniej części serii opiszę jak biblioteka Reactive Extensions  (RX) rozwiązała te problemy, oraz dlaczego postanowiliśmy pozostawić niektóre obszary kodu bez zmian.

Obserwator i obserwowany

Wzorzec projektowy obserwatora i obserwowanego nie jest zbyt często wykorzystywany w aplikacjach .Net. Wiąże to się z faktem iż C# wspiera zdarzenia (events),  które w większości przypadków są rozwiązaniem wystarczająco dobrym. Microsoft zauważył jednak, że istnieją bardziej rozbudowane scenariusze, takie jak nasze, w których nie zdają one egzaminu, dlatego dodał interfejsy IObserver<T> i IObservable<T>. Różnią się one od klasycznej definicji książkowej tego wzorca kilkoma szczegółami:

  • Są generyczne (parametryzowane typem) 
  • IObservable zawiera tylko metodę Subscribe akceptującą jako parametr obserwatora i zwracającą obiekt implementujący IDisposable, zamiast wywołania metody Unsubscribe należy wywołać metodę Dispose z wyniku metody Register
  • Obserwator posiada 3 metody OnNext, OnError i OnCompleted i jest bardziej skoncentrowany na obserwacji zmiany jednej wartości niż ogólnego stanu obserwowanego obiektu.

RX – Linq dla obserwatorów

LINQ jest bardzo potężnym fragmentem standardowej biblioteki .Net opartym o 2 proste interfejsy IEnumerable i IEnumerator. Sprawił że operacje na kolekcjach stały się tak proste, że większość programistów .Net prędzej zmieni pracę niż zrezygnuje z ich używania. LINQ jest zbiorem metod rozszerzających i wzorcu dekoratora: większość metod jako parametr przyjmuje instancję IEnumerable i ewentualnie inne parametry mające wpływ na zachowanie metody. Wewnątrz tej metody jest tworzona i zwracana instancja klasy implementującej IEnumerable, która akceptuje  parametry metody rozszerzającej jako parametry konstruktora. Metoda GetEnumerator tej klasy wywołuje metodę GetEnumerator oryginalnej kolekcji i przekazuje ją do konstruktora własnej klasy implementującej interfejs IEnumerator, wraz z parametrami swojego konstruktora. Dopiero ta klasa podczas wywoływania metody MoveNext analizuje parametry przekazane w konstruktorze i dokonuje decyzji czy wywoływać metodę MoveNext z oryginalnego Enumeratora, oraz jak przekształcić jej wynik, aby otrzymać docelową kolekcję.

Zarówno interfejs IEnumerable jak i IObservable opisują serię danych, pozwalają odczytywać kolejne elementy, oraz dostarczają informacji o tym czy seria zawiera więcej elementów. Podstawową różnicą jest fakt iż w przypadku IEnumerable to kod kliencki musi odczytywać kolejne elementy, a w przypadku IObservable to biblioteka wysyła kolejne elementy.

Na tym etapie utworzenie zestawu rozszerzeń do interfejsu IObservable wydaje się być dobrym pomysłem, tak właśnie powstała biblioteka Reactive Extensions. Pomysł okazał się tak dobry, że został zaadaptowany w większości języków programowania. Oprócz oryginalnej dokumentacji stworzonej przez Microsoft, powstała alternatywna wielo-platformowa dokumentacja pozwalająca lepiej zrozumieć koncepcje. Po zapoznaniu się z nią postanowiliśmy wykorzystać bibliotekę do implementacji jednej z funkcjonalności. Zaskoczył nas fakt iż pomimo, że integracja z RX nie była uwzględniona w oryginalnej wycenie zadania, udało się je dostarczyć na czas.

Dostarczanie wartości właściwości gdy zostaje podniesione zdarzenie

Jak wspominałem w pierwszej części artykułu API, z którego korzystamy składa się w większości z par właściwość – zdarzenie. Kluczowe więc dla nas było żeby istniał sposób na  dostarczenie wartości właściwości po podniesieniu zdarzenia. Na szczęście okazało się, że RX posiada odpowiednią metodę

var sizeObservable =
    Observable
        .FromEventPattern(
            handler =&gt; someObject.SizeChanged += handler,
            handler =&gt; someObject.SizeChanged -= handler)
        .Select(o =&gt; someObject.Size);

Ponieważ w większości przypadków nasz kod wymagał aby aktualna wartość właściwości natychmiastowo wyświetliła się na ekranie, stworzyliśmy własne rozszerzenie, które do powyższej metody dodawało dostarczenie tej wartości w momencie subskrypcji.

Komunikacja między wątkami

Ponieważ nasza aplikacja wykorzystuje wiele wątków, wsparcie przesyłania wiadomości pomiędzy wątkami była czymś czego oczekiwaliśmy. RX posiada metodę ObserveOn, która jako parametr przyjmuje obiekt typu Dispatcher. Niestety nie mogliśmy jej wykorzystać ponieważ inna biblioteka, którą wykorzystujemy, ukrywa ten obiekt pod własną abstrakcją. Na szczęście okazało się, że napisanie takiego rozszerzenia (Dispatch) własnoręcznie nie jest szczególnie trudne: 1 metoda rozszerzająca, 2 klasy mniej niż 100 linijek kodu.

Obserwowanie wielu źródeł na raz


Często to co użytkownik widzi na ekranie jest wyliczane na podstawie kilku wartości dostarczanych przez bibliotekę. Kolejną fukcjonalnością jakiej oczekiwaliśmy od RX była możliwość łączenia wartości z różnych źródeł. Rozwiązaniem jest metoda CombineLatest przyjmująca jako parametr kilka obiektów IObservable i funkcję która jako parametry akceptuje ostatnie znane wartości dostarczone przez obiekty Observable, a jako wynik zwraca wartość dostarczoną do obserwatora.

Observable
    .CombineLatest(
        firstObservable,
        secondObservable,
        thirdObservable,
        (firstValue, secondValue, thirdValue) =&gt; firstValue * secondValue * thirdValue);

Poprawa wydajności

Filtrowanie wartości

Podobnie jak LINQ, RX posiada metodę Where, pozwalającą usunąć z sekwencji wartości nie spełniających określonych warunków, odciążając w ten sposób obserwatora. Dodam że inne implementacje RX nazywają tą metodę Filter.

Usuwanie duplikatów

RX posiada metodę Distinct, która gwarantuje, że obserwator otrzyma każdą wartość tylko raz.W naszej aplikacji jednak okazało się, że znacznie bardziej użyteczna jest metoda DistinctUntilChanged, która usuwa duplikaty następujące po sobie. Ustawienie CombineLatest, DistinctUntilChanged i Dispatch w odpowiedniej kolejności pozwoliło wyeliminować większość nadmiarowej komunikacji między wątkami.

Debouncing

RX zwiera metodę Debounce. Która ogranicza częstotliwość dostarczania kolejnych wiadomości. Jest ona użyteczna jeżeli nowe wartości mogą przychodzą tak często, że użytkownik nie jest w stanie ich konsumować, a utrata części komunikatów nie jest problemem. Dobrym przykładem może tu być czujnik poziomu płynu chłodniczego, w trakcie jazdy po wyboistej drodze, gdy znajdujemy się w okolicach krytycznie niskiego poziomu. W takim przypadku czujnik co chwila wysyła informację, że wszystko jest OK albo że należy dolać płynu, to sprawia, że kontrolka na desce cały czas by mrugała denerwując użytkownika. Po zastosowaniu metody Debounce kontrolka zapalałaby się na kilka – kilkanaście sekund i gasła.

Testy jednostkowe

W naszym projekcie cały czas staramy się podnosić poziom pokrycia kodu testami. Jeżeli to ułatwia pracę stosujemy biblioteki Moq i NSubstitute. Niestety obie te biblioteki domyślnie zwracają null dla właściwości typu IObservable, co zmusza nas do rozszerzenia kodu inicjującego test. Nie jest to jednak tak uciążliwe na jakie wygląda.

Analiza problemów z produkcji

Podobnie jak LINQ i metody asynchroniczne, generuje bardzo długie i nieczytelne stosy wywołań. Czasami zdarza się, że w całym stosie nie ma żadnej linijki kodu należącej do naszej aplikacji.

Nie rezygnujemy z event-agregatora

Event agregator ma miejsce w naszym projekcie i nie planujemy go usuwać. Istnieją zdarzenia o których informujemy przy pomocy obydwóch mechanizmów.

Nasza podróż przez różne rozwiązania, nauczyła nas, że jeżeli istnieją różne rozwiązania jednego problemu, to każde z nich ma swoje wady i zalety, i nie należy się kurczowo trzymać jednego rozwiązania.

3 odpowiedzi do “Nasza droga do Reactive Extensions – cz. 3. – Meta”

  1. Hej bardzo ciekawy artykuł 😉 czy w opisywanym przez Ciebie Reactive Extension poza debouncing jest jeszcze jakaś inna obsługa Backpresure? Pytam bo np. w Java, RxJava cała odpowiedzialność za backpresure była w Observable. Teraz w RXJava 2 mamy możliwość konwersji Observable do Flowable a to daje dostęp do kliku strategii do obsługi Backpresure. A to zmienia jakość życia 😉

  2. Pingback: dotnetomaniak.pl

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *