Coroutinen in C++20: Automatisches Fortsetzen eines Jobs auf einem anderen Thread
In meinem letzten Artikel "Jobs starten mit Coroutinen in C++20" fĂĽhrte ich das neue SchlĂĽsselwort co_await ein, um einen Job zu starten. In diesem Artikel werde ich den Arbeitsablauf verbessern und den Job automatisch auf einem separaten Thread vollenden, falls das notwendig ist.
- Rainer Grimm
In meinem letzten Artikel "Jobs starten mit Coroutinen in C++20" fĂĽhrte ich das neue SchlĂĽsselwort co_await
ein, um einen Job zu starten. In diesem Artikel werde ich den Arbeitsablauf verbessern und den Job automatisch auf einem separaten Thread vollenden, falls das notwendig ist.
Das ist der siebte Artikel meiner Miniserie zu den neuen Schlüsselwörtern co_return
, co_yield
und co_await
. Um das meiste aus meiner praktischen Einführung zu Coroutinen herauszuholen, ist ein Grundverständnis der bisherigen Artikel notwendig.
co_return
- Einfache Futures mit Coroutinen implementieren
- Lazy Futures mit Coroutinen in C++20
- Mit Coroutinen einen Future in einem eigenen Thread ausfĂĽhren
co_yield
- Ein unendlicher Datenstrom dank Coroutinen in C++20
- Ein generischer Datenstrom mit Coroutinen in C++20
co_await
Automatisches Fortsetzen des Jobs
Im vorherigen Arbeitsablauf (Jobs starten mit Coroutinen in C++20) habe ich den Awaiter-Arbeitsablauf detailliert vorgestellt und den Job explizit gestartet:
int main() {
std::cout << "Before job" << '\n';
auto job = prepareJob();
job.start();
std::cout << "After job" << '\n';
}
Das direkte Aufrufen von job.start()
war notwendig, denn await_ready
des Awaitable MySuspendAlways
gibt immer false
zurück. Nun möchte ich annehmen, dass await_ready
entweder true
oder false
zurĂĽckgeben kann und der Arbeitsablauf nicht explizit gestartet wird. Zur Erinnerung: Wenn await_ready
true
zurĂĽckgibt, wird die Funktion await_resume
direkt aufgerufen, die Funktion await_suspend
aber nicht ausgefĂĽhrt:
// startJobWithAutomaticResumption.cpp
#include <coroutine>
#include <functional>
#include <iostream>
#include <random>
std::random_device seed;
auto gen = std::bind_front(std::uniform_int_distribution<>(0,1),
std::default_random_engine(seed()));
struct MySuspendAlways {
bool await_ready() const noexcept {
std::cout << " MySuspendAlways::await_ready" << '\n';
return gen();
}
bool await_suspend(std::coroutine_handle<> handle) const noexcept {
std::cout << " MySuspendAlways::await_suspend" << '\n';
handle.resume();
return true;
}
void await_resume() const noexcept {
std::cout << " MySuspendAlways::await_resume" << '\n';
}
};
struct Job {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
handle_type coro;
Job(handle_type h): coro(h){}
~Job() {
if ( coro ) coro.destroy();
}
struct promise_type {
auto get_return_object() {
return Job{handle_type::from_promise(*this)};
}
MySuspendAlways initial_suspend() {
std::cout << " Job prepared" << '\n';
return {};
}
std::suspend_always final_suspend() noexcept {
std::cout << " Job finished" << '\n';
return {};
}
void return_void() {}
void unhandled_exception() {}
};
};
Job performJob() {
co_await std::suspend_never();
}
int main() {
std::cout << "Before jobs" << '\n';
performJob();
performJob();
performJob();
performJob();
std::cout << "After jobs" << '\n';
}
Zuerst einmal besitzt die Coroutine den Namen performJob
und startet automatisch. Der Generator gen
(Zeile 1) erzeugt Zufallszahlen mit den Werten 0 oder 1. Er verwendet den Standardzufallsgenerator, der mit einem Seed initialisiert wird. Dank std::bind_front
lässt sich der Generator mit std::uniform_int_distribution
verknüpfen, sodass dieser eine aufrufbare Einheit erzeugt, die auf Anfrage Zufallszahlen zwischen 0 und 1 erzeugt. Eine aufrufbare Einheit verhält sich wie eine Funktion. Das können nicht nur Funktionen, sondern auch Funktionsobjekte oder Lambda-Ausdrücke sein. Mehr zu der neuen Funktion std::bind_front
lässt sich im Artikel "Noch mehr praktische Werkzeuge in C++20" nachlesen.
In dem Beispiel habe ich mit Ausnahme des Awaitable MySuspendAlways
die im C++-Standard vordefinierten Awaitables verwendet. MySuspendAlways
kommt als RĂĽckgabewert der Methode initial_suspend
(Zeile 2) zum Einsatz. await_ready
(Zeile 3) gibt einen Wahrheitswert zurĂĽck. Wenn dieser Wert true
besitzt, springt der Kontrollfluss direkt zur Methode await_resume
(Zeile 4). Wenn die Funktion false
zurĂĽckgibt, wird die Coroutine sofort pausiert. Damit wird die Funktion await_suspend
(Zeile 5) ausgefĂĽhrt. Die Funktion await_suspend
erhält den Verweis auf die Coroutine und verwendet diesen, um die Coroutine (Zeile 6) wieder aufzuwecken. Anstelle des Werts true
kann await_suspend
auch void
zurĂĽckgeben.
Der folgende Screenhot bringt es auf den Punkt: Wenn await_ready
true
zurĂĽckgibt, wird die Funktion await_resume
aufgerufen; wenn await_ready
false
zurĂĽckgibt, wird die Funktion await_suspend
ausgefĂĽhrt.
Die Programmausführung lässt sich schön mit dem Compiler Explorer visualisieren.
Nun steht die nächste Verbesserung an. Der Awaiter sollte auf einem anderen Thread aufgeweckt werden und seinen Arbeitsablauf fortführen.
Automatisches Fortsetzen des Jobs auf einem eigenen Thread
Das folgende Programm baut auf dem vorherigen auf:
// startJobWithResumptionOnThread.cpp
#include <coroutine>
#include <functional>
#include <iostream>
#include <random>
#include <thread>
#include <vector>
std::random_device seed;
auto gen = std::bind_front(std::uniform_int_distribution<>(0,1),
std::default_random_engine(seed()));
struct MyAwaitable {
std::jthread& outerThread;
bool await_ready() const noexcept {
auto res = gen();
if (res) std::cout << " (executed)" << '\n';
else std::cout << " (suspended)" << '\n';
return res;
}
void await_suspend(std::coroutine_handle<> h) {
outerThread = std::jthread([h] { h.resume(); });
}
void await_resume() {}
};
struct Job{
static inline int JobCounter{1};
Job() {
++JobCounter;
}
struct promise_type {
int JobNumber{JobCounter};
Job get_return_object() { return {}; }
std::suspend_never initial_suspend() {
std::cout << " Job " << JobNumber << " prepared on thread "
<< std::this_thread::get_id();
return {};
}
std::suspend_never final_suspend() noexcept {
std::cout << " Job " << JobNumber << " finished on thread "
<< std::this_thread::get_id() << '\n';
return {};
}
void return_void() {}
void unhandled_exception() { }
};
};
Job performJob(std::jthread& out) {
co_await MyAwaitable{out};
}
int main() {
std::vector<std::jthread> threads(8);
for (auto& thr: threads) performJob(thr);
}
Der Hauptunterschied zum vorherigen Programm besteht darin, dass das neue Awaitable MyAwaitable
in der Coroutine performJob
(Zeile 1) zum Einsatz kommt. Im Gegensatz dazu ist das Coroutinen-Objekt, das die Coroutine performJob
zurĂĽckgibt, einfach gehalten. Im Wesentlichen geben die Methoden inital_suspend
(Zeile 2) und final_suspend
(Zeile 3) das vordefinierte Awaitable std::suspend_never
zurück. Zusätzlich stellen beide Funktionen die Jobnummer des ausgeführten Jobs und die Thread-Id des ausführenden Threads dar. Der Screenshot zeigt deutlich, welche Coroutine sofort ausgeführt und welche pausiert wird. Dank der Thread-Id lässt sich schön visualisieren, dass die pausierten Coroutinen auf einem anderen Thread aufgeweckt werden.
Die Programmausführung lässt sich mithilfe der Wandbox visualisieren:
Gerne möchte ich auf den interessanten Arbeitsablauf des Programms genauer eingehen. Zeile 4 erzeugt acht Default-konstruierte Threads, die die Coroutine performJob
(Zeile 5) mittels Referenz annehmen. DarĂĽber hinaus wird die Referenz als Argument fĂĽr MyAwaitable{out}
verwendet. Abhängig vom Wert von res
(Zeile 6), und damit abhängig vom Rückgabewert der Methode await_ready
, wird die Coroutine (res
ist true
) weiter ausgefĂĽhrt oder pausiert (res
ist false
). Im Fall, dass MyAwaitable
pausiert wird, wird die Funktion await_suspend
(Zeile 7) ausgefĂĽhrt. Dank der Zuweisung von outerThread
(Zeile 8) wird dieser zum ausgeführten Thread. Die Lebenszeit des ausgeführten Threads muss länger sein als die der Coroutine. Aus diesem Grund werden die Threads im Gültigkeitsbereich der main
-Funktion angelegt.
Wie geht's weiter?
Geschafft: Beinahe 100 Artikel habe ich zu C++20 verfasst. In meinem nächsten Artikel möchte das Thema C++20 abschließen und die folgende Frage beantworten: Wie geht es weiter mit C++? ()