C++20: Thread-Pools mit cppcoro
Der dritte und letzte Artikel der Miniserie zu Lewis Bakers Coroutinen-Abstraktion cppcoro fĂŒhrt Thread-Pools ein.
Der dritte und letzte Artikel der Miniserie zu Lewis Bakers Coroutinen-Abstraktion cppcoro [1] fĂŒhrt Thread-Pools ein.
Der heutige Artikel baut auf meinen zwei vorherigen Artikeln zu cppcoro auf:
- C++20: Coroutinen mit cppcoro: [2] EinfĂŒhrung in cppcoro inklusive der elementaren Coroutinen-Tasks und -Generatoren
- C++20: MĂ€chtige Coroutinen mit cppcoro: [3] MĂ€chtigere Coroutinen dank Threads
ZusĂ€tzlich zuer cppcoro::sync_wait-Funktion, mit der sich einfach auf die vollstĂ€ndige AusfĂŒhrung eines Awaitable warten lĂ€sst, bietet cppcoro die interessante cppcoro::when_all-Funktion an.
when_all
when_all: erzeugt ein Awaitable, das auf alle Input-Awaitable wartet und das Aggregat der einzelnen Ergebnisse zurĂŒckgibt.
Ich habe die Definition cppcoro::when_all ein wenig vereinfacht. Das folgende Beispiel soll einen ersten Eindruck geben:
// cppcoroWhenAll.cpp
#include <chrono>
#include <iostream>
#include <thread>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/when_all.hpp>
using namespace std::chrono_literals;
cppcoro::task<std::string> getFirst() {
std::this_thread::sleep_for(1s); // (3)
co_return "First";
}
cppcoro::task<std::string> getSecond() {
std::this_thread::sleep_for(1s); // (3)
co_return "Second";
}
cppcoro::task<std::string> getThird() {
std::this_thread::sleep_for(1s); // (3)
co_return "Third";
}
cppcoro::task<> runAll() {
// (2)
auto[fir, sec, thi] = co_await cppcoro::when_all(getFirst(), getSecond(), getThird());
std::cout << fir << " " << sec << " " << thi << std::endl;
}
int main() {
std::cout << std::endl;
auto start = std::chrono::steady_clock::now();
cppcoro::sync_wait(runAll()); // (1)
std::cout << std::endl;
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start; // (4)
std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
std::cout << std::endl;
}
Der Top-Level Task cppcoro::sync_wait(runAll()) (Zeite 1) wartet auf das Awaitable runAll. Dieses wiederum wartet auf die Awaitables getFirst, getSecond und getThird (Zeile 2). Die Awaitables runAll, getFirst, getSecond und getThird sind Coroutinen. Jede der get-Funktionen schlĂ€ft fĂŒr eine Sekunde (Zeite 3). Drei mal eins macht drei. Diese drei Sekunden sind genau die Zeit, die der Aufruf cppcoro::sync_wait(runAll()) auf alle Coroutinen wartet. Zeile 4 gibt die Zeitdauer aus:
Das waren die Grundlagen zur Funktion cppcoro::when_all. Jetzt erweitere ich das Beispiel um einen Thread-Pool.
static_thread_pool
static_thread_pool verwaltet (schedule) Arbeitspakete auf einem Thread-Pool fester LĂ€nge. cppcoro::static_thread_pool kann mit und ohne Angabe einer Anzahl aufgerufen werden. Die Anzahl steht fĂŒr die Anzahl der Threads, die erzeugt werden. Falls du die Anzahl nicht angibst, kommt die Funktion std::thread::hardware_concurrency() [4] zum Einsatz. Sie gibt einen Hinweis darauf, wie viele Hardware-Threads auf dem System unterstĂŒtzt werden. Dies ist meist die Anzahl der Prozessoren oder Kerne, die zur VerfĂŒgung stehen.
Das folgende Programm basiert auf dem vorherigen Programm, lediglich die Coroutinen getFirst, getSecond und getThird werden gleichzeitig ausgefĂŒhrt:
// cppcoroWhenAllOnThreadPool.cpp
#include <chrono>
#include <iostream>
#include <thread>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/static_thread_pool.hpp>
#include <cppcoro/when_all.hpp>
using namespace std::chrono_literals;
cppcoro::task<std::string> getFirst() {
std::this_thread::sleep_for(1s);
co_return "First";
}
cppcoro::task<std::string> getSecond() {
std::this_thread::sleep_for(1s);
co_return "Second";
}
cppcoro::task<std::string> getThird() {
std::this_thread::sleep_for(1s);
co_return "Third";
}
template <typename Func>
cppcoro::task<std::string> runOnThreadPool(cppcoro::static_thread_pool& tp, Func func) {
co_await tp.schedule();
auto res = co_await func();
co_return res;
}
cppcoro::task<> runAll(cppcoro::static_thread_pool& tp) {
auto[fir, sec, thi] = co_await cppcoro::when_all( // (3)
runOnThreadPool(tp, getFirst),
runOnThreadPool(tp, getSecond),
runOnThreadPool(tp, getThird));
std::cout << fir << " " << sec << " " << thi << std::endl;
}
int main() {
std::cout << std::endl;
auto start = std::chrono::steady_clock::now();
cppcoro::static_thread_pool tp; // (1)
cppcoro::sync_wait(runAll(tp)); // (2)
std::cout << std::endl;
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start; // (4)
std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
std::cout << std::endl;
}
Dieses sind die entscheidenden Unterschiede zum vorherigen Programm cppcoroWhenAll.cpp. Ich erzeuge in Zeile (1) einen Thread-Pool tp und verwende diesen als Argument der Funktion runAll(tp) (Zeile 2). Die Funktionen runAll verwendet den Thread-Pool, um die Coroutinen gleichzeitig zu starten. Dank Structured Binding (Zeile 3) lassen sich die Werte aller Coroutinen einfach einsammeln und Variablen zuweisen. Das AusfĂŒhren des Programms benötigt jetzt eine statt bisher drei Sekunden.
Du weiĂt es eventuell bereits, dass wir mit C++20 Latches und Barriers erhalten. Beise sind einfache Synchronisationsmechanismen, die es erlauben, Threads zu blockieren, bis ein ZĂ€hler den Wert null besitzt. cppcoro bietet auch Latches und Barriers an.
async_latch
async_latch erlaubt es, Coroutinen asynchron zu warten, bis ein ZĂ€hler den Wert null besitzt. Das folgende Programm cppcoroLatch.cpp stellt Thread-Synchronisation mit cppcoro::async_latch vor:
// cppcoroLatch.cpp
#include <chrono>
#include <iostream>
#include <future>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/async_latch.hpp>
#include <cppcoro/task.hpp>
using namespace std::chrono_literals;
cppcoro::task<> waitFor(cppcoro::async_latch& latch) {
std::cout << "Before co_await" << std::endl;
co_await latch; // (3)
std::cout << "After co_await" << std::endl;
}
int main() {
std::cout << std::endl;
cppcoro::async_latch latch(3); // (1)
// (2)
auto waiter = std::async([&latch]{ cppcoro::sync_wait(waitFor(latch)); });
auto counter1 = std::async([&latch] { // (2)
std::this_thread::sleep_for(2s);
std::cout << "counter1: latch.count_down() " << std::endl;
latch.count_down();
});
auto counter2 = std::async([&latch] { // (2)
std::this_thread::sleep_for(1s);
std::cout << "counter2: latch.count_down(2) " << std::endl;
latch.count_down(2);
});
waiter.get(), counter1.get(), counter2.get();
std::cout << std::endl;
}
In Zeile (1) habe ich einen cppcoro::asynch_latch erzeugt und seinen ZĂ€hler mit drei initialisiert. Dieses Mal verwende ich std::async (Zeile 2), um die drei Coroutinen gleichzeitig auszufĂŒhren. Jeder std::async-Aufruf erhĂ€lt den Latch per Referenz. Die waitFor-Coroutine warten in Zeile 3, bis der ZĂ€hler den Wert null besitzt. Die Coroutine counter1 schlĂ€ft fĂŒr zwei Sekunden, bevor sie den ZĂ€hler um zwei reduziert. Im Gegensatz dazu schlĂ€ft counter2 eine Sekunde und reduziert den ZĂ€hler um zwei. Der Screenshot zeigt das wechselnde AusfĂŒhren der Threads:
Wie geht's weiter?
Bisher habe ich ĂŒber drei der vier groĂen Neuerungen [5] in C++20 geschrieben: Concepts, Ranges und Coroutinen. Module fehlen noch in meiner Tour durch die groĂen Vier in C++20 und werden daher Thema der nĂ€chsten Artikel sein.
Eine kleine Anmerkung möchte ich noch loswerden. Wenn du gerne einen Artikel zu einem C++20-Feature schreiben möchtest, das ich noch vorstellen werde, schreibe mir eine E-Mail. Ich freue mich auf den Artikel und werde ihn gegebenenfalls in Englisch/Deutsch ĂŒbersetzen, falls dies notwendig ist. ( [6])
URL dieses Artikels:
https://www.heise.de/-4713235
Links in diesem Artikel:
[1] https://github.com/lewissbaker/cppcoro
[2] https://heise.de/-4705161
[3] https://heise.de/-4708995
[4] https://en.cppreference.com/w/cpp/thread/thread/hardware_concurrency
[5] https://heise.de/-4568956
[6] mailto:rainer@grimm-jaud.de
Copyright © 2020 Heise Medien