zurück zum Artikel

Reaktive Anwendungen mit dem Reactor-Framework

Martin Lehmann, Kristine Schaal, Rüdiger Grammes

Asynchrone, eventgetriebene Architekturen haben in den letzten Jahren stark an Bedeutung gewonnen. Ein Treiber für diese Entwicklung ist das Web mit ständig steigenden Anforderungen an Performanz und Skalierbarkeit. Eine neue Antwort auf diese Herausforderungen liefert das Reactor-Framework.

Reaktive Anwendungen mit dem Reactor-Framework

Asynchrone, eventgetriebene Architekturen haben in den letzten Jahren stark an Bedeutung gewonnen. Ein Treiber für diese Entwicklung ist das Web mit ständig steigenden Anforderungen an Performanz und Skalierbarkeit. Eine neue Antwort auf diese Herausforderungen liefert das Reactor-Framework.

Verschiedene neue Frameworks erlauben die Implementierung eventgetriebener Architekturen für hochperformante Datenverarbeitung durch asynchrone Event-Verarbeitung sowie Non-Blocking IO. Die grundlegende Philosophie dieses Ansatzes beschreibt das im letzten Jahr veröffentlichte und mittlerweile in Version 2.0 aktualisierte Reactive Manifesto [1]. Bekannte Vertreter sind Node.js und Rubys EventMachine, auf der JVM sind vor allem Vert.x [2], Akka [3] und das Basis-Framework Netty zu nennen.

Wesentliches Grundkonzept ist das Reactor-Pattern [4], das namensgebend für das noch junge Framework Reactor [5] von SpringSource war. Es ist:

Als Open-Source-Projekt steht Reactor unter der Apache Software License 2.0. Die Version 1.0 wurde im November 2013 veröffentlicht [7]. Nachstehende Beispiele benutzen die Version 1.1.0 aus dem Mai 2014.

Den Kern von Reactor bildet ein asynchroner Event-Bus, der für das Routing und das Zustellen von Events zuständig ist. An ihm können sich Consumer registrieren und Bus-Nachrichten empfangen, die in Form von Events eintreffen. Ein erstes Hello-World-Beispiel zeigt die wesentlichen Grundelemente: Selektoren, Consumer und Events.

 1: public class HelloReactorWorld {
2: public static void main(String[] args) throws InterruptedException {
3: Reactor reactor = Reactors.reactor(new Environment());
4: CountDownLatch latch = new CountDownLatch(1);
5:
6: // Consumer wird registriert
7: reactor.on(
8: $("Greeting"),
9: (Event<String> ev) -> {
10: LoggingHelper.logInfoString(ev.getData());
11: latch.countDown();
12: });
13:
14: // Event wird generiert
15: LoggingHelper.logInfoString("Send Greeting...");
16: reactor.notify("Greeting", Event.wrap("Hello World"));
17:
18: latch.await();
19: }
20: }

In Zeile 3 wird zunächst eine neue Reactor-Instanz mit einer Standardumgebung erzeugt. Diese Instanz muss in der Anwendung allen Event-Produzenten und -Konsumenten zur Verfügung stehen. Es lassen sich zudem auch mehrere, voneinander unabhängige Reactor-Instanzen in einer Anwendung starten.

Mit reactor.on wird ein Consumer für Events bei der Reactor-Instanz registriert (Zeile 7-12, hier mit Java-8-Lambda-Ausdruck). Auf welche Events er reagiert, legt dessen Selektor fest. Reactor bringt fertige Selektoren mit, die unter anderem auf Klassentyp, reguläre Ausdrücke oder URL-Patterns vergleichen. Das Beispiel verwendet einen Objekt-Selektor (Zeile 7): $ ist dabei eine statische Methode als Kurzform für den Aufruf von Selectors.object. Dieser Selektor passt auf den Vergleich über equals für das Topic "Greeting", das somit als Bus-Adresse fungiert.

