Asynchrone Auftragsverarbeitung in Webdiensten

Eine Message-Queue lässt sich dazu verwenden, Aufgaben an verschiedene Instanzen eines Dienstes zu verteilen. Doch was, wenn man den Aufwand für eine Message-Queue scheut? Wie wäre es, einen solchen Dispatcher stattdessen auf Basis einer Datenbank zu implementieren?

In Pocket speichern vorlesen Druckansicht 15 Kommentare lesen
Lesezeit: 9 Min.
Von
  • Golo Roden
Inhaltsverzeichnis

Eine Message-Queue lässt sich dazu verwenden, Aufgaben an verschiedene Instanzen eines Dienstes zu verteilen. Doch was, wenn man den Aufwand für eine Message-Queue scheut? Wie wäre es, einen solchen Dispatcher stattdessen auf Basis einer Datenbank zu implementieren?

Viele Webdienste bedienen Abfragen: Man stellt eine Anfrage und erhält das gewünschte Ergebnis als Antwort. Das Vorgehen passt hervorragend zu dem Request-Reponse-Modell des HTTP-Protokolls. Auffällig ist, dass die Anfragen synchron verarbeitet werden. Bis der Webdienst antwortet, muss der Client auf das Ergebnis warten.

Das gilt allerdings nicht für alle Webdienste. Außer Abfragen gibt es nämlich auch Aufträge, die zu verarbeiten sind, wofür unter Umständen eine längere Zeit benötigt wird. Am Ende eines Auftrags steht zwar auch ein abfragbares Ergebnis, doch ist die eigentliche Aufgabe des Webdienstes das Verarbeiten des Auftrags. Aufgrund der benötigten Zeit arbeiten solche Webdienste in der Regel asynchron.

Beispiele für derartige Webdienste gibt es viele. Dazu zählen das Versenden von E-Mails, das Konvertieren von Videos oder die Analyse von Dokumenten. Der Client will hierauf nicht warten, sondern den Auftrag möglichst schnell aufgeben und dann später ermitteln können oder benachrichtigt werden, wie der Stand der Dinge ist.

Im Prinzip ist auch ein solcher Webdienst leicht zu implementieren: Der Client schickt eine Anfrage mit dem Auftrag, der Webdienst legt ihn in einen Auftragspool ab und bestätigt den Empfang, eventuell mit einer zugeteilten Auftrags-ID. Der Client weiß nun lediglich, dass der Auftrag angenommen wurde. Um den aktuellen Status zu ermitteln, muss er entweder regelmäßig beim Webdienst nachfragen, oder sich von diesem benachrichtigen lassen.

Schwierig wird es, wenn das Verarbeiten von Aufträgen so lange dauert, dass der Client schneller Aufträge schickt, als der Webdienst sie verarbeiten kann. Dann wäre es hilfreich, die Aufträge auf mehrere Instanzen des Dienstes verteilen zu können. Dazu lässt sich eine Message-Queue wie RabbitMQ verwenden.

Verwendet man Node.js, lässt sich das am einfachsten mit dem Modul hase implementieren. Die Installation erfolgt regulär mithilfe von npm:

$ npm install hase

Anschließend gilt es, hase in die eigene Anwendung zu integrieren und eine Verbindung zu RabbitMQ aufzubauen:

const hase = require('hase');

hase.connect('amqp://...', (err, mq) => {
// ...
});

Will man auf Fehler beziehungsweise Verbindungsabbrüche reagieren, empfiehlt es sich, auf das error- beziehungsweise disconnect-Ereignis des mq-Objekts zu reagieren:

mq.on('error', err => {
// ...
});

mq.on('disconnect', () => {
// ...
});

Schließlich gilt es, einen sogenannten Worker zu erzeugen, der als Verteiler für die Aufträge fungiert. Dazu dient die Funktion worker des mq-Objekts, der als Parameter der Name des Verteilers zu übergeben ist:

const worker = mq.worker('test');

Der Worker stellt anschließend die beiden Funktionen createReadStream und createWriteStream zur Verfügung, die jeweils einen Stream erzeugen, mit dem sich Aufträge empfangen beziehungsweise aufgeben lassen. Ein Auftrag ist dabei nichts anderes als ein reguläres JSON-Objekt, das in den schreibbaren Stream geschrieben wird:

worker.createWriteStream((err, stream) => {
const task = {
// ...
};

stream.write(task);
});

Auf der Leseseite lässt sich der Auftrag empfangen, indem man sich auf das data-Ereignis des lesbaren Streams registriert. Wichtig ist, nach der Auftragsverarbeitung die Funktion next aufzurufen, um zu signalisieren, dass die Instant für die Verarbeitung des nächsten Auftrags zur Verfügung steht:

worker.createReadStream((err, stream) => {
stream.on('data', task => {
// ...
task.next();
});
});

Ist das Verarbeiten fehlgeschlagen, kann man den Auftrag durch den Aufruf von

task.discard();

verwerfen, oder ihn durch Aufruf von

task.defer();

für eine spätere erneute Verarbeitung kennzeichnen. Stürzt der den Auftrag verarbeitende Prozess ab, bevor eine der genannten Funktionen aufgerufen wurde, bemerkt RabbitMQ das und stellt den Auftrag im folgenden erneut zu.

Das bisher gezeigte Vorgehen ist einfach und funktioniert, wirft aber trotzdem einige Probleme auf. Am gravierendsten davon ist, dass RabbitMQ einen ziemlich hohen administrativen Aufwand verursacht, da es alles andere als leichtgewichtig ist.

