C++20: Thread-Synchronisation mit Coroutinen
HĂ€ufig mĂŒssen Threads synchronisiert werden. Das typische Szenario ist, dass ein Thread ein Arbeitspaket vorbereitet, auf das ein anderer Thread wartet.
HĂ€ufig mĂŒssen Threads synchronisiert werden. Das typische Szenario ist, dass ein Thread ein Arbeitspaket vorbereitet, auf das ein anderer Thread wartet.
Ich nehme an, die meisten Anwender verwenden Bedingungsvariablen fĂŒr einen Sender/EmpfĂ€nger- oder Producer/Consumer-Arbeitsablauf. Bedingungsvariablen besitzen viele implizite Gefahren wie Spurious Wakeups oder Lost Wakeups. Bevor ich daher die Thread-Synchronisation mit Coroutinen umsetzen will, möchte ich einen vorherigen Artikel, der die inhĂ€renten Herausforderungen von Bedingungsvariablen beschreibt, kurz ins GedĂ€chtnis zurĂŒckrufen.
Die Herausforderungen von Bedingungsvariablen
Zuerst einmal der richtige Einsatz von Bedingungsvariablen:
// conditionVariables.cpp
#include <condition_variable>
#include <iostream>
#include <thread>
std::mutex mutex_;
std::condition_variable condVar;
bool dataReady{false};
void waitingForWork(){
std::cout << "Waiting " << std::endl;
std::unique_lock<std::mutex> lck(mutex_);
condVar.wait(lck, []{ return dataReady; }); // (4)
std::cout << "Running " << std::endl;
}
void setDataReady(){
{
std::lock_guard<std::mutex> lck(mutex_);
dataReady = true;
}
std::cout << "Data prepared" << std::endl;
condVar.notify_one(); // (3)
}
int main(){
std::cout << std::endl;
std::thread t1(waitingForWork); // (1)
std::thread t2(setDataReady); // (2)
t1.join();
t2.join();
std::cout << std::endl;
}
Wie funktioniert die Synchronisation? Das Programm besitzt zwei Kinder-Threads: t1 und t2. Diese erhalten ihr Arbeitspaket waitingForWork und setDataReady in den Zeilen (1) und (2). setDataReady sendet seine Nachricht, dass er mit der Vorbereitung der Arbeit fertig ist, mithilfe der Bedingungsvariable condVar: condVar.notify_one() (Zeile 3). WÀhrend der Thread t1 den Lock besitzt, wartet er auf seine Benachrichtigung: condVar.wait(lck,[] return dataReady;) (Zeile 4). Sowohl der Sender als auch der EmpfÀnger der Nachricht benötigen einen Lock. Im Falle des Senders ist ein einfacher std::lock_guard ausreichend, da er einen Mutex nur ein einziges Mal "lockt" und wieder freigibt. Der EmpfÀnger benötigt hingegen ein std::unique_lock, da er gegebenenfalls einen Mutex mehrmals "locken" und wieder freigeben muss.
Das Programm besitzt die erwartete Ausgabe:
(Bild:Â https://www.educative.io/)
Vermutlich wunderst du dich: Warum benötigt der wait-Aufruf ein PrĂ€dikat, denn es sich auch ohne dieses verwenden lĂ€sst? Dieser Ablauf wirkt viel zu kompliziert fĂŒr eine solch einfache Aufgabe wie die Synchronisation von Threads.
Jetzt komme ich auf das fehlende GedĂ€chtnis von Bedingungsvariablen und die zwei PhĂ€nomene Lost Wakeup und Spurious Wakeup zurĂŒck.
- Lost Wakeup: Der Sender verschickt seine Benachrichtigung, bevor der EmpfÀnger im Wartezustand ist. Als Konsequenz geht die Benachrichtigung verloren.
- Spurious Wakeup: Der EmpfÀnger der Nachricht wacht auf, obwohl der Sender keine Benachrichtigung geschickt hat.
Als Schutz gegen diese beiden PhĂ€nomene benötigt der EmpfĂ€nger ein zusĂ€tzliches PrĂ€dikat als GedĂ€chtnis, das er prĂŒft. Damit beginnt die KomplexitĂ€t. Falls du kein PrĂ€dikat in diesem Fall einsetzt, besteht eine 50/50-Wahrscheinlichkeit fĂŒr ein Lost Wakeup. Dieses verursacht vereinfachend gesagt ein Deadlock, da ein Thread auf ein Ereignis wartet, das nicht eintritt.
Das ist nicht die letzte Falle, die bei der Verwendung von Bedingungsvariablen lauert. Die Details dazu gibt es in dem frĂŒheren Artikel: "C++ Core Guidelines: Sei dir der Fallen von Bedingungsvariablen bewusst [1]".
Dank Coroutinen ist die Synchronisation von Threads deutlich einfacher. DarĂŒber hinaus lauern keine Fallen wie Spurious Wakeups oder Lost Wakeups.
Thread-Synchronisation mit co_await
// senderReceiver.cpp
#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>
class Event {
public:
Event() = default;
Event(const Event&) = delete;
Event(Event&&) = delete;
Event& operator=(const Event&) = delete;
Event& operator=(Event&&) = delete;
class Awaiter;
Awaiter operator co_await() const noexcept;
void notify() noexcept;
private:
friend class Awaiter;
mutable std::atomic<void*> suspendedWaiter{nullptr};
mutable std::atomic<bool> notified{false};
};
class Event::Awaiter {
public:
Awaiter(const Event& eve): event(eve) {}
bool await_ready() const;
bool await_suspend(std::coroutine_handle<> corHandle) noexcept;
void await_resume() noexcept {}
private:
friend class Event;
const Event& event;
std::coroutine_handle<> coroutineHandle;
};
bool Event::Awaiter::await_ready() const { // (7)
// allow at most one waiter
if (event.suspendedWaiter.load() != nullptr){
throw std::runtime_error("More than one waiter is not valid");
}
// event.notified == false; suspends the coroutine
// event.notified == true; the coroutine is executed such as a usual function
return event.notified;
}
// (8)
bool Event::Awaiter::await_suspend(std::coroutine_handle<> corHandle) noexcept {
coroutineHandle = corHandle;
if (event.notified) return false;
// store the waiter for later notification
event.suspendedWaiter.store(this);
return true;
}
void Event::notify() noexcept { // (6)
notified = true;
// try to load the waiter
auto* waiter = static_cast<Awaiter*>(suspendedWaiter.load());
// check if a waiter is available
if (waiter != nullptr) {
// resume the coroutine => await_resume
waiter->coroutineHandle.resume();
}
}
Event::Awaiter Event::operator co_await() const noexcept {
return Awaiter{ *this };
}
struct Task {
struct promise_type {
Task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
Task receiver(Event& event) { // (3)
auto start = std::chrono::high_resolution_clock::now();
co_await event;
std::cout << "Got the notification! " << std::endl;
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
}
using namespace std::chrono_literals;
int main(){
std::cout << std::endl;
std::cout << "Notification before waiting" << std::endl;
Event event1{};
auto senderThread1 = std::thread([&event1]{ event1.notify(); });// (1)
auto receiverThread1 = std::thread(receiver, std::ref(event1)); // (4)
receiverThread1.join();
senderThread1.join();
std::cout << std::endl;
std::cout << "Notification after 2 seconds waiting" << std::endl;
Event event2{};
auto receiverThread2 = std::thread(receiver, std::ref(event2));// (5)
auto senderThread2 = std::thread([&event2]{
std::this_thread::sleep_for(2s);
event2.notify(); // (2)
});
receiverThread2.join();
senderThread2.join();
std::cout << std::endl;
}
Thread-Synchronisation mit Coroutinen geht aus der Anwendersicht leicht von der Hand. Die Threads senderThread1 (Zeile 1) und senderThread2 (Zeile 2) verwenden Objekte vom Datentyp Event, um ihre Benachrichtigungen zu verschicken. Die Funktion receiver in Zeile (3) ist die Coroutine, die in dem Thread receiverThread1 (Zeile 4) und receiverThread2 (Zeile 5) zum Einsatz kommt. Ich habe die Zeit zwischen dem Beginn und dem Ende der Coroutine gemessen und ausgegeben. Die Zahl zeigt schön, wie lange die Coroutine warten muss. Der folgende Screenshot stellt die Ausgabe des Programms mit dem Online-Compiler Wandbox [2] vor. Der Compiler Explorer [3] erlaubt noch nicht das Erzeugen von Threads, sein Entwickler ist aber gerade dran.
Die AusfĂŒhrung der zweiten Coroutine benötigt im Gegensatz zur ersten zwei Sekunden. Der Grund ist, dass event1 seine Benachrichtigung verschickt (Zeile 1), bevor die Coroutine wartet. Hingegen verschickt event2 seine Benachrichtigungen, nachdem die zweite Coroutine bereits zwei Sekunden gewartet hat.
Nun werde ich den Arbeitsablauf des Coroutinen-Frameworks aus der Sicht der Implementierung vereinfacht darstellen.
Der vereinfachte Arbeitsablauf
Wenn du die Klasse Generator im letzten Artikel "C++20: Ein unendlicher Datenstrom mit Coroutinen [4]" mit der Klasse Event in diesem Beispiel vergleichst, fĂ€llt dir eventuell ein feiner Unterschied auf. Im ersten Fall ist der Generator der "Awaitable" und der "Awaiter"; im zweiten Fall verwendet Event den Operator co_await, um den "Awaiter" zurĂŒckzugeben. Diese Trennung der ZustĂ€ndigkeiten verbessert die Struktur des Codes.
In meiner ErklĂ€rung der beiden AblĂ€ufe nehme ich an, dass im ersten Fall die Benachrichtigung stattfindet, bevor die Coroutine auf diese wartet. FĂŒr den zweiten Fall nehme ich das Gegenteil an.
Zuerst beschĂ€ftige ich mich mit event1 und dem ersten Arbeitsablauf. event1 schickt seine Benachrichtigung, bevor der receiverThread1 startet. Der Aufruf event1 (line 1) stöĂt die Methode notify (Zeile 6) an. Zuerst wird das Benachrichtigungs-Flag flag gesetzt und dann lĂ€dt der Aufruf auto* waiter = static_cast<awaiter*>(suspendedWaiter.load()); den möglichen Warter. In diesem Fall ist dieser ein nullptr, da er davor noch nicht gesetzt wurde. Das bedeutet, dass der folgende resume-Aufruf nicht ausgefĂŒhrt wird. Die darauffolgend ausgefĂŒhrte Funktion await_ready (Zeile 7) prĂŒft, ob es mehr als einen Warter gibt. Der Einfachheit halber löse ich eine std::runtime-Ausnahme aus. Der entscheidende Punkt der Methode ist ihr RĂŒckgabewert. event.notification wurde in der notify-Methode bereits auf true gesetzt. true als RĂŒckgabewert bedeutet fĂŒr den Arbeitsablauf, dass die Coroutine nicht angehalten und entsprechend einer gewöhnlichen Funktion ausgefĂŒhrt wird.
Im zweiten Arbeitsablauf findet co_await event2 statt, bevor event2 seine Benachrichtigung verschickt. co_wait event2 stöĂt den Aurfruf await_ready in Zeile (7) an. Der groĂe Unterschied ist es, dass in diesem Fall event.notified false ist. Der false-Wert verursacht die Suspension der Coroutine. Damit wird die Methode await_suspend (Zeile 8) ausgefĂŒhrt. await_suspend erhĂ€lt den Verweis auf die Coroutine dank corHandle und speichert sie fĂŒr ihre spĂ€tere AusfĂŒhrung in der Variable coroutineHandle. SpĂ€tere AusfĂŒhrung heiĂt natĂŒrlich die Wiederaufnahme der AusfĂŒhrung. DarĂŒber hinaus wird der Warter in der Variable suspendedWaiter gespeichert. Wenn somit spĂ€ter event2.notify seine Benachrichtigung schickt, wird die Methode notify (Zeile 6) ausgefĂŒhrt. Der Unterschied zum ersten Arbeitsablauf ist es, dass die Bedingung waiter != nullptr true ergibt. Die Konsequenz ist, dass der Warter coroutineHandle verwendet, um die AusfĂŒhrung der Coroutine fortzusetzen.
Wie geht's weiter?
Wenn ich einen Schluss auf diesem und meinem letzten Artikel "C++20: Ein unendlicher Datenstrom mit Coroutinen [5]" ziehen möchte, dann diesen: Implementiere nicht deine eigenen Coroutinen. Verwende existierende Coroutinen wie die, die Lewis Barker in der Bibliothek cppcoro [6] zur VerfĂŒgung stellt. Genau diesen Ratschlag werde ich in meinem nĂ€chsten Artikel befolgen.
Vier Gutscheine fĂŒr Educative zu gewinnen
Auf meinem englischen Blog gibt es vier Gutscheine fĂŒr Educative zu gewinnen: https://bit.ly/VoucherEducative. [7] Die Gutscheine erlauben es dir, ein Viertel Jahr auf alle Kurse von educative.io [8] zuzugreifen. ( [9])
URL dieses Artikels:
https://www.heise.de/-4701396
Links in diesem Artikel:
[1] https://heise.de/-4063822
[2] https://wandbox.org/permlink/qClEIVff0OXp51Av
[3] https://godbolt.org/
[4] https://heise.de/-4697142
[5] https://heise.de/-4697142
[6] https://github.com/lewissbaker/cppcoro
[7] https://bit.ly/VoucherEducative
[8] https://www.educative.io/
[9] mailto:rainer@grimm-jaud.de
Copyright © 2020 Heise Medien