Programmiersprache C++: Erweiterung für Inclusive Scan in C++26

Dank der Umsetzung von std::execution im kommenden Standard C++26 lässt sich Inclusive scan auch asynchronous ausführen.

In Pocket speichern vorlesen Druckansicht 1 Kommentar lesen
Holzwürfel mit dem Schriftzug C++

(Bild: SerbioVas/Shutterstock)

Lesezeit: 7 Min.
Von
  • Rainer Grimm
Inhaltsverzeichnis

Der inklusive Scan löst Probleme im Zusammenhang mit Bereichsabfragen, wie die Berechnung der Summe einer Reihe von Elementen in einem Array. Er wird auch bei Bereichsminimumabfragen und verschiedenen anderen Algorithmen verwendet.

Modernes C++ – Rainer Grimm

Rainer Grimm ist seit vielen Jahren als Softwarearchitekt, Team- und Schulungsleiter tätig. Er schreibt gerne Artikel zu den Programmiersprachen C++, Python und Haskell, spricht aber auch gerne und häufig auf Fachkonferenzen. Auf seinem Blog Modernes C++ beschäftigt er sich intensiv mit seiner Leidenschaft C++.

Lasst uns gemeinsam etwas Großes vollbringen: Vom 1. bis 24. Dezember spende ich die Hälfte des Geldes für die ALS-Forschung, wenn Sie eines meiner Mentoring-Programme buchen.

Ich werde jede Woche ein Update veröffentlichen, damit wir sehen können, was wir bisher erreicht haben.

Hier finden Sie weitere Informationen über mich, die Struktur meiner Mentoring-Programme und die einzelnen Programme:

Wenn Sie oder Ihr Team eine spezielle Auswahl aus meinem Mentoring-Programm wünschen, kontaktieren Sie mich bitte unter rainer.grimm@ModernesCpp.de. Wir finden eine Lösung.

Hier ist eine Aussage der ALS-Ambulanz:

ALS zählt zu den „Waisen der Medizin“ – seltene Erkrankungen, die nicht im gesellschaftlichen Fokus stehen. Die Behandlung ist deutlich unterfinanziert und auch für die ALS-Forschung gibt es bislang keine ausreichenden Mittel. Spenden und andere Drittmittel sind von elementarer Bedeutung.

Diese Unterfinanzierung wurde mit der Ice Bucket Challenge offensichtlich.

Forschung benötigt mehr Geld.

Bevor ich auf den asynchronen inklusiven Scan eingehe, möchte ich den inklusiven Scan, auch Präfixsumme genannt, vorstellen.

In der englischsprachigen Wikipedia findet sich folgende Definition: "In computer science, the prefix sum, cumulative sum, inclusive scan, or simply scan of a sequence of numbers x0, x1, x2, … is a second sequence of numbers y0, y1, y2, …, the sums of prefixes (running totals) of the input sequence:"

    y0 = x0
    y1 = x0 + x1
    y2 = x0 + x1+ x2
    ...

Proposal P2300R10 bietet die Implementierung von Inclusive Scan.

using namespace std::execution;

sender auto async_inclusive_scan(scheduler auto sch,                          // 2
                                 std::span<const double> input,               // 1
                                 std::span<double> output,                    // 1
                                 double init,                                 // 1
                                 std::size_t tile_count)                      // 3
{
  std::size_t const tile_size = (input.size() + tile_count - 1) / tile_count;

  std::vector<double> partials(tile_count + 1);                               // 4
  partials[0] = init;                                                         // 4

  return just(std::move(partials))                                            // 5
       | continues_on(sch)
       | bulk(tile_count,                                                     // 6
           [ = ](std::size_t i, std::vector<double>& partials) {              // 7
             auto start = i * tile_size;                                      // 8
             auto end   = std::min(input.size(), (i + 1) * tile_size);        // 8
             partials[i + 1] = *--std::inclusive_scan(begin(input) + start,   // 9
                                                      begin(input) + end,     // 9
                                                      begin(output) + start); // 9
           })                                                                 // 10
       | then(                                                                // 11
           [](std::vector<double>&& partials) {
             std::inclusive_scan(begin(partials), end(partials),              // 12
                                 begin(partials));                            // 12
             return std::move(partials);                                      // 13
           })
       | bulk(tile_count,                                                     // 14
           [ = ](std::size_t i, std::vector<double>& partials) {              // 14
             auto start = i * tile_size;                                      // 14
             auto end   = std::min(input.size(), (i + 1) * tile_size);        // 14
             std::for_each(begin(output) + start, begin(output) + end,        // 14
               [&] (double& e) { e = partials[i] + e; }                       // 14
             );
           })
       | then(                                                                // 15
           [ = ](std::vector<double>&& partials) {                            // 15
             return output;                                                   // 15
           });                                                                // 15
}

