Fehlerkorrektur zum Beitrag über Monitor Object in der Thread-Safe Queue
Der Beitrag "Patterns in der Softwarearchitektur: Monitor Object" implementiert eine fehlerhafte Thread-Safe Queue. Zeit für eine Korrektur.
- Rainer Grimm
In meinem letzten Artikel "Patterns in der Softwarearchitektur: Monitor Object" habe ich eine Thread-Safe Queue implementiert. Ich habe zwei schwere Fehler gemacht. Sorry. Heute werde ich diese Fehler beheben.
Um den Zusammenhang zu verstehen, möchte ich zunächst die fehlerhafte Implementierung aus meinem letzten Beitrag nochmals vorstellen.
// monitorObject.cpp
#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>
class Monitor {
public:
void lock() const {
monitMutex.lock();
}
void unlock() const {
monitMutex.unlock();
}
void notify_one() const noexcept {
monitCond.notify_one();
}
template <typename Predicate>
void wait(Predicate pred) const { // (10)
std::unique_lock<std::mutex> monitLock(monitMutex);
monitCond.wait(monitLock, pred);
}
private:
mutable std::mutex monitMutex;
mutable std::condition_variable monitCond;
};
template <typename T> // (1)
class ThreadSafeQueue: public Monitor {
public:
void add(T val){
lock();
myQueue.push(val); // (6)
unlock();
notify_one();
}
T get(){
wait( [this] { return ! myQueue.empty(); } ); // (2)
lock();
auto val = myQueue.front(); // (4)
myQueue.pop(); // (5)
unlock();
return val;
}
private:
std::queue<T> myQueue; // (3)
};
class Dice {
public:
int operator()(){ return rand(); }
private:
std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main(){
std::cout << '\n';
constexpr auto NumberThreads = 100;
ThreadSafeQueue<int> safeQueue; // (7)
auto addLambda = [&safeQueue](int val){ safeQueue.add(val); // (8)
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.get(); }; // (9)
std::vector<std::thread> addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
std::vector<std::thread> getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);
for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();
std::cout << "\n\n";
}
Der Kerngedanke des Beispiels ist, dass das Monitor Object in einer Klasse gekapselt ist und daher wiederverwendet werden kann. Die Klasse Monitor
verwendet einen std::mutex
als Monitor Lock und eine std::condition_variable
als Monitor-Bedingung. Die Klasse Monitor
bietet die minimale Schnittstelle an, die ein Monitor Object unterstützen sollte.
ThreadSafeQueue
in (1) erweitert std::queue
um eine Thread-sichere Schnittstelle. ThreadSafeQueue
leitet sich von der Klasse Monitor
ab und verwendet deren Mitgliedsfunktionen, um die synchronisierten Mitgliedsfunktionen add
und get
zu unterstützen. Die Mitgliedsfunktionen add
und get
verwenden das Lock des Monitors, um das Monitor Object zu schützen. Dies gilt insbesondere für die nicht Thread-sichere myQueue
. add
benachrichtigt den wartenden Thread, wenn ein neues Element zu myQueue
hinzugefügt wurde. Diese Benachrichtigung ist Thread-sicher. Die Mitgliedsfunktion get
(3) verdient mehr Aufmerksamkeit. Zunächst wird die wait
-Mitgliedsfunktion der zugrunde liegenden Bedingungsvariable aufgerufen. Dieser wait
-Aufruf benötigt ein zusätzliches Prädikat, um sich vor lost und spurious wakeups (C++ Core Guidelines: Sei dir der Gefahren von Bedingungsvariablen bewusst) zu schützen. Die Operationen zur Änderung der myQueue
(4) und (5) müssen ebenfalls geschützt werden, da sie sich mit dem Aufruf myQueue.push(val)
(6) überschneiden können. Das Monitor Object safeQueue
(7) verwendet die Lambda-Funktionen in (8) und (9), um eine Zahl aus der synchronisierten safeQueue
hinzuzufügen oder zu entfernen. ThreadSafeQueue
selbst ist ein Klassen-Template und kann Werte eines beliebigen Typs aufnehmen. Einhundert Clients fügen der safeQueue
100 Zufallszahlen zwischen 1 und 6 hinzu (Zeile 7), während einhundert Clients diese 100 Zahlen gleichzeitig aus der safeQueue
entfernen. Die Ausgabe des Programms zeigt die Zahlen und die Thread-IDs.
Dieses Programm hat zwei ernste Probleme. Dietmar Kühl und Frank Birbacher haben die Probleme in einer E-Mail beschrieben. Hier sind ihre Worte. Meine Anmerkungen sind kursiv und in fetter Schrift.
- In
ThreadSafeQueue::get()
wird mittelsMonitor::wait()
getestet, obmyQueue
ein Element enthält, beziehungsweise darauf gewartet, dass ein Element enthalten ist. Allerdings wird der Lock nur innerhalb vonwait()
gehalten, das heißt inget()
kann man nicht sicher sein, dass das Element noch inmyQueue
ist: ein anderer Thread kann das Lock bekommen und das Element entfernen, was zu undefined behavior bei dem Aufruf von myQueue.front() führt. - Wenn der Copy/Move-Konstructor von
T
eine Exception wirft, ist dieThreadSafeQueue
in einem inkonsistenten Zustand: Keine Member-Funktion ist aktiv, aber das Mutex ist gelockt.
Die Korrektur besteht darin, dass Monitor::wait()
nur aufgerufen werden kann, wenn ein unique_lock
gehalten wird. Das kann man zum Beispiel dadurch erreichen, dass Monitor eine entsprechende (protected?) Funktion zur Verfügung stellt, die ein geeignetes Objekt zurückgibt und eine Referenz zu diesem in wait()
verlangt:
struct Monitor {
using Lock = std::unique_lock<std::mutex>; // could be wrapper if you prefer
[[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); }
template <typename Predicate>
void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); }
// …
};
template <typename T>
T ThreadSafeQueue<T>::get() {
auto kerberos = receiveGuard();
wait(kerberos, [this]{ return not myQueue.empty(); });
T rc = std::move(myQueue.front());
myqueue.pop();
return rc;
}
Diese Version korrigiert das Exception-Problem für get().
Für add()
kann man einfach das Monitor-Objekt mit einem lock_guard
verwenden:
template <typename T>
void add(T val) {
{
std::lock_guard<Monitor> kerberos(*this);
myqueue.push(std::move(val));
}
notify_one();
}
Vermutlich würde ich die Notifikation in ein “SendGuard
” verpacken, dass einen lock_guard
und eine Reference auf die condition_variable
enthält und bei der Destruktion die Notifikation sendet:
class SendGuard {
friend class Monitor;
using deleter = decltype([](auto& cond){ cond->notify_one(); });
std::unique_ptr<std::condition_variable, deleter> notifier;
std::lock_guard<std::mutex> kerberos;
SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};
Der Move-Konstructor und der Destruktor sollten trotzdem public
sein und stellen das gesamte Interface dar! Damit wäre die Benutzung in add()
auch deutlich einfacher:
template <typename T>
void add(T val) {
auto kerberos = sendGuard();
myqueue.push(val);
}
Hier ist schließlich die vollständige Implementierung von Dietmar. Die Zahlen entsprechen den Zahlen in meinem monitorObjec.cpp
Beispiel.
// monitorObject.cpp
#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>
class Monitor {
public:
using Lock = std::unique_lock<std::mutex>;
[[nodiscard]] Lock receiveGuard() {
return Lock(monitMutex);
}
template <typename Predicate>
void wait(Lock& kerberos, Predicate pred) {
monitCond.wait(kerberos, pred);
}
class SendGuard {
friend class Monitor;
using deleter = decltype([](auto* cond){ cond->notify_one(); });
std::unique_ptr<std::condition_variable, deleter> notifier;
std::lock_guard<std::mutex> kerberos;
SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};
SendGuard sendGuard() { return {monitMutex, monitCond}; }
private:
mutable std::mutex monitMutex;
mutable std::condition_variable monitCond;
};
template <typename T> // (1)
class ThreadSafeQueue: public Monitor {
public:
void add(T val){
auto kerberos = sendGuard();
myQueue.push(val); // (6)
}
T get(){
auto kerberos = receiveGuard();
wait(kerberos, [this] { return ! myQueue.empty(); } ); // (2)
auto val = myQueue.front(); // (4)
myQueue.pop(); // (5)
return val;
}
private:
std::queue<T> myQueue; // (3)
};
class Dice {
public:
int operator()(){ return rand(); }
private:
std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main(){
std::cout << '\n';
constexpr auto NumberThreads = 100;
ThreadSafeQueue<int> safeQueue; // (7)
auto addLambda = [&safeQueue](int val){ safeQueue.add(val); // (8)
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.get(); }; // (9)
std::vector<std::thread> addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
std::vector<std::thread> getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);
for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();
std::cout << "\n\n";
}
Als Ergebnis der Diskussion oben hat Frank die folgende Version unten vorschlagen, die ein konsistentes und einfach zu benutzendes Interface für Monitor hat.
// threadSafeQueue.cpp
#ifndef INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP
#define INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP
#include <atomic>
#include <algorithm>
#include <condition_variable>
#include <deque>
#include <iterator>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <vector>
class Monitor {
public:
struct UnlockAndNotify {
std::mutex d_mutex;
std::condition_variable d_condition;
void lock() { d_mutex.lock(); }
void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
};
private:
UnlockAndNotify d_combined;
public:
std::unique_lock<UnlockAndNotify> makeLockWithNotify() {
return std::unique_lock{d_combined};
}
template <typename PRED>
std::unique_lock<std::mutex> makeLockWithWait(PRED waitForCondition) {
std::unique_lock lock{d_combined.d_mutex};
d_combined.d_condition.wait(lock, waitForCondition);
return lock;
}
};
class ThreadQueue {
Monitor d_monitor;
std::deque<int> d_numberQueue;
auto makeLockWhenNotEmpty() {
return d_monitor.makeLockWithWait([this] { return !d_numberQueue.empty(); });
}
public:
void addNumber(int number) {
const auto lock = d_monitor.makeLockWithNotify();
d_numberQueue.push_back(number);
}
int removeNumber() {
const auto lock = makeLockWhenNotEmpty();
const auto number = d_numberQueue.front();
d_numberQueue.pop_front();
return number;
}
};
#endif
int main() {
ThreadQueue queue;
std::atomic<int> sharedSum{};
std::atomic<int> sharedCounter{};
std::vector<std::jthread> threads;
threads.reserve(200);
std::generate_n(std::back_inserter(threads), 100, [&] {
return std::jthread{[&] { sharedSum += queue.removeNumber(); }};
});
std::generate_n(std::back_inserter(threads), 100, [&] {
return std::jthread{[&] { queue.addNumber(++sharedCounter); }};
});
threads.clear(); // wait for all threads to finish
if (sharedSum.load() != 5050) {
throw std::logic_error("Wrong result for sum of 1..100");
}
}
Die Implementierung des Monitor Object basiert auf der Flexibilität von std::unique_lock
durch seinen Template-Parameter. Alle Locks des C++ Standards können mit jeder Klasse verwendet werden, die über lock()
- und unlock()
-Methoden verfügt. Die Klasse UnlockAndNotify
implementiert diese Schnittstelle und setzt ihre Bedingungsvariable in der unlock()
-Methode frei. Darüber hinaus bietet die Klasse Monitor
eine reduzierte öffentliche Schnittstelle an, mit der zwei verschiedene Arten von Locks erstellt werden können, eine mit und eine ohne Benachrichtigung, indem ein std::unique_lock
entweder auf der gesamten UnlockAndNotify
-Instanz oder nur auf dem enthaltenen std::mutex
erstellt wird.
Bei der Wahl zwischen std::unique_lock
und std::lock_guard
bevorzuge ich (Frank) die unique_lock
in der Schnittstelle. Diese Wahl ermöglicht dem Benutzer der Monitorklasse mehr Flexibilität. Ich schätze diese Flexibilität höher ein als einen möglichen Performanz-Unterschied zu lock_guard
, der ohnehin erst einmal gemessen werden muss. Ich gebe zu, dass die gegebenen Beispiele diese zusätzliche Flexibilität nicht ausnutzen.
Danach entwickelte Dietmar Franks Idee weiter: Hier werden die geschützten Daten im Monitor aufbewahrt, was es schwieriger macht, ungeschützt auf sie zuzugreifen.
// threadsafequeue2.cpp
#ifndef INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP
#define INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <iterator>
#include <mutex>
#include <random>
#include <stdexcept>
#include <thread>
#include <tuple>
#include <vector>
namespace patterns::monitor3 {
template <typename T>
class Monitor {
public:
struct UnlockAndNotify {
std::mutex d_mutex;
std::condition_variable d_condition;
void lock() { d_mutex.lock(); }
void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
};
private:
mutable UnlockAndNotify d_combined;
mutable T d_data;
public:
std::tuple<T&, std::unique_lock<UnlockAndNotify>> makeProducerLock() const {
return { d_data, std::unique_lock{d_combined} };
}
template <typename PRED>
std::tuple<T&, std::unique_lock<std::mutex>> makeConsumerLockWhen(PRED predicate) const {
std::unique_lock lock{d_combined.d_mutex};
d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); });
return { d_data, std::move(lock) };
}
};
template <typename T>
class ThreadQueue {
Monitor<std::deque<T>> d_monitor;
public:
void add(T number) {
auto[numberQueue, lock] = d_monitor.makeProducerLock();
numberQueue.push_back(number);
}
T remove() {
auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); });
const auto number = numberQueue.front();
numberQueue.pop_front();
return number;
}
};
}
#endif
class Dice {
public:
int operator()(){ return rand(); }
private:
std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main(){
std::cout << '\n';
constexpr auto NumberThreads = 100;
patterns::monitor3::ThreadQueue<int> safeQueue;
auto addLambda = [&safeQueue](int val){ safeQueue.add(val);
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.remove(); };
std::vector<std::thread> addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
std::vector<std::thread> getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);
for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();
std::cout << "\n\n";
}
Nochmals herzlichen Dank an Frank und Dietmar. Ich wollte mit meiner fehlerhaften Implementierung einer Thread-Safe Queue in meinem letzten Artikel nicht beweisen, dass Concurrency sehr anspruchsvoll ist. Besonders ärgerlich ist, dass ich die Mutex nicht in ein Lock gepackt habe (Fehler 2). Ich lehre dies in meinen C++-Kursen: NNM (No Naked Mutex).
Wie geht's weiter?
In meinem nächsten Artikel werde ich tief in die Zukunft von C++20 eintauchen: C++23. (rme)