Effiziente Datenverarbeitung mit Kafka

Seite 2: Die Evolution von Apache Kafka

Inhaltsverzeichnis

Das soziale Netz LinkedIn stand am Anfang des Jahrzehnts vor dem Problem, dass keine der am Markt erhältlichen Messaging-Produkte für das hohe Nachrichtenvolumen geeignet waren. Daher beschloss das Unternehmen, eine eigene Software zu entwickeln. Die Macher haben den Code der Apache Software Foundation (ASF) übergeben, der dort großen Erfolg unter dem Projektnamen Kafka hat. Wie häufig in Silicon Valley ließ die Gründung eines Kafka-fokussierten Unternehmens nicht lange auf sich warten. 2014 verließen Jay Kreps, Neha Narkhede und Jun Rao LinkedIn, um sich mit Confluent selbstständig zu machen. Seitdem ist dieses Unternehmen einer der stärksten, aber bei weitem nicht der einzige Treiber der Weiterentwicklung von Kafka.

Mit Version 0.9, die im November 2015 erschien, wurde deutlich, wohin die Reise gehen sollte. Mit Kafka Connect erhielt die Distribution ein Framework zur Integration mit externen Datenquellen und -senken. Mittlerweile existiert eine Vielzahl an offiziellen (von Confluent zertifizierten) und Community-getriebenen Konnektoren, um nur ĂĽber Konfiguration und ohne eigenen Quellcode Kafka skalierbar, resillient und hochverfĂĽgbar mit Systemen wie Cassandra oder Amazons S3 zu verbinden.

Damit aber nicht genug – Kafka 0.10.0 brachte die Client-Bibliothek Kafka Streams, die die Entwicklung von Stream-Processing-Anwendungen ermöglicht, inklusive Funktionen wie das Zusammenfügen verschiedener Kafka Topics oder der Aggregationen von Daten. Kafka Streams tritt damit in direkte Konkurrenz zu Streaming-Frameworks wie Spark oder Flink. Version 0.10.1 enthielt eine auf dem Papier kleine, aber sehr bedeutsame Änderung: Eine API exponiert den internen Zustand von Streams-Anwendungen.

Kafka hat sich durch Connect und Streams von einem Message Broker zu einer Streaming-Plattform weiterentwickelt.

Kafka Connect ist ein Framework zur Integration von Kafka mit anderen Systemen. Ein Quellkonnektor lädt Daten aus einer externen Datenquelle in ein Kafka Topic. Beispielsweise lässt sich über eine JDBC-Quelle eine relationale Datenbank abfragen und das Ergebnis in Kafka ablegen. Ein Zielkonnektor liest Daten aus einem Kafka Topic und schreibt sie beispielsweise in HDFS.

Apache Kafka definiert eine einheitliche API für Konnektoren und stellt die Laufzeitumgebung – sogenannte "Worker" – bereit. Ein Worker lässt sich entweder eigenständig oder im Clusterverbund starten, wobei Letzteres für einen produktiven Einsatz zu empfehlen ist. Die Worker haben eine REST-API, über die sich Konnektorinstanzen erstellen, verwalten und entfernen lassen. Ein Worker Cluster sorgt für Hochverfügbarkeit, indem er dezentral den Cluster-Zustand überwacht und beim Verlust eines Worker dafür sorgt, dass die ihm zugewiesenen Konnektorinstanzen auf einem anderen Worker neu gestartet werden. Die Instanzen skalieren außerdem je nach Bedarf in einem definierbaren Rahmen. Da sie periodisch melden, welche Daten sie zuletzt verarbeitet haben – ein sogenanntes "Offset" –, können neue Instanzen dort die Arbeit wieder aufnehmen, wo ihr jeweiliger Vorgänger aufgehört hat.

