Fehlerkorrektur zum Beitrag über Monitor Object in der Thread-Safe Queue

Der Beitrag "Patterns in der Softwarearchitektur: Monitor Object" implementiert eine fehlerhafte Thread-Safe Queue. Zeit für eine Korrektur.

In Pocket speichern vorlesen Druckansicht 36 Kommentare lesen

(Bild: Tatiana Popova/Shutterstock.com)

Lesezeit: 6 Min.
Von
  • Rainer Grimm

In meinem letzten Artikel "Patterns in der Softwarearchitektur: Monitor Object" habe ich eine Thread-Safe Queue implementiert. Ich habe zwei schwere Fehler gemacht. Sorry. Heute werde ich diese Fehler beheben.

Modernes C++ – Rainer Grimm

Rainer Grimm ist seit vielen Jahren als Softwarearchitekt, Team- und Schulungsleiter tätig. Er schreibt gerne Artikel zu den Programmiersprachen C++, Python und Haskell, spricht aber auch gerne und häufig auf Fachkonferenzen. Auf seinem Blog Modernes C++ beschäftigt er sich intensiv mit seiner Leidenschaft C++.

Um den Zusammenhang zu verstehen, möchte ich zunächst die fehlerhafte Implementierung aus meinem letzten Beitrag nochmals vorstellen.

// monitorObject.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

class Monitor {
public:
    void lock() const {
        monitMutex.lock();
    }

    void unlock() const {
        monitMutex.unlock();
    }

    void notify_one() const noexcept {
        monitCond.notify_one();
    }

    template <typename Predicate>
    void wait(Predicate pred) const {                 // (10)
        std::unique_lock<std::mutex> monitLock(monitMutex);
        monitCond.wait(monitLock, pred);
    }
    
private:
    mutable std::mutex monitMutex;
    mutable std::condition_variable monitCond;
};

template <typename T>                                  // (1)
class ThreadSafeQueue: public Monitor {
 public:
    void add(T val){ 
        lock();
        myQueue.push(val);                             // (6)
        unlock();
        notify_one();
    }
    
    T get(){ 
        wait( [this] { return ! myQueue.empty(); } );  // (2)
        lock();
        auto val = myQueue.front();                    // (4)
        myQueue.pop();                                 // (5)
        unlock();
        return val;
    }

private:
    std::queue<T> myQueue;                            // (3)
};


class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};


int main(){
    
    std::cout << '\n';
    
    constexpr auto NumberThreads = 100;
    
    ThreadSafeQueue<int> safeQueue;                      // (7)

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "\n\n";
     
}

Der Kerngedanke des Beispiels ist, dass das Monitor Object in einer Klasse gekapselt ist und daher wiederverwendet werden kann. Die Klasse Monitor verwendet einen std::mutex als Monitor Lock und eine std::condition_variable als Monitor-Bedingung. Die Klasse Monitor bietet die minimale Schnittstelle an, die ein Monitor Object unterstützen sollte.

ThreadSafeQueue in (1) erweitert std::queue um eine Thread-sichere Schnittstelle. ThreadSafeQueue leitet sich von der Klasse Monitor ab und verwendet deren Mitgliedsfunktionen, um die synchronisierten Mitgliedsfunktionen add und get zu unterstützen. Die Mitgliedsfunktionen add und get verwenden das Lock des Monitors, um das Monitor Object zu schützen. Dies gilt insbesondere für die nicht Thread-sichere myQueue. add benachrichtigt den wartenden Thread, wenn ein neues Element zu myQueue hinzugefügt wurde. Diese Benachrichtigung ist Thread-sicher. Die Mitgliedsfunktion get (3) verdient mehr Aufmerksamkeit. Zunächst wird die wait-Mitgliedsfunktion der zugrunde liegenden Bedingungsvariable aufgerufen. Dieser wait-Aufruf benötigt ein zusätzliches Prädikat, um sich vor lost und spurious wakeups (C++ Core Guidelines: Sei dir der Gefahren von Bedingungsvariablen bewusst) zu schützen. Die Operationen zur Änderung der myQueue (4) und (5) müssen ebenfalls geschützt werden, da sie sich mit dem Aufruf myQueue.push(val) (6) überschneiden können. Das Monitor Object safeQueue (7) verwendet die Lambda-Funktionen in (8) und (9), um eine Zahl aus der synchronisierten safeQueue hinzuzufügen oder zu entfernen. ThreadSafeQueue selbst ist ein Klassen-Template und kann Werte eines beliebigen Typs aufnehmen. Einhundert Clients fügen der safeQueue 100 Zufallszahlen zwischen 1 und 6 hinzu (Zeile 7), während einhundert Clients diese 100 Zahlen gleichzeitig aus der safeQueue entfernen. Die Ausgabe des Programms zeigt die Zahlen und die Thread-IDs.

