Patterns in der Softwareentwicklung für das Teilen von Daten zwischen Threads

In nebenläufigen Anwendungen besteht die Gefahr von Data Races, wenn die Threads Daten gemeinsam verwenden.

In Pocket speichern vorlesen Druckansicht 6 Kommentare lesen

(Bild: Wolfilser/Shutterstock.com)

Lesezeit: 6 Min.
Von
  • Rainer Grimm
Inhaltsverzeichnis

Wenn Daten in nebenläufigen Anwendungen nicht geteilt werden, können keine Data Races entstehen. Keine gemeinsame Nutzung bedeutet, dass der Thread mit lokalen Variablen arbeitet. Das kann durch das Kopieren der Daten, die Verwendung von Thread-lokalem Speicher oder die Übertragung des Ergebnisses eines Threads an den zugehörigen Future über einen geschützten Datenkanal erreicht werden.

Modernes C++ – Rainer Grimm

Rainer Grimm ist seit vielen Jahren als Softwarearchitekt, Team- und Schulungsleiter tätig. Er schreibt gerne Artikel zu den Programmiersprachen C++, Python und Haskell, spricht aber auch gerne und häufig auf Fachkonferenzen. Auf seinem Blog Modernes C++ beschäftigt er sich intensiv mit seiner Leidenschaft C++.

Die Muster in diesem Abschnitt sind ziemlich offensichtlich, aber ich werde sie der Vollständigkeit halber mit einer kurzen Erklärung vorstellen. Beginnen wir mit Copied Value.

Wenn ein Thread seine Argumente per Kopie und nicht per Referenz erhält, muss der Zugriff auf die Daten nicht synchronisiert werden. Es gibt keine Data Races und keine Probleme mit der Lebensdauer der Daten.

Data Races mit Referenzen

Das folgende Programm erstellt drei Threads. Ein Thread erhält sein Argument per Kopie, der andere per Referenz und der letzte per konstanter Referenz.

// copiedValueDataRace.cpp

#include <functional>
#include <iostream>
#include <string>
#include <thread>

using namespace std::chrono_literals;

void byCopy(bool b){
    std::this_thread::sleep_for(1ms);             // (1)
    std::cout << "byCopy: " << b << '\n';
}

void byReference(bool& b){
    std::this_thread::sleep_for(1ms);            // (2)
    std::cout << "byReference: " << b << '\n';
}

void byConstReference(const bool& b){
    std::this_thread::sleep_for(1ms);            // (3)
    std::cout << "byConstReference: " << b << '\n';
}

int main(){

    std::cout << std::boolalpha << '\n';

    bool shared{false};
    
    std::thread t1(byCopy, shared);
    std::thread t2(byReference, std::ref(shared));
    std::thread t3(byConstReference, std::cref(shared));
    
    shared = true;
    
    t1.join();
    t2.join();
    t3.join();

    std::cout << '\n';

}

Jeder Thread schläft eine Millisekunde lang (1, 2 und 3), bevor er den booleschen Wert anzeigt. Nur der Thread t1 hat eine lokale Kopie des booleschen Wertes und hat daher keinen Data Race. Die Ausgabe des Programms zeigt, dass die booleschen Werte der Threads t2 und t3 ohne Synchronisation geändert werden.

Der Gedanke ist naheliegend, dass der Thread t3 aus dem vorherigen Beispiel copiedValueDataRace.cpp einfach durch std::thread t3(byConstReference, shared) ersetzt werden kann. Das Programm kompiliert und läuft, was aber wie eine Referenz aussieht, ist eine Kopie. Der Grund dafür ist, dass die Type Traits-Funktion std::decay auf jedes Thread-Argument angewendet wird. std::decay führt implizite lValue zu rValue, Array-zu-Zeiger und Funktion-zu-Zeiger Konvertierung seines Typs T durch. Insbesondere ruft es in diesem Fall die Funktion std::remove_reference auf den Datentyp T auf.

