Reaktive Anwendungen mit dem Reactor-Framework

Seite 2: Streams

Inhaltsverzeichnis

Mit asynchroner Programmierung verbindet man oft die sogenannte Callback-Hölle: Da eine asynchrone Verarbeitung nicht blockiert, wird für die Behandlung des Ergebnisses ein Callback übergeben. Mehrere asynchrone Verarbeitungsschritte in Folge führen demnach zu verschachtelten Callbacks. Gibt es – zum Beispiel für eine Fehlerbehandlung – dann noch mehrere Verarbeitungspfade, entstehen baumartig verschaltete Callback-Strukturen. Der Code wird entsprechend schwer lesbar.

Daher ermöglicht Reactor, Actions zu sequenziellen Verarbeitungsketten zu komponieren. Basis hierfür ist eine Fluent API mit verschiedenen funktionalen Interfaces für Funktionen, Prädikate und Consumer sowie zwei Arten sogenannter Composables: Ein Promise verarbeitet ein einzelnes Event, ein Stream dagegen wird für jedes übergebene Event durchlaufen. Da beide prinzipiell ähnlich arbeiten, geht der Artikel im Folgenden nur auf Stream ein.

Seine API ähnelt stark den Streams aus Java 8: Die funktionalen Interfaces von Reactor sind sogar mit ihrem Java-Gegenstück identisch (wenn auch in anderen Packages). Daher lässt sich Reactor auch mit Java 7 einsetzen. Allerdings unterscheidet sich die Verarbeitungslogik deutlich: Während Java-8-Streams auf einer vorab erstellten Collection arbeiten, wird die Verarbeitung eines Reactor-Streams immer wieder (asynchron) angestoßen, wenn ein Producer ein einzelnes Event übergibt. Auch Durchführung und Zusammenspiel der Einzelschritte einer Verarbeitungskette erfolgen in Reactor asynchron.

Im nachstehenden Beispiel werden Werte nach verschiedenen Kriterien gefiltert und verarbeitet. Eingangsdaten sind Float-Werte, die in ein fachliches Data-Objekt verpackt sind. Werte unterhalb eines Schwellwerts werden verworfen, negative Werte als ungültig aussortiert, alle übrigen Werte aufsummiert und ihre Gesamtsumme ausgegeben.

Zuerst definiert man zwei Klassen für die beschriebene Verarbeitungslogik: Für die Schwellwertprüfung ist das funktionale Interface Predicate von Reactor zuständig. Außerdem führt ein negativer Wert zu einer IllegalArgumentException.

 1: public class ThresholdPredicate implements Predicate<Data> {
2: private final static float THRESHOLD = 30f;
3:
4: @Override
5: public boolean test(Data data) throws IllegalArgumentException {
6: if (data.getValue() < 0)
7: throw new IllegalArgumentException("ungueltig: " +
data.getValue());
8: return data.getValue() > THRESHOLD;
9: }
10: }

Die Summenbildung nutzt das funktionale Interface Function und addiert den Wert auf die Zwischensumme:

1: public class AggregateFunction implements Function<Tuple2<Data,Float>,
Float> {
2: public Float apply(Tuple2<Data, Float> dataAcc) {
3: float data = dataAcc.getT1().getValue();
4: float accumulator = dataAcc.getT2();
5: LOG.info("Alte Summe: " + accumulator + ", addiere Wert: " + data);
6: return data + accumulator;
7: }
8: }

Streams werden in Reactor nicht direkt erzeugt, sondern in ein Deferred-Objekt verpackt. Über dieses werden Events an den zugrunde liegenden Stream zur Verarbeitung übergeben. Dazu dient die Utility-Klasse Streams (hier mit Angabe des Ringbuffer-Dispatchers, Zeilen 1-2). Dabei erzeugt sie automatisch eine Reactor-Instanz und einen Dispatcher. compose gibt Stream<Data> zurück, der Deferred zugrunde liegt (Zeile 4).

1: Deferred<Data,Stream<Data>> deferred = 
2: Streams.defer(env,
env.getDispatcher(Environment.RING_BUFFER));
3:
4: Stream<Data> stream = deferred.compose();
5: Stream<Float> aggregated = stream
6: .filter (new ThresholdPredicate())
7: .reduce (new AggregateFunction(), 0f)
8: .consume (v -> LOG.info("Summe: " + v))
9: .when(IllegalArgumentException.class,
LoggingHelper::logException);