Wie sieht das beispielsweise bei einer JDBC-Quelle aus? Der passende Konnektor von Confluent bietet unterschiedliche Konfigurationsoptionen. Prinzipiell führt er periodisch Abfragen auf die Quelldatenbank durch. Idealerweise möchte man bei diesen Abfragen nicht immer die komplette Tabelle, sondern nur die inkrementellen Änderungen seit der letzten Abfrage lesen. Damit das funktioniert, müssen die Tabellen bestimmte Voraussetzungen erfüllen. Zum Beispiel gibt es den Modus "Incrementing Column", bei dem die Instanz neue Zeilen anhand einer fortlaufenden ID identifiziert. Er eignet sich für Fälle, in denen eine Zeile unveränderlich ist und nie aktualisiert wird. Die Konnektor-Instanz beginnt bei einem definierten Startwert für diese ID, führt eine Abfrage aus, legt die Daten in Kafka ab und persistiert die maximal gelesene ID als Offset über das Connect-Framework. Durch Anpassung der WHERE-Bedingung im folgenden Durchlauf liefert die Abfrage nur Datensätze mit einer höheren ID. Da nicht alle Tabellen diese Voraussetzung erfüllen, kann alternativ unter anderem ein Zeitstempel zum Einsatz kommen.

Die Konfiguration der JDBC-Quelle sieht folgendermaĂźen aus:

{
"name":"jdbc-source",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":2,
"connection.url":
"jdbc:mysql://127.0.0.1:3306/rsvpdbt?user=franz&password=kafka",
"mode":"incrementing",
"table.whitelist": "rsvps,members",
"incrementing.column.name":"id",
"topic.prefix":"jdbc-",
"poll.interval.ms":1000
}
}

FĂĽr diese Quelle werden zwei Instanzen des Konnektors gestartet, die die Daten in den Tabellen "rsvps" und "members" basierend auf der Spalte "id" abfragt. Der Konnektor schreibt die Ergebnisse im Avro-Format in ein Topic namens "jdbc-rsvps" beziehungsweise "jdbc-members". Die Abfrage erfolgt sekĂĽndlich.

Bei einer Senke ist es einfacher, da ein Kafka Topic als Datenquelle für den Konnektor dient. Dabei kommen Kafkas Standardmechanismen zum Speichern der "Consumer Offsets" zum Einsatz. Manche Konnektoren wie der S3-Konnektor von Confluent unterstützen "exactly once"-Garantien. Die Quality of Service hängt aber von der jeweiligen Implementierung ab.

Die Kafka-Distribution enthält keine eigenen Konnektoren. Confluent pflegt eine Liste in diversen Kategorieren. Zum einen gibt es von Confluent entwickelte und unterstützte Konnektoren für Amazon S3, Elasticsearch, HDFS und JDBC. Darüber hinaus zertifiziert das Unternehmen Anbindungen anderer Hersteller, die den eigenen Qualitätsansprüchen entsprechen. Dazu gehören Konnektoren für Cassandra, SAP HANA, Vertica und mehr. Die dritte Kategorie umfasst Community-Konnektoren, die in der Regel nicht von Firmen, sondern eben von der Community stammen und in unterschiedliche Qualitätsstufen fallen. Unter anderem existieren solche Anbindungen für Amazon Kinesis und InfluxDB. Das Fallbeispiel nutzt die zertifizierte Cassandra-Senke.

Kafka Streams ist eine Bibliothek zum Entwickeln von Anwendungen, die kontinuierlich Daten aus einem oder mehreren Kafka-Topics lesen, die Daten transformieren oder in irgendeiner Form aggregieren und die Ergebnisse in ein weiteres Kafka-Topic schreiben, von wo aus die Weiterverarbeitung erfolgen kann.

Kafka Streams setzt auf zwei Hauptabstraktionen: KStreams und KTables. Ersteres ist die logische Abstraktion eines Stroms eingehender Nachrichten. Im einfachen Fall wird ein KStream direkt auf einem Kafka Topic erzeugt. Jede neue Nachricht auf dem Topic führt zu einem neuen Datensatz im KStream. KStreams können auch aus Transformationen entstehen. Diese können beispielsweise eine Funktion auf jedes neue Element eines KStream anwenden. Die API enthält aus Scala, Spark oder Java-Streams-API bekannte Methoden wie map, filter oder flatMap.

