zurück zum Artikel

CompletableFuture: Fibers in Java 8

Markus Karg
CompletableFuture: Fibers in Java 8

Fibers, also leichtgewichtige Threads, erhöhen die Nebenläufigkeit und reduzieren die Anfälligkeit für Deadlocks und Race Conditions. Java 8 bietet mit der neuen Klasse CompletableFuture einen einfachen Weg, Fibers in eigenen Anwendungen zu nutzen.

Jahrzehntelang haben sich Anwendungsentwickler wenig Gedanken um Performance machen müssen, da mit jeder Generation noch schnellere Prozessoren zur Verfügung standen. Der Trend hat sich in den letzten Jahren leider nicht weiter fortgesetzt. Statt höherer Taktraten bieten moderne CPUs eine Vielzahl von Rechenkernen. Um diese zu nutzen, sind Änderungen an den entwickelten Programmen zwingend notwendig. Hierzu bedienten sich Entwickler üblicherweise entweder Prozessen oder Threads. Beide sind jedoch nur bedingt geeignet, eine zeitgemäße Anwendung massiv zu beschleunigen.

Der Grund hierfür liegt in der Natur der Sache: Prozesse, wie auch Threads, sind Betriebssystemartefakte. Im Vergleich zum inzwischen nahezu "unbegrenzt" verfügbaren (virtuellen) Arbeitsspeicher ist die Menge der effektiv erzeugbaren Prozesse und Threads endlicher Natur. Ebenso setzt das durch das Betriebssystem kontrollierte Multitasking Grenzen. Zur Herstellung einer gewissen Fairness zwischen den Threads wendet das Betriebssystem typischerweise ein Zeitscheibenverfahren an. Das hierzu notwendige Anhalten und Fortführen der Threads durch das Betriebssystem benötigt eine ziemlich lange Zeit, zumindest im Vergleich mit einem einfachen Funktionsaufruf innerhalb der Anwendung.

Diese begründet sich unter anderem im Speichern des aktuellen Prozessorzustands im RAM beziehungsweise dem entsprechend umgekehrten Weg wie auch dem Verlust des jeweiligen Cache-Inhalts. Je nach Art des Wechsels ist der Effekt unterschiedlich ausgeprägt: stark bei Prozesswechseln, da Prozesse sich typischerweise keinen gemeinsamen Speicher teilen oder schwächer bei Thread-Wechseln, da sich alle Threads eines Prozesses den zugeteilten Heap-Speicher teilen und lediglich über getrennte Stack-Speicher verfügen. Dieser Overhead wird in vielen Anwendungen zunehmend zum Ballast, der stärker zutage tritt, je mehr Prozesse beziehungsweise Threads eine Anwendung nutzen möchte.

Ebenso ist es ein bekannter Nachteil der von Prozessen wie auch Threads zur Synchronisierung eingesetzten, blockierenden Mechanismen (Monitore, Semaphoren etc.), dass sie nicht nur einen vorzeitigen Kontextwechsel des jeweiligen Rechenkerns erzwingen (und somit umso mehr Overhead erzeugen), sondern auch komplex in der Umsetzung sind: Deadlocks und Race Conditions sind allseits bekannte Probleme nebenläufiger Programmierung.

Um gegenüber Prozessen und Threads verbesserte, also effizientere und einfachere Nebenläufigkeit zu erhalten, sind Fibers ein probates Mittel: Weil sie keinen Kontextwechsel des Rechenkerns
benötigen und somit vom Betriebssystem nicht wahrgenommen werden, gelten sie als erheblich "leichtgewichtiger". Da sie im optimalen Fall nur einen Thread pro CPU-Kern nutzen, reduzieren sie statistisch gesehen zudem die Anfälligkeit für Deadlocks und Race Conditions – weniger Threads, weniger Locks.