Dieses Programm hat zwei ernste Probleme. Dietmar Kühl und Frank Birbacher haben die Probleme in einer E-Mail beschrieben. Hier sind ihre Worte. Meine Anmerkungen sind kursiv und in fetter Schrift.

  1. In ThreadSafeQueue::get() wird mittels Monitor::wait() getestet, ob myQueue ein Element enthält, beziehungsweise darauf gewartet, dass ein Element enthalten ist. Allerdings wird der Lock nur innerhalb von wait() gehalten, das heißt in get() kann man nicht sicher sein, dass das Element noch in myQueue ist: ein anderer Thread kann das Lock bekommen und das Element entfernen, was zu undefined behavior bei dem Aufruf von myQueue.front() führt.
  2. Wenn der Copy/Move-Konstructor von T eine Exception wirft, ist die ThreadSafeQueue in einem inkonsistenten Zustand: Keine Member-Funktion ist aktiv, aber das Mutex ist gelockt.

Die Korrektur besteht darin, dass Monitor::wait() nur aufgerufen werden kann, wenn ein unique_lock gehalten wird. Das kann man zum Beispiel dadurch erreichen, dass Monitor eine entsprechende (protected?) Funktion zur Verfügung stellt, die ein geeignetes Objekt zurückgibt und eine Referenz zu diesem in wait() verlangt:

struct Monitor {
   using Lock = std::unique_lock<std::mutex>; // could be wrapper if you prefer
   [[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); }
   template <typename Predicate>
   void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); }
   // …
};

template <typename T>
T ThreadSafeQueue<T>::get() {
   auto kerberos = receiveGuard();
   wait(kerberos, [this]{ return not myQueue.empty(); });
   T rc = std::move(myQueue.front());
   myqueue.pop();
   return rc;
}

Diese Version korrigiert das Exception-Problem für get(). Für add() kann man einfach das Monitor-Objekt mit einem lock_guard verwenden:

template <typename T>
void add(T val) {
   {
        std::lock_guard<Monitor> kerberos(*this);
        myqueue.push(std::move(val));
    }
    notify_one();
}

Vermutlich würde ich die Notifikation in ein “SendGuard” verpacken, dass einen lock_guard und eine Reference auf die condition_variable enthält und bei der Destruktion die Notifikation sendet:

class SendGuard {
    friend class Monitor;
    using deleter = decltype([](auto& cond){ cond->notify_one(); });
    std::unique_ptr<std::condition_variable, deleter> notifier;
    std::lock_guard<std::mutex> kerberos;
    SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};

Der Move-Konstructor und der Destruktor sollten trotzdem public sein und stellen das gesamte Interface dar! Damit wäre die Benutzung in add() auch deutlich einfacher:

template <typename T>
void add(T val) {
   auto kerberos = sendGuard();
   myqueue.push(val);
}

Hier ist schließlich die vollständige Implementierung von Dietmar. Die Zahlen entsprechen den Zahlen in meinem monitorObjec.cpp Beispiel.

// monitorObject.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

class Monitor {
public:
    using Lock = std::unique_lock<std::mutex>;
    [[nodiscard]] Lock receiveGuard() {
        return Lock(monitMutex);
    }

    template <typename Predicate>
    void wait(Lock& kerberos, Predicate pred) {
        monitCond.wait(kerberos, pred);
    }

    class SendGuard {
        friend class Monitor;
        using deleter = decltype([](auto* cond){ cond->notify_one(); });
        std::unique_ptr<std::condition_variable, deleter> notifier;
        std::lock_guard<std::mutex> kerberos;
        SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
    };

    SendGuard sendGuard() { return {monitMutex, monitCond}; }
    
private:
    mutable std::mutex monitMutex;
    mutable std::condition_variable monitCond;
};

template <typename T>                                  // (1)
class ThreadSafeQueue: public Monitor {
 public:
    void add(T val){ 
        auto kerberos = sendGuard();
        myQueue.push(val);                             // (6)
    }
    
    T get(){ 
        auto kerberos = receiveGuard();
        wait(kerberos, [this] { return ! myQueue.empty(); } );  // (2)
        auto val = myQueue.front();                    // (4)
        myQueue.pop();                                 // (5)
        return val;
    }

private:
    std::queue<T> myQueue;                            // (3)
};


class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};


int main(){
    
    std::cout << '\n';
    
    constexpr auto NumberThreads = 100;
    
    ThreadSafeQueue<int> safeQueue;                      // (7)

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "\n\n";
     
}

Als Ergebnis der Diskussion oben hat Frank die folgende Version unten vorschlagen, die ein konsistentes und einfach zu benutzendes Interface für Monitor hat.

// threadSafeQueue.cpp

#ifndef INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP
#define INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP

#include <atomic>
#include <algorithm>
#include <condition_variable>
#include <deque>
#include <iterator>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <vector>


class Monitor {
public:
    struct UnlockAndNotify {
        std::mutex d_mutex;
        std::condition_variable d_condition;

        void lock() { d_mutex.lock(); }
        void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
    };

private:
    UnlockAndNotify d_combined;

public:
    std::unique_lock<UnlockAndNotify> makeLockWithNotify() {
        return std::unique_lock{d_combined};
    }

