C++20: Mächtige Coroutinen mit cppcoro

Nach dem ersten Einblick in die Coroutinen-Bibliothek cppcoro geht es nun um die Erweiterung von Tasks durch Threads – mit dem Ergebnis mächtiger Coroutinen.

In Pocket speichern vorlesen Druckansicht 42 Kommentare lesen
Lesezeit: 6 Min.
Von
  • Rainer Grimm
Inhaltsverzeichnis

Mein letzter Artikel "C++20: Coroutinen mit cppcoro" gab einen ersten Einblick in die Coroutinen-Bibliothek von Lewis Baker. Dieser Einblick stellt die elementaren Coroutinen Tasks und Generatoren vor. Heute erweitere ich Tasks um Threads. Das Ergebnis sind mächtige Coroutinen.

Kannst du dich an den Artikel "C++20: Thread-Synchronisation mit Coroutinen" erinnern? Falls nicht, ich habe in dem Artikel die Herausforderungen von Bedingungsvariablen vorgestellt. Bedingungsvariablen sind die klassische Strategie, um Thread-Synchronisation wie Sender/Empfänger- oder Producer/Consumer-Arbeitsabläufe umzusetzen. Sie besitzen aber eine große Design-Schwäche. Der Empfänger kann eine vermeintliche Benachrichtigung vom Sender erhalten (spuious wakeup) oder der Empfänger kann die Benachrichtigung vom Sender überhören (lost wakeup). In beiden Fällen lauert ein Deadlock. Mein anschließendes Beispiel zur Thread-Synchronisation mit Coroutinen hatte zwar nicht das inhärente Risiko der Bedingungsvariablen wie Spurious Wakeup und Lost Wakeup. Dafür besaß die Lösung eine neue Unzulänglichkeit: Sie war viel zu kompliziert.

Dank cppcoro lässt sich das Beste aus beiden Welten vereinen: ein einfacher Benachrichtigungsmechanismus, der nicht die Design-Schwächen der Bedingungsvariablen besitzt.

Laut der Dokumentation von cppcoro ist ein single_consumer_event "a simple manual-reset event type that supports only a single coroutine awaiting it at a time". Das ist genau das, was ich benötige:

// cppcororProducerConsumer.cpp

#include <cppcoro/single_consumer_event.hpp>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>

#include <future>
#include <iostream>
#include <string>
#include <thread>
#include <chrono>

cppcoro::single_consumer_event event;

cppcoro::task<> consumer() {

auto start = std::chrono::high_resolution_clock::now();

co_await event; // suspended until some thread calls event.set()

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
std::cout << "Consumer waited " << elapsed.count() << " seconds." << std::endl;

co_return;
}

void producer() {

using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);

event.set(); // resumes the consumer

}

int main() {

std::cout << std::endl;

auto con = std::async([]{ cppcoro::sync_wait(consumer()); }); // (1)
auto prod = std::async(producer); // (2)

con.get(), prod.prod();

std::cout << std::endl;

}

Das Beispiel sollte sich fast selbst erklären. Der Empfänger (Zeile 1) und der Sender (Zeile 2) laufen in ihrem eigenen Thread. Der Aufruf cppcoro::sync_wait(consumer()) (Zeile 1) dient als startender Top-Level Task, da die main-Funktion keine Coroutine sein kann. Der Aufruf wartet, bis die Coroutine consumer fertig ist. Die Coroutine consumer wartet in dem Ausdruck call co_await, bis event.set() aufgerufen wird. Nachdem die Funktion producer zwei Sekunden geschlafen hat, sendet sie das notwendige Event.

Dank der cppcoro-Bibliothek lassen sich Threads auch beenden.

Der Aufrufer und der Aufgerufene können mithilfe des cppcoro::cancellation_token kommunizieren. Die aufgerufene Funktion, die die Aufforderung an ihre Beendigung erhält, kann auf zwei Weisen interagieren:

  1. Sie frägt in regelmäßigen Zeitabständen, ob sie die Aufforderung für ihre Beendigung erhalten hat. Für diesen Ablauf bietet das cppcoro::cancellation_token zwei Methoden an: is_cancellation_requested() und throw_if_cancellation_requested().
  2. Sie registriert einen Callback, der bei einer Anfrage zur Beendigung ausgeführt wird.