Deadlocks können bei Fibers nur noch auftreten, wenn zwei Fibers auf das gleiche Objekt zugreifen und dabei von getrennten Threads ausgeführt werden würden. Fibers, die hingegen von ein und demselben Thread ausgeführt werden, können sich nicht gegenseitig blockieren und sind somit immun, da sich ein Thread üblicherweise nicht selbst blockieren kann. Race Conditions werden zumindest reduziert, da sich Fibers, die der gleiche Thread ausführt, nicht gegenseitig überholen können. Das liegt daran, dass sie eher mit gleicher Geschwindigkeit ausgeführt werden, als das bei getrennten Threads der Fall ist. Das soll keinesfalls dazu anregen, Code zu schreiben, der potenziell solche Probleme verursachen kann; vielmehr sollte man in ernsthaften Anwendungen diese Tatsache als statistisch willkommenen Seiteneffekt betrachten.

Der Begriff "Fibers" ist in der Informatik relativ lange bekannt und wird gerne mit "leichtgewichtigen Threads" oder "User-Threads" gleichgesetzt. Er war jedoch eine gewisse Zeit in Vergessenheit geraten – bis sich eben Multicore-CPUs verbreiteten. Stellt man sich Prozesse als ein Seil vor, besteht es aus verschränktem Zwirn, den Threads. Jeder Zwirn wiederum besteht aus verschränkten Fäden, den Fibers. Diese wiederum bestehen aus Fasern, kurzen Code-Sequenzen, die nicht weiter unterteilt sind.

Während sich das Betriebssystem um Lebenszeit und Ausführungsfairness von Prozessen und Threads kümmert, übernimmt diese Aufgaben die Anwendung bei Fibers hingegen selbst. Sie hat die absolute und alleinige Kontrolle darüber, welche Fiber beziehungsweise welche Code-Sequenz aktuell ausgeführt wird und wann ein Wechsel auf eine andere Fiber oder Code-Sequenz
stattfindet. Technisch vereinfacht ausgedrückt liegt der Anwendung eine Liste mit Code-Sequenzen vor, welche die Anwendung Schritt für Schritt durch eine limitierte Anzahl an Threads abarbeiten lässt. Wie die Anwendung die Aufgabe umsetzt, bleibt ihr überlassen.

Unter anderem kann Windows mit Fibers nativ umgehen, was im Folgenden jedoch nicht weiter beachtet sei. Die diesem Artikel zugrunde liegenden, per CompletableFuture erzeugten Fibers sind hingegen weder dem Betriebssystem noch der Java Virtual Machine bekannt: Es sind somit "echte" Fibers, deren Existenz lediglich die Anwendung selbst wahrnimmt und kontrolliert. Theoretisch geht es auch ohne CompletableFuture, mit viel Aufwand sowie schwer lesbarem Code, also sogar prinzipiell bereits in Java 1. Das Konstrukt CompletableFuture vereinfacht aber das Leben dermaßen, dass sich Fibers erst mit Java 8 wieder zu einem diskutierten Thema auf der JVM entwickelt haben.

Den Kern der Fiber-Unterstützung in Java 8 bilden die Klasse CompletableFuture sowie die von selbiger implementierte Schnittstelle CompletionStage. Betrachtet man CompletableFuture als Builder-Klasse mit einer Fluent API, handelt es sich bei CompletionStage um kurze, am Stück ablaufende Code-Sequenzen, deren Verkettung wiederum eine Fiber darstellt. CompletableFuture dient also genau genommen dazu, CompletionStages zu erzeugen und gleichzeitig zu verketten. Prinzipiell spricht nichts dagegen, andere Wege zu finden, doch das JDK 8 bringt neben CompletionStage keine solchen mit, weshalb diese Alternativen im Folgenden nicht weiter beachtet werden.

Doch wozu dienen CompletionStages überhaupt, und wieso kann CompletableFuture nicht einfach Code "am Stück" in eine Fiber wandeln? Die Antwort ist ebenso trivial wie offensichtlich. Im Gegensatz zu Threads unterliegen Fibers keinem präemptiven Multitasking. Somit muss die Anwendung einen Weg finden, zwischen mehreren parallel laufenden Fibern zu wechseln, das heißt eine Fiber beiseitezulegen und mit einer anderen zu arbeiten.

Da die Java Virtual Machine ebenso wie die Sprache Java selbst keinerlei native Unterstützung für
Fibers bietet, sondern diese ausschließlich in "reinem Java" formuliert sind, ist ein Weg zu finden, das mit den herkömmlichen Sprachkonstrukten zu lösen. Der einzige Weg ist somit, Code-Sequenzen (in Java also Methoden-Handles oder Lambda-Ausdrücke) unterbrechungsfrei zu durchlaufen und an deren Ende (Completion) einen Callback auszuführen. Der Callback erlaubt es der Anwendung, eine Entscheidung zu treffen, in der gleichen Fiber weiterzuarbeiten (d. h. die nächste Code-Sequenz) oder zu einer anderen zu wechseln.