Der Stream wird nun um eine Kette sogenannter Actions erweitert:

  • Zeile 6: filter hängt eine FilterAction mit dem zuvor definierten ThresholdPredicate an den Stream. Resultat ist wieder Stream<Data>.
  • Zeile 7: reduce summiert die Werte im Stream auf. Es führt einen Akkumulator ein (mit 0 initialisiert), womit der Stream nicht mehr zustandslos ist. Für jeden eingehenden Wert wird die übergebene Funktion (hier also: AggregateFunction) auf Wert und alte Summe im Akkumulator angewendet. Ergebnis ist eine neue Summe im aktualisierten Akkumulator. Da AggregateFunction ein Float zurückgibt, wird Stream<Data> in Stream<Float> umgewandelt.
  • Zeile 8: consume führt auf einen eingehenden Wert einen Consumer aus, der den Wert in das Log schreibt. Im Beispiel wird so die Gesamtsumme aller Werte ausgegeben.
  • Zeile 9: when definiert einen Callback auf einen weiteren Consumer für die Fehlerbehandlung (im Beispiel für das Auftreten einer IllegalArgumentException, die bei negativen Werten von ThresholdPredicate geworfen wird).

Abbildung 1 zeigt den so aufgebauten Stream als Verkettung von Filter, Reduce und Consume zu einer Sequenz sogenannter Actions (eine Callback-Action setzt hier Consume um). Fehler werden direkt an die When-Action für eine zentrale Fehlerbehandlung weitergeleitet – aus jedem der Verarbeitungsschritte heraus, auch wenn die Fluent-API-Schreibweise suggeriert, dass dies nur am Ende geschieht. Den Aufbau des Streams kann man sich mit dem Aufruf von debug ausgeben lassen, im Beispiel also mit aggregated.debug().

Verarbeitungskette aus Actions für Filter, Reduce und Callback (Consume) sowie Fehlerbehandlung (Abb. 1)

Die Verarbeitung durch den Stream erfolgt für jedes übergebene Event durch den Aufruf von accept auf Deferred (Zeile 11). Zeile 13 zeigt eine Besonderheit von reduce: Die Summe der Werte wird erst beim expliziten Aufruf von flush auf dem Stream an consume weitergegeben. Da das erst nach der Schleife nur einmal passiert (Zeile 13), wird consume in Zeile 8 also nur einmal für die Gesamtsumme durchlaufen.

10: for (int i=0; i < NUMBER_OF_EVENTS; i++) {
11: deferred.accept( new Data(randomGenerator.nextInt(500)/10f - 5f) );
12: }
13: deferred.flush();

Das Deferred-Objekt (Zeile 1) erstellt im Hintergrund automatisch eine Reactor-Instanz inklusive Dispatcher, die die Actions des Streams ausführt. Die Kommunikation zwischen diesen einzelnen Actions erfolgt asynchron über den Event-Bus der Reactor-Instanz. Dabei kommen die bei der Definition des Streams automatisch generierten internen Schlüssel zum Einsatz: Jede Action bekommt eigene Schlüssel (verschiedene für erfolgreiche und nicht erfolgreiche Verarbeitung) und registriert sich auf dem Event-Bus auf die Schlüssel der sie auslösenden Actions: Beispielsweise registriert sich die Reduce-Action auf die erfolgreiche Verarbeitung der Filter-Action, die When-Action auf die fehlerhafte Verarbeitung aller anderen Actions.

Zusammenspiel des Streams mit Dispatcher und Event-Bus (Fehlerverarbeitung nicht dargestellt) (Abb. 2)

In welchen Threads die einzelnen Actions ablaufen – ob im gleichen oder in verschiedenen – hängt wieder von der Dispatcher-Implementierung ab. Die Tücken im Detail machen sich bemerkbar, wenn die Dispatcher-Implementierung von Ringbuffer auf Thread-Pool umgestellt wird: Damit werden die Events durch mehr als einen Thread parallel abgearbeitet.

Für den zustandslosen Filter ist eine solche Parallelisierung kein Problem, aber die Summenbildung im Akkumulator hat einen Zustand. In der Log-Ausgabe unten sieht man, wie drei Threads versuchen, Werte auf den Akkumulator aufzuaddieren, der im Beispiel noch auf seinem initialen Stand 0.0 steht. Zwei der drei zu addierenden Werte gehen dabei verloren und werden folglich nicht in der Summe berücksichtigt. Grund dafür ist, dass der Akkumulator in der Reduce-Action nur durch ein Volatile geschützt ist. In Kombination mit einem Thread-Pool-Dispatcher funktioniert die Reduce-Action daher nicht korrekt.

[PoolExecutor-12] AggregateFunction: Alte Summe: 0.0, addiere Wert: 38.5
[PoolExecutor-13] AggregateFunction: Alte Summe: 0.0, addiere Wert: 36.2
[PoolExecutor-10] AggregateFunction: Alte Summe: 0.0, addiere Wert: 30.9