Bei einer KTable wird nicht jeder Datensatz einer Tabelle getrennt betrachtet, sondern anhand des NachrichtenschlĂĽssels nur der jeweils letzte Wert gespeichert ("Changelog Stream"). Im Gegensatz zu KStreams halten KTables einen aktuellen Zustand. Im Normalfall exponieren ausgehende Kafka Topics die Daten innerhalb von KStreams und KTables.

Beispielsweise kommt ein KStream zum Einsatz, um den textuellen Inhalt einer Nachricht auf Großschreibung zu transformieren und nur Nachrichten mit einem bestimmten Schlüssel zu behalten. Die gefilterten und transformierten Nachrichten werden in ein Topic geschrieben. Mit einer KTable hingegen könnte man unter anderem alle Nachrichten zu einem Schlüssel zählen und den jeweils aktuellen Wert pro Schlüssel über ein ausgehendes Topic veröffentlichen. Darüber hinaus sind komplexere Aggregationen möglich. Außerdem gibt es eine ganze Reihe an Join-Operationen, um KTables und KStreams zu verknüpfen.

Das erinnert auf den ersten Blick an bekannte Mechanismen von Spark oder Flink. Es gibt aber einen großen Unterschied: Während Flink und Spark Frameworks sind und Jobs in einem Cluster laufen, verfolgt Kafka Streams eine andere Strategie: Es ist lediglich eine weitere Clientbibliothek für Kafka. Um eine Kafka-Streams-Anwendung zu skalieren, starten Administratoren einfach eine neue Instanz der Anwendung – in der Regel in Form eines gewöhnlichen JAR. Die Lastverteilung erfolgt unter Anwendung der üblichen Mechanismen. Die dazu verteilten Instanzen der Anwendungen müssen sich untereinander nicht kennen und könnten theoretisch sogar in völlig getrennten Netzwerken laufen – sie benötigen nur Zugriff auf Kafka. Das hat auch Nachteile: Das bei Spark und Flink erforderliche Umverteilen der Daten zwischen Cluster-Knoten ist bei Kafka-Streams nicht ohne Weiteres möglich. Die Repartitionierung als Alternative müssen Entwickler eigenhändig vornehmen – mit Standardmechanismen von Kafka. Der Prozess belegt zusätzlichen Speicher auf den Kafka-Brokern.

Kafka 0.10.1 führt eine Option ein, mit der sich der Zustand einer Streams-Anwendung beziehungsweise KTable abfragen lässt, ohne die Daten in ein Ausgangs-Topic zu exportieren. Sogenannte "Interactive Queries" ermöglichen die direkte schlüsselbasierte Abfrage von KTables. Dazu exponiert die Streams-API Zugriffsmethoden.

Interactive Queries stehen noch am Anfang ihrer Entwicklung. Der Philosophie von Kafka Streams, eine API und kein Framework zu sein, ist geschuldet, dass die Ausführung von Abfragen über eine verteilte Kafka-Streams-Anwendung nicht automatisch erfolgt. Entwickler müssen eigentätig dafür sorgen, herauszufinden, welche Instanz für einen Schlüssel zuständig ist, um sie dann anzufragen. Kafka Streams unterstützt hierbei die Entwickler – jede Anwendung kann ihren Endpunkt clusterweit bekannt machen.

Die Zuständigkeitsfrage für einen Schlüssel kann jede Instanz beantworten, aber es fehlt ein automatisches Abfrage-Framework. Zudem ist die Anwendung in den Zeiträumen, in denen die Neuverteilung der Partitionen über die Instanzen erfolgt, nicht abfragbar und damit nicht hochverfügbar. Daher bieten Interactive Queries eine Vorschau auf die Zukunft, sind aber vermutlich noch nicht weiträumig sinnvoll einsatzbar.