    template <typename PRED>
    std::unique_lock<std::mutex> makeLockWithWait(PRED waitForCondition) {
        std::unique_lock lock{d_combined.d_mutex};
        d_combined.d_condition.wait(lock, waitForCondition);
        return lock;
    }
};

class ThreadQueue {
    Monitor d_monitor;
    std::deque<int> d_numberQueue;

    auto makeLockWhenNotEmpty() {
        return d_monitor.makeLockWithWait([this] { return !d_numberQueue.empty(); });
    }

public:
    void addNumber(int number) {
        const auto lock = d_monitor.makeLockWithNotify();
        d_numberQueue.push_back(number);
    }

    int removeNumber() {
        const auto lock = makeLockWhenNotEmpty();
        const auto number = d_numberQueue.front();
        d_numberQueue.pop_front();
        return number;
    }
};

#endif

int main() {
   ThreadQueue queue;
   std::atomic<int> sharedSum{};
   std::atomic<int> sharedCounter{};

   std::vector<std::jthread> threads;
   threads.reserve(200);
   std::generate_n(std::back_inserter(threads), 100, [&] {
       return std::jthread{[&] { sharedSum += queue.removeNumber(); }};
   });
   std::generate_n(std::back_inserter(threads), 100, [&] {
       return std::jthread{[&] { queue.addNumber(++sharedCounter); }};
   });

   threads.clear(); // wait for all threads to finish
   if (sharedSum.load() != 5050) {
       throw std::logic_error("Wrong result for sum of 1..100");
   }
}

Die Implementierung des Monitor Object basiert auf der Flexibilität von std::unique_lock durch seinen Template-Parameter. Alle Locks des C++ Standards können mit jeder Klasse verwendet werden, die über lock()- und unlock()-Methoden verfügt. Die Klasse UnlockAndNotify implementiert diese Schnittstelle und setzt ihre Bedingungsvariable in der unlock()-Methode frei. Darüber hinaus bietet die Klasse Monitor eine reduzierte öffentliche Schnittstelle an, mit der zwei verschiedene Arten von Locks erstellt werden können, eine mit und eine ohne Benachrichtigung, indem ein std::unique_lock entweder auf der gesamten UnlockAndNotify-Instanz oder nur auf dem enthaltenen std::mutex erstellt wird.

Bei der Wahl zwischen std::unique_lock und std::lock_guard bevorzuge ich (Frank) die unique_lock in der Schnittstelle. Diese Wahl ermöglicht dem Benutzer der Monitorklasse mehr Flexibilität. Ich schätze diese Flexibilität höher ein als einen möglichen Performanz-Unterschied zu lock_guard, der ohnehin erst einmal gemessen werden muss. Ich gebe zu, dass die gegebenen Beispiele diese zusätzliche Flexibilität nicht ausnutzen.

Danach entwickelte Dietmar Franks Idee weiter: Hier werden die geschützten Daten im Monitor aufbewahrt, was es schwieriger macht, ungeschützt auf sie zuzugreifen.

// threadsafequeue2.cpp

#ifndef INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP
#define INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <iterator>
#include <mutex>
#include <random>
#include <stdexcept>
#include <thread>
#include <tuple>
#include <vector>

namespace patterns::monitor3 {

template <typename T>
class Monitor {
public:
   struct UnlockAndNotify {
       std::mutex d_mutex;
       std::condition_variable d_condition;
   
       void lock() { d_mutex.lock(); }
       void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
   };

private:
   mutable UnlockAndNotify d_combined;
   mutable T               d_data;

public:
   std::tuple<T&, std::unique_lock<UnlockAndNotify>> makeProducerLock() const {
       return { d_data, std::unique_lock{d_combined} };
   }

   template <typename PRED>
   std::tuple<T&, std::unique_lock<std::mutex>> makeConsumerLockWhen(PRED predicate) const {
       std::unique_lock lock{d_combined.d_mutex};
       d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); });
       return { d_data, std::move(lock) };
   }
};

template <typename T>
class ThreadQueue {
   Monitor<std::deque<T>> d_monitor;

public:
   void add(T number) {
       auto[numberQueue, lock] = d_monitor.makeProducerLock();
       numberQueue.push_back(number);
   }

   T remove() {
       auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); });
       const auto number = numberQueue.front();
       numberQueue.pop_front();
       return number;
   }
};
}

#endif

class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};

int main(){
    
    std::cout << '\n';
    
    constexpr auto NumberThreads = 100;
    
    patterns::monitor3::ThreadQueue<int> safeQueue;                     

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);         
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.remove(); };  

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "\n\n";
     
}

Nochmals herzlichen Dank an Frank und Dietmar. Ich wollte mit meiner fehlerhaften Implementierung einer Thread-Safe Queue in meinem letzten Artikel nicht beweisen, dass Concurrency sehr anspruchsvoll ist. Besonders ärgerlich ist, dass ich die Mutex nicht in ein Lock gepackt habe (Fehler 2). Ich lehre dies in meinen C++-Kursen: NNM (No Naked Mutex).

In meinem nächsten Artikel werde ich tief in die Zukunft von C++20 eintauchen: C++23. (rme)