Softwareentwicklung: Ein Coroutine-basierter Consumer-Producer-Workflow

Ich freue mich, in diesem Gastbeitrag von Ljubic Damir einen typischen Anwendungsfall für Coroutines vorzustellen: einen Producer-Consumer-Workflow.​

In Pocket speichern vorlesen Druckansicht

(Bild: heise online / anw)

Lesezeit: 6 Min.
Von
  • Rainer Grimm
Inhaltsverzeichnis

Coroutines bieten einen intuitiven und strukturierten Weg, asynchronen Code zu schreiben. Sie ermöglichen es, asynchrone Operationen in einem prozeduralen Stil zu schreiben. Sie sind eine Funktion, die in C++20 eingeführt wurde, um die asynchrone Programmierung zu vereinfachen.

Modernes C++ – Rainer Grimm

Rainer Grimm ist seit vielen Jahren als Softwarearchitekt, Team- und Schulungsleiter tätig. Er schreibt gerne Artikel zu den Programmiersprachen C++, Python und Haskell, spricht aber auch gerne und häufig auf Fachkonferenzen. Auf seinem Blog Modernes C++ beschäftigt er sich intensiv mit seiner Leidenschaft C++.

Auch wenn dieser Single-Producer-Single-Consumer-Workflow nicht ganz einfach zu verstehen ist, ist er ein guter Ausgangspunkt für Coroutine-Experimente.

Vorhandene Mechanismen wie std::async, std::packaged_task oder events (std::condition_variable & std::mutex) synchronisieren zwei oder mehr Threads über das Ergebnis der Aufgabe, indem sie einen Kommunikationskanal aufbauen. Dieser Kommunikationskanal hat zwei Enden:

  • std::promise, das entweder das Ergebnis oder die Ausnahme in den gemeinsamen Zustand schreibt, und
  • std::future (std::shared_future) - ein empfangendes Ende, das auf das Ergebnis der Aufgabe (oder die Ausnahme) wartet.

Im Gegensatz zu diesem bereits existierenden Mechanismus sind Coroutines nicht direkt mit Threads oder anderen Synchronisierungsprimitiven des Betriebssystems verbunden. Sie sind vielmehr eine reine Softwareabstraktion, die auf dem Steuerungsobjekt der Coroutine und der um sie herum aufgebauten Zustandsmaschinenlogik basiert.

Coroutines sind stackless - das bedeutet, dass das Steuerobjekt auf dem Heap erstellt werden muss. Zufälligerweise handelt es sich um einen Library Wrapper um den promise_type (std::coroutine_handle<promise_type>), der aber eigentlich nichts mit std::promise gemein hat.

Der promise_type ist eine Schnittstelle (ein Anpassungspunkt), die die vordefinierten Übergangszustände in der State-Machine einer Coroutine beschreibt.

Coroutines sind sehr vielseitig und können in verschiedenen Szenarien eingesetzt werden, in denen man einen asynchronen Nachrichtenfluss verwalten muss. Ein gängiges Beispiel ist die Socket-basierte Kommunikation.

Heute werde ich versuchen, Coroutines anhand eines anderen Beispiels zu erläutern: Single Producer-Single-Consumer Workflow.

Zuerst müssen wir den Ergebnistyp für die Coroutine definieren:

class[[nodiscard]] AudioDataResult final
{
    public:
        class promise_type;
        using handle_type = std::coroutine_handle<promise_type>;
            
        class promise_type
        {
            ...
        };
};

Dies ist ein Wrapper um den inneren: promise_type Typ. Wir schmücken die umschließende Klasse mit dem Attribut [[nodiscard]], da der Ergebnistyp das Steuerungsobjekt der Coroutine ist: das wir an den Client-Code zurückgeben, um seine Aussetzung/Wiederaufnahme zu verwalten.

@Anmerkung Der Destruktor der Klasse bereinigt die Ressourcen (dynamischer Speicher) in RAII-Manier, sodass der Rückgabetyp streng genommen ignoriert werden kann, wenn keine Notwendigkeit besteht, den Zustand der Coroutine zu verwalten.

~AudioDataResult() { if(handle_) { handle_.destroy(); } }

Der Ergebnistyp ist move-only: Die Kopieroperationen sind verboten - um zu verhindern, dass das Kontrollobjekt vervielfältigt wird.

// Make the result type move-only, 
//due to exclusive ownership over the handle

AudioDataResult(const AudioDataResult& ) = delete;
AudioDataResult& operator= (constAudioDataResult& ) = delete;

AudioDataResult(AudioDataResult&& other) noexcept:
handle_(std::exchange(other.handle_, nullptr))
{}

AudioDataResult& operator = (AudioDataResult&& other) noexcept
{
    using namespace std;
    AudioDataResult tmp =std::move(other);
    swap(*this, tmp);
    return *this;
}

Definieren wir nun die Schnittstelle promise_type selbst:

// Predefined interface that has to be specify 
//in order to implement
// coroutine's state-machine transitions
class promise_type
{
    
