Apache Kafka als Backend für Webanwendungen?

Seite 2: Setup der Anwendung

Inhaltsverzeichnis

Kafka ist schnell aufgesetzt: Herunterladen, Auspacken, Zookeeper starten, Kafka starten, fertig. Schon läuft eine Instanz, mit der man entwickeln kann. Um sie aus einem Node-Projekt anzusprechen, bietet sich die Bibliothek kafka-node an, die aktiv weiterentwickelt wird und alle wichtigen Features in der Kommunikation mit Kafka anbietet.

const kafka         = require('kafka-node');
const Producer = kafka.Producer;
const Client = kafka.Client;

let client = new Client('localhost:2181');
let producer = new Producer(client);

let kafkaTopic = 'ADD_USER';
let kafkaMessage = '{"type":"USERS_LIST","users":[{"name":"Alejandro","id":1}]}';

producer.on('ready', function () {

let payload = [{
topic: kafkaTopic,
partition: 0,
messages: [JSON.stringify(kafkaMessage)],
attributes: 0
}];

producer.send(payload, function(error,result) {
if (error) {
console.error(error);
} else {
console.log('result: ', result)
}
});
});

producer.on('error', function (err) {
console.error(err);
});

Im Listing sieht man bespielhaft, was nötig ist, um einen Producer für Kafka-Nachrichten zu schreiben. Zunächst initalisiert man einen Client für den verwendeten Kafka-Server und den Producer. Sobald er bereit ist (ready-Event), können Nutzer Kafka-Nachrichten senden. Neben Topic und Message wird die Partition angegeben und dass die Nachricht nicht gzip-komprimiert (attribute: 0) abgelegt wird. Kafka-node kennt HighLevelProducer und "normale" Producer. Bei ersteren muss man sich nicht um das Management der Partitionen eines Topics kümmern, bei Letzteren hingegen schon. Fehler, die beim Senden auftreten, gibt die Konsole aus, ansonsten das Ergebnis beim erfolgreichen Ablegen der Nachricht.

result:  { ADD_USER: { '0': 76 } }

Die Zeile liefert statistische Informationen darüber, wie viele Nachrichten zum Topic (ADD_MESSAGE) auf den einzelnen Partitionen liegen. Da der Einfachheit halber mit einer Partition gearbeitet wurde, liegen alle bisherigen 76 Nachrichten dort. Sollte der Producer auf technische Fehler stoßen (error-Event), kann man darauf reagieren.

const kafka                 = require('kafka-node');
const HighLevelProducer = kafka.HighLevelProducer;
const Client = kafka.Client;

let client = new Client('localhost:2181');
let producer = new HighLevelProducer(client);
let topics = ['ADD_USER', 'ADD_MESSAGE'];

producer.on('ready', function (errProducer, dataProducer) {
producer.createTopics(topics, true, function (err, data) {
if (err) {
console.error('Error create Topics', err);
}
});
})

Bevor Nutzer jedoch das erste Mal Nachrichten zu einem neuen Topic erzeugen, sollte man sie Kafka mit createTopics bekannt machen. Das sollte beim Starten der Middleware, oder bevor man die Consumer initalisiert, passieren.

const kafka                 = require('kafka-node');
const HighLevelConsumer = kafka.HighLevelConsumer;
const Client = kafka.Client;

let client = new Client('localhost:2181');
let consumerTopics = [{ topic: 'ADD_USER', partition: 0 },
{ topic: 'ADD_MESSAGE', partition: 0 }];
let consumer = new HighLevelConsumer(client, consumerTopics,
{ autoCommit: true });

consumer.on('message', function (data) {
console.log(JSON.parse(data.value));
// später: ws.emit('broadcast', JSON.parse(data.value));
});

consumer.on('error', function (err) {
console.log('error', err);
});

process.on('SIGINT', function () {
consumer.close(true, function () {
process.exit();
});
});

Das Listing zeigt zum Schluss die nötigen Schritte, um einen Consumer zu erzeugen. Kafka-node unterscheidet erneut HighLevelConsumer und "normale" Consumer. Bei Verwendung von ersterem hat man keine Kontrolle über die Offsets beim Lesen der Nachrichten und es werden einfach nur fortlaufende neue Nachrichten konsumiert. Braucht man Features wie Replay oder besondere Leserichtungen, bleibt nur der zweite Weg über den Consumer. Beim Definieren der gelesenen Topics können Entwickler wiederum die Partitionen mit angeben, was im Beispiel aber nur pro forma ist, da es ja nur eine gibt. Um auf neue Nachrichten reagieren zu können, hängt man sich einfach an das message-Event. Als Übergabeparameter bekommen Nutzer die komplette Kafka-Nachricht an die Hand.

{ topic: 'ADD_USER',
value:
'{"type":"USERS_LIST","users":[{"name":"Alejandro","id":1}]}',
offset: 77,
partition: 0,
highWaterOffset: 79,
key: -1 }

Der eigentliche Nachrichteninhalt befindet sich im Attribut value, das im Beispiel vorerst auf der Konsole geloggt wird. Ebenso wie beim Producer können Anwender mit dem error-Event auf technische Fehler reagieren.

Falls der Serverprozess stoppt, sollten laufende Consumer ebenfalls geschlossen werden. Das gehört zum guten Ton, da Kafka minimale Informationen zu aktiven Consumern behält und andernfalls nicht sofort mitbekommen würde, dass sie geschlossen sind und deshalb unnötige Informationen erhalten bleiben. Bei einer großen Anzahl von Consumern wäre das ein Problem