RX extensions w przykładach

RxExtension – to biblioteka od Microsoftu ułatwiająca programowanie asynchroniczne. Opiera się na istniejących interfejsach IObservable oraz IObserver.
W RX wiadomości są traktowane jako strumienie danych, do których należy się przypiąć i reagować na pojawienie się nowej wiadomości. Najnowsza wersja ma już cyferkę 2, ale nie jest jeszcze oznaczona jako stabilna. Ja do nauki wykorzystałem wersję 1 oraz książeczkę dostępną na stronie RX – Dev Labs Hands On. Na Channel9 są jakieś filmy na temat RX. Poniżej pokaże kilka przykładów i opis jak korzystać z RX, źródła pochodzą z książki, będą dostępne razem z książką i projektem MSVC. Drobna uwaga co do książki i źródeł: w książce podane są dwa pliki do których należny dodać referencję, aktualna wersja RX dostarcza tylko jednej biblioteki – System.Reactive, natomiast System.CoreEx jest już w systemie. Ja korzystałem z .net 4.0. Podobnie kilka metod wymienionych w książce nie jest już dostępne w związku z czym zostały zamienione na inne dostępne i spełniające te same wymagania.

Zaczynamy od poznania interfejsów:

.csharpcode, .csharpcode pre
{
font-size: small;
color: black;
font-family: Consolas, “Courier New”, Courier, Monospace;
background-color: #ffffff;
/*white-space: pre;*/
}

.csharpcode pre { margin: 0em; }

