Programmieren mit Tasks und parallelen Schleifen

Spätestens seitdem jeder Softwareentwickler einen Multicore-Rechner auf dem Schreibtisch oder Schoß hat, sind Threads fester Bestandteil vieler Anwendungen. Die Parallelisierung mit ihnen ist jedoch ein mühsames Unterfangen. Gesucht sind Alternativen und deren Umsetzung in den gängigen Programmiersprachen.

In Pocket speichern vorlesen Druckansicht 13 Kommentare lesen
Lesezeit: 23 Min.
Von
  • Urs Gleim
  • Tobias Schüle
Inhaltsverzeichnis

Spätestens seitdem jeder Softwareentwickler einen Multicore-Rechner auf dem Schreibtisch oder Schoß hat, sind Threads fester Bestandteil vieler Anwendungen. Die Parallelisierung mit ihnen ist jedoch ein mühsames Unterfangen. Gesucht sind Alternativen und deren Umsetzung in den gängigen Programmiersprachen.

Man muss viel Zeit investieren, um ein Programm mit Threads so zu parallelisieren, dass es die verfügbare Rechenleistung effizient ausnutzt. Ein Grund dafür ist, dass das Erzeugen und Verwalten von Threads einen nicht unerheblichen Aufwand verursacht. Hinzu kommt, dass bei einem Thread-Wechsel der aktuelle Kontext (Prozessorregister etc.) zu sichern und der des auszuführenden Threads wiederherzustellen ist. Sind wesentlich mehr Threads rechenbereit als Prozessorkerne vorhanden, kommt es zur Überbelegung des Systems, die zu Leistungseinbußen führen kann.

Mehr Infos

"Multicore-Software" – das Buch

Der Artikel ist ein für heise Developer aufbereiteter Auszug aus dem im dpunkt.verlag erschienenen Buch "Multicore-Software", das sich auf rund 370 Seiten mit den Grundlagen, der Architektur sowie der Implementierung paralleler Software in C/C++, Java und C# auseinandersetzt.

Um das Potenzial von Multicore-Prozessoren ausschöpfen zu können, ist es wünschenswert, Parallelität auf einer feineren Ebene nutzen zu können. Zu dem Zweck bieten zeitgemäße Programmiersprachen und Bibliotheken zunehmend Programmiermodelle mit Tasks an. Diese erlauben es, ein Problem in kleine Teilaufgaben zu zerlegen und parallel auszuführen. Das Erzeugen eines Tasks ist vergleichsweise einfach und geht viel schneller vonstatten als das eines Threads. Deshalb sind Tasks auch gut für die Implementierung datenparalleler Algorithmen geeignet. Ein weiterer Vorteil ist die bessere Skalierbarkeit: Die meisten mit Tasks parallelisierten Programme lassen sich ohne Anpassungen auf Prozessoren mit unterschiedlich vielen Kernen effizient ausführen, sofern die Anzahl der Tasks groß genug ist.

Ein Task ist durch eine parallel zum Aufrufer ausführbare Einsprungsfunktion definiert. In der Regel lassen sich ihr ein oder mehrere Argumente übergeben. Darüber hinaus bieten Tasks spezielle Mechanismen zur Synchronisation an, die je nach Implementierung variieren. So kann man gezielt auf einzelne Tasks warten, auf die von einem "Eltern-Task" erzeugten "Kinder" oder auf eine explizit angegebene Liste beziehungsweise Gruppe von Tasks. Bei manchen Implementierungen hat man sogar die Wahl zwischen mehreren Möglichkeiten. Begonnen sei mit C++ und den Threading Building Blocks (TBB), einer von Intel entwickelten Bibliothek für die parallele Programmierung. Um beispielsweise eine Funktion f parallel zum aktuellen Programmstück auszuführen, legt man eine Task-Gruppe an und ruft anschließend deren run-Methode mit dem auszuführenden Task als Argument auf (mit den seit C++11 verfügbaren Lambda-Funktionen [1] geht das auf kompakte Weise). Die Synchronisation geschieht mit wait:

tbb::task_group group;
group.run([] {f();});
...
group.wait();

In Java verwendet man am besten den in der Version 7 hinzugekommenen ForkJoinPool. Da Java 7 jedoch keine Lambda-Funktionen unterstützt, ist zunächst eine Klasse vom Typ RecursiveAction beziehungsweise RecursiveTask<V> – letztere für Tasks, die ein Ergebnis zurückliefern – zu implementieren:

class MyTask extends RecursiveAction {
// ... Konstruktor
void compute() {
f()
}
}

Nach dem Erzeugen eines neuen ForkJoinPool lässt sich der Task folgendermaßen starten:

static final ForkJoinPool fjPool = new ForkJoinPool();
// ...
MyTask t = new MyTask();
fjPool.invoke(t);

Die Methode invoke wartet implizit auf das Ende des Tasks. Ist das nicht gewünscht, muss man den Task mit execute starten. Tasks, die aus dem Kontext eines laufenden Tasks gestartet werden sollen, stößt man mit invoke beziehungsweise fork (wartet nicht) der Klasse ForkJoinTask an. Das Warten auf mit fork gestartete Tasks erfolgt mit join. Daher der Name des Pools.