Außerdem ist fraglich, wie gut RabbitMQ für den Produktivbetrieb geeignet ist, da es im Cluster unter Umständen Daten verliert – was ein Cluster eigentlich gerade verhindern soll!

Da die meisten Webdienste ohnehin eine Datenbank verwenden, stellt sich die Frage, ob sich das Konzept einer Auftrags-Queue nicht auch auf Basis der Datenbank implementieren lässt. Auf den ersten Blick scheint das sehr einfach zu sein. Das Annehmen eines Auftrags entspricht dem Einfügen eines Datensatzes in eine Tabelle. Zum Verarbeiten muss der Client lediglich einen Auftrag aus der Tabelle auslesen, der noch von keinem anderen Client verarbeitet wird, und ihn abschließend löschen.

Außerdem muss man sich lediglich darum kümmern, im Falle eines Absturzes den Eintrag wieder zurückzusetzen, was sich irgendwie mithilfe einer Transaktion lösen lassen sollte …

Unglücklicherweise stellt sich rasch heraus, dass das alles nicht ganz so einfach ist wie zunächst gedacht: Es beginnt bereits mit dem Problem, dass das automatische Zurücksetzen der Sperre nur dann funktioniert, wenn die Transaktion abbricht, dann die Sperre aber auch nur innerhalb der Transaktion sichtbar wäre.

Das ließe sich zwar durch ein geeignetes Isolationslevel wie Read Uncommitted umgehen, doch fühlt sich das Lesen von noch in einer Transaktion befindlichen Änderungen unsauber an: Im wahrsten Sinn des Wortes handelt es sich dabei um Dirty Reads.

Doch selbst wenn man dieses Problem für einen Moment ausklammert, zeigen sich noch an anderer Stelle unerwartete Probleme. Selbst das Ermitteln des nächsten Auftrags, der noch nicht von einer anderen Instanz verarbeitet wird, gestaltet sich schwierig. Dazu wäre nämlich SQL-Code wie der folgende erforderlich, wobei als Datenbank PostgreSQL angenommen wird:

UPDATE queue
SET isProcessed = 't'
WHERE id = (
SELECT id
FROM queue
WHERE NOT isProcessed
ORDER BY id
LIMIT 1
)
RETURNING id

Das Problem hierbei ist, dass zwei Instanzen gleichzeitig das innere SELECT ausführen könnten und beide die gleiche Auftrags-ID als Ergebnis erhielten. Das würde schließlich zur doppelten Verarbeitung des Auftrags führen.

Lösen lässt sich das Problem scheinbar, indem man dem inneren SELECT zusätzlich noch die Anweisung FOR UPDATE mitgibt, so dass der gefundene Datensatz gesperrt wird. In dem Fall würde das zweite SELECT aber auf die Freigabe der Sperre des ersten warten, womit die weitere Verarbeitung erst einmal blockiert wird.

Natürlich lässt sich die ganze auf einen Schlag lösen, indem man als Isolationslevel SERIALIZABLE verwendet, allerdings leidet dann die Skalierbarkeit, was der ursprünglichen Motivation zuwider läuft.

Glücklicherweise kennt PostgreSQL eine SQL-Erweiterung, die das Problem tatsächlich auf effiziente Weise löst: SKIP LOCKED. Der gleiche Ansatz funktioniert auch für Oracle und SQL Server, wobei bei SQL Server das Schlüsselwort READPAST zu verwenden ist.

Die Idee ist, dass man mit SKIP LOCKED angeben kann, dass die Datenbank bei einer Abfrage all jene Datensätze auslässt, die bereits durch eine andere Transaktion gesperrt wurden. Damit lässt sich das Aktualisieren des Zustands tatsächlich in eine Transaktion verpacken, indem man den Datensatz einfach löscht.

Kann der Auftrag erfolgreich verarbeitet werden, bestätigt man das Löschen durch das Abschließen der Transaktion. Schlägt die Verarbeitung fehl, rollt man die Transaktion zurück. Stürzt die Instanz zwischendurch ab, wird die Transaktion automatisch zurückgerollt. Damit sind alle Probleme gelöst:

DELETE FROM queue
WHERE id = (
SELECT id
FROM queue
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *;

Obwohl diese Lösung weitaus besser funktioniert als die vorherigen Versuche, ist auch sie nicht perfekt: Das SKIP LOCKED bewirkt, dass Datensätze übersprungen werden müssen. Insbesondere bei einer hohen Anzahl an Workern kann das aufwändig sein. Abgesehen davon ist die Lösung aber ausgesprochen elegant und lässt sich leicht implementieren.

Zusammenfassend lässt sich festhalten, dass es durchaus möglich ist, einen Dispatcher für Aufträge auf Basis einer Datenbank zu entwickeln, sofern sie über gewisse Sprachmerkmale verfügt, die über den SQL-Standard hinausgehen. Ansonsten ist man wahrscheinlich besser beraten, in den sauren Apfel zu beißen und eine dedizierte Queue zu verwenden.

Das gilt insbesondere dann, wenn noch andere Aufgaben durch die Message-Queue zu erfüllen sind. Schließlich können diese Produkt weit mehr als lediglich das Zustellen von Aufgaben an verschiedene Dienstinstanzen.

tl;dr: PostgreSQL, Oracle und SQL Server kennen das Sprachmerkmal SKIP LOCKED beziehungsweise READPAST, mit dem sich effiziente Queues entwickeln lassen. Das kann eine interessante Alternative zum Einsatz einer Message-Queue wie RabbitMQ sein. ()