Programmiersprache C++: Erweiterung für Inclusive Scan in C++26
Dank der Umsetzung von std::execution im kommenden Standard C++26 lässt sich Inclusive scan auch asynchronous ausführen.
- Rainer Grimm
Der inklusive Scan löst Probleme im Zusammenhang mit Bereichsabfragen, wie die Berechnung der Summe einer Reihe von Elementen in einem Array. Er wird auch bei Bereichsminimumabfragen und verschiedenen anderen Algorithmen verwendet.
Christmas Special
Mache den Unterschied
Lasst uns gemeinsam etwas Großes vollbringen: Vom 1. bis 24. Dezember spende ich die Hälfte des Geldes für die ALS-Forschung, wenn Sie eines meiner Mentoring-Programme buchen.
Ich werde jede Woche ein Update veröffentlichen, damit wir sehen können, was wir bisher erreicht haben.
Hier finden Sie weitere Informationen über mich, die Struktur meiner Mentoring-Programme und die einzelnen Programme:
- Fundamentals for C++ Professionals
- Design Patterns and Architectural Patterns with C++
- C++20: Get the Details
- Concurrency with Modern C++
- Generic Programming (Templates) with C++
- Embedded Programming with Modern C++
Wenn Sie oder Ihr Team eine spezielle Auswahl aus meinem Mentoring-Programm wünschen, kontaktieren Sie mich bitte unter rainer.grimm@ModernesCpp.de. Wir finden eine Lösung.
Warum?
Hier ist eine Aussage der ALS-Ambulanz:
ALS zählt zu den „Waisen der Medizin“ – seltene Erkrankungen, die nicht im gesellschaftlichen Fokus stehen. Die Behandlung ist deutlich unterfinanziert und auch für die ALS-Forschung gibt es bislang keine ausreichenden Mittel. Spenden und andere Drittmittel sind von elementarer Bedeutung.
Diese Unterfinanzierung wurde mit der Ice Bucket Challenge offensichtlich.
Forschung benötigt mehr Geld.
Präfixsumme
Bevor ich auf den asynchronen inklusiven Scan eingehe, möchte ich den inklusiven Scan, auch Präfixsumme genannt, vorstellen.
In der englischsprachigen Wikipedia findet sich folgende Definition: "In computer science, the prefix sum, cumulative sum, inclusive scan, or simply scan of a sequence of numbers x0, x1, x2, … is a second sequence of numbers y0, y1, y2, …, the sums of prefixes (running totals) of the input sequence:"
y0 = x0
y1 = x0 + x1
y2 = x0 + x1+ x2
...
Proposal P2300R10 bietet die Implementierung von Inclusive Scan.
using namespace std::execution;
sender auto async_inclusive_scan(scheduler auto sch, // 2
std::span<const double> input, // 1
std::span<double> output, // 1
double init, // 1
std::size_t tile_count) // 3
{
std::size_t const tile_size = (input.size() + tile_count - 1) / tile_count;
std::vector<double> partials(tile_count + 1); // 4
partials[0] = init; // 4
return just(std::move(partials)) // 5
| continues_on(sch)
| bulk(tile_count, // 6
[ = ](std::size_t i, std::vector<double>& partials) { // 7
auto start = i * tile_size; // 8
auto end = std::min(input.size(), (i + 1) * tile_size); // 8
partials[i + 1] = *--std::inclusive_scan(begin(input) + start, // 9
begin(input) + end, // 9
begin(output) + start); // 9
}) // 10
| then( // 11
[](std::vector<double>&& partials) {
std::inclusive_scan(begin(partials), end(partials), // 12
begin(partials)); // 12
return std::move(partials); // 13
})
| bulk(tile_count, // 14
[ = ](std::size_t i, std::vector<double>& partials) { // 14
auto start = i * tile_size; // 14
auto end = std::min(input.size(), (i + 1) * tile_size); // 14
std::for_each(begin(output) + start, begin(output) + end, // 14
[&] (double& e) { e = partials[i] + e; } // 14
);
})
| then( // 15
[ = ](std::vector<double>&& partials) { // 15
return output; // 15
}); // 15
}
Hier ist die Erklärung aus dem Proposal P2300R10, übersetzt ins Deutsche.
- Die Funktion scannt eine Folge von
double
s (dargestellt alsstd::span<const double> input
) und speichert das Ergebnis in einer anderen Folge vondouble
s (dargestellt alsstd::span<double> output
). - Es wird ein Scheduler verwendet, der angibt, auf welcher Ausführungsressource der Scan gestartet werden soll.
- Es wird auch ein
tile_count
-Parameter verwendet, der die Anzahl der auszuführenden Agenten steuert, die erzeugt werden. - Zuerst müssen wir den für den Algorithmus benötigten temporären Speicher zuweisen, was wir mit einem
std::vector, partials
erledigen. Wir benötigen einen doppelten temporären Speicher für jeden von uns erstellten Ausführungsagenten. - Als Nächstes erstellen wir unseren ersten Sender mit
execution::just
undexecution::continues_on
. Diese Sender senden den temporären Speicher, den wir in den Sender verschoben haben. Der Sender hat einen Fertigstellungs-Scheduler vonsch
, was bedeutet, dass das nächste Element in der Kettesch
verwendet. - Sender und Senderadapter unterstützen die Komposition über
operator|
, ähnlich wie bei C++-Bereichen. Wir verwendenoperator |
, um das nächste Stück Arbeit anzuhängen, dastile_count
-Ausführungsagenten unter Verwendung vonexecution::bulk
s erzeugt. - Jeder Agent ruft eine
std::invocable
auf und übergibt ihr zwei Argumente. Das erste ist der Index des Agenten (i
) in derexecution::bulk
-Operation, in diesem Fall eine eindeutige Ganzzahl in [0, tile_count
). Das zweite Argument ist, wie der Eingabesender gesendet hat – der temporäre Speicher. - Wir beginnen mit der Berechnung von Anfang und Ende des Bereichs der Eingabe- und Ausgabeelemente, für die dieser Agent verantwortlich ist, basierend auf unserem Agentenindex.
- Dann führen wir einen sequenziellen
std::inclusive_scan
über unsere Elemente durch. Wir speichern das Scan-Ergebnis für unser letztes Element, das die Summe aller unserer Elemente ist, in unserem temporären Speicher "partials
". - Nachdem alle Berechnungen in diesem ersten Durchlauf in großen Mengen abgeschlossen sind, hat jeder der erzeugten Ausführungsagenten die Summe seiner Elemente in seinen Slot in Teilmengen geschrieben.
- Jetzt müssen wir alle Werte in Teilmengen scannen. Das machen wir mit einem einzelnen Ausführungsagenten, der nach Abschluss von
execution::bulk
ausgeführt wird. Wir erstellen diesen Ausführungsagenten mitexecution::then
. execution::then
nimmt einen Eingabesender und einstd::invocable
und ruft dasstd::invocable
mit dem vom Eingabesender gesendeten Wert auf. Innerhalb unseresstd::invocable
rufen wirstd::inclusive_scan
für Teilmengen auf, die die Eingabesender an uns senden werden.- Dann geben wir Teilmengen zurück, die in der nächsten Phase benötigt werden.
- Schließlich führen wir eine weitere
execution::bulk
in der gleichen Form wie zuvor durch. In dieserexecution::bulk
verwenden wir die gescannten Werte in Teilbereichen, um die Summen aus anderen Kacheln in unsere Elemente zu integrieren und den inklusiven Scan abzuschließen. async_inclusive_scan
gibt einen Sender zurück, der die Ausgabestd::span<double>
sendet. Ein Verbraucher des Algorithmus kann zusätzliche Arbeit verketten, die das Scan-Ergebnis verwendet. Zu dem Zeitpunkt, an demasync_inclusive_scan
zurückgegeben wird, ist die Berechnung möglicherweise noch nicht abgeschlossen. Tatsächlich hat sie möglicherweise noch nicht einmal begonnen.
Sender
just(values)
: Gibt einen Sender ohne Abschlussplaner zurück, der die bereitgestellten Werte sendet.just
ist eine Senderfabrik.bulk(input, shape, call)
: Gibt einen Sender zurück, der den aufrufbarencall
beschreibt, der aufinput
gemäßshape
aufgerufen wird.continues_on(input, scheduler)
: Gibt einen Sender zurück, der den Übergang vom Ausführungsagenten des Senders für die Eingabe zum Ausführungsagenten des Ziel-scheduler
s beschreibt.then(input, call)
:then
gibt einen Sender zurück, der die Fortsetzung der Task des Senders für die Eingabe auf einem hinzugefügten Knoten des Aufrufs der bereitgestellten Funktioncall
beschreibt.
Wäre es nicht schön, dieses Programm in Aktion zu sehen? Derzeit (Dezember 2024) unterstützt kein Compiler std::execution
oder die Concepts sender
und scheduler
.
Hier hilft die Referenzimplementierung stdexec, wobei ich den Datentyp der verarbeiteten Elemente von double
in int
geändert habe:
// inclusiveScanExecution.cpp
#include <algorithm>
#include <exec/static_thread_pool.hpp>
#include <iostream>
#include <numeric>
#include <span>
#include <stdexec/execution.hpp>
#include <vector>
auto async_inclusive_scan(auto sch, // 2
std::span<const int> input, // 1
std::span<int> output, // 1
int init, // 1
std::size_t tile_count) // 3
{
std::size_t const tile_size = (input.size() + tile_count - 1) / tile_count;
std::vector<int> partials(tile_count + 1); // 4
partials[0] = init; // 4
return stdexec::just(std::move(partials)) // 5
| stdexec::continues_on(sch) |
stdexec::bulk(tile_count, // 6
[=](std::size_t i, std::vector<int> &partials) { // 7
auto start = i * tile_size; // 8
auto end =
std::min(input.size(), (i + 1) * tile_size); // 8
partials[i + 1] =
*--std::inclusive_scan(begin(input) + start, // 9
begin(input) + end, // 9
begin(output) + start); // 9
}) // 10
| stdexec::then( // 11
[](std::vector<int> &&partials) {
std::inclusive_scan(begin(partials), end(partials), // 12
begin(partials)); // 12
return std::move(partials); // 13
}) |
stdexec::bulk(
tile_count, // 14
[=](std::size_t i, std::vector<int> &partials) { // 14
auto start = i * tile_size; // 14
auto end = std::min(input.size(), (i + 1) * tile_size); // 14
std::for_each(begin(output) + start, begin(output) + end, // 14
[&](int &e) { e = partials[i] + e; } // 14
);
}) |
stdexec::then( // 15
[=](std::vector<int> &&partials) { // 15
return output; // 15
}); // 15
}
int main() {
std::cout << '\n';
std::vector<int> input(30);
std::iota(begin(input), end(input), 0);
for (auto e : input) {
std::cout << e << ' ';
}
std::cout << '\n';
std::vector<int> output(input.size());
exec::static_thread_pool pool(8);
auto sch = pool.get_scheduler();
auto [out] =
stdexec::sync_wait(async_inclusive_scan(sch, input, output, 0, 4))
.value();
for (auto e : out) {
std::cout << e << ' ';
}
std::cout << '\n';
}
In main
wird ein std::vector<int>
input mit 30 Elementen erstellt. Die std::iota
-Funktion füllt den input
-Vektor mit aufeinanderfolgenden Ganzzahlen, beginnend mit 0
. Das Programm gibt dann den Inhalt des Vektors in der Konsole aus.
Als Nächstes wird ein std::vector<int> output
mit der gleichen Größe wie der input
-Vektor erstellt, um die Ergebnisse der Inclusive-Scan-Operation zu speichern. Der exec::static_thread_pool pool
verfügt über Threads, die zur gleichzeitigen Ausführung von Tasks verwendet werden. Die get_scheduler
-Memberfunktion des Thread-Pools erstellt ein Scheduler-Objekt sch
.
Die Funktion async_inclusive_scan
verwendet den Scheduler sch
, den Vektor input
, den Vektor output
, einen Anfangswert von 0
und eine Kachelanzahl von 4
. Diese Funktion führt die Inclusive-Scan-Operation asynchron unter Verwendung des angegebenen Scheduler aus und gibt ein Future-ähnliches Objekt zurück. Die Funktion stdexec::sync_wait
wartet synchron auf den Abschluss der async_inclusive_scan
-Operation, und das Ergebnis wird in die Variable out
entpackt.
Schließlich gibt das Programm den Inhalt des Vektors out
in der Konsole aus:
Wie geht es weiter?
In meinem nächsten Blogpost werde ich einen Schritt zurückgehen und die Komposition von Sendern mittels des Operators |
erklären.
(rme)