    public:
        using value_type = std::vector<int>;
        AudioDataResult get_return_object()
        {
            return AudioDataResult{ handle_type::from_promise(*this) };
        }
        std::suspend_never initial_suspend() noexcept { return{}; }
        std::suspend_always final_suspend() noexcept { return{}; }

        void return_void() {}
        void unhandled_exception()
        {
            std::rethrow_exception(std::current_exception());
        }

        // Generates the value and suspend the "producer"
        template <typename Data>
        requires std::convertible_to<std::decay_t<Data>, value_type>
        std::suspend_always yield_value(Data&& value)
        {
            data_ = std::forward<Data>(value);
            data_ready_.store(true, std::memory_order::relaxed);
            return {};
        }

    private:
        value_type data_;
        std::atomic<bool> data_ready_;
};//promise_type interface

Der promise_type definiert die notwendige Infrastruktur der Coroutine. Zusätzlich muss promise_type für alle Coroutines, die als Generator – "Producer" – fungieren wollen, um die Werte auszugeben, mit der yield_value-Methode erweitert werden (co_yield ≡ co_await promise_.yield_value). Wenn die Daten verbraucht sind, müssen wir die entsprechende Wrapper-Methode resume() bereitstellen, um den Producer wieder aufzunehmen.

void resume() { if( not handle_.done()) { handle_.resume();} }

Jetzt müssen wir die Coroutine so erweitern, dass sie die Anforderungen des Consumer erfüllt: Sie muss mit der Datenbereitschaft synchronisiert werden. Mit anderen Worten: Der Consumer wird angehalten, bis die Daten vom Producer als verfügbar gemeldet werden. Dazu müssen wir die Schnittstelle Awaiter implementieren:

class promise_type
{
    // Awaiter interface: for consumer waiting on data being ready
    struct AudioDataAwaiter
    {

        explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}

        bool await_ready() const
        {
            return promise_.data_ready_.load(std::memory_order::relaxed);
        }

        void await_suspend(handle_type) const
        {
            while( not promise_.data_ready_.exchange(false))
            {
                std::this_thread::yield();
            }
        }

        // move assignment at client invocation side:
        //        const auto data = co_await audioDataResult;
        // This requires that coroutine's result type provides
        // the co_await unary operator

        value_type&& await_resume() const
        {
            return std::move(promise_.data_);
        }

    private:
            promise_type& promise_;

    };//Awaiter interface

};//promise_type

In der State-Machine ist await_ready() der erste Übergangszustand: Er wird auf die Bereitschaft der Daten geprüft. Wenn die Daten nicht bereit sind, wird als nächstes await_suspend() aufgerufen. Hier wird tatsächlich gewartet und zwar bis das entsprechende Flag gesetzt wird. Schließlich wird await_resume() aufgerufen: Wir "verschieben" den Wert aus dem promise_type, indem wir ihn bedingungslos in die rvalue-Referenz umwandeln. Auf der Seite des Client-Aufrufs führt dies dazu, dass der Zuweisungsoperator für den zurückgegebenen Wert - data - aufgerufen wird.

const auto data = co_await audioDataResult;

Damit das funktioniert, muss der Ergebnistyp den unären Operator co_await bereitstellen, der unsere Awaiter-Schnittstelle zurückgibt.

class AudioDataResult
{
    auto operator co_await() noexcept
    {
        return promise_type::AudioDataAwaiter{handle_.promise()};
    }
};

<Beispiel 1>: https://godbolt.org/z/MvYfbEP8r

Das folgende Programm producerConsumer.cpp zeigt eine vereinfachte Version von Beispiel 1:

// producerConsumer.cpp

#include <algorithm>
#include <atomic>
#include <chrono>
#include <coroutine>
#include <functional>
#include <iostream>
#include <iterator>
#include <memory>
#include <source_location>
#include <thread>
#include <utility>
#include <vector>


void funcName(const std::source_location location = std::source_location::current()) {
    std::cout << location.function_name() << '\n';
}


template <typename Container>
void printContainer(const Container& container)
{
    typedef typename Container::value_type value_type;
    auto first = std::cbegin(container);
    auto last = std::cend(container);

    std::cout << " [";
    std::copy(first, std::prev(last), std::ostream_iterator<value_type>(std::cout, ", "));
    std::cout << *std::prev(last) << "]\n";
}




class [[nodiscard]] AudioDataResult final
{
    public:
        class promise_type;
        using handle_type = std::coroutine_handle<promise_type>;
        
        // Predefined interface that has to be specify in order to implement
        // coroutine's state-machine transitions
        class promise_type 
        {
            
            public:
                
                using value_type = std::vector<int>;

                AudioDataResult get_return_object() 
                {
                    return AudioDataResult{handle_type::from_promise(*this)};
                }
                std::suspend_never initial_suspend() noexcept { return {}; }
                std::suspend_always final_suspend() noexcept { return {}; }
                void return_void() {}
                void unhandled_exception() 
                {
                    std::rethrow_exception(std::current_exception());
                }

                // Generates the value and suspend the "producer"
                template <typename Data>
                requires std::convertible_to<std::decay_t<Data>, value_type>
                std::suspend_always yield_value(Data&& value) 
                {
                    data_ = std::forward<Data>(value);
                    data_ready_.store(true);
                    return {};
                }