.csharpcode .rem { color: #008000; }

.csharpcode .kwrd { color: #0000ff; }

.csharpcode .str { color: #a31515; }

.csharpcode .op { color: #0000c0; }

.csharpcode .preproc { color: #cc6633; }

.csharpcode .asp { background-color: #ffff00; }

.csharpcode .html { color: #800000; }

.csharpcode .attr { color: #ff0000; }

.csharpcode .alt
{
background-color: #f4f4f4;
width: 100%;
margin: 0em;
}

.csharpcode .lnum { color: #606060; }

   1:  static void Main(string[] args)
   2:  {
   3:      IObservable<int> source = Observable.Empty<int>();
   4:      IObserver<int> handler = null;
   5:   
   6:      IDisposable subscription = source.Subscribe();
   7:      Console.WriteLine("Press ENTER to unsubscribe and dispose");
   8:      Console.ReadLine();
   9:   
  10:      subscription.Dispose();
  11:  }

RX opiera się o dwa interfejsy IObservable oraz IObserver. Klasa Observable pochodzi System.Reactive.Linq. Kod powyżej nie jest zbytnio porywający, ale pokazuje, że po subskrypcji do źródła danych otrzymujemy instancję IDisposable, którą trzeba będzie wywalić do kosza po zakończeniu pracy. Później będzie to zrobione przy użyciu mechanizmu using.

Interfejs IObserver definiuje trzy metody, które muszę zostać zdefiniowane:

   1:  public interface IObserver<in T>
   2:  {
   3:      void OnCompleted();
   4:      void OnError(Exception error);
   5:      void OnNext(T value);
   6:  }
  • OnCompleted: źródełko wyschło
  • OnNext:  nowa informacja
  • OnError: coś poszło nie tak

Podczas dopisywania się do obiektu, który ma dostarczać informacji można wykorzystać wyżej wymieniony interfejs lub skorzystać z wyrażeń lambda. To drugie podejście jest prostsze, dodatkowo nie wymagane jest definiowanie dla wszystkich metod z interfejsu.
Na początek pełna deklaracja:

   1:  static void Main(string[] args)
   2:  {
   3:      IObservable<int> source = Observable.Empty<int>();
   4:   
   5:      IDisposable subscription = source.Subscribe(
   6:          x=> Console.WriteLine("Has new value {0}", x),
   7:          ex=>Console.WriteLine("Exception caught {0}", ex.Message),
   8:          ()=>Console.WriteLine("No more items")
   9:          );
  10:   
  11:      Console.WriteLine("ENTER to dispose");
  12:      subscription.Dispose();
  13:  }

W tym przypadku od razu zostanie wywołana metoda OnCompleted, ponieważ źródło danych jest puste. Po zakończeniu pracy zwalniamy zasoby przez wywołanie Dispose na obiekcie zwróconym po subskrypcji. Co ważne, aby móc korzystać z rozszerzeń dla RX należy dodać referencję oraz using do System.Reactive.Linq.

Jednoelementowa kolekcja intów:

   1:  static void Main(string[] args)
   2:  {
   3:      IObservable<int> source = Observable.Return(42);
   4:   
   5:      var subscriber = source.Subscribe(
   6:          x=>Console.WriteLine("Value: {0}",x),
   7:          ex=>Console.WriteLine("Exception: {0}", ex.Message),
   8:          ()=>Console.WriteLine("end!")
   9:          );
  10:   
  11:      Console.WriteLine("ENTER to dispose");
  12:      subscriber.Dispose();
  13:  }

Kod powyżej zwróci raz wartość 42 (sens życia), a następnie zakończy poprzez wywołanie OnCompleted. Następnie zwolnienie zasobów. Ponownie wykorzystanie Reactive.Linq do stworzenia jednoelementowej kolekcji.

Aby zrobić coś ciekawszego można podmienić kod definiujący źródło na coś takiego:

   1:  IObservable<int> source = Observable.Range(5, 7);

Spowoduje do wygenerowanie cyfr od 5 do 11, następnie zakończy przez OnCompleted.

Aby nie zanudzać prostymi przykładami, RX umożliwia stworzenie obserwowanej pętli for, robi się to w taki sposób:

   1:  static void Main(string[] args)
   2:  {
   3:      IObservable<int> source = Observable
   4:          .Generate(0,          // initial state
   5:          i => i < 10,    // condition
   6:          i => i + 1,        // iteration step
   7:          i => i * i);       // iteration operation
   8:   
   9:      using (var s = source.Subscribe(
  10:          x => Console.WriteLine(x)    // only the working stuff will be handled
  11:                                          // no errors and no exceptions
  12:                                          // no information about sequence finish either
  13:          )) { };
  14:  }

Coraz ciekawiej co nie?
Zaczynamy od 0, następnie warunkiem jest i mniejsze od 10, w każdym kroku i będzie zwiększane co 1, a wynikiem operacji ma być i*i, wynik mnożenia nie jest zapisywany do i, tylko zwracany jako wynik. W przeciwnym wypadku pętla skończyła by się za wcześnie.
Podczas dopinania się do źródła definiujemy tylko metodę OnNext, która przyjmuje jeden parametr, w ten sposób nie zostaniemy powiadomieni o skończeniu się danych lub o wystąpieniu błędu. Dodatkowo wykorzystany zostanie mechanizm using, na który spadnie odpowiedzialność zwolnienia zasobów z IDisposable.

Jeśli zastanawialiście się jak to możliwe że pętla nie zakończy skoro w klamrach using nic nie ma, nie dziwcie się, dla mnie na początku to także było dziwne. Ale jeśli odpalić debuggera i sprawdzić wątki, to wszystkie wywołania są dokonywane z głównego wątku. Dopiero po skończeniu się zasobów w IObservable, zostanie wykonany kod z klamerek.

Żeby nie było nudno, teraz przykład z normalniejszym kodem, takim który jest w klamerkach:

   1:  static void Main(string[] args)
   2:  {
   3:      var source = Observable.Generate(
   4:          0,
   5:          i => i < 10,
   6:          i => i + 1,
   7:          i => i * i,
   8:          i => TimeSpan.FromSeconds(i)
   9:          );
  10:   
  11:      using (var s = source.Subscribe(
  12:          x => Console.WriteLine("next: {0}", x),
  13:          ex => Console.WriteLine("exception: {0}", ex.Message),
  14:          () => Console.WriteLine("no more")))
  15:      {
  16:          Console.WriteLine("ENTER");
  17:          Console.ReadLine();
  18:      }
  19:  }

Dwie zmiany zostały wprowadzone; dodałem 1 sekundowe opóźnienie w generowaniu wartości i, co powoduje że wywoływania zaczynają wreszcie być prawdziwie asynchroniczne. To powoduje, potrzebę umieszczenie Console.ReadLine w ciele using, w przeciwnym wypadku, nie zdążymy odczytać nawet pierwszej wartości. Walnięcie ENTER w trakcie działania dema, spowoduje jego zakończenie.
Okno wątków w debuggerze, pokazują że podczas wołania OnNext, wykorzystywany jest dodatkowy Worker Thread, stworzony przez RX.

Teraz drobna zmiana, do naszej konsoli dołożona zostanie prosta forma (WinForms – przykłady z książki są na tym oparte, więc i ja z tego skorzystałem). Będzie służyć za pośrednika do źródła danych, załóżmy sobie że jest taki kod:

   1:  static void Main(string[] args)
   2:  {
   3:      var label = new Label();
   4:      var form = new Form
   5:      {
   6:          Controls = { label }
   7:      };
   8:   
   9:      var moves = Observable.FromEventPattern<MouseEventArgs>(form, "MouseMove");
  10:   
  11:      using (moves
  12:          .Subscribe(
  13:              x => label.Text = x.EventArgs.Location.ToString(),
  14:              ex => label.Text = ex.Message,
  15:              () => label.Text = "Mouse is over?!" ) )
  16:      {
  17:          Application.Run(form);
  18:   
  19:      }
  20:  }

Pojawia się tutaj prosta forma z tekstem do którego będziemy zapisywać informacje o aktualnym położeniu myszy. To co interesujące znajduje się w linijce 9, gdzie tworzymy źródło danych na podstawie przychodzących zdarzeń z formy, następnie zapisujemy się do nich. Ponieważ w tym przykładnie nie stosowane są żadne specjalne opóźnienia (o nich poniżej), nie potrzeba tutaj wykorzystywać Dispatchera czy obiekty synchronizacji pomiędzy wątkami. W okienku wątków widać, że każde wywołanie metody OnNext realizowane jest w głównym wątku UI, dlatego bezpieczna jest modyfikacja labelki.

   1:  static void Main(string[] args)
   2:  {
   3:      var label = new Label();
   4:      var form = new Form
   5:      {
   6:          Controls = { label }
   7:      };
   8:   
   9:      var moves = Observable.FromEventPattern<MouseEventArgs>(form, "MouseMove");
  10:   
  11:      using (moves
  12:          .Subscribe(
  13:              x => label.Text = x.EventArgs.Location.ToString(),
  14:              ex => label.Text = ex.Message,
  15:              () => label.Text = "Mouse is over?!" ) )
  16:      {
  17:          Application.Run(form);
  18:      }
  19:  }

Najciekawsza jest linia 9, gdzie wydarzenia generowane przez formę/mysz są definiowane jako źródło danych, a następnie w linii 13 wartość ta zostaje wyłuskana z argumentów oraz zapisana w labelce na formatce.

Kolejny przykład będzie rozwinięciem tego powyżej:

   1:  static void Main(string[] args)
   2:  {
   3:      var textbox = new TextBox();
   4:      var form = new Form
   5:      {
   6:          Controls = { textbox }
   7:      };
   8:   
   9:      var moves = Observable
  10:          .FromEventPattern<MouseEventArgs>(form, "MouseMove")
  11:          .Select(e => e.EventArgs.Location);
  12:   
  13:      var texts = Observable
  14:          .FromEventPattern<EventArgs>(textbox, "TextChanged")
  15:          .Select(e => (e.Sender as TextBox).Text);
  16:   
  17:      var msubs = moves.Subscribe(x => Console.WriteLine("mouse position: {0}", x));
  18:      var ksubs = texts.Subscribe(x => Console.WriteLine("Textbox text: {0}", x));
  19:   
  20:      using (new CompositeDisposable(msubs, ksubs))
  21:      {
  22:          Application.Run(form);
  23:      }
  24:  }

Jak widać tutaj ponownie nasłuchujemy na wiadomości generowane przez ruch myszy, ale dodatkowo oczekujemy na wpisywany przez użytkownika tekst w formatce. Warto zauważyć, że w przypadku tekstu, źródłem danych nie jest formatka, ale sama kontrolka. W tym przykładzie pod uwagę bierzemy tylko pozytywne wywołania OnNext. Dodatkowo subskrypcja wykorzystuje projekcję (Select) i ostatecznej rozgrywce (17 i 18) otrzymujemy tylko takie wartości, które są dla nas najbardziej interesujące. Na sam koniec przykładu, wykorzystujemy klasę CompositeDisposable, która umożliwia wykorzystanie mechanizmu using na więcej niż jednym obiekcie IDisposable.

Kolejnym ciekawy rozszerzenie dla RX jest mechanizm Distinct, który wyśle powiadomienie tylko wtedy, gdy jest ono różne od poprzedniego.

   1:  var texts = Observable
   2:                .FromEventPattern<EventArgs>(textbox, "TextChanged")
   3:                .Select(e => (e.Sender as TextBox).Text)
   4:                .Do(e => Console.WriteLine("Before DistinctUntilChanged: {0}", e))
   5:                .DistinctUntilChanged();

Cały kod jest taki sam jak poprzednio, zmienia się tylko definicja źródła danych dla tekstu. Linia 4 pokazywać będzie, że wiadomości są przesyłane – metoda Do. Natomiast linia 5 zatrzyma ich dalszą propagację, gdy będę one takie same jak poprzednio otrzymane. Należy zwrócić uwagę podczas używania DistinctUntilChanged na kolejność jego użycia. Jeśli zostanie wywołany za wcześnie, kolejne eventy mogą nie zostać przesłane. W ramach ćwiczeń można wstawić go przed Select i zobaczyć co się dzieje. W tym przykładzie, pod uwagę brana będzie zawartości kontrolki w textboxie. A więc, gdy zaznaczona zostanie literka i zostanie zastąpiona przez taką samą, obiekt nasłuchujący nie zostanie powiadomiony o takie zmianie.

Następne rozszerzenie dla RX to Throttle, który powoduje że powiadomienia są przesyłane z pewnym opóźnieniem, każda nowa aktualizacja na źródle resetuje ten licznik. Dla przykładu, gdy chcemy udostępnić mechanizm który działa jak słownik z podpowiedziami, aby nie włączać wyszukiwania dla każdej wpisanej literki, można dodać małe opóźnienie rzędu kilkudziesięciu milisekund, które dla użytkownika będzie nie zauważalne, a jego wykorzystanie spowoduje zmniejszenie wykorzystania zasobów, ponieważ ilość generowanych zapytać będzie mniejsza. Poniżej przykład definicji takiego źródła danych:

   1:  var moves = Observable
   2:            .FromEventPattern<MouseEventArgs>(form, "MouseMove")
   3:            .Select(e => e.EventArgs.Location)
   4:            .Throttle(TimeSpan.FromMilliseconds(100));
   5:   
   6:  var texts = Observable
   7:            .FromEventPattern<EventArgs>(textbox, "TextChanged")
   8:            .Select(e => (e.Sender as TextBox).Text)
   9:            .Throttle(TimeSpan.FromMilliseconds(500))
  10:            .DistinctUntilChanged();

W pierwszy przypadku, informację o pozycji myszy otrzymamy gdy użytkownik okres około 100 ms nie będzie poruszać myszą. Należy być świadomym, że nie dostaniemy wszystkich wartości pośrednich, tylko ostatnią pozycję myszy. W drugim przypadku, jest dodatkowe założenie, użytkownik nie może zmieniać wartości tekstu przez około 500ms, a dodatkowo wartość musi być inna niż poprzednia – modyfikator DistinctUntilChanged. W bardzo prosty sposób ograniczyć można ilość powiadomień, które obsłużyć musi obiekt obsługujący kontrolkę.

Wcześniej mówiłem o tym, że będzie o pełniejszej asynchroniczności oraz o kontrolkach, czas najwyższy na rozwiązanie tej tajemnicy. Jeśli wiemy, że obsługa zdarzeń będzie wymagała interakcji z kontrolkami (lub kolekcjami – np. ObservableCollection w WPF) wymagającymi synchronizami z głównym wątkiem UI należy wykorzystać odpowiednie rozszerzenie, które umożliwia obserwację zdarzeń wykonywaną na głównym wątku aplikacji – zamieszane? Kod was oświeci:

   1:  static void Main(string[] args)
   2:  {
   3:      TextBox t1 = new TextBox();
   4:      Label l1 = new Label { Left = t1.Width + 20 };
   5:      Form f1 = new Form
   6:      {
   7:          Controls = { t1, l1 }
   8:      };
   9:   
  10:      var source = Observable
  11:          .FromEventPattern(t1, "TextChanged")
  12:          .Throttle(TimeSpan.FromMilliseconds(250))
  13:          .Select(x=>(x.Sender as TextBox).Text)
  14:          .DistinctUntilChanged();
  15:   
  16:      using (source
  17:          .ObserveOn(WindowsFormsSynchronizationContext.Current)
  18:          .Subscribe(x => l1.Text = x))
  19:      {
  20:          Application.Run(f1);
  21:      }
  22:  }

Jak widać, źródło danych działa teraz prawdziwie asynchronicznie (wykorzystanie Throttle), aby móc wykonać kod zdefiniowany w OnNext (linia 18) należy wcześniej powiedzieć że obsługa (obserwacja) ma nastąpić w WindowsFormsSynchronizationContext.Current (dla WPF to będzie Application.Current.Dispatcher lub Dispatcher.Current) – lub w implementacji wykorzystać mechanizm synchronizacji, jak zawsze to zależy od potrzeb implementacji. W takiej sytuacji źródło działa w osobnym wątku, ale powiadomienia i ich obsługa nastąpi w wątku głównym, gdzie można modyfikować UI.

Następny przykład jest trochę dłuższy, pokazuje jak wykorzystać RX w połączeniu z serwisami implementowanymi w WCF, dokładniejsze omówienie poniżej:

   1:  private static void Main()
   2:  {
   3:      var service = new DictServiceSoapClient("DictServiceSoap");
   4:      Func<string, string, string, IObservable<DictionaryWord[]>> match = Observable
   5:          .FromAsyncPattern<string, string, string, DictionaryWord[]>(
   6:              service.BeginMatchInDict, service.EndMatchInDict);
   7:   
   8:      Func<string, IObservable<DictionaryWord[]>> matchInWordNetByPrefix = term => match("wn", term, "prefix");
   9:   
  10:   
  11:      var t1 = new TextBox();
  12:      var f1 = new Form
  13:                   {
  14:                       Controls = {t1}
  15:                   };
  16:   
  17:   
  18:      IObservable<string> textSource = Observable
  19:          .FromEventPattern<EventArgs>(t1, "TextChanged")
  20:          .Throttle(TimeSpan.FromMilliseconds(250))
  21:          .Select(x => ((TextBox) x.Sender).Text)
  22:          .Where(x => x.Length > 0)
  23:          .DistinctUntilChanged();
  24:   
  25:      IDisposable formRequest = textSource
  26:          .Finally(()=>Console.WriteLine("Finalized"))
  27:          .Subscribe(x => matchInWordNetByPrefix(x)
  28:                              .Finally(() => Console.WriteLine("Finalized: {0}", x))
  29:                              .Subscribe(words =>
  30:                                             {
  31:                                                 Console.WriteLine("{0} - {1}", x, words.Count());
  32:                                                 foreach (DictionaryWord w in words)
  33:                                                 {
  34:                                                     Console.Write("{0},", w.Word);
  35:                                                 }
  36:                                                 Console.WriteLine("n*******************************************");
  37:                                             },
  38:                                         ex =>
  39:                                             {
  40:                                                 Console.WriteLine("n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
  41:                                                 Console.WriteLine(ex.Message);
  42:                                                 Console.WriteLine("n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
  43:                                             }));
  44:   
  45:   
  46:      using (formRequest)
  47:      {
  48:          Application.Run(f1);
  49:      }
  50:  }

Zacznijmy od początku:
Linia 3 definiuje nowe proxy do serwisu WCF, nie istotne z punktu widzenia RX. Następnie linia 4 definiuje zmienną typu Func, będzie to delegat przyjmujący jako dane wejściowe trzy stringi, a zwracający tablicę wyrazów słownikowych (ach ten mój angielski). Dalej (ciągle w linii 4) przypisujemy do tego delegata metodę z RX wykorzystując wcześniej zdeklarowany serwis WCF, a dokładniej dwie jego metody BeginMatchInDict oraz EndMatchInDict. Są to dwie metody, które umożliwiają asynchroniczne wykorzystanie serwisu. Podczas definiowania dostępu do serwisu WCF zaznaczona została opcja generowania zapytań asynchronicznych. Dalej, linia 8 to ponowne zdefiniowanie delegata, tym razem będzie on przyjmować tylko jeden argument – string, który będzie zawierać właściwe dla nas zapytanie, zwracany typ nie ulegnie zmianie. Pozostałe dwa stringi zostają przypisane na stałe. Ich wartość nie jest istotna dla RX. Teraz możemy korzystać asynchronicznie z WCF podając tylko jednego stringa podczas zapytania. Potem “normalna” definicja formatki, później źródełko z danymi;  pierwsze to textbox, drugie to serwis WCF który na podstawie wysłanej części słowa, zwraca tablicę wyrazów, który mogą je dokończyć (np. “yellow a“: yellow adder’s tongue,yellow ageratum,yellow asphodel,yellow avens). Metoda Finally posłuży aby wykonać jakiś kod w momencie zwalniania zasobów zwróconych przez Subscribe. Kod z linijki 28 zostanie wykonany po tym, gdy wszystkie pasujące słowa zostaną wypisane na ekran. Linijka 26 zostanie wywołana po zamknięciu formatki, czyli  zakończeniu using z linii 46.

Jeśli podczas zabawy z tym przykładem zauważycie, że możliwe jest zdefiniowanie nowego pytania do serwisu WCF zanim, przyjdzie odpowiedź na wcześniejsze – to macie racje (za .net rocks – golfclap for you). Występuje tu taka sytuacja. Na rozwiązanie tego problemu zaproponowano dwa podejścia, dla mnie Switch jest (będzie poniżej) czytelniejszy i zrozumiały. W udostępnionym projekcie są oba rozwiązania.

   1:  static void Main(string[] args)
   2:  {
   3:      var t1 = new TextBox();
   4:      var l1 = new ListBox { Top = t1.Height + 10, Height = 250, Width = 150 };
   5:      var f1 = new Form
   6:      {
   7:          Controls = { t1, l1 }
   8:      };
   9:   
  10:      var textSource = Observable
  11:          .FromEventPattern<EventArgs>(t1, "TextChanged")
  12:          .Throttle(TimeSpan.FromMilliseconds(50))
  13:          .Select(x => (x.Sender as TextBox).Text)
  14:          .Where(x => x.Length >= 3)
  15:          .DistinctUntilChanged()
  16:          .Do(Console.WriteLine);
  17:   
  18:      var service = new DictServiceSoapClient("DictServiceSoap");
  19:      var dictSource = Observable
  20:          .FromAsyncPattern<string, string, string, DictionaryWord[]>(service.BeginMatchInDict, service.EndMatchInDict);
  21:   
  22:      Func<string, IObservable<DictionaryWord[]>> matchInWordNetByPrefix = term => dictSource("wn", term, "prefix");
  23:   
  24:      var data = textSource
  25:          .Select(x => matchInWordNetByPrefix(x))
  26:          .Switch();
  27:   
  28:      using (data
  29:          .ObserveOn(WindowsFormsSynchronizationContext.Current)
  30:          .Subscribe(w =>
  31:          {
  32:              l1.Items.Clear();
  33:              l1.Items.AddRange(w.Select(word => word.Word).ToArray());
  34:          },
  35:          ex =>
  36:          {
  37:              MessageBox.Show(ex.Message);
  38:          }))
  39:   
  40:          Application.Run(f1);
  41:  }

Początek bez zmian, dopiero podczas wykorzystywania tego co wpisze użytkownik i przesyłania tego do serwisu WCF, pojawiają się pierwsze zmiany. Na początku (linia 24) czekamy aż, użytkownik coś wpisze, a następnie wysyłamy to do serwisu. Jeśli w między czasie użytkownik zmieni coś w kontrolce, ponownie wyślemy zapytanie, ale z nową wartością. Aby nie obsługiwać wyników dla poprzedniego zapytania wykorzystana zostaje metoda Switch, której działanie polega na zwróceniu wyniku tylko z ostatniego zapytania, wszystkie poprzednie zostają anulowane. Reszta zmian to tylko kosmetyka, w tym przykładnie wyniki pojawiać się będą w kontrolce na formatce.

Na moje oko RX wydaje się być interesującym rozszerzeniem, umożliwiającym uproszczenie i kodu, a już na pewno metody jak Throttle czy DistinctUntilChanged uczynią aplikacje mniej zasobożernymi. Jak z każdą nowo poznaną technologią trzeba trochę czasu, aby poznać wszystkie za i przeciw. Ja na razie jestem na etapie wow. Poprawcie mnie jeśli się mylę, ale RX jest wyczesany.

Projekt, źródła i książka dostępne pod następującym adresem:
Projekt: https://bitbucket.org/jstadnicki/rx-examples
Git: https://bitbucket.org/jstadnicki/rx-examples.git