Effiziente Datenverarbeitung mit Kafka

Seite 3: Von Spark zu Kafka Connect und Kafka Streams

Inhaltsverzeichnis

Im ersten Schritt ersetzen Kafka Streams und Kafka Connect Spark wie in Abbildung 4 dargestellt. Zunächst erzeugt der Code einen KStream auf dem Eingangstopic. Daraufhin erfolgt die Extraktion von Land und Stadt eines Meet-up und die Repartitionierung der Daten über ein Ausgangs-Topic:

KStream<String, String> geschluesselt =
builder
.stream(Serdes.String(), Serdes.String(), "rsvps")
.map((key, value) -> {
//extrahiere Land und Stadt aus JSON
String land = ...;
String stadt = ...;
//setze eine Kombination aus Stadt und Land
//als Kafka-NachrichtenschlĂĽssel
return new KeyValue<>(land + "|" + stadt, value);
})
//repartioniere die geschlĂĽsselten Daten
.through("rsvp_geschluesselt");

Im Fallbeispiel kann Spark Streaming durch die Kombination aus Kafka Streams und Kafka Connect ersetzt werden (Abb. 4).

Da die Daten nun geschlüsselt sind, lassen sich alle Vorkommen eines Schlüssels (Land-Stadt-Paar) zählen:

//Zähle die Nachrichten pro Schlüssel
KTable<String, Long> count = geschluesselt.groupByKey().count("count");

Um von auĂźen Zugriff zu erlangen, mĂĽssen die Inhalte der KTables in ein Ausgangstopic namens "rsvp_pro_stadt" geleitet werden:

count.toStream()
.map((key, value) -> {
String[] split = key.split("\\|");
String land = split[0];
String stadt = split[1];

//erzeuge JSON mit Land,
//Stadt und Anzahl
String json = ...;
//key darf null sein, da Partitionierung
//ab hier irrelevant ist
return new KeyValue<>(null, json);

}).to("rsvp_pro_stadt");

Der Cassandra-Konnektor ist folgendermaĂźen konfiguriert:

{
"name": "cassandra-sink-meetup",
"config": {
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"connector.class":
"com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
"tasks.max": "10",
"topics": "rsvp_pro_stadt",
"connect.cassandra.kcql": "INSERT INTO rsvps SELECT * FROMrsvp_pro_stadt",
"connect.cassandra.contact.points": "localhost",
"connect.cassandra.port": "9042",
"connect.cassandra.key.space": "rsvp"
  }
}

Die entscheidende Property ist connect.cassandra.kcql: Sie verknüpft Topics und Tabellen. Der Beispielcode schreibt alle Felder aus der Quellnachricht in die Tabelle "rsvps" in Cassandra. Entwickler können an der Stelle zusätzlich Feldeinschränkungen vornehmen. Der Konnektor benutzt im Code die in Cassandra eingebaute JSON-Fähigkeit.

Das Beispiel hat Spark mit Kafka-Bordmitteln ersetzt. Das ist interessant, aber im Endeffekt nur ein 1:1-Ersatz einer existierenden Technik. Kafka geht allerdings noch einen Schritt weiter.

Um die Anzahl der RSVPs pro Meet-up zu zählen, baut Kafka Streams einen sogenannten "State Store" auf, der den jeweils aktuellen Zustand der Anwendung enthält. In der Beispielanwendung heißt er count. Der Store lässt sich seit Kafka 0.10.1 direkt abfragen, ohne den Umweg über das Ausgangstopic rsvps_pro_stadt und Cassandra (vgl. Abb. 5). Die Datenbank komplett zu streichen klingt nach Zukunftsmusik, aber für einige Anwendungsfälle kann der Weg dorthin führen.

Beim Verwenden von Interactive Queries können Kafka Connect und Cassandra entfallen (Abb. 5).

Eine einfache Abfrage der aktuellen Anzahl aller RSVPs in Berlin sähe folgendermaßen aus:

ReadOnlyKeyValueStore<String, Long> store =
kafkaStreams.store("count",
QueryableStoreTypes
.<String, Long>keyValueStore());
Long countBerlin = store.get("de|Berlin");