Spark versus Flink – Rumble in the (Big Data) Jungle

Seite 3: Streaming

Inhaltsverzeichnis

In vielen Geschäftsbereichen kann Geschwindigkeit in der Datenanalyse und -verarbeitung einen Wettbewerbsvorteil bedeuten. Insbesondere für viele Internet-Unternehmen ist schnelle Datenverarbeitung Kerngeschäft. Doch nicht nur, wenn es um Geschwindigkeit geht, kann man von Streaming profitieren, auch Anwendungsfälle, die derzeit mit Batch-Verarbeitung gelöst sind, lassen sich teilweise als Streaming-Anwendung einfacher und performanter darstellen.

Dennoch ist es kein Wunder, dass wichtige Tools im Real-Time-Bereich wie Storm oder Samza von Twitter beziehungsweise LinkedIn stammen. Hier gibt es aber noch Bedarf für ein Tool, das alle Ansprüche erfüllt. Lange war Storm Marktführer, es scheint aber inzwischen an Fahrt zu verlieren. Twitter hat sich inzwischen von der Technik abgewandt und mit Heron ein neues Tool geschaffen.

Deshalb ist es interessant, einen Blick auf Flink zu werfen, da es eine echte Real-Time-Plattform mit vielen weiteren Features bietet und sich durchaus zum Branchenprimus aufschwingen kann. Dabei wird Flink zugute kommen, dass es – wie Spark – die komplette Bandbreite von Batch über Graph Processing und Stream Processing bis hin zu Machine Learning anbietet.

Bei der Batch-Verarbeitung war der Code bei den Flink- und Spark-Programmen konzeptionell noch ähnlich; das ändert sich nun bei der Verarbeitung von Streams. Grund dafür ist, dass Spark kein reines Streaming beherrscht, sondern die Daten in kleinen Zeitabständen zu Batches zusammenfasst – das Konzept nennt sich Micro-Batching. Darunterliegend sind bei Spark dann immer noch die RDDs, mit denen man auch beim Streaming noch Kontakt hat. Bei Flink gibt es eine komplett separate API für Streaming, die DataStream API. Der funktionale Ansatz bei der Datenverarbeitung (map, reduce etc.) gilt aber weiterhin für beide Frameworks.

Bei der Verarbeitung von Streams werden die Daten in der Regel aus einer Queue eingelesen. Diese erfüllt im Wesentlichen zwei Funktionen. Erstens entkoppelt sie Datengenerierung und -verarbeitung und kann Daten zwischenspeichern, falls die Verarbeitung nicht mit der Generierung mithalten kann. Zweitens lassen sich Daten leicht erneut in die Verarbeitung einspeisen, falls es zu Fehlern oder Ausfällen bei der Verarbeitung gekommen ist. Im Big-Data-Umfeld hat sich Apache Kafka als verteilte Queue zum Standard-Tool entwickelt, weshalb die folgenden Beispiele Kafka nutzen.

Für Real-Time Streaming hat Flink eine eigene API namens DataStreams. Sie stellt ganz ähnliche Operationen wie die DataSet API zur Verfügung, zusätzlich aber noch einige weitere, die speziell auf den Streaming-Fall ausgelegt sind. Besondere Bedeutung kommt bei der Stream-Verarbeitung der Definition von Windows auf dem Stream zu. Denn in einem Stream kann man normalerweise immer nur ein einzelnes Event betrachten. Abgesehen von einfachen Filter- und Umformatierungsoperationen lässt sich damit noch nicht viel anfangen.

Interessanter wird es bei der gleichzeitigen Betrachtung von Events über einen gewissen Zeitraum. So könnte es erforderlich sein, gewisse Analysen auf allen Events durchzuführen, die in den letzten fünf Minuten angekommen sind. Ungleich spannender ist die Möglichkeit, solche Fenster auch auf den Zeitstempeln der Events zu definieren. Somit kann man etwa alle Events, die in einer bestimmten Stunde erzeugt wurden, gesammelt verarbeiten, selbst wenn sie über den Zeitraum von vier Stunden angeliefert wurden (bspw. wegen Ausfällen in einer Zwischenkomponente oder wegen eines Backlogs). Gerade bei Auswertungen über so lange Zeiträume ist es wichtig, einen internen Status persistent speichern zu können.