Diese Code-Sequenzen inklusive des Aufrufs der Entscheidungslogik verbergen sich hinter den CompletionStages. Implementiert wird diese Logik faktisch durch Interna der CompletableFuture, das die CompletionStage erzeugt und verkettet hat. Trickreich und gleichsam effektiv: Ein Thread-Wechsel ist somit nur wenige Java-Codezeilen lang und entsprechend schnell.

Um Fibers in Java effizient zu formulieren, ist es unabdingbar, die auf den ersten Blick ungewohnt umfangreiche und komplexe Schnittstelle CompletableStage umfassend zu verstehen. Glücklicherweise haben die Autoren der Fiber-Unterstützung die zahlreichen Methoden jedoch weitgehend nach einem relativ einfachen Muster aufgebaut, das sich zumindest vom Grundsatz her anhand unten stehender Matrizen entschlüsseln lässt:

Auf einen Vorgänger warten Auf zwei Vorgänger warten
In gleichem Thread weiter thenInOut thenInOutBoth
In anderem Thread weiter thenInOutAsync thenInOutBothAsync

Beginnt eine Methode mit then, folgt sie obigem Muster. Der Namensteil InOut gibt an, ob Werte in eine Stage hineingehen und herauskommen. Der einfachste Fall ist eine Stage, die Werte weder nimmt noch liefert. Hier heißt der Namensteil InOut schlicht Run (z. B. thenRun) und benötigt als Parameter folglich ein Runnable. Nimmt eine Stage Werte entgegen, akzeptiert die Builder-Methode folglich einen Consumer<T> und heißt Accept (z. B. thenAccept). Liefert die Stage zudem einen Rückgabewert, wird sie mit einer Function<T, R> beschickt und folglich Apply (z. B. thenApply) genannt et cetera.

Keine Ausgabewerte Ein Ausgabewert
Keine Eingabewerte thenRun()
Ein Eingabewert thenAccept(Consumer) thenApply(Function)

Folgt die Endung Async, wird die Stage in einem anderen Thread ausgeführt als die vorherige, wobei sich fakultativ ein Executor angeben lässt, um die Abarbeitung explizit zu beeinflussen; hierzu später mehr. Ansonsten wird der ForkJoinPool.commonPool() des Fork-Join-Frameworks verwendet. Fehlt das Async-Suffix, erfolgt die Verarbeitung der Stage in jenem Thread, der die vorhergehende Stage effektiv beendet hat. Vereinfacht gesagt also der gleiche Thread, der auch bislang diese Fiber abgearbeitet hat.

Komplexer wird es mit der Endung Both. Sie bewirkt ein Warten auf zwei Vorgänger-Stages und benötigt daher eine weitere Stage als Parameter. Da verschiedene Threads beide Stages abarbeiten können, erschließt sich der Sinn der Endung Both.

Die JDK-Dokumentation [1] nennt noch viele weitere Endungen, die ebenfalls solchen Benennungsmustern folgern, aber hier nicht berücksichtigt werden. Exemplarisch sei an der Stelle lediglich auf den Namensbestandteil Compose verwiesen, der darauf hindeutet, dass das Ergebnis der Stage kein Wert ist, sondern wiederum eine Stage, die einen Wert liefern kann. Das erweitert die Syntax um die Möglichkeit, dynamische Entscheidungen zu treffen, das heißt erst zur tatsächlichen Ausführungszeit einer Stage zu bestimmen, was die Folge-Stage sein wird. Das Programm ändert sich dadurch praktisch zur Laufzeit selbst, was zum Beispiel Konzepte wie Rekursion ermöglicht, da die durch Compose erzeugte Stage ja grundsätzlich wieder eine Stage (beispielsweise sich selbst) als Wert liefern kann. Compose ist somit neben Rekursion ein Schlüssel für unter anderem nebenläufige Iteration als Fibers. Doch um es nicht zu verkomplizieren, sei an dieser Stelle zunächst mit einem wesentlich einfacheren Beispiel die Nutzung der API demonstriert.

