Apache Spark 2.0: Zweiter Akt einer Erfolgsgeschichte

Seite 3: Structured Streaming

Inhaltsverzeichnis

Neben den Datasets ist das Structured Streaming die zweite große Neuerung. Ihr erklärtes Ziel ist, dass Entwickler sich keine Gedanken über die Details der Datenströme machen müssen. Waren bei den vorhergehenden Spark-Version die APIs von Batch und Streaming weitestgehend ähnlich, geht die Vereinheitlichung in Spark 2 noch einen Schritt weiter.

In den 1.x-Versionen waren RDDs und DataFrames für die Batch-Verarbeitung vorgesehen, während es für Datenströme DStreams und damit nur RDDs gab. Mit Spark 2.0 erfolgt auch das Streaming auf Basis von Datasets beziehungsweise DataFrames. Ein Stream ist dabei eine Tabelle, die stetig neue Zeilen erhält. Die unendliche Tabelle heißt Input-Tabelle.

Auf ihr laufen die Queries, die die Output-Tabellen füllen. Das Ausführen der Queries erfolgt durch zeitliche Trigger – beispielsweise im Sekundentakt. Der Catalyst Optimizer analysiert und verbessert die ausgeführten Queries ähnlich wie die Operationen auf Datasets und DataFrames.

Vom Input über Query zu Result und Output, gesteuert durch einen regelmäßigen Triggerimpuls (Abb. 1)

(Bild: Apache)

Zum Speichern der Daten existieren unterschiedliche Modi. Das System kann beispielsweise die komplette Ausgabe bei jedem Trigger speichern oder nur die neuen Einträge. Das Aktualisieren bestehender Einträge ist geplant, in der aktuellen Version allerdings noch nicht implementiert. Im Unterschied zum DStream, der auch in Spark 2.0 weiterhin nutzbar ist, baut das Structured Streaming automatisch den State im Lauf der Applikation auf. Die Daten wachsen also von Triggersignal zu Triggersignal an.

Als Beispiel für die API kommt ein klassischer Wortzähler zum Einsatz, der auf einem TCP-Socket-Text-Stream arbeitet:

val spark = SparkSession. ...
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
val query = wordCounts
.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()

Das System legt nach dem Erzeugen der SparkSession den Stream auf localhost:9999 an. Der direkte Vergleich mit dem Lesen einer CSV-Datei mit DataFrame, zeigt die einheitliche API für Batch und Streaming:

val lines = spark.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load("cars.csv")

Beim Streaming kommt statt read nun readStream zum Einsatz. Abgesehen davon ist das Format gleich und unterscheidet sich nur in den Parametern. Nach dem Anlegen des Streams erfolgt die weitere Verarbeitung zum Ergebnis – im konkreten Fall somit die Zahl der Wörter. Der Befehl writeStream startet anschließend einen Ausgabe-Stream, der im Beispiel bei jedem Trigger die kompletten Daten auf der Konsole ausgibt. Weitere Senken neben der Konsole sind aktuell eine Datei (Parquet), eine generische foreach-Schleife sowie eine in-Memory-Tabelle, auf die im Anschluss Abfragen erfolgen können. Mit awaitTermination läuft die Query bis zum Beenden der Streaming-Anwendung. Die Verarbeitung könnte auf die Weise auch in einem Batch implementiert werden.

Wie erwähnt wächst der Input und gegebenenfalls der Output im Lauf der Zeit ins Unendliche. Bei der Verarbeitung von Streaming-Daten gilt der Blick daher häufig nur einem – meist zeitlich begrenztem – Fenster der Daten. Das ist auch beim Structured Streaming möglich. Ein Window entsteht dabei ähnlich wie eine Gruppierung: Statt alle Datensätze einer ID zusammenzufassen, gehören zu einem Fenster alle Datensätze, die in einem bestimmten Zeitrahmen anfallen. In der Ausgabetabelle werden alle Events für die jeweiligen Zeitfenster zusammengefasst. Das folgende Listing erstellt ein Zehnminutenfenster basierend auf der Zeit in der Spalte timestamp, das alle fünf Minuten aktualisiert wird. (12:00 – 12:10, 12:05 – 12:15, 12:10 – 12:20, usw.).

