Coroutinen in C++20: Automatisches Fortsetzen eines Jobs auf einem anderen Thread

In meinem letzten Artikel "Jobs starten mit Coroutinen in C++20" führte ich das neue Schlüsselwort co_await ein, um einen Job zu starten. In diesem Artikel werde ich den Arbeitsablauf verbessern und den Job automatisch auf einem separaten Thread vollenden, falls das notwendig ist.

In Pocket speichern vorlesen Druckansicht 127 Kommentare lesen
Lesezeit: 7 Min.
Von
  • Rainer Grimm
Inhaltsverzeichnis

In meinem letzten Artikel "Jobs starten mit Coroutinen in C++20" führte ich das neue Schlüsselwort co_await ein, um einen Job zu starten. In diesem Artikel werde ich den Arbeitsablauf verbessern und den Job automatisch auf einem separaten Thread vollenden, falls das notwendig ist.

Das ist der siebte Artikel meiner Miniserie zu den neuen Schlüsselwörtern co_return, co_yield und co_await. Um das meiste aus meiner praktischen Einführung zu Coroutinen herauszuholen, ist ein Grundverständnis der bisherigen Artikel notwendig.

co_return

co_yield

co_await

Im vorherigen Arbeitsablauf (Jobs starten mit Coroutinen in C++20) habe ich den Awaiter-Arbeitsablauf detailliert vorgestellt und den Job explizit gestartet:

int main() {

std::cout << "Before job" << '\n';

auto job = prepareJob();
job.start();

std::cout << "After job" << '\n';

}

Das direkte Aufrufen von job.start() war notwendig, denn await_ready des Awaitable MySuspendAlways gibt immer false zurück. Nun möchte ich annehmen, dass await_ready entweder true oder false zurückgeben kann und der Arbeitsablauf nicht explizit gestartet wird. Zur Erinnerung: Wenn await_ready true zurückgibt, wird die Funktion await_resume direkt aufgerufen, die Funktion await_suspend aber nicht ausgeführt:

// startJobWithAutomaticResumption.cpp

#include <coroutine>
#include <functional>
#include <iostream>
#include <random>

std::random_device seed;
auto gen = std::bind_front(std::uniform_int_distribution<>(0,1),
std::default_random_engine(seed()));

struct MySuspendAlways {
bool await_ready() const noexcept {
std::cout << " MySuspendAlways::await_ready" << '\n';
return gen();
}
bool await_suspend(std::coroutine_handle<> handle) const noexcept {
std::cout << " MySuspendAlways::await_suspend" << '\n';
handle.resume();
return true;

}
void await_resume() const noexcept {
std::cout << " MySuspendAlways::await_resume" << '\n';
}
};

struct Job {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
handle_type coro;
Job(handle_type h): coro(h){}
~Job() {
if ( coro ) coro.destroy();
}

struct promise_type {
auto get_return_object() {
return Job{handle_type::from_promise(*this)};
}
MySuspendAlways initial_suspend() {
std::cout << " Job prepared" << '\n';
return {};
}
std::suspend_always final_suspend() noexcept {
std::cout << " Job finished" << '\n';
return {};
}
void return_void() {}
void unhandled_exception() {}

};
};

Job performJob() {
co_await std::suspend_never();
}

int main() {

std::cout << "Before jobs" << '\n';

performJob();
performJob();
performJob();
performJob();

std::cout << "After jobs" << '\n';

}

Zuerst einmal besitzt die Coroutine den Namen performJob und startet automatisch. Der Generator gen (Zeile 1) erzeugt Zufallszahlen mit den Werten 0 oder 1. Er verwendet den Standardzufallsgenerator, der mit einem Seed initialisiert wird. Dank std::bind_front lässt sich der Generator mit std::uniform_int_distribution verknüpfen, sodass dieser eine aufrufbare Einheit erzeugt, die auf Anfrage Zufallszahlen zwischen 0 und 1 erzeugt. Eine aufrufbare Einheit verhält sich wie eine Funktion. Das können nicht nur Funktionen, sondern auch Funktionsobjekte oder Lambda-Ausdrücke sein. Mehr zu der neuen Funktion std::bind_front lässt sich im Artikel "Noch mehr praktische Werkzeuge in C++20" nachlesen.