Hier ist die Erklärung aus dem Proposal P2300R10, übersetzt ins Deutsche.

  1. Die Funktion scannt eine Folge von doubles (dargestellt als std::span<const double> input) und speichert das Ergebnis in einer anderen Folge von doubles (dargestellt als std::span<double> output).
  2. Es wird ein Scheduler verwendet, der angibt, auf welcher Ausführungsressource der Scan gestartet werden soll.
  3. Es wird auch ein tile_count-Parameter verwendet, der die Anzahl der auszuführenden Agenten steuert, die erzeugt werden.
  4. Zuerst müssen wir den für den Algorithmus benötigten temporären Speicher zuweisen, was wir mit einem std::vector, partials erledigen. Wir benötigen einen doppelten temporären Speicher für jeden von uns erstellten Ausführungsagenten.
  5. Als Nächstes erstellen wir unseren ersten Sender mit execution::just und execution::continues_on. Diese Sender senden den temporären Speicher, den wir in den Sender verschoben haben. Der Sender hat einen Fertigstellungs-Scheduler von sch, was bedeutet, dass das nächste Element in der Kette sch verwendet.
  6. Sender und Senderadapter unterstützen die Komposition über operator|, ähnlich wie bei C++-Bereichen. Wir verwenden operator |, um das nächste Stück Arbeit anzuhängen, das tile_count-Ausführungsagenten unter Verwendung von execution::bulks erzeugt.
  7. Jeder Agent ruft eine std::invocable auf und übergibt ihr zwei Argumente. Das erste ist der Index des Agenten (i) in der execution::bulk-Operation, in diesem Fall eine eindeutige Ganzzahl in [0, tile_count). Das zweite Argument ist, wie der Eingabesender gesendet hat – der temporäre Speicher.
  8. Wir beginnen mit der Berechnung von Anfang und Ende des Bereichs der Eingabe- und Ausgabeelemente, für die dieser Agent verantwortlich ist, basierend auf unserem Agentenindex.
  9. Dann führen wir einen sequenziellen std::inclusive_scan über unsere Elemente durch. Wir speichern das Scan-Ergebnis für unser letztes Element, das die Summe aller unserer Elemente ist, in unserem temporären Speicher "partials".
  10. Nachdem alle Berechnungen in diesem ersten Durchlauf in großen Mengen abgeschlossen sind, hat jeder der erzeugten Ausführungsagenten die Summe seiner Elemente in seinen Slot in Teilmengen geschrieben.
  11. Jetzt müssen wir alle Werte in Teilmengen scannen. Das machen wir mit einem einzelnen Ausführungsagenten, der nach Abschluss von execution::bulk ausgeführt wird. Wir erstellen diesen Ausführungsagenten mit execution::then.
  12. execution::then nimmt einen Eingabesender und ein std::invocable und ruft das std::invocable mit dem vom Eingabesender gesendeten Wert auf. Innerhalb unseres std::invocable rufen wir std::inclusive_scan für Teilmengen auf, die die Eingabesender an uns senden werden.
  13. Dann geben wir Teilmengen zurück, die in der nächsten Phase benötigt werden.
  14. Schließlich führen wir eine weitere execution::bulk in der gleichen Form wie zuvor durch. In dieser execution::bulk verwenden wir die gescannten Werte in Teilbereichen, um die Summen aus anderen Kacheln in unsere Elemente zu integrieren und den inklusiven Scan abzuschließen.
  15. async_inclusive_scan gibt einen Sender zurück, der die Ausgabe std::span<double> sendet. Ein Verbraucher des Algorithmus kann zusätzliche Arbeit verketten, die das Scan-Ergebnis verwendet. Zu dem Zeitpunkt, an dem async_inclusive_scan zurückgegeben wird, ist die Berechnung möglicherweise noch nicht abgeschlossen. Tatsächlich hat sie möglicherweise noch nicht einmal begonnen.

Sender

  • just(values): Gibt einen Sender ohne Abschlussplaner zurück, der die bereitgestellten Werte sendet. just ist eine Senderfabrik.
  • bulk(input, shape, call): Gibt einen Sender zurück, der den aufrufbaren call beschreibt, der auf input gemäß shape aufgerufen wird.
  • continues_on(input, scheduler): Gibt einen Sender zurück, der den Übergang vom Ausführungsagenten des Senders für die Eingabe zum Ausführungsagenten des Ziel-schedulers beschreibt.
  • then(input, call): then gibt einen Sender zurück, der die Fortsetzung der Task des Senders für die Eingabe auf einem hinzugefügten Knoten des Aufrufs der bereitgestellten Funktion call beschreibt.