Fibers zu bilden ist mit CompletableFuture grundsätzlich einfach, jedoch lassen sich durchaus komplexe Ablaufstränge damit modellieren. Hierzu bedient man sich der Möglichkeit, explizit parallele Ablaufstränge zu beginnen und wieder zusammenzuführen.

Das Aufsplitten in zwei (oder mehr) Ablaufstränge erfolgt, indem man nicht die Fluent-Schreibweise verwendet, sondern zunächst aus einer zwei (oder mehr) Stages erzeugt:

Stage a = CompletableFuture.supplyAsync(() -> ...);
Stage b1 = a.apply(x -> ...);
Stage b2 = a.apply(x -> ...);
Branchen erlaubt parallele Ausführung von Fibers (Abb. 1)

Branchen erlaubt parallele Ausführung von Fibers (Abb. 1)

Die Stages b1 und b2 können somit prinzipiell parallel ablaufen. Ob sie das tun, hängt vom eingesetzten Executor ab, doch dazu später mehr. Bei der Zusammenführung der Stages b1 und b2 kann entschieden werden, ob auf die Vollendung nur einer oder tatsächlich beider Stages gewartet wird und ob deren Ergebnisse weiterverarbeitet werden sollen. Auch hierzu finden sich in der API reichlich Methoden für alle möglichen Kombinationen.

Die Steuerung, welcher Thread beziehungsweise wie viele letztlich mit der Abarbeitung einer Fiber beschäftigt werden, hängt maßgeblich davon ab, ob explizit ein Exekutor, also eine Ausführungseinheit, angegeben wird. Grundsätzlich benutzt CompletableFuture den gemeinsamen Exekutor des Fork-Join-Frameworks, also ForkJoinPool.commonPool(). In vielen Fällen wird man jedoch davon abweichen und explizit bei einem oder mehreren Stages andere Exekutor-Arten einsetzen wollen. Die Hauptgründe hierfür sind zum einen die Nutzung blockierender APIs, das heißt I/O-Aufrufe mit theoretisch unendlicher Wartezeit, zum anderen gezielte Lastkontrolle. Beispielsweise geht es darum zu verhindern, dass eine bestimmte Fiber die gesamte Rechenzeit aller Cores verbraucht, wodurch andere Prozesse wie das Rendern auf dem Bildschirm ruckeln oder hängen.

Exekutoren entkoppeln I/O und CPU (Abb. 2)

Exekutoren entkoppeln I/O und CPU (Abb. 2)

Die Provision eines expliziten Exekutors ist einfach: Er wird als zusätzlicher Parameter beim Erstellen der Stage (d. h. an thenApply, thenRun usw.) mitgegeben. Üblicherweise kommen hierbei bestimmte Muster zur Anwendung, um die Komplexität einzudämmen. So macht es beispielsweise Sinn, jeder Festplatte einen eigenen Exekutor zuzuweisen, um die Last von CPUs und HDD beziehungsweise HDD1 und HDD2 et cetera zu entkoppeln und es damit zu erlauben, weiter zu rechnen oder mit HDD2 zu sprechen, während alle neuen Aufgaben für HDD1 in eine gerätebezogene Warteschlange laufen.

Da je nach Bauart eine HDD durchaus mehr als eine Aufgabe gleichzeitig verarbeiten kann, ist es durchaus sinnig, hierbei mehr als einen Thread zu verwenden, beispielsweise also einen statischen Thread-Pool zu nutzen, wie ihn Executors.newFixedThreadPool(int) implementiert, wobei als Parameter die Anzahl der maximal gleichzeitig möglichen Operationen pro Gerät anzugeben ist. Für die CPU sollte ein einzelner, statisch bemessener Pool mit n Threads (also so viele Threads wie Cores) zum Einsatz kommen, unter der Prämisse, dass keine Stage darauf ausgeführt wird, die blockierende Befehle verwendet. Hierdurch werden die Kerne nahezu dauerhaft mit sinnvoller Arbeit belegt, ohne zusätzlichen Management-Overhead hervorzurufen, wie er durch "sinnlos mehr" Threads hervorgerufen werden würde. Exakt solch ein Exekutor lässt sich über Executors.newWorkStealingPool() erzeugen.

