Streams und Collections in Java 8
Seite 4: Pipes & Filters
Pipes & Filters als Grundmuster der Streams
Die gezeigten funktionalen Interfaces Function, Consumer und Predicate sind die Bausteine für die neuen Streams in Java 8, die eine neue Nutzungssicht auf Java-Collections bieten. Statt wie bisher mit einer for-Schleife oder mit einem (externen) Iterator selbst auf den Collections zu operieren, bieten Streams eine höherwertige Sicht auf Daten-Collections. Streams halten dabei keine Daten, sondern erlauben den Zugriff auf Daten der unterliegenden Collection. Sie sind demnach eine Art "View" auf die Collection, vergleichbar dem altbekannten java.util.Iterator.
Das Grundmuster der Streams basiert auf dem Pattern "Pipes & Filters". Dabei handelt es sich um ein bewährtes EAI-Pattern (Enterprise Architecture Integration) [5], und das Grundprinzip kennt jeder, der schon mal Unix-Kommandos verknüpft hat:
ps -ef | grep login | cut -c 50- | head
Die Ausgabe von ps wird mit der Pipe an grep zur Filterung weitergereicht. Dessen lange Ausgabe bildet cut auf einen Teilbereich der Ausgabe (nur Spalten > 50) ab. Abschließend gibt head die ersten Einträge aus. Streams in Java 8 arbeiten letztlich nach demselben Prinzip und verketten Teiloperatoren zu einem Gesamtkontrollfluss auf einer Java-Collection.
Intermediate und terminale Operatoren – lazy und eager
Betrachtet sei noch einmal das eingehende Beispiel:
List<Integer> prices = Arrays.asList(10, 20, 30, 40, 50, 60, 70);
return prices.stream() // interne Iteration
.filter(price -> price >= 42) // Filter
.mapToDouble(price -> price * 0.9) // Map: Rabatt
.sum(); // Reduce: Gesamtsumme
Hier werden filter, map und reduce angewendet und die oben vorgestellten funktionalen Interfaces benutzt, parametriert mit fachlichen Lambda-AusdrĂĽcken:
- filter: Es werden aus dem Stream<Integer> nur die Werte herausgefiltert, die den Test auf >=42 ĂĽberstehen, Ergebnis ist wiederum ein (gefilterter) Stream<Integer>.
- map: Mit mapToDouble konvertiert man jeden einzelnen Integerwert in ein Double. Resultat ist ein Stream<Double>.
- reduce: Mit sum wird abschlieĂźend die Gesamtsumme aller Double-Werte gebildet. Ergebnis ist ein einzelnes Double.
filter und map sind sogenannte intermediäre Operatoren, die "lazy" ausgewertet werden. Dagegen ist reduce ist ein terminaler Operator, der "eager" ausgewertet wird. Dabei ist wichtig zu verstehen, dass die Auswertung der intermediären Operationen erst erfolgt, wenn den Stream eine terminale Operation realisiert hat. Ein Stream wird also erst angestoßen, wenn eine terminale Operationen verarbeitet wird, die den Stream damit sozusagen verbraucht. Konsequenterweise ist somit für einen Stream nur die Angabe einer einzigen terminalen Operation möglich. Gibt man für einen Stream mehr als eine an, resultiert das in einem Laufzeitfehler (java.lang.IllegalStateException: stream has already been operated upon).
Szenarien fĂĽr den Fehlerfall
Was passiert, wenn im Stream ein Fehler geworfen wird? Zu unterscheiden sind hier drei Fälle.
- Fall 1: Eine Exception wird geworfen und nicht mit try-catch behandelt. Hier ein Beispiel:
String[] test = { "abc", "defgh", "ijk" };
// Stream wird aufgebaut
Stream<Character> s = Arrays.stream(test).
map(t -> {
// gezielte Exception fuer alle Strings mit Laenge > 3
if (t.length()>3) throw new RuntimeException("error");
return t.charAt(0);
});
// Stream wird konsumiert
s.forEach( c -> System.out.print(c.charValue() + ",") );
Dieses Beispiel gibt nur ein a aus. Denn der zweite String defgh ist zu lang und löst die RuntimeException aus, was die Stream-Verarbeitung beendet. Der letzte String ijk wird nicht mehr behandelt.
- Fall 2: Eine Ausnahme wird geworfen, aber explizit in der intermediären Operation mit try-catch behandelt. Das nachstehende Beispiel gibt also das erste Zeichen aller Strings aus:
String[] test = { "abc", "defgh", "ijk" };
// Stream wird aufgebaut
Stream<Character> s = Arrays.stream(test).
map(t -> {
// gezielte Exception fuer alle Strings mit Laenge > 3
try {
if (t.length()>3) throw new RuntimeException("error");
} catch (Exception ex) { return null; }
return t.charAt(0);
});
// Stream wird konsumiert
s.forEach( c ->
{ if (c!=null) System.out.print(c.charValue() + ","); } );
- Fall 3: Eine Ausnahme wird geworfen, aber erst in der terminalen Operation mit try-catch behandelt:
String[] test = { "abc", "defgh", "ijk" };
// Stream wird aufgebaut
Stream<Character> s = Arrays.stream(test).
map(t -> {
// gezielte Exception fuer alle Strings mit Laenge > 3
if (t.length()>3) throw new RuntimeException("error");
return t.charAt(0);
});
// Stream wird konsumiert
s.forEach( c ->
{ try {
System.out.print(c.charValue() + ",");
} catch (Exception e) { e.printStackTrace(System.err); }} );
Hier werden (vielleicht ĂĽberraschend) nicht alle Strings behandelt: Wie im ersten Fall wird die Stream-Verarbeitung durch den zu langen String defgh beendet. Die Behandlung der Ausnahme in der terminalen Operation hilft also zum Wiederaufsetzen nach dem Fehler nicht.
Potenzial der Streams-API
Die Streams-API bietet eine Fülle generischer Methoden an, die sich zur Verarbeitung von Collections nutzen lassen. Eine vollständige Liste zeigt die JavaDoc zu java.util.stream.Stream. Nachstehende Tabelle gibt hierzu einen kleinen Ausschnitt:
| Intermediäre Operation | Rückgabe | Interface | Bemerkung |
| filter | Stream<T> | Predicate<T> | Filter |
| map | Stream<R> | Function<T, R> | Map |
| peek | Stream<T> |
- |
Gibt den Stream zurĂĽck und erlaubt gleichzeitig eine Aktion auf allen Elementen |
| sorted | Stream<T> | Comparator<T> | Sortierung |
| limit | Stream<T> |
- |
Nur die ersten k Elemente durchlassen |
|
... |
... |
... |
... |
| Terminale Operation | RĂĽckgabe | Interface | Bemerkung |
| reduce | T | BinaryOperator<T> | Reduce |
| forEach | void | Block<T> | Erlaubt eine Operation |
| findFirst | Optional<T> |
- |
Gibt das erste Element zurĂĽck, das Kriterium erfĂĽllt |
| anyMatch | boolean | Predicate<T> | Gibt es mindestens ein Element im Stream, das das Predicate erfĂĽllt |
|
... |
... |
... |
... |
Tabelle: Liste wichtiger intermediärer Operationen ("lazy" evaluiert) und terminaler Operationen ("eager" evaluiert)
Tipp: Mit peek kann man den Zwischenzustand im Stream ansehen und debuggen:
List<Integer> prices = Arrays.asList(10, 20, 30, 40, 50, 60, 70);
return prices.stream() // interne Iteration
.peek(price -> System.out.println(price))
.filter(price -> price >= 42) // Filter
.peek(price -> System.out.println(price))
.mapToDouble(price -> price * 0.9) // Map: Rabatt
.peek(price -> System.out.println(price))
.sum(); // Reduce: Gesamtsumme