Reactive Extensions: Renaissance eines Programmiermodells

Seite 2: Unter der Haube

Inhaltsverzeichnis

Ein anderer Name für Rx ist "LINQ für Events", da die Extensions LINQ-Abfragen auf Ereignisse ermöglichen. Das bezieht sich nicht ausschließlich auf die typischen .NET-Events. Microsoft hatte LINQ, das für Language Integrated Query steht, mit dem .NET-Framework 3.5 eingeführt. Es ermöglicht, die Anwendung SQL-ähnlicher Abfragen auf XML, Collections oder Arrays. Als Beispiel dient eine Liste mit den Werten von 1 bis 10, aus denen ein Programm mit dem Where-Operator eine Liste der enthaltenen geraden Werte erzeugt:

var range = Enumerable.Range(1,10);
var evenRange = range.Where(value => value % 2 == 0);
foreach (var value in evenRange)
Console.WriteLine(value);

Im .NET-Framework lassen sich die Operatoren nur auf das Interface IEnumerable<T> anwenden. T steht dabei für einen generischen Datentypen.

public interface IEnumerable<out T> : IEnumerable
{
IEnumerator<T> GetEnumerator();
}

Das Interface besteht lediglich aus einer Methode, die einen Iterator des Typs IEnumerator<T> zurückgibt. Damit ist es möglich, Element für Element aus der zugrunde liegenden Liste oder Collection abzufragen.

public interface IEnumerator<out T>
{
T Current { get; }
bool MoveNext();
void Reset();
}

Beim klassischen Iterator-Modell fragt eine Instanz die Werte aktiv und möglicherweise blockierend an. Die Variante heißt auch Pull-Modell (Abb. 1).

Das Interface besteht aus einer Property und zwei Methoden, wobei Reset() hier unter den Tisch fallen darf. Die Property Current enthält immer das Element, auf das der Iterator gerade zeigt. Die Methode MoveNext() fordert das nächste Element an. Sollte keines mehr vorhanden sein, liefert die Methode den Wert false zurück. Abbildung 1 stellt das Iterator-Modell schematisch dar.

Für Listen und Collections, deren Elemente bereits beim Beginn der Iteration vorhanden sind, ist das Interface gut geeignet. Wenn der Aufruf von MoveNext() dagegen beispielsweise einen Request zu einem Webserver führt, um das nächste Element bereitzustellen, kann die Methode und damit der zugehörige Thread potenziell blockieren. Auch in Zeiten von Multicore sollen Entwickler blockierende Threads unter allen Umständen vermeiden, denn der Preis für das Bereitstellen neuer Threads und den daraus folgenden Thread-Kontext-Wechsel ist nicht zu unterschätzen.

An dieser Stellen kommen die Observables ins Spiel. Sie können aufgrund ihres Interfaces deutlich besser mit solchen asynchronen Quellen umgehen. Mit Rx stehen dem Interface alle gängigen LINQ-Operatoren zu Verfügung. Das Observable besitzt nur eine einzige Interface-Methode:

public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}

Im Gegensatz zum IEnumerable<T> benötigt die Interface-Methode Subscribe() ein Objekt vom Typ IObserver<T> als Übergabeparameter. Das Interface des IObserver<T> ist ebenfalls übersichtlich gehalten und besteht aus drei Methoden:

public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}

Die Methoden sind aus Sicht des Observer Callbacks, die das Observable aufruft. Letzteres ist ein Objekt, das ein Observer überwachen kann. Dieser kann Informationen über drei Ereignisse erhalten: Über OnNext() bekommt der Observer neue Werte oder Events des Observable zur Verfügung gestellt. Mit der Methode OnCompleted() teilt das Observable mit, dass es keine Werte beziehungsweise Events mehr liefern wird. Sollte innerhalb des Observables ein Fehler auftreten, erfolgt ein Aufruf der OnError()-Callback-Methode des Observer. Die Aufrufe für OnCompleted() und OnError() sind immer final: Nach einem dieser Callbacks sollte und wird das Observable keinen anderen mehr aufrufen.

Das Observable ruft die Callback-Methoden des Observer, sobald ein entsprechendes Ereignis eintritt, und verhindert somit das aktive Warten. Der Ansatz heißt Push-Modell (Abb. 2).

Die Verwendung von Callbacks verhindert das Blockieren des aktuellen Threads, weil das Observable sie nur aufruft, wenn ein entsprechendes Ereignis eintritt. Das ist einer der Gründe, warum Entwickler das Observable gerne für asynchrone Aufgaben heranziehen. Das folgende kleine Beispiel filtert analog zum IEnumerable<T> einen Bereich von Zahlen:

var range = Observable.Range(1,10);
var evenRange = range.Where(value => value % 2 == 0);
var subscription = evenRange.Subscribe(Console.WriteLine);

Anstatt mit einer foreach-Schleife zu iterieren und den Wert auf der Konsole auszugeben, übergibt der Code den Callback für einen neuen Wert der Subscribe()-Methode. Das ist eine vereinfachte Form, die auf die Übergabe eines Observer-Interfaces verzichtet. Die Erweiterungsmethoden von Rx sorgen für die Kompatibilität zum Interface. Dass Entwickler nicht zwangsmäßig alle drei Callbacks angeben müssen, um ein Observable zu nutzen, vereinfacht den Einsatz. Der obige Code übergibt als Callback Console.WriteLine, das für die Ausgabe des Werts auf der Konsole sorgt.