Das folgende Programm stellt den ersten Anwendungsfall vor:

// cppcoroCancellation.cpp

#include <chrono>
#include <iostream>
#include <future>

#include <cppcoro/cancellation_token.hpp>
#include <cppcoro/cancellation_source.hpp>

using namespace std::chrono_literals;

int main() {

std::cout << std::endl;

cppcoro::cancellation_source canSource;
cppcoro::cancellation_token canToken = canSource.token(); // (1)

auto cancelSender = std::async([&canSource] {
std::this_thread::sleep_for(2s);
canSource.request_cancellation(); // (3)
std::cout << "canSource.request_cancellation() " << std::endl;
});

auto cancelReceiver = std::async([&canToken] {
while(true) {
std::cout << "Wait for cancellation request" << std::endl;
std::this_thread::sleep_for(200ms);
if (canToken.is_cancellation_requested()) return; // (2)
}
});

cancelSender.get(), cancelReceiver.get();

std::cout << std::endl;

}

Zeile (1) zeigt das cancellation_token, das durch die cancellation_source erzeugt wurde. Der Aufrufer cancelSender erhält die cancellation_source und der Aufgerufene cancelReceiver das cancellation_token. Der Aufgerufene fragt permanent nach der Aufforderung zu seiner Beendigung nach (Zeile 2). Diese sendet der Aufrufer mittels call.request_cancellation() (Zeile 3) nach zwei Sekunden.

Zwei interessante Beobachtungen möchte ich noch ergänzen:

  1. Die Beendigung ist kooperativ. Falls der Aufgerufene die Anfrage zu seiner Beendigung ignoriert, passiert nichts.
  2. Mit C++20 erhalten wir auch einen verbesserten std::thread: std::jthread. Er ruft automatisch join in seinem Destruktor auf und kann mit einem Interrupt-Token beendet werden. Mein Artikel "Ein neuer Thread mit C++20: std::jthread" bietet mehr Details zum std::jthread an.

cppcore besitzt auch Mutexe.

Ein Mutex wie cppcoro::async_mutex ist ein Synchronisationswerkzeug, um geteilte Daten vor dem gleichzeitigen Zugriff mehrerer Threads zu schützen:

// cppcoroMutex.cpp

#include <cppcoro/async_mutex.hpp>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>

#include <iostream>
#include <thread>
#include <vector>


cppcoro::async_mutex mutex;

int sum{}; // (2)

cppcoro::task<> addToSum(int num) {
cppcoro::async_mutex_lock lockSum = co_await mutex.scoped_lock_async(); // (3)
sum += num;

} // (4)

int main() {

std::cout << std::endl;

std::vector<std::thread> vec(10); // (1)

for(auto& thr: vec) {
thr = std::thread([]{ for(int n = 0; n < 10; ++n) cppcoro::sync_wait(addToSum(n)); } );
}

for(auto& thr: vec) thr.join();

std::cout << "sum: " << sum << std::endl;

std::cout << std::endl;

}

In der Zeile (1) werden zehn Threads erzeugt. Jeder Thread fügt die Zahlen 0 bis 9 zu der geteilten Summationsvariable sum (Zeile 2) hinzu. Die Funktion addToSum stellt die Coroutine dar. Die Coroutine wartet in dem Ausdruck co_await mutex.scoped_lock() (Zeile 3), bis sie den Mutex erhält. Die Coroutine, die auf den Mutex wartet, wird nicht blockiert, sondern schlafen gelegt. Der vorherige Besitzer des Mutex weckte in seinem unlock-Aufruf die Coroutine wieder auf. Wie es der Name vermuten lässt, bleibt der Mutex bis zum Ende seines Bereichs (Zeile 4) gelockt.

Dank der Funktion cppcoro::when_all kann nicht nicht nur auf eine, sondern auf mehrere Coroutinen gewartet werden. Ich wende cppcoro::when_all zusammen mit cppcoro::static_thread_pool in meinem nächsten Artikel an, um mächtige Arbeitsabläufe zu modellieren. ()