Reactive Extensions: Renaissance eines Programmiermodells

Seite 3: Asynchron & Fazit

Inhaltsverzeichnis

Die bisherigen Beispiele liefen komplett synchron ab. Beim Observable werden direkt beim Aufruf von Subscribe() die Werte 1 bis 10 an den Callback für den nächsten Wert zugestellt. Die Rx Library ermöglicht auf einfache Art das Erstellen eines asynchronen Observable. Die statische Methode Interval() erwartet als Parameter ein zeitliches Intervall, in dem neue Events generiert werden – im folgenden Beispiel im Sekundentakt. Anschließend kombiniert der Code das neue Observable mit dem vorhanden. Der Operator zip führt schließlich die beiden Streams zusammen. Er überwacht die beiden Quell-Observables und generiert auf dem zusammengeführten erst ein neues Event, wenn jede Quelle jeweils ein Ereignis meldet. Das Ereignis besteht aus den Tupple der beiden neuen Werte der Quell-Observables.

var range = Observable.Range(1, 10);
var evenRange = range.Where(value => value % 2 == 0);
var interval = Observable.Interval(TimeSpan.FromSeconds(1));
var zipStream = interval.Zip(evenRange, (i, r) => r);

var subscription = zipStream.Subscribe(Console.WriteLine);

Das kombinierte Observable ist asynchron, da die Events nun einen zeitlichen Bezug besitzen. Somit gibt das Beispielprogramm jede Sekunde einen Wert aus dem bereits gefilterten Observable aus und zeigt anschaulich, was mit "Array über Zeit" gemeint ist.

Die Registrierung auf ein Observable mit der Methode Subscribe() gibt ein Objekt mit dem IDisposable-Interface zurück. Dieses Objekt wird als Subscription bezeichnet und ist ein Grund, warum Observables für die asynchrone Programmierung sehr attraktiv sind. Nach dem Aufruf der Methode Dispose() auf eine Subscription endet die Zustellung der Events. Für asynchrone Operation öffnet das Vorgehen eine elegante Möglichkeit, die Operation abzubrechen. Mithilfe des Dispose() drückt der Aufrufer aus, dass ihn das Ergebnis der noch laufenden Operation nicht mehr interessiert. Die Abarbeitung der eigentlichen asynchronen Operation erfolgt bis zu ihrem Abschluss, aber sie stellt das Ergebnis nicht mehr über das Observable zu. Die Ereignisse OnError() und OnClompeted() fallen nach dem Dispose() ebenfalls unter den Tisch.

Sollte das Observable auf einem .NET-Event basieren, erfolgt beim Aufruf von Dispose() die korrekte Deregistrierung des zugehörigen Ereignisses.

Generell bringt das System Observables auch nach dem Eintreten von OnError() und OnCompleted() automatisch in einen aufgeräumten Zustand. Wenn eines der beiden Events eintritt, ist es das letzte Ereignis, das das zugehörige Observable seinem Beobachter meldet.

Die Observables stellen Entwicklern ein Interface zu Verfügung, das sich vor allem für zeitlich abhängige (asynchrone) Daten eignet. Im Vergleich zu den Tasks der Task Parallel Library (TPL) können Observables nicht nur exakt einen Wert zurückgeben: Ein Observable kann kein, eines oder mehrere Ergebnisse liefern und ist damit deutlich flexibler. Der im Interface enthaltene Callback für auftrende Fehler erleichtert deren Behandlung. Eine weiterer Pluspunkt ist die elegante integrierte Option, Observables abzubrechen. Zudem müssen Entwickler nicht auf das async/await-Pattern verzichten.

Ein Blick auf das GitHub-Repository oder die Projektseite lohnt sich auf jeden Fall. Interessierte finden unter Introduction to Rx ein Tutorial.

Marko Beelmann
arbeitet seit 16 Jahren in der professionellen Softwareentwicklung und derzeit bei Philips Medizin Systeme. Schwerpunkte seiner Arbeit liegt im Design und der Entwicklung von Software im .NET Umfeld. Zudem ist er als Sprecher auf Konferenzen zu diversen Themen rund um Softwareentwicklung tätig.
(rme)