Werden theoretisch-parallele Ablaufstränge wieder zusammengeführt ("gemergt"), jedoch bei Konstruktion der zusammenführenden Stage kein Exekutor explizit angegeben, führt laut JavaDocs immer jener Thread (somit Exekutor) die folgende Stage aus, der die "letzte" der beiden vorhergehenden Stages beendet hat. Daraus ergibt sich ein gewisses Zufallsverhalten, sofern mehr als ein Exekutor an der Fiber beteiligt ist. Im Sinne einer möglichst weitgehenden Kontrolle über
die Fiber-Ausführung ist es daher ratsam, nach der Regel "einmal Exekutor, immer Exekutor" vorzugehen und bei der zusammenführenden Stage stets den zu benutzenden Exekutor anzugeben.

Einen besonders erwähnenswerten Fall für die explizite Angabe von Exekutoren stellen übrigens die Frameworks AWT, Swing und JavaFX dar. Sie eint bekanntlich der Grundsatz, dass bestimmte Aufrufe
(z. B. das Verändern von auf dem Bildschirm sichtbaren JavaFX-Knoten) nur in ganz bestimmten Threads ausgeführt werden dürfen. Bei JavaFX ist das beispielsweise der sogenannte JavaFX Application Thread. Sollen Fibers mit solchen Frameworks interagieren, ist es somit zwingend notwendig, sie auch dann in Stages zu aufzuteilen, wenn sie gar nicht parallel, sondern sequenziell, aber eben in alternierenden Threads ablaufen sollen: Worker-Threads für zeitaufwendigen und/oder blockierenden Code sowie den jeweiligen "Framework-Thread".

Executor HDD = Executors.newSingleThreadExecutor();

Executor JFX = runnable -> {
if (Platform.isFxApplicationThread())
runnable.run();
else
Platform.runLater(runnable);
};

supplyAsync(() -> this::loadFromDisk, HDD).thenConsume(v ->
myJfxProperty::set, JFX);

In vielen Fällen kann man sich so sehr viel Code sparen, der durch den Einsatz von Swing- oder JavaFX-Workern entstehen würde. Die Fallunterscheidung im JFX-Exekutor in diesem Beispiel dient zur Performance-Optimierung: Wurde die vorangehende Stage bereits vom JavaFX Application Thread ausgeführt, besteht keine Notwendigkeit, die durch Platform.runLater verursachte Verzögerung hinzunehmen. Für den HDD-Zugriff wird im Beispiel ein einziger Thread verwendet. Die Nutzung weiterer Threads kann, je nach Gerätetyp, zu einer Leistungssteigerung führen, sofern Techniken wie NCQ (Native Command Queueing) zur Verfügung stehen. Sinnvoll sind hierbei, je nach Gerätetyp, die im JRE (Java Runtime Environment) vorhandenen Factories Executors.newFixedThreadPool() ebenso wie Executors.newCachedThreadPool() und Executors.newWorkStealingPool().

Auf die Spitze treiben lässt sich der Detaillierungsgrad, indem mittels java.nio.FileStore geprüft wird, wie viele unterschiedliche Geräte tatsächlich von der Fiber genutzt werden, um jedem seinen eigenen, speziell angepassten Exekutor zu gönnen. Dadurch erreicht man die höchstmögliche Parallelisierung in Abhängigkeit der jeweiligen Hardwareausstattung.

Stage-Sequenzen werden von Exekutoren abgearbeitet (Abb. 3)

Stage-Sequenzen werden von Exekutoren abgearbeitet (Abb. 3)

Die Nutzung der CompletableFuture-API ermöglicht es, kooperatives Multitasking in Java auf saubere und lesbare Art und Weise umzusetzen. Durch den gezielten Einsatz verschiedener Exekutoren ist ein adaptives und detailliertes Performance-Tuning des Parallelitätslevels bis hinab auf Geräteebene möglich.

Markus Karg
gibt seine über 30-jährige Programmiererfahrung in Artikeln und Vorträgen weiter.
(ane [2])


URL dieses Artikels:
https://www.heise.de/-3319381

Links in diesem Artikel:
[1] https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html
[2] mailto:ane@heise.de