C++20: Thread-Synchronisation mit Coroutinen

Häufig müssen Threads synchronisiert werden. Das typische Szenario ist, dass ein Thread ein Arbeitspaket vorbereitet, auf das ein anderer Thread wartet.

In Pocket speichern vorlesen Druckansicht 6 Kommentare lesen
Lesezeit: 10 Min.
Von
  • Rainer Grimm
Inhaltsverzeichnis

Häufig müssen Threads synchronisiert werden. Das typische Szenario ist, dass ein Thread ein Arbeitspaket vorbereitet, auf das ein anderer Thread wartet.

Ich nehme an, die meisten Anwender verwenden Bedingungsvariablen für einen Sender/Empfänger- oder Producer/Consumer-Arbeitsablauf. Bedingungsvariablen besitzen viele implizite Gefahren wie Spurious Wakeups oder Lost Wakeups. Bevor ich daher die Thread-Synchronisation mit Coroutinen umsetzen will, möchte ich einen vorherigen Artikel, der die inhärenten Herausforderungen von Bedingungsvariablen beschreibt, kurz ins Gedächtnis zurückrufen.

Zuerst einmal der richtige Einsatz von Bedingungsvariablen:

// conditionVariables.cpp

#include <condition_variable>
#include <iostream>
#include <thread>

std::mutex mutex_;
std::condition_variable condVar;

bool dataReady{false};

void waitingForWork(){
std::cout << "Waiting " << std::endl;
std::unique_lock<std::mutex> lck(mutex_);
condVar.wait(lck, []{ return dataReady; }); // (4)
std::cout << "Running " << std::endl;
}

void setDataReady(){
{
std::lock_guard<std::mutex> lck(mutex_);
dataReady = true;
}
std::cout << "Data prepared" << std::endl;
condVar.notify_one(); // (3)
}

int main(){

std::cout << std::endl;

std::thread t1(waitingForWork); // (1)
std::thread t2(setDataReady); // (2)

t1.join();
t2.join();

std::cout << std::endl;

}

Wie funktioniert die Synchronisation? Das Programm besitzt zwei Kinder-Threads: t1 und t2. Diese erhalten ihr Arbeitspaket waitingForWork und setDataReady in den Zeilen (1) und (2). setDataReady sendet seine Nachricht, dass er mit der Vorbereitung der Arbeit fertig ist, mithilfe der Bedingungsvariable condVar: condVar.notify_one() (Zeile 3). Während der Thread t1 den Lock besitzt, wartet er auf seine Benachrichtigung: condVar.wait(lck,[] return dataReady;) (Zeile 4). Sowohl der Sender als auch der Empfänger der Nachricht benötigen einen Lock. Im Falle des Senders ist ein einfacher std::lock_guard ausreichend, da er einen Mutex nur ein einziges Mal "lockt" und wieder freigibt. Der Empfänger benötigt hingegen ein std::unique_lock, da er gegebenenfalls einen Mutex mehrmals "locken" und wieder freigeben muss.

Das Programm besitzt die erwartete Ausgabe:

(Bild: https://www.educative.io/)

Vermutlich wunderst du dich: Warum benötigt der wait-Aufruf ein Prädikat, denn es sich auch ohne dieses verwenden lässt? Dieser Ablauf wirkt viel zu kompliziert für eine solch einfache Aufgabe wie die Synchronisation von Threads.

Jetzt komme ich auf das fehlende Gedächtnis von Bedingungsvariablen und die zwei Phänomene Lost Wakeup und Spurious Wakeup zurück.

  • Lost Wakeup: Der Sender verschickt seine Benachrichtigung, bevor der Empfänger im Wartezustand ist. Als Konsequenz geht die Benachrichtigung verloren.
  • Spurious Wakeup: Der Empfänger der Nachricht wacht auf, obwohl der Sender keine Benachrichtigung geschickt hat.

Als Schutz gegen diese beiden Phänomene benötigt der Empfänger ein zusätzliches Prädikat als Gedächtnis, das er prüft. Damit beginnt die Komplexität. Falls du kein Prädikat in diesem Fall einsetzt, besteht eine 50/50-Wahrscheinlichkeit für ein Lost Wakeup. Dieses verursacht vereinfachend gesagt ein Deadlock, da ein Thread auf ein Ereignis wartet, das nicht eintritt.

Das ist nicht die letzte Falle, die bei der Verwendung von Bedingungsvariablen lauert. Die Details dazu gibt es in dem früheren Artikel: "C++ Core Guidelines: Sei dir der Fallen von Bedingungsvariablen bewusst".

Dank Coroutinen ist die Synchronisation von Threads deutlich einfacher. Darüber hinaus lauern keine Fallen wie Spurious Wakeups oder Lost Wakeups.

// senderReceiver.cpp

#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>

class Event {
public:

Event() = default;

Event(const Event&) = delete;
Event(Event&&) = delete;
Event& operator=(const Event&) = delete;
Event& operator=(Event&&) = delete;

class Awaiter;
Awaiter operator co_await() const noexcept;

void notify() noexcept;

private:

friend class Awaiter;

mutable std::atomic<void*> suspendedWaiter{nullptr};
mutable std::atomic<bool> notified{false};

};

class Event::Awaiter {
public:
Awaiter(const Event& eve): event(eve) {}

bool await_ready() const;
bool await_suspend(std::coroutine_handle<> corHandle) noexcept;
void await_resume() noexcept {}

private:
friend class Event;

const Event& event;
std::coroutine_handle<> coroutineHandle;
};

bool Event::Awaiter::await_ready() const { // (7)

// allow at most one waiter
if (event.suspendedWaiter.load() != nullptr){
throw std::runtime_error("More than one waiter is not valid");
}

// event.notified == false; suspends the coroutine
// event.notified == true; the coroutine is executed such as a usual function
return event.notified;
}
// (8)
bool Event::Awaiter::await_suspend(std::coroutine_handle<> corHandle) noexcept {

coroutineHandle = corHandle;

if (event.notified) return false;

// store the waiter for later notification
event.suspendedWaiter.store(this);

return true;
}

void Event::notify() noexcept { // (6)
notified = true;

// try to load the waiter
auto* waiter = static_cast<Awaiter*>(suspendedWaiter.load());

// check if a waiter is available
if (waiter != nullptr) {
// resume the coroutine => await_resume
waiter->coroutineHandle.resume();
}
}

Event::Awaiter Event::operator co_await() const noexcept {
return Awaiter{ *this };
}

struct Task {
struct promise_type {
Task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};

Task receiver(Event& event) { // (3)
auto start = std::chrono::high_resolution_clock::now();
co_await event;
std::cout << "Got the notification! " << std::endl;
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
}

using namespace std::chrono_literals;

int main(){

std::cout << std::endl;

std::cout << "Notification before waiting" << std::endl;
Event event1{};
auto senderThread1 = std::thread([&event1]{ event1.notify(); });// (1)
auto receiverThread1 = std::thread(receiver, std::ref(event1)); // (4)

receiverThread1.join();
senderThread1.join();

std::cout << std::endl;

std::cout << "Notification after 2 seconds waiting" << std::endl;
Event event2{};
auto receiverThread2 = std::thread(receiver, std::ref(event2));// (5)
auto senderThread2 = std::thread([&event2]{
std::this_thread::sleep_for(2s);
event2.notify(); // (2)
});

receiverThread2.join();
senderThread2.join();

std::cout << std::endl;

}

Thread-Synchronisation mit Coroutinen geht aus der Anwendersicht leicht von der Hand. Die Threads senderThread1 (Zeile 1) und senderThread2 (Zeile 2) verwenden Objekte vom Datentyp Event, um ihre Benachrichtigungen zu verschicken. Die Funktion receiver in Zeile (3) ist die Coroutine, die in dem Thread receiverThread1 (Zeile 4) und receiverThread2 (Zeile 5) zum Einsatz kommt. Ich habe die Zeit zwischen dem Beginn und dem Ende der Coroutine gemessen und ausgegeben. Die Zahl zeigt schön, wie lange die Coroutine warten muss. Der folgende Screenshot stellt die Ausgabe des Programms mit dem Online-Compiler Wandbox vor. Der Compiler Explorer erlaubt noch nicht das Erzeugen von Threads, sein Entwickler ist aber gerade dran.

Die Ausführung der zweiten Coroutine benötigt im Gegensatz zur ersten zwei Sekunden. Der Grund ist, dass event1 seine Benachrichtigung verschickt (Zeile 1), bevor die Coroutine wartet. Hingegen verschickt event2 seine Benachrichtigungen, nachdem die zweite Coroutine bereits zwei Sekunden gewartet hat.

Nun werde ich den Arbeitsablauf des Coroutinen-Frameworks aus der Sicht der Implementierung vereinfacht darstellen.

Wenn du die Klasse Generator im letzten Artikel "C++20: Ein unendlicher Datenstrom mit Coroutinen" mit der Klasse Event in diesem Beispiel vergleichst, fällt dir eventuell ein feiner Unterschied auf. Im ersten Fall ist der Generator der "Awaitable" und der "Awaiter"; im zweiten Fall verwendet Event den Operator co_await, um den "Awaiter" zurückzugeben. Diese Trennung der Zuständigkeiten verbessert die Struktur des Codes.

In meiner Erklärung der beiden Abläufe nehme ich an, dass im ersten Fall die Benachrichtigung stattfindet, bevor die Coroutine auf diese wartet. Für den zweiten Fall nehme ich das Gegenteil an.

Zuerst beschäftige ich mich mit event1 und dem ersten Arbeitsablauf. event1 schickt seine Benachrichtigung, bevor der receiverThread1 startet. Der Aufruf event1 (line 1) stößt die Methode notify (Zeile 6) an. Zuerst wird das Benachrichtigungs-Flag flag gesetzt und dann lädt der Aufruf auto* waiter = static_cast<awaiter*>(suspendedWaiter.load()); den möglichen Warter. In diesem Fall ist dieser ein nullptr, da er davor noch nicht gesetzt wurde. Das bedeutet, dass der folgende resume-Aufruf nicht ausgeführt wird. Die darauffolgend ausgeführte Funktion await_ready (Zeile 7) prüft, ob es mehr als einen Warter gibt. Der Einfachheit halber löse ich eine std::runtime-Ausnahme aus. Der entscheidende Punkt der Methode ist ihr Rückgabewert. event.notification wurde in der notify-Methode bereits auf true gesetzt. true als Rückgabewert bedeutet für den Arbeitsablauf, dass die Coroutine nicht angehalten und entsprechend einer gewöhnlichen Funktion ausgeführt wird.

Im zweiten Arbeitsablauf findet co_await event2 statt, bevor event2 seine Benachrichtigung verschickt. co_wait event2 stößt den Aurfruf await_ready in Zeile (7) an. Der große Unterschied ist es, dass in diesem Fall event.notified false ist. Der false-Wert verursacht die Suspension der Coroutine. Damit wird die Methode await_suspend (Zeile 8) ausgeführt. await_suspend erhält den Verweis auf die Coroutine dank corHandle und speichert sie für ihre spätere Ausführung in der Variable coroutineHandle. Spätere Ausführung heißt natürlich die Wiederaufnahme der Ausführung. Darüber hinaus wird der Warter in der Variable suspendedWaiter gespeichert. Wenn somit später event2.notify seine Benachrichtigung schickt, wird die Methode notify (Zeile 6) ausgeführt. Der Unterschied zum ersten Arbeitsablauf ist es, dass die Bedingung waiter != nullptr true ergibt. Die Konsequenz ist, dass der Warter coroutineHandle verwendet, um die Ausführung der Coroutine fortzusetzen.

Wenn ich einen Schluss auf diesem und meinem letzten Artikel "C++20: Ein unendlicher Datenstrom mit Coroutinen" ziehen möchte, dann diesen: Implementiere nicht deine eigenen Coroutinen. Verwende existierende Coroutinen wie die, die Lewis Barker in der Bibliothek cppcoro zur Verfügung stellt. Genau diesen Ratschlag werde ich in meinem nächsten Artikel befolgen.

Auf meinem englischen Blog gibt es vier Gutscheine für Educative zu gewinnen: https://bit.ly/VoucherEducative. Die Gutscheine erlauben es dir, ein Viertel Jahr auf alle Kurse von educative.io zuzugreifen. ()