                // Awaiter interface: for consumer waiting on data being ready
                struct AudioDataAwaiter 
                {
                    explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}

                    bool await_ready() const { return promise_.data_ready_.load();}
                    
                    void await_suspend(handle_type) const
                    {
                        while(not promise_.data_ready_.exchange(false)) {
                             std::this_thread::yield(); 
                        }
                    }
                    // move assignment at client invocation side: const auto data = co_await audioDataResult;
                    // This requires that coroutine's result type provides the co_await unary operator
                    value_type&& await_resume() const 
                    {
                        return std::move(promise_.data_);
                    }

                    private: 
                        promise_type& promise_;
                };//Awaiter interface

        
            private:
                value_type data_;
                std::atomic<bool> data_ready_;
        }; //promise_type interface

        
        auto operator co_await() noexcept   
        {
            return promise_type::AudioDataAwaiter{handle_.promise()};
        }

        // Make the result type move-only, due to ownership over the handle
        AudioDataResult(const AudioDataResult&) = delete;
        AudioDataResult& operator=(const AudioDataResult&) = delete;

        AudioDataResult(AudioDataResult&& other) noexcept: handle_(std::exchange(other.handle_, nullptr)){}
        AudioDataResult& operator=(AudioDataResult&& other) noexcept 
        {
            using namespace std;
            AudioDataResult tmp = std::move(other);
            swap(*this, tmp);
            return *this;
        }

        // d-tor: RAII
        ~AudioDataResult() { if (handle_) {funcName(); handle_.destroy();}}

        // For resuming the producer - at the point when the data are consumed
        void resume() {if (not handle_.done()) { funcName(); handle_.resume();}}
    
    private:
        AudioDataResult(handle_type handle) noexcept : handle_(handle) {}

    private:
    handle_type handle_;
};


using data_type = std::vector<int>;
AudioDataResult producer(const data_type& data) 
{
    for (std::size_t i = 0; i < 5; ++i) {
        funcName();
        co_yield data;
    }
    co_yield data_type{}; // exit criteria
}

AudioDataResult consumer(AudioDataResult& audioDataResult) 
{
    while(true)
    {
        funcName();
        const auto data = co_await audioDataResult;
        if (data.empty()) {std::cout << "No data - exit!\n"; break;}
        std::cout << "Data received:";
        printContainer(data);
        audioDataResult.resume(); // resume producer
    }
}

int main() 
{
    {
        const data_type data = {1, 2, 3, 4};
        auto audioDataProducer = producer(data);
        std::thread t ([&]{auto audioRecorded = consumer(audioDataProducer);});
        t.join();
    }

    std::cout << "bye-bye!\n";
}

Hier ist schließlich die Ausgabe des Programms:

Die andere Möglichkeit ist die Verwendung von promise_type::await_transform(), um auf den Wert zu warten, der in der promise_type-Instanz gespeichert ist, die vom Producer verwendet wird.

class promise_type
{
    auto await_transform(handle_type other)
    {
        // Awaiter interface: remained the same
        struct AudioDataAwaiter
        {
            explicit AudioDataAwaiter(promise_type& promise)noexcept: promise_(promise) {}
            ...
        };

        return AudioDataAwaiter{other.promise()};
    }
};

Auf diese Weise müssen wir nicht mehr den unären Operator co_await des Ergebnistyps angeben, sondern einen (expliziten) Umwandlungsoperator,

explicit operator handle_type() const {return handle_;}

sodass wir ihn an dem Punkt übergeben können, an dem der Konsument co_await aufruft, was intern in den Aufruf await_transform() übersetzt wird.

const auto data = co_await static_cast<AudioDataResult::handle_type>(audioDataResult);

Wir können dies veranschaulichen als: me.handle_.promise().await_transform(other.handle_)

<Beispiel 2>: https://godbolt.org/z/57zsK9rEn

In diesem einfachen Beispiel wird der Producer angehalten, ohne dass es zu einer Bestrafung kommt, da er nach der Wiederaufnahme genau die gleiche – im Voraus bekannte – Abfolge von Daten liefert. In einem realen Szenario ist das wahrscheinlich nicht der Fall: Der Producer selbst wird wahrscheinlich eine Art Vermittler sein – Empfänger von asynchron gesendeten Daten, die an den Consumer zurückgesendet werden. Deshalb muss auf der Producerseite eine Warteschlangenlogik implementiert werden, um Datenverluste zu vermeiden, wenn die Daten angehalten werden und darauf warten, dass der Consumer sie wieder aufnimmt - um die Unterschiede zwischen der Ankunftsrate der Producer-Daten und der Verbrauchsrate des Consumer auszugleichen.

In C++20 lässt sich der Drei-Wege definieren oder per default anfordern. Damit stehen alle sechs Vergleichsoperatoren zur Verfügung: ==, !=, <, <=, > und >=. Man kann auch den Gleichheitsoperator (==) definieren oder mit default anfordern.

Mein Blog legt eine kleine Weihnachtspause ein. Den nächste Artikel werde ich am 8. Januar veröffentlichen. Allen Lesern wünsche ich eine gute Zeit. (rme)