Softwareentwicklung: Ein Coroutine-basierter Consumer-Producer-Workflow
Ich freue mich, in diesem Gastbeitrag von Ljubic Damir einen typischen Anwendungsfall für Coroutines vorzustellen: einen Producer-Consumer-Workflow.​

(Bild: heise online / anw)
- Rainer Grimm
Coroutines bieten einen intuitiven und strukturierten Weg, asynchronen Code zu schreiben. Sie ermöglichen es, asynchrone Operationen in einem prozeduralen Stil zu schreiben. Sie sind eine Funktion, die in C++20 eingeführt wurde, um die asynchrone Programmierung zu vereinfachen.
Auch wenn dieser Single-Producer-Single-Consumer-Workflow nicht ganz einfach zu verstehen ist, ist er ein guter Ausgangspunkt fĂĽr Coroutine-Experimente.
Vorhandene Mechanismen wie std::async,
std::packaged_task
oder events (std::condition_variable & std::mutex
) synchronisieren zwei oder mehr Threads ĂĽber das Ergebnis der Aufgabe, indem sie einen Kommunikationskanal aufbauen. Dieser Kommunikationskanal hat zwei Enden:
std::promise
, das entweder das Ergebnis oder die Ausnahme in den gemeinsamen Zustand schreibt, undstd::future
(std::shared_future
) - ein empfangendes Ende, das auf das Ergebnis der Aufgabe (oder die Ausnahme) wartet.
Im Gegensatz zu diesem bereits existierenden Mechanismus sind Coroutines nicht direkt mit Threads oder anderen Synchronisierungsprimitiven des Betriebssystems verbunden. Sie sind vielmehr eine reine Softwareabstraktion, die auf dem Steuerungsobjekt der Coroutine und der um sie herum aufgebauten Zustandsmaschinenlogik basiert.
Coroutines sind stackless - das bedeutet, dass das Steuerobjekt auf dem Heap erstellt werden muss. Zufälligerweise handelt es sich um einen Library Wrapper um den promise_type
(std::coroutine_handle<promise_type>
), der aber eigentlich nichts mit std::promise
gemein hat.
Der promise_type
ist eine Schnittstelle (ein Anpassungspunkt), die die vordefinierten Übergangszustände in der State-Machine einer Coroutine beschreibt.
Coroutines sind sehr vielseitig und können in verschiedenen Szenarien eingesetzt werden, in denen man einen asynchronen Nachrichtenfluss verwalten muss. Ein gängiges Beispiel ist die Socket-basierte Kommunikation.
Heute werde ich versuchen, Coroutines anhand eines anderen Beispiels zu erläutern: Single Producer-Single-Consumer Workflow.
Implementierung
Zuerst mĂĽssen wir den Ergebnistyp fĂĽr die Coroutine definieren:
class[[nodiscard]] AudioDataResult final
{
public:
class promise_type;
using handle_type = std::coroutine_handle<promise_type>;
class promise_type
{
...
};
};
Dies ist ein Wrapper um den inneren: promise_type
Typ. Wir schmĂĽcken die umschlieĂźende Klasse mit dem Attribut [[nodiscard]]
, da der Ergebnistyp das Steuerungsobjekt der Coroutine ist: das wir an den Client-Code zurĂĽckgeben, um seine Aussetzung/Wiederaufnahme zu verwalten.
@Anmerkung Der Destruktor der Klasse bereinigt die Ressourcen (dynamischer Speicher) in RAII-Manier, sodass der RĂĽckgabetyp streng genommen ignoriert werden kann, wenn keine Notwendigkeit besteht, den Zustand der Coroutine zu verwalten.
~AudioDataResult() { if(handle_) { handle_.destroy(); } }
Der Ergebnistyp ist move-only: Die Kopieroperationen sind verboten - um zu verhindern, dass das Kontrollobjekt vervielfältigt wird.
// Make the result type move-only,
//due to exclusive ownership over the handle
AudioDataResult(const AudioDataResult& ) = delete;
AudioDataResult& operator= (constAudioDataResult& ) = delete;
AudioDataResult(AudioDataResult&& other) noexcept:
handle_(std::exchange(other.handle_, nullptr))
{}
AudioDataResult& operator = (AudioDataResult&& other) noexcept
{
using namespace std;
AudioDataResult tmp =std::move(other);
swap(*this, tmp);
return *this;
}
Definieren wir nun die Schnittstelle promise_type selbst:
// Predefined interface that has to be specify
//in order to implement
// coroutine's state-machine transitions
class promise_type
{
public:
using value_type = std::vector<int>;
AudioDataResult get_return_object()
{
return AudioDataResult{ handle_type::from_promise(*this) };
}
std::suspend_never initial_suspend() noexcept { return{}; }
std::suspend_always final_suspend() noexcept { return{}; }
void return_void() {}
void unhandled_exception()
{
std::rethrow_exception(std::current_exception());
}
// Generates the value and suspend the "producer"
template <typename Data>
requires std::convertible_to<std::decay_t<Data>, value_type>
std::suspend_always yield_value(Data&& value)
{
data_ = std::forward<Data>(value);
data_ready_.store(true, std::memory_order::relaxed);
return {};
}
private:
value_type data_;
std::atomic<bool> data_ready_;
};//promise_type interface
Der promise_type
definiert die notwendige Infrastruktur der Coroutine. Zusätzlich muss promise_type
für alle Coroutines, die als Generator – "Producer" – fungieren wollen, um die Werte auszugeben, mit der yield_value
-Methode erweitert werden (co_yield ≡ co_await promise_.yield_value
). Wenn die Daten verbraucht sind, mĂĽssen wir die entsprechende Wrapper-Methode resume()
bereitstellen, um den Producer wieder aufzunehmen.
void resume() { if( not handle_.done()) { handle_.resume();} }
Jetzt mĂĽssen wir die Coroutine so erweitern, dass sie die Anforderungen des Consumer erfĂĽllt: Sie muss mit der Datenbereitschaft synchronisiert werden. Mit anderen Worten: Der Consumer wird angehalten, bis die Daten vom Producer als verfĂĽgbar gemeldet werden. Dazu mĂĽssen wir die Schnittstelle Awaiter
implementieren:
class promise_type
{
// Awaiter interface: for consumer waiting on data being ready
struct AudioDataAwaiter
{
explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}
bool await_ready() const
{
return promise_.data_ready_.load(std::memory_order::relaxed);
}
void await_suspend(handle_type) const
{
while( not promise_.data_ready_.exchange(false))
{
std::this_thread::yield();
}
}
// move assignment at client invocation side:
// const auto data = co_await audioDataResult;
// This requires that coroutine's result type provides
// the co_await unary operator
value_type&& await_resume() const
{
return std::move(promise_.data_);
}
private:
promise_type& promise_;
};//Awaiter interface
};//promise_type
In der State-Machine ist await_ready()
der erste Übergangszustand: Er wird auf die Bereitschaft der Daten geprüft. Wenn die Daten nicht bereit sind, wird als nächstes await_suspend()
aufgerufen. Hier wird tatsächlich gewartet und zwar bis das entsprechende Flag gesetzt wird. Schließlich wird await_resume()
aufgerufen: Wir "verschieben" den Wert aus dem promise_type
, indem wir ihn bedingungslos in die rvalue-Referenz umwandeln. Auf der Seite des Client-Aufrufs fĂĽhrt dies dazu, dass der Zuweisungsoperator fĂĽr den zurĂĽckgegebenen Wert - data
- aufgerufen wird.
const auto data = co_await audioDataResult;
Damit das funktioniert, muss der Ergebnistyp den unären Operator co_await
bereitstellen, der unsere Awaiter
-Schnittstelle zurĂĽckgibt.
class AudioDataResult
{
auto operator co_await() noexcept
{
return promise_type::AudioDataAwaiter{handle_.promise()};
}
};
<Beispiel 1>: https://godbolt.org/z/MvYfbEP8r
Das folgende Programm producerConsumer.cpp
zeigt eine vereinfachte Version von Beispiel 1:
// producerConsumer.cpp
#include <algorithm>
#include <atomic>
#include <chrono>
#include <coroutine>
#include <functional>
#include <iostream>
#include <iterator>
#include <memory>
#include <source_location>
#include <thread>
#include <utility>
#include <vector>
void funcName(const std::source_location location = std::source_location::current()) {
std::cout << location.function_name() << '\n';
}
template <typename Container>
void printContainer(const Container& container)
{
typedef typename Container::value_type value_type;
auto first = std::cbegin(container);
auto last = std::cend(container);
std::cout << " [";
std::copy(first, std::prev(last), std::ostream_iterator<value_type>(std::cout, ", "));
std::cout << *std::prev(last) << "]\n";
}
class [[nodiscard]] AudioDataResult final
{
public:
class promise_type;
using handle_type = std::coroutine_handle<promise_type>;
// Predefined interface that has to be specify in order to implement
// coroutine's state-machine transitions
class promise_type
{
public:
using value_type = std::vector<int>;
AudioDataResult get_return_object()
{
return AudioDataResult{handle_type::from_promise(*this)};
}
std::suspend_never initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception()
{
std::rethrow_exception(std::current_exception());
}
// Generates the value and suspend the "producer"
template <typename Data>
requires std::convertible_to<std::decay_t<Data>, value_type>
std::suspend_always yield_value(Data&& value)
{
data_ = std::forward<Data>(value);
data_ready_.store(true);
return {};
}
// Awaiter interface: for consumer waiting on data being ready
struct AudioDataAwaiter
{
explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}
bool await_ready() const { return promise_.data_ready_.load();}
void await_suspend(handle_type) const
{
while(not promise_.data_ready_.exchange(false)) {
std::this_thread::yield();
}
}
// move assignment at client invocation side: const auto data = co_await audioDataResult;
// This requires that coroutine's result type provides the co_await unary operator
value_type&& await_resume() const
{
return std::move(promise_.data_);
}
private:
promise_type& promise_;
};//Awaiter interface
private:
value_type data_;
std::atomic<bool> data_ready_;
}; //promise_type interface
auto operator co_await() noexcept
{
return promise_type::AudioDataAwaiter{handle_.promise()};
}
// Make the result type move-only, due to ownership over the handle
AudioDataResult(const AudioDataResult&) = delete;
AudioDataResult& operator=(const AudioDataResult&) = delete;
AudioDataResult(AudioDataResult&& other) noexcept: handle_(std::exchange(other.handle_, nullptr)){}
AudioDataResult& operator=(AudioDataResult&& other) noexcept
{
using namespace std;
AudioDataResult tmp = std::move(other);
swap(*this, tmp);
return *this;
}
// d-tor: RAII
~AudioDataResult() { if (handle_) {funcName(); handle_.destroy();}}
// For resuming the producer - at the point when the data are consumed
void resume() {if (not handle_.done()) { funcName(); handle_.resume();}}
private:
AudioDataResult(handle_type handle) noexcept : handle_(handle) {}
private:
handle_type handle_;
};
using data_type = std::vector<int>;
AudioDataResult producer(const data_type& data)
{
for (std::size_t i = 0; i < 5; ++i) {
funcName();
co_yield data;
}
co_yield data_type{}; // exit criteria
}
AudioDataResult consumer(AudioDataResult& audioDataResult)
{
while(true)
{
funcName();
const auto data = co_await audioDataResult;
if (data.empty()) {std::cout << "No data - exit!\n"; break;}
std::cout << "Data received:";
printContainer(data);
audioDataResult.resume(); // resume producer
}
}
int main()
{
{
const data_type data = {1, 2, 3, 4};
auto audioDataProducer = producer(data);
std::thread t ([&]{auto audioRecorded = consumer(audioDataProducer);});
t.join();
}
std::cout << "bye-bye!\n";
}
Hier ist schlieĂźlich die Ausgabe des Programms:
Die andere Möglichkeit ist die Verwendung von promise_type::await_transform()
, um auf den Wert zu warten, der in der promise_type
-Instanz gespeichert ist, die vom Producer verwendet wird.
class promise_type
{
auto await_transform(handle_type other)
{
// Awaiter interface: remained the same
struct AudioDataAwaiter
{
explicit AudioDataAwaiter(promise_type& promise)noexcept: promise_(promise) {}
...
};
return AudioDataAwaiter{other.promise()};
}
};
Auf diese Weise müssen wir nicht mehr den unären Operator co_await
des Ergebnistyps angeben, sondern einen (expliziten) Umwandlungsoperator,
explicit operator handle_type() const {return handle_;}
sodass wir ihn an dem Punkt übergeben können, an dem der Konsument co_await
aufruft, was intern in den Aufruf await_transform()
ĂĽbersetzt wird.
const auto data = co_await static_cast<AudioDataResult::handle_type>(audioDataResult);
Wir können dies veranschaulichen als: me.handle_.promise().await_transform(other.handle_)
<Beispiel 2>: https://godbolt.org/z/57zsK9rEn
Schlussfolgerung
In diesem einfachen Beispiel wird der Producer angehalten, ohne dass es zu einer Bestrafung kommt, da er nach der Wiederaufnahme genau die gleiche – im Voraus bekannte – Abfolge von Daten liefert. In einem realen Szenario ist das wahrscheinlich nicht der Fall: Der Producer selbst wird wahrscheinlich eine Art Vermittler sein – Empfänger von asynchron gesendeten Daten, die an den Consumer zurückgesendet werden. Deshalb muss auf der Producerseite eine Warteschlangenlogik implementiert werden, um Datenverluste zu vermeiden, wenn die Daten angehalten werden und darauf warten, dass der Consumer sie wieder aufnimmt - um die Unterschiede zwischen der Ankunftsrate der Producer-Daten und der Verbrauchsrate des Consumer auszugleichen.
Wie geht's weiter?
In C++20 lässt sich der Drei-Wege definieren oder per default
anfordern. Damit stehen alle sechs Vergleichsoperatoren zur VerfĂĽgung: ==, !=, <, <=, >
und >=
. Man kann auch den Gleichheitsoperator (==
) definieren oder mit default
anfordern.
Meine kurze Weihnachtspause
Mein Blog legt eine kleine Weihnachtspause ein. Den nächste Artikel werde ich am 8. Januar veröffentlichen. Allen Lesern wünsche ich eine gute Zeit. (rme)