In dem Beispiel habe ich mit Ausnahme des Awaitable MySuspendAlways die im C++-Standard vordefinierten Awaitables verwendet. MySuspendAlways kommt als Rückgabewert der Methode initial_suspend (Zeile 2) zum Einsatz. await_ready (Zeile 3) gibt einen Wahrheitswert zurück. Wenn dieser Wert true besitzt, springt der Kontrollfluss direkt zur Methode await_resume (Zeile 4). Wenn die Funktion false zurückgibt, wird die Coroutine sofort pausiert. Damit wird die Funktion await_suspend (Zeile 5) ausgeführt. Die Funktion await_suspend erhält den Verweis auf die Coroutine und verwendet diesen, um die Coroutine (Zeile 6) wieder aufzuwecken. Anstelle des Werts true kann await_suspend auch void zurückgeben.

Der folgende Screenhot bringt es auf den Punkt: Wenn await_ready true zurückgibt, wird die Funktion await_resume aufgerufen; wenn await_ready false zurückgibt, wird die Funktion await_suspend ausgeführt.

Die Programmausführung lässt sich schön mit dem Compiler Explorer visualisieren.

Nun steht die nächste Verbesserung an. Der Awaiter sollte auf einem anderen Thread aufgeweckt werden und seinen Arbeitsablauf fortführen.

Das folgende Programm baut auf dem vorherigen auf:

// startJobWithResumptionOnThread.cpp

#include <coroutine>
#include <functional>
#include <iostream>
#include <random>
#include <thread>
#include <vector>

std::random_device seed;
auto gen = std::bind_front(std::uniform_int_distribution<>(0,1),
std::default_random_engine(seed()));

struct MyAwaitable {
std::jthread& outerThread;
bool await_ready() const noexcept {
auto res = gen();
if (res) std::cout << " (executed)" << '\n';
else std::cout << " (suspended)" << '\n';
return res;
}
void await_suspend(std::coroutine_handle<> h) {
outerThread = std::jthread([h] { h.resume(); });
}
void await_resume() {}
};


struct Job{
static inline int JobCounter{1};
Job() {
++JobCounter;
}

struct promise_type {
int JobNumber{JobCounter};
Job get_return_object() { return {}; }
std::suspend_never initial_suspend() {
std::cout << " Job " << JobNumber << " prepared on thread "
<< std::this_thread::get_id();
return {};
}
std::suspend_never final_suspend() noexcept {
std::cout << " Job " << JobNumber << " finished on thread "
<< std::this_thread::get_id() << '\n';
return {};
}
void return_void() {}
void unhandled_exception() { }
};
};

Job performJob(std::jthread& out) {
co_await MyAwaitable{out};
}

int main() {

std::vector<std::jthread> threads(8);
for (auto& thr: threads) performJob(thr);

}

Der Hauptunterschied zum vorherigen Programm besteht darin, dass das neue Awaitable MyAwaitable in der Coroutine performJob (Zeile 1) zum Einsatz kommt. Im Gegensatz dazu ist das Coroutinen-Objekt, das die Coroutine performJob zurückgibt, einfach gehalten. Im Wesentlichen geben die Methoden inital_suspend (Zeile 2) und final_suspend (Zeile 3) das vordefinierte Awaitable std::suspend_never zurück. Zusätzlich stellen beide Funktionen die Jobnummer des ausgeführten Jobs und die Thread-Id des ausführenden Threads dar. Der Screenshot zeigt deutlich, welche Coroutine sofort ausgeführt und welche pausiert wird. Dank der Thread-Id lässt sich schön visualisieren, dass die pausierten Coroutinen auf einem anderen Thread aufgeweckt werden.

Die Programmausführung lässt sich mithilfe der Wandbox visualisieren:

Gerne möchte ich auf den interessanten Arbeitsablauf des Programms genauer eingehen. Zeile 4 erzeugt acht Default-konstruierte Threads, die die Coroutine performJob (Zeile 5) mittels Referenz annehmen. Darüber hinaus wird die Referenz als Argument für MyAwaitable{out} verwendet. Abhängig vom Wert von res (Zeile 6), und damit abhängig vom Rückgabewert der Methode await_ready, wird die Coroutine (res ist true) weiter ausgeführt oder pausiert (res ist false). Im Fall, dass MyAwaitable pausiert wird, wird die Funktion await_suspend (Zeile 7) ausgeführt. Dank der Zuweisung von outerThread (Zeile 8) wird dieser zum ausgeführten Thread. Die Lebenszeit des ausgeführten Threads muss länger sein als die der Coroutine. Aus diesem Grund werden die Threads im Gültigkeitsbereich der main-Funktion angelegt.

Geschafft: Beinahe 100 Artikel habe ich zu C++20 verfasst. In meinem nächsten Artikel möchte das Thema C++20 abschließen und die folgende Frage beantworten: Wie geht es weiter mit C++? ()