val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()

Durch die explizite Deklaration der Zeitspalte ist im Gegensatz zum DStream eine Gruppierung auf Basis der Event-Zeit möglich. Außerdem lassen sich Out-of-order-Events verarbeiten und nachträglich zu den – weiterhin in-Memory gespeicherten – Output-Tabellen hinzufügen. Die Implementierung der Strategien, um die States zu einem bestimmten Zeitpunkt zu schließen – beispielsweise über Watermarks – erfolgt allerdings erst in einer späteren Version von Spark.

Entwickler dürfen Daten in einem Stream über einen Join mit zusätzlichen statischen Daten anreichern. Zwei Streams können sie hingegen nicht über einen Join verbinden. Aus Performance- und Effizienzgründen bietet die Software zudem einige Dataset-Operationen auf einem Stream wie das Zählen und Sortieren ohne vorherige Aggregation nicht an. Außerdem erlaubt Spark 2.0 das Ausführen mehrerer Aggregationen auf einem Stream hintereinander noch nicht

Das obige Beispiel hält das Ergebnis des wordCount.writeStream ... .start() als StreamingQuery fest. Entwickler können dieses Objekt in Streaming-Anwendungen nutzen, um Abfragen zu überwachen und zu administrieren. Auf die Weise können sie beispielsweise die Anzahl der gelesenen und geschriebenen Einträge abfragen.

Die Beispiele hatten einen Trigger von einer Sekunde zur Grundlage. Das entspricht jedoch nicht dem Batch-Intervall aus vorherigen Spark-Versionen: Während Letzteres Einfluss auf mögliche Fenstergrößen hatte, ist der Trigger nur der Auslöser für eine neue Verarbeitung. Standardmäßig läuft der Trigger alle 0 Sekunden: Das Framework kümmert sich um die Daten, sobald sie eintreffen. Spark 2.0 nutzt weiterhin Microbatching: Es sammelt alle neuen Daten und startet die Verarbeitung, sobald die vorhergehende abgeschlossen ist. Im Gegensatz zum Vorgänger gibt es dafür allerdings keinen festgelegten Rhythmus mehr. Die neue Version bietet zudem die grundsätzliche Möglichkeit, intern die Microbatch-Verarbeitung zu ersetzen, ohne Änderungen in der API vornehmen zu müssen.

Fehlertoleranz erreicht Spark weiterhin durch Write Ahead Logs (WAL) und Checkpoints. Dazu schreibt die Software den Offset der bislang gelesen Daten wie die Zeilen von Dateien oder Kafka-Offsets in ein ausfallsicheres Dateisystem wie HDFS. Checkpoints kommen zum Speichern des in-Memory States – beispielsweise von Fensteraggregationen – zum Einsatz. Mit diesen Techniken ist zumindest eine minimale Garantie gegeben. Wenn die Senke der Verarbeitung idempotent ist, können Entwickler mit Streaming eine Exactly-once-Vorgehensweise erreichen.

Mit dem Structured Streaming erreicht Spark die Integration der Datasets für das Stream Processing. Außerdem schafft es die Grundlagen, um die Verarbeitung auf Basis der Eventzeit inklusive der Berücksichtigung von out-of-order Events zu ermöglichen. Die Unterstützung interaktiver Queries klingt verheißungsvoll, ist aber in der neuen Streaming API noch als Preview zu betrachten. Bislang sind die unterstützten Quellen für Streams zudem noch recht dürftig. Beispielsweise fehlt die Integration von Kafka, die aber zeitnah folgen soll. Auf der Roadmap stehen zudem weiterführende Konzepte bei der Eventzeitverarbeitung und eine tiefere Integration der Machine Learning API.