In Zeile 16 erzeugt reactor.notify ein Event an den Reactor: Im Beispiel wird als Event-Nutzdatum "Hello World" an die Adresse "Greeting" übergeben. Events können beliebige Objekte transportieren. Es empfiehlt sich, nur "immutable" Datentypen zu verwenden. Reactor ist ein asynchrones Framework. Nach Aufruf von notify beendet sich eigentlich das Programm, aber Event-Zustellung und -Verarbeitung sind zu diesem Zeitpunkt eventuell noch nicht erfolgt. Daher wird ein CountDownLatch eingesetzt.

Für Event-Zustellung und die Ausführung des Consumer ist in Reactor ein Dispatcher zuständig: Verschiedene Implementierungen stehen zur Auswahl, die für unterschiedliche Zwecke optimiert sind. Dispatcher stellen der Event-Verarbeitung einen Thread zur Verfügung. Wie viele Threads es gibt, wie diese verwaltet werden und wie genau die Verteilung auf Threads passiert, hängt von der gewählten Implementierung ab.

Standardmäßig nutzt Reactor einen Ringbuffer-Dispatcher. Damit werden Events in einem Ringbuffer gespeichert und mit nur einem einzelnen Thread abgearbeitet, und zwar anhand der Konzepte und Implementierungen [8] des LMAX Disruptor [9] (für dessen Performance siehe hier [10]). Der Ringbuffer-Dispatcher ermöglicht einen hohen Durchsatz für Consumer, die nicht blockieren und nur kurz laufen. Das Ergebnis der Log-Ausgabe mit diesem Dispatcher ist folglich:

2014-07-31T09:58:20.090Z: [main] Send Greeting...
2014-07-31T09:58:20.216Z: [ringBuffer-9] Hello World

Da Consumer ihren Dispatcher-Thread bis zur vollständigen Abarbeitung nichtunterbrechbar belegen, nimmt man für blockierende und lang laufende Consumer besser eine Dispatcher-Variante, die Events mit mehr als einem Thread verarbeitet. Eine solche Variante ist der Thread-Pool-Dispatcher. Er lässt sich beim Erzeugen der Reactor-Umgebung mitgeben:

...
3: Reactor reactor = Reactors.reactor(new Environment(),
Environment.THREAD_POOL);
...
14: // Events werden generiert
15: LoggingHelper.logInfoString("Send Greeting...");
16: reactor.notify("Greeting", Event.wrap("Hello World"));
17: reactor.notify("Greeting", Event.wrap("Goodbye World"));
...

Schickt man nun zwei Events (Zeilen 16-17), werden diese durch zwei verschiedene Threads konsumiert. Die Reihenfolge ihrer Verarbeitung ist nicht festgelegt:

2014-08-01T07:14:44.983Z: [main] Send Greeting...
2014-08-01T07:14:45.124Z: [threadPoolExecutor-10] Hello World
2014-08-01T07:14:45.124Z: [threadPoolExecutor-11] Goodbye World

Mit asynchroner Programmierung verbindet man oft die sogenannte Callback-Hölle: Da eine asynchrone Verarbeitung nicht blockiert, wird für die Behandlung des Ergebnisses ein Callback übergeben. Mehrere asynchrone Verarbeitungsschritte in Folge führen demnach zu verschachtelten Callbacks. Gibt es – zum Beispiel für eine Fehlerbehandlung – dann noch mehrere Verarbeitungspfade, entstehen baumartig verschaltete Callback-Strukturen. Der Code wird entsprechend schwer lesbar.

Daher ermöglicht Reactor, Actions zu sequenziellen Verarbeitungsketten zu komponieren. Basis hierfür ist eine Fluent API mit verschiedenen funktionalen Interfaces für Funktionen, Prädikate und Consumer sowie zwei Arten sogenannter Composables: Ein Promise [11] verarbeitet ein einzelnes Event, ein Stream dagegen wird für jedes übergebene Event durchlaufen. Da beide prinzipiell ähnlich arbeiten, geht der Artikel im Folgenden nur auf Stream ein.