Das folgende Programm perConstReference.cpp verwendet einen nicht kopierbaren Datentyp NonCopyableClass.

// perConstReference.cpp

#include <thread>

class NonCopyableClass{
    public:

    // the compiler generated default constructor
    NonCopyableClass() = default;

    // disallow copying
    NonCopyableClass& operator = 
      (const NonCopyableClass&) = delete;
    NonCopyableClass (const NonCopyableClass&) = delete;
  
};

void perConstReference(const NonCopyableClass& nonCopy){}

int main(){

    NonCopyableClass nonCopy;                      // (1)

    perConstReference(nonCopy);                    // (2)
    
    std::thread t(perConstReference, nonCopy);     // (3)
    t.join();

}

Das Objekt nonCopy (1) ist nicht kopierbar. Das ist in Ordnung, wenn ich die Funktion perConstReference mit dem Argument nonCopy (2) aufrufe, weil die Funktion ihr Argument per konstanter Referenz annimmt. Wenn ich dieselbe Funktion im Thread t (3) verwende, erzeugt der GCC einen ausführlichen Compilerfehler mit mehr als 300 Zeilen:

Der wesentliche Teil der Fehlermeldung befindet sich in der Mitte des Screenshots in einem roten, abgerundeten Rechteck: "error: use of deleted function". Der Copy-Konstruktor der Klasse NonCopyableClass ist nicht verfügbar.

Wer etwas ausleiht, muss sicherstellen, dass der zugrunde liegende Wert noch beim Verwenden verfügbar ist.

Probleme mit der Lebensdauer von Referenzen

Wenn ein Thread sein Argument per Referenz verwendet und man Thread detach aufruft, ist äußerste Vorsicht geboten. Das kleine Programm copiedValueLifetimeIssues.cpp besitzt undefiniertes Verhalten.

// copiedValueLifetimeIssues.cpp

#include <iostream>
#include <string>
#include <thread>

void executeTwoThreads(){                                   // (1)
    
    const std::string localString("local string");          // (4)
    
    std::thread t1([localString]{
        std::cout << "Per Copy: " << localString << '\n';
    });
    
     std::thread t2([&localString]{
        std::cout << "Per Reference: " << localString << '\n';
    });
    
    t1.detach();                                           // (2)
    t2.detach();                                           // (3)
}
    
using namespace std::chrono_literals;

int main(){
    
    std::cout << '\n';
    
    executeTwoThreads();
    
    std::this_thread::sleep_for(1s);
    
    std::cout << '\n';
    
}

executeTwoThreads (1) startet zwei Threads. Beide Threads werden detached (2 und 3) und geben die lokale Variable localString aus (4). Der erste Thread bindet die lokale Variable per Kopie und der zweite per Referenz. Der Einfachheit halber habe ich in beiden Fällen einen Lambda-Ausdruck verwendet, um die Argumente zu binden. Da die Funktion executeTwoThreads nicht wartet, bis die beiden Threads beendet sind, bezieht sich der Thread t2 auf den lokalen String, der an die Lebensdauer der aufrufenden Funktion gebunden ist. Das führt zu undefiniertem Verhalten. Seltsamerweise scheint mit GCC die maximal optimierte ausführbare Datei -O3 zu funktionieren, während die nicht optimierte ausführbare Datei abstürzt.

Dank Thread-lokalen Speichers kann ein Thread problemlos auf seinen Daten arbeiten.

Thread-lokaler Speicher ermöglicht es mehreren Threads, den lokalen Speicher über einen globalen Zugriffspunkt zu nutzen. Durch die Verwendung des Spezifiziers thread_local wird eine Variable zu einer Thread-lokalen Variable. Das bedeutet, dass du die thread-local Variable ohne Synchronisierung verwenden kannst. Angenommen, du willst die Summe aller Elemente eines Vektors randValues berechnen. Dies lässt sich mit einer Range-based for-Schleife ganz einfach umsetzen.

unsigned long long sum{};
for (auto n: randValues) sum += n;