In C# sieht das aufgeräumter aus, insbesondere ist der Pool nicht explizit zu erzeugen:

Task t = new Task(f);
t.Start();
// ...
t.Wait();

Mit der alternativen Factory-Methode aus Task.Factory und Lambda-Funktionen ist das C#-Beispiel dem TBB-Beispiel sehr ähnlich:

Task t = Task.Factory.StartNew(() => f());
// ...
t.Wait();

Es ist deutlich zu erkennen, dass die zugrunde liegenden Konzepte überall die gleichen sind. Die Umsetzungen unterscheiden sich vor allem im Komfort der Benutzung sowie im Verhalten des Laufzeitsystems.

Wie bei Threads ist bei der parallelen Ausführung mehrerer Tasks Vorsicht geboten, da Zugriffe auf gemeinsame Variablen oder Ein-/Ausgabeoperationen zu unvorhergesehenen Fehlern führen können. Deshalb sollten Tasks keine Daten modifizieren, auf die parallel andere Tasks oder Threads zugreifen.

Eine nützliche Eigenschaft von Tasks ist die Möglichkeit der verschachtelten Parallelität. Das bedeutet, dass sich innerhalb eines Tasks weitere Tasks erzeugen lassen. Auf diese Weise lassen sich rekursive Algorithmen nach dem "Teile und herrsche"-Prinzip oder das Durchlaufen baumartiger Datenstrukturen einfach parallelisieren. Ein typisches Beispiel ist der Quicksort-Algorithmus, der das zu sortierende Feld anhand eines Pivot-Elements zunächst in zwei Teile partitioniert. In einem zweiten Schritt werden die beiden Teile dann rekursiv sortiert. Die so entstehenden Teilbäume lassen sich parallel ausführen, indem für jeden Aufruf ein neuer Task erzeugt wird. Der folgende Quellcode zeigt die Vorgehensweise mit TBB (auf die Darstellung der Funktion partition haben die Autoren der Übersichtlichkeit halber verzichtet).

template <typename RAI> // RandomAccessIterator
void quicksort(const RAI begin, const RAI end) {
if(size_t(end-begin) > 1) { // Abbruch, falls nur noch ein Element
const RAI pivot = partition(begin, end); // Partitionierung
tbb::task_group group;
group.run([=] {quicksort(begin, pivot);});
group.run([=] {quicksort(pivot+1, end);});
group.wait();
}
}

Beim Ausführen des Algorithmus auf einem Multicore-Rechner stellt man schnell fest, dass er langsamer als die sequenzielle Variante ist. Der Grund dafür ist, dass sehr viele Tasks erzeugt werden, die einen erheblichen Mehraufwand verursachen. Zur Lösung des Problems muss man die Anzahl der erzeugten Tasks reduzieren. Dazu bricht man die Rekursion ab, wenn die Teilfelder eine bestimmte Größe erreicht haben, und sortiert sie sequenziell. Ihre Größe bezeichnet man als Körnung (engl. grain size) und ist so zu wählen, dass alle Prozessorkerne gut ausgelastet sind. Dabei sollte man eine Reserve einplanen, um Leerlaufzeiten zu vermeiden, die entstehen können, wenn die Tasks unterschiedliche Laufzeiten haben. Bei Quicksort kann es je nach Wahl des Pivot-Elements nämlich passieren, dass die Teile unterschiedlich groß sind und das Sortieren der Teilfelder somit unterschiedlich viel Zeit in Anspruch nimmt.

Eine weitere Optimierung ist, den zweiten Aufruf von quicksort direkt im aktuellen Task auszuführen, wodurch nicht für beide Aufrufe jeweils ein neuer Task zu erzeugen ist. Außerdem genügt es, nur einmal am Ende mit wait zu synchronisieren. Ein Aufruf von wait bei jedem Rekursionsschritt ist nur notwendig, wenn die Tasks Ergebnisse zurückliefern, die weiterzuverarbeiten sind. Die optimierte Fassung des Beispiels ist in dem nächsten Codestück zu sehen.

template <typename RAI> // RandomAccessIterator
void quicksort_recursive(const RAI begin, const RAI end,
const size_t grainsize, tbb::task_group& group) {
if(size_t(end-begin) > grainsize) {
const RAI pivot = partition(begin, end);
group.run([=, &group] {quicksort_recursive(begin,
pivot, grainsize, group);});
quicksort_recursive(pivot+1, end, grainsize, group);
}
else
std::sort(begin, end); // sequenziell sortieren
}

template <typename RAI>
void quicksort(const RAI begin, const RAI end) {
const size_t grainsize = size_t(end-begin) /
(NUMBER_OF_CORES * C); // z.B. C = 2
tbb::task_group group;
quicksort_recursive(begin, end, grainsize, group);
group.wait(); // auf das Ende aller Tasks warten
}

Weiterführende Informationen zu parallelen Sortierverfahren sind in den Schriften von Cormen, Leiserson, Rivest und Stein [2] sowie von Miller und Boxer [3] zu finden.