Zur Veranschaulichung der Stream API kommt ein ähnliches Beispiel zum Einsatz wie im Batch-Fall: Aus einem Strom von Sale-Events ist zu bestimmen werden, welcher Artikel für welchen Tag wie oft verkauft wurde.

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> kafkaSource = env.addSource(new
FlinkKafkaConsumer08<>("topic",
new SimpleStringSchema(), kafkaProperties));

kafkaSource.map(line -> line.split(","))
.map(new MapFunction<String[], Tuple3<Integer, String,
Integer>>() {
@Override
public Tuple3<Integer, String, Integer> map(String[] parts)
throws Exception {
int id = Integer.parseInt(parts[0]), amount =
Integer.parseInt(parts[1]);
String date = parts[2];
return new Tuple3<Integer, String, Integer>(id, date,
amount);
}
})
.keyBy(0, 1)
.sum(2)
.print();

env.execute(); //Start processing

Der erste Unterschied zum Batch-Code ist die Verwendung der StreamExecutionEnvironment, die Zugang zur Streaming API gewährt. Auf ihr fügt man mit addSource die Datenquellen hinzu. Es gibt, insbesondere zum Testen und Ausprobieren, einige fertige Quellen, die man über Methoden auf StreamExecutionEvironment bekommt, etwa readTextFile(), das Daten aus einer Datei liest und als Stream verarbeitet. Für das Beispiel wird Kafka als Quelle verwendet. Dazu liefert Flink einen Kafka-Consumer mit, derzeit sowohl für Kafka 0.8 als auch Kafka 0.9. An Unterstützung für Kafka 0.10 arbeitet das Team aktuell.

Um die Binärdaten aus Kafka als String zu deserialisieren, nutzen die Autoren das SimpleStringSchema, das Flink ebenfalls bereitstellt. Den Kafka Topic sowie einige Kafka-Einstellungen (etwa die Broker bzw- Zookeeper-Adresse) muss man hier ebenfalls übergeben. Daraus ergibt sich nun eine DataStreamSource, die von der Hauptklasse der Streaming API (DataStream) erbt und somit den Zugriff auf die Streaming API bereitstellt. Im Gegensatz zur Batch API liefert Flink keine eingebaute Methode, mit CSV umzugehen, deshalb ist hier manuell zu splitten.

Der Rest der Verarbeitung ist ähnlich wie zuvor im Batch-Code. Diesmal wird die Funktion keyBy verwendet, nicht groupBy. Sie liefert einen KeyedDataStream der nach den angegebenen Schlüsseln partitioniert ist. Den praktischen Aggregation-Operator sum gibt es auch hier. Er bewirkt, dass in dem gerade betrachteten Event das angegebene Feld durch den bisher aggregierten Wert ersetzt wird.

Je mehr Nachrichten für einen bestimmten Schlüssel eingehen, desto größer wird demnach der Wert. Damit ist die Umsetzung abgeschlossen. Wenn Events in die Kafka Queue geschrieben werden, wird ausgegeben, wie viele Artikel bisher pro Tag und Artikel verkauft wurden.

Bei Spark Streaming gibt es kein echtes Real-Time. Stattdessen fasst es die empfangenen Events alle 0,5 bis 2 Sekunden in sogenannte Micro-Batches zusammen. Das hat den großen Vorteil, dass Spark mit Batching gut umgehen und man so sehr viele Operationen wiederverwenden kann. Zudem lassen sich Streaming- und Batch-Daten einfach kombinieren; es sind im Grunde ja beides RDDs. Es hat aber den Nachteil, dass es eine gewisse Latenz mit sich bringt. Dem Vorgehen fallen auch einige Windowing-Arten zum Opfer. Während Windows auf Processing Time einfach umzusetzen sind (zumindest solange das Fenster ein Vielfaches der Micro-Batch-Länge ist), sind Event-Time Windows oder Windows mit fester Anzahl an Events mit Spark derzeit nicht trivial möglich.

Das gleiche Beispiel wie oben sieht in Spark wie folgt aus:

SparkConf conf = new SparkConf().setAppName("Spark 
Demo").setMaster("local[*]");
JavaStreamingContext streamContext = new JavaStreamingContext(conf,
Durations.seconds(1));

JavaPairDStream<Sale,Integer> aggregatedSales =
KafkaUtils.createDirectStream(streamContext, String.class,
String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics)
.map(tuple -> tuple._2);
.map(line -> line.split(","))
.filter(array -> array.length == 3) // Rausfiltern ungültiger Zeilen
.mapToPair(parts -> { // Umwandeln der Werte in einen POJO
int id = Integer.parseInt(parts[0]), amount =
Integer.parseInt(parts[1]);
String date = parts[2];
return new Tuple2<Sale,Integer>(new Sale(id, date), amount);
})
.reduceByKey((x, y) -> x + y);

aggregatedSales.foreachRDD(rdd -> {
rdd.map(tuple->tuple).sortBy(tuple -> tuple._2, false, 1)
.take(1)
.forEach(System.out::println);
});

streamContext.start();
streamContext.awaitTermination();

Der Ausgangspunkt ist ein StreamingContext. Für ihn ist eine Duration zu übergeben. Durch das Micro-Batching gibt sie an, wie lang diese Batches sein sollen. Mit der Einstellung wird Spark jede Sekunde ein neues Paket mit Daten erzeugen.

Auch bei Spark ist die Datenquelle Kafka. Die KafkaUtils-Klasse von Spark, mit der sich Kafka als Quelle und Senke verwenden lässt, bietet verschiedene Methoden an. Die hier verwendete Methode createDirectStream ist die momentan empfohlene. Derzeit unterstützt Spark nur Kafka 0.8, an der Kafka-0.10-Unterstützung wird gearbeitet.

Die Spark API gibt in einem Kafka Stream sowohl den Schlüssel als auch den Wert. Wie bei Flink ist hier anzugeben, wie man aus den Kafka-Nachrichten die gewünschten Rückgabetypen extrahieren kann. Die verwendete Klasse StringDecoder stellt Kafka bereit. Wie bei Flink sind der Topic und einige Kafka-Einstellungen anzugeben. Als Ergebnis bekommt man einen JavaDStream und damit Zugriff auf die Operatoren, die sich auf dem Stream aufrufen lassen.

Zuerst entfernen Entwickler den Key, da sie nur mit dem Value weiterarbeiten sollen. Dann wird analog zum Batch-Beispiel das CSV geparst und ein POJO erstellt. Danach ruft man erneut reduceByKey auf, gefolgt von der Ausgabe der Daten. Der Code ist fast der gleiche wie im Batch-Beispiel. Vor dem Sortieren und Ausgeben muss man jedoch foreachRDD aufrufen. Damit erhalten Entwickler die RDDs, also Batches, aus denen der DStream aufgebaut ist. Darauf hin lässt sich einfach auf die Operatoren zurückgreifen, die nur auf RDDs verfügbar sind, etwa das Sortieren. Auf einem Stream ergibt der Operator keinen großen Sinn, auf den einzelnen Batches kann aber sinnvoll sortiert werden. Die Tatsache, dass Spark Micro-Batching nutzt, tritt so in den APIs recht häufig zu Tage.

Damit bekommt man nun für jeden Ein-Sekunden-Batch die Summe der darin verkauften Artikel pro Tag. Das entspricht nicht ganz der Aufgabenstellung. Um die Summe über einen längeren Zeitraum zu persistieren, muss man in Spark deutlich mehr Aufwand betreiben. Das aktuelle Spark 1.6 führte eine neue Methode ein, mit der man sich einen Zustand merken kann. Der vollständige Code würde hier aber den Rahmen sprengen und lässt sich im GitHub-Repository zum Artikel nachschauen.

Mit der kommenden Version 2.0 von Spark wird sich einiges ändern. Der neue Ansatz für Streaming soll deutlich schlanker sein und auf den neuen Dataframes/Dataset APIs aufsetzen. Hier wird die Zeit zeigen, ob Spark mit der neuen API im Bereich Streaming zu Flink aufschließen kann.