Wäre es nicht schön, dieses Programm in Aktion zu sehen? Derzeit (Dezember 2024) unterstützt kein Compiler std::execution oder die Concepts sender und scheduler.

Hier hilft die Referenzimplementierung stdexec, wobei ich den Datentyp der verarbeiteten Elemente von double in int geändert habe:

// inclusiveScanExecution.cpp

#include <algorithm>
#include <exec/static_thread_pool.hpp>
#include <iostream>
#include <numeric>
#include <span>
#include <stdexec/execution.hpp>
#include <vector>

auto async_inclusive_scan(auto sch,                   // 2
                          std::span<const int> input, // 1
                          std::span<int> output,      // 1
                          int init,                   // 1
                          std::size_t tile_count)     // 3
{
  std::size_t const tile_size = (input.size() + tile_count - 1) / tile_count;

  std::vector<int> partials(tile_count + 1); // 4
  partials[0] = init;                        // 4

  return stdexec::just(std::move(partials)) // 5
         | stdexec::continues_on(sch) |
         stdexec::bulk(tile_count,                                      // 6
                       [=](std::size_t i, std::vector<int> &partials) { // 7
                         auto start = i * tile_size;                    // 8
                         auto end =
                             std::min(input.size(), (i + 1) * tile_size); // 8
                         partials[i + 1] =
                             *--std::inclusive_scan(begin(input) + start,   // 9
                                                    begin(input) + end,     // 9
                                                    begin(output) + start); // 9
                       }) // 10
         | stdexec::then( // 11
               [](std::vector<int> &&partials) {
                 std::inclusive_scan(begin(partials), end(partials), // 12
                                     begin(partials));               // 12
                 return std::move(partials);                         // 13
               }) |
         stdexec::bulk(
             tile_count,                                                 // 14
             [=](std::size_t i, std::vector<int> &partials) {            // 14
               auto start = i * tile_size;                               // 14
               auto end = std::min(input.size(), (i + 1) * tile_size);   // 14
               std::for_each(begin(output) + start, begin(output) + end, // 14
                             [&](int &e) { e = partials[i] + e; }        // 14
               );
             }) |
         stdexec::then(                         // 15
             [=](std::vector<int> &&partials) { // 15
               return output;                   // 15
             });                                // 15
}

int main() {

  std::cout << '\n';

  std::vector<int> input(30);
  std::iota(begin(input), end(input), 0);
  for (auto e : input) {
    std::cout << e << ' ';
  }
  std::cout << '\n';

  std::vector<int> output(input.size());

  exec::static_thread_pool pool(8);
  auto sch = pool.get_scheduler();

  auto [out] =
      stdexec::sync_wait(async_inclusive_scan(sch, input, output, 0, 4))
          .value();

  for (auto e : out) {
    std::cout << e << ' ';
  }

  std::cout << '\n';
}

In main wird ein std::vector<int> input mit 30 Elementen erstellt. Die std::iota-Funktion füllt den input-Vektor mit aufeinanderfolgenden Ganzzahlen, beginnend mit 0. Das Programm gibt dann den Inhalt des Vektors in der Konsole aus.

Als Nächstes wird ein std::vector<int> output mit der gleichen Größe wie der input-Vektor erstellt, um die Ergebnisse der Inclusive-Scan-Operation zu speichern. Der exec::static_thread_pool pool verfügt über Threads, die zur gleichzeitigen Ausführung von Tasks verwendet werden. Die get_scheduler-Memberfunktion des Thread-Pools erstellt ein Scheduler-Objekt sch.

Die Funktion async_inclusive_scan verwendet den Scheduler sch, den Vektor input, den Vektor output, einen Anfangswert von 0 und eine Kachelanzahl von 4. Diese Funktion führt die Inclusive-Scan-Operation asynchron unter Verwendung des angegebenen Scheduler aus und gibt ein Future-ähnliches Objekt zurück. Die Funktion stdexec::sync_wait wartet synchron auf den Abschluss der async_inclusive_scan-Operation, und das Ergebnis wird in die Variable out entpackt.

Schließlich gibt das Programm den Inhalt des Vektors out in der Konsole aus:

In meinem nächsten Blogpost werde ich einen Schritt zurückgehen und die Komposition von Sendern mittels des Operators | erklären. (rme)