Für einen PC mit vier Kernen macht man aus dem sequenziellen Programm ein concurrent Programm:

// threadLocallSummation.cpp

#include <atomic>
#include <iostream>
#include <random>
#include <thread>
#include <utility>
#include <vector>

constexpr long long size = 10000000;   

constexpr long long fir =  2500000;
constexpr long long sec =  5000000;
constexpr long long thi =  7500000;
constexpr long long fou = 10000000;

thread_local unsigned long long tmpSum = 0;

void sumUp(std::atomic<unsigned long long>& sum, 
           const std::vector<int>& val, 
           unsigned long long beg, unsigned long long end) {
    for (auto i = beg; i < end; ++i){
        tmpSum += val[i];
    }
    sum.fetch_add(tmpSum);
}

int main(){

  std::cout << '\n';

  std::vector<int> randValues;
  randValues.reserve(size);

  std::mt19937 engine;
  std::uniform_int_distribution<> uniformDist(1, 10);
  for (long long i = 0; i < size; ++i) 
      randValues.push_back(uniformDist(engine));
 
  std::atomic<unsigned long long> sum{}; 
  
  std::thread t1(sumUp, std::ref(sum), 
                 std::ref(randValues), 0, fir);
  std::thread t2(sumUp, std::ref(sum), 
                 std::ref(randValues), fir, sec);
  std::thread t3(sumUp, std::ref(sum), 
                 std::ref(randValues), sec, thi);
  std::thread t4(sumUp, std::ref(sum), 
                 std::ref(randValues), thi, fou);   
  
  t1.join();
  t2.join();
  t3.join();
  t4.join();

  std::cout << "Result: " << sum << '\n';

  std::cout << '\n';

}

Man packt die Range-based for-Schleife in eine Funktion und lässt jeden Thread ein Viertel der Summe in der thread_local-Variable tmpSum berechnen. Die Zeile sum.fetch_add(tmpSum) (1) summiert schließlich alle Werte in der atomaren Summe. Mehr über thread_local Speicher lässt sich in dem Artikel "Threadlokale Daten" nachlesen.

Promises und Futures teilen sich einen geschützten Datenkanal.

C++11 bietet Futures und Promises in drei Varianten an: std::async, std::packaged_task und das Paar std::promise und std::future. Das Future ist ein geschützter Platzhalter für den Wert, den der Promise setzt. Aus Sicht der Synchronisierung ist die entscheidende Eigenschaft eines promise/future-Paares, dass ein geschützter Datenkanal beide miteinander verbindet. Bei der Implementierung eines Future sind einige Entscheidungen zu treffen.

  • Ein Future kann seinen Wert explizit mit dem get-Aufruf abfragen und es
  • kann die Berechnung lazy (nur auf Anfrage) oder eager (sofort) starten. Nur der Promise std::async unterstützt die lazy Evaluation mit ein Launch Policy.
auto lazyOrEager = std::async([]{ return "LazyOrEager"; });
auto lazy = std::async(std::launch::deferred, 
  []{ return "Lazy"; });
auto eager = std::async(std::launch::async, []{ return "Eager"; });

lazyOrEager.get();
lazy.get();
eager.get();

Wenn ich keine Launch Policy angebe, entscheidet das System, ob der Job sofort oder auf Anfrage gestartet wird. Mit der Launch-Policy std::launch::async wird ein neuer Thread erstellt, und der Promise beginnt sofort mit seiner Arbeit. Diese steht im Gegensatz zur Launch-Policy std::launch::deferred. Der Aufruf lazy.get() startet den Promise. Außerdem wird der Promise in dem Thread ausgeführt, der das Ergebnis mit get anfordert.

Mehr über Futures in C++ findet sich in dem Artikel "Asynchrone Funktionsaufrufe".

Data Races können nicht passieren, wenn Daten nicht gleichzeitig geschrieben und gelesen werden. In meinem nächsten Artikel werde ich über Muster schreiben, die dir helfen, dich vor Veränderung zu schützen. (rme)