Seine API ähnelt stark den Streams aus Java 8 [12]: Die funktionalen Interfaces von Reactor sind sogar mit ihrem Java-Gegenstück identisch (wenn auch in anderen Packages). Daher lässt sich Reactor auch mit Java 7 einsetzen. Allerdings unterscheidet sich die Verarbeitungslogik deutlich: Während Java-8-Streams auf einer vorab erstellten Collection arbeiten, wird die Verarbeitung eines Reactor-Streams immer wieder (asynchron) angestoßen, wenn ein Producer ein einzelnes Event übergibt. Auch Durchführung und Zusammenspiel der Einzelschritte einer Verarbeitungskette erfolgen in Reactor asynchron.

Im nachstehenden Beispiel werden Werte nach verschiedenen Kriterien gefiltert und verarbeitet. Eingangsdaten sind Float-Werte, die in ein fachliches Data-Objekt verpackt sind. Werte unterhalb eines Schwellwerts werden verworfen, negative Werte als ungültig aussortiert, alle übrigen Werte aufsummiert und ihre Gesamtsumme ausgegeben.

Zuerst definiert man zwei Klassen für die beschriebene Verarbeitungslogik: Für die Schwellwertprüfung ist das funktionale Interface Predicate von Reactor zuständig. Außerdem führt ein negativer Wert zu einer IllegalArgumentException.

 1: public class ThresholdPredicate implements Predicate<Data> {
2: private final static float THRESHOLD = 30f;
3:
4: @Override
5: public boolean test(Data data) throws IllegalArgumentException {
6: if (data.getValue() < 0)
7: throw new IllegalArgumentException("ungueltig: " +
data.getValue());
8: return data.getValue() > THRESHOLD;
9: }
10: }

Die Summenbildung nutzt das funktionale Interface Function und addiert den Wert auf die Zwischensumme:

1: public class AggregateFunction implements Function<Tuple2<Data,Float>,
Float> {
2: public Float apply(Tuple2<Data, Float> dataAcc) {
3: float data = dataAcc.getT1().getValue();
4: float accumulator = dataAcc.getT2();
5: LOG.info("Alte Summe: " + accumulator + ", addiere Wert: " + data);
6: return data + accumulator;
7: }
8: }

Streams werden in Reactor nicht direkt erzeugt, sondern in ein Deferred-Objekt verpackt. Über dieses werden Events an den zugrunde liegenden Stream zur Verarbeitung übergeben. Dazu dient die Utility-Klasse Streams (hier mit Angabe des Ringbuffer-Dispatchers, Zeilen 1-2). Dabei erzeugt sie automatisch eine Reactor-Instanz und einen Dispatcher. compose gibt Stream<Data> zurück, der Deferred zugrunde liegt (Zeile 4).

1: Deferred<Data,Stream<Data>> deferred = 
2: Streams.defer(env,
env.getDispatcher(Environment.RING_BUFFER));
3:
4: Stream<Data> stream = deferred.compose();
5: Stream<Float> aggregated = stream
6: .filter (new ThresholdPredicate())
7: .reduce (new AggregateFunction(), 0f)
8: .consume (v -> LOG.info("Summe: " + v))
9: .when(IllegalArgumentException.class,
LoggingHelper::logException);

Der Stream wird nun um eine Kette sogenannter Actions erweitert:

Abbildung 1 zeigt den so aufgebauten Stream als Verkettung von Filter, Reduce und Consume zu einer Sequenz sogenannter Actions (eine Callback-Action setzt hier Consume um). Fehler werden direkt an die When-Action für eine zentrale Fehlerbehandlung weitergeleitet – aus jedem der Verarbeitungsschritte heraus, auch wenn die Fluent-API-Schreibweise suggeriert, dass dies nur am Ende geschieht. Den Aufbau des Streams kann man sich mit dem Aufruf von debug ausgeben lassen, im Beispiel also mit aggregated.debug().

Verarbeitungskette aus Actions für Filter, Reduce und Callback (Consume) sowie Fehlerbehandlung (Abb. 1)

Verarbeitungskette aus Actions für Filter, Reduce und Callback (Consume) sowie Fehlerbehandlung (Abb. 1)

Die Verarbeitung durch den Stream erfolgt für jedes übergebene Event durch den Aufruf von accept auf Deferred (Zeile 11). Zeile 13 zeigt eine Besonderheit von reduce: Die Summe der Werte wird erst beim expliziten Aufruf von flush auf dem Stream an consume weitergegeben. Da das erst nach der Schleife nur einmal passiert (Zeile 13), wird consume in Zeile 8 also nur einmal für die Gesamtsumme durchlaufen.

10: for (int i=0; i < NUMBER_OF_EVENTS; i++) {
11: deferred.accept( new Data(randomGenerator.nextInt(500)/10f - 5f) );
12: }
13: deferred.flush();

Das Deferred-Objekt (Zeile 1) erstellt im Hintergrund automatisch eine Reactor-Instanz inklusive Dispatcher, die die Actions des Streams ausführt. Die Kommunikation zwischen diesen einzelnen Actions erfolgt asynchron über den Event-Bus der Reactor-Instanz. Dabei kommen die bei der Definition des Streams automatisch generierten internen Schlüssel zum Einsatz: Jede Action bekommt eigene Schlüssel (verschiedene für erfolgreiche und nicht erfolgreiche Verarbeitung) und registriert sich auf dem Event-Bus auf die Schlüssel der sie auslösenden Actions: Beispielsweise registriert sich die Reduce-Action auf die erfolgreiche Verarbeitung der Filter-Action, die When-Action auf die fehlerhafte Verarbeitung aller anderen Actions.

Zusammenspiel des Streams mit Dispatcher und Event-Bus (Fehlerverarbeitung nicht dargestellt) (Abb. 2)

Zusammenspiel des Streams mit Dispatcher und Event-Bus (Fehlerverarbeitung nicht dargestellt) (Abb. 2)

In welchen Threads die einzelnen Actions ablaufen – ob im gleichen oder in verschiedenen – hängt wieder von der Dispatcher-Implementierung ab. Die Tücken im Detail machen sich bemerkbar, wenn die Dispatcher-Implementierung von Ringbuffer auf Thread-Pool umgestellt wird: Damit werden die Events durch mehr als einen Thread parallel abgearbeitet.

Für den zustandslosen Filter ist eine solche Parallelisierung kein Problem, aber die Summenbildung im Akkumulator hat einen Zustand. In der Log-Ausgabe unten sieht man, wie drei Threads versuchen, Werte auf den Akkumulator aufzuaddieren, der im Beispiel noch auf seinem initialen Stand 0.0 steht. Zwei der drei zu addierenden Werte gehen dabei verloren und werden folglich nicht in der Summe berücksichtigt. Grund dafür ist, dass der Akkumulator in der Reduce-Action nur durch ein Volatile geschützt ist. In Kombination mit einem Thread-Pool-Dispatcher funktioniert die Reduce-Action daher nicht korrekt.

[PoolExecutor-12] AggregateFunction: Alte Summe: 0.0, addiere Wert: 38.5
[PoolExecutor-13] AggregateFunction: Alte Summe: 0.0, addiere Wert: 36.2
[PoolExecutor-10] AggregateFunction: Alte Summe: 0.0, addiere Wert: 30.9

Die Verarbeitung von Events in Reactor erinnert auf den ersten Blick an das asynchrone Framework Vert.x [13], dessen Basis ebenfalls ein asynchroner Event-Bus ist. Der wesentliche Unterschied ist, dass Reactor ein Basis-Framework ist. Es kommt also als Bibliothek innerhalb einer Anwendung zum Einsatz, bringt jedoch keine eigene Laufzeitumgebung mit. Vert.x dagegen ist ein Anwendungsframework mit eigener Laufzeitumgebung sowie Techniken für Clustering und Hochverfügbarkeit. Und auch bei den Event-Bus-Konzepten von Reactor und Vert.x finden sich diverse Unterschiede:

Reactor Vert.x
Die Adressierung ist durch Reactors Selektoren-Konzept sehr mächtig. Vert.x bietet nur eine simple Bus-Adressierung mit String-Adressen (ohne weitere Strukturvorgaben).
Nachrichten in Reactor können beliebige Daten enthalten. Nachrichten sind primitive Datentypen oder JSON-Objekte.
In Reactor hat man mit den verschiedenen Dispatcher-Implementierungen die Möglichkeit, ein passendes Laufzeitmodell selbst zu wählen. (Allerdings birgt dies auch Gefahren, wie obige Ausführungen zu reduce zeigen.) Vert.x bietet mit sogenannten Verticles auf Event-Loops bzw. Worker-Verticles im Thread-Pool eine klar strukturierte Laufzeitumgebung. In Verticles kann man programmieren, als ob man die Anwendung "single-threaded"implementieren würde.
Nachrichten werden über eine Reactor-Instanz nur lokal in einer Anwendung (also in einer JVM) zugestellt. Eine Verteilung an andere Prozesse ist derzeit nicht möglich (aber für zukünftige Releases geplant). Vert.x bietet einen Event-Bus für Nachrichten in einem Cluster, also auch über Prozess- und Maschinengrenzen hinweg.

Reactor ist ein einfach zu bedienendes Framework, das Basisfunktionen für reaktive Anwendungen in Java und als Groovy-DSL zur Verfügung stellt. Reactor ist noch recht neu, was sich auch darin bemerkbar macht, dass Dokumentation und Beispiele derzeit noch dürftig sind, sodass man sich vieles durch Ausprobieren und Framework-Debugging erarbeiten muss. Das sollte sich hoffentlich durch weitere Investitionen von SpringSource bald ändern: In Spring XD und Spring Batch wird Reactor eingesetzt, für das Spring Framework/WebSockets ist es in Diskussion. Auch wurde Reactor in die neue Pivotal-App-Suite [14] integriert.

Martin Lehmann
ist Diplom-Informatiker und als Cheftechnologe und Softwarearchitekt bei der Accso – Accelerated Solutions GmbH tätig. Seit Ende der 90er-Jahre arbeitet er in der Softwareentwicklung in diversen Projekten der Individualentwicklung für Kunden verschiedener Branchen.

Dr. Kristine Schaal
ist als Softwarearchitektin beim gleichen Unternehmen tätig. Sie arbeitet seit fast 20 Jahren in der Softwareentwicklung und ist in Projekten der Individualentwicklung für Kunden verschiedener Branchen unterwegs.

Dr. Rüdiger Grammes
ist seit November 2011 als Managing Consultant ebenfalls beim gleichen Unternehmen und dort als Softwarearchitekt in verschiedenen Projekten unterwegs.

(ane [19])


URL dieses Artikels:
https://www.heise.de/-2405139

Links in diesem Artikel:
[1] http://www.reactivemanifesto.org/
[2] https://www.heise.de/hintergrund/Vert-x-2-das-reaktive-modulare-Framework-2248970.html
[3] https://www.heise.de/hintergrund/Einfuehrung-in-die-Aktor-Programmierung-mit-Akka-1921580.html
[4] http://www.dre.vanderbilt.edu/~schmidt/POSA/POSA2/event-patterns.html
[5] https://github.com/reactor/
[6] https://www.youtube.com/watch?v=8jPJh1VFNpk
[7] https://www.heise.de/news/Reaktive-Anwendungen-mit-Reactor-1-0-2046605.html
[8] https://lmax-exchange.github.io/disruptor/
[9] https://www.heise.de/hintergrund/Parallele-Verarbeitung-mit-dem-Disruptor-2215433.html
[10] https://github.com/codepitbull/lockperformance
[11] http://promises-aplus.github.io/promises-spec
[12] https://www.heise.de/hintergrund/Streams-und-Collections-in-Java-8-2151376.html
[13] https://www.heise.de/hintergrund/Vert-x-2-das-reaktive-modulare-Framework-2248970.html
[14] https://www.heise.de/news/Pivotal-stellt-App-Suite-vor-2287988.html
[15] https://lmax-exchange.github.io/disruptor/
[16] https://www.heise.de/hintergrund/Streams-und-Collections-in-Java-8-2151376.html
[17] https://github.com/codepitbull/lockperformance
[18] http://www.dre.vanderbilt.edu/~schmidt/POSA/POSA2/event-patterns.html
[19] mailto:ane@heise.de