Programmiersprache C++: Ein anspruchsvoller Prioritäts-Scheduler für Coroutinen

Der Blogbeitrag verwendet den einfachen Scheduler aus dem vorherigen Artikel und verbessere seine Prioritätsbehandlung.

In Pocket speichern vorlesen Druckansicht 3 Kommentare lesen

(Bild: Sepp photography/Shutterstock.com)

Lesezeit: 3 Min.
Von
  • Rainer Grimm
Inhaltsverzeichnis

Dies ist der vierte Artikel in meiner Miniserie über Scheduler für C++ Coroutinen. Die ersten beiden Artikel waren Gastbeiträge von Dian-Lun Lin:

Modernes C++ – Rainer Grimm

Rainer Grimm ist seit vielen Jahren als Softwarearchitekt, Team- und Schulungsleiter tätig. Er schreibt gerne Artikel zu den Programmiersprachen C++, Python und Haskell, spricht aber auch gerne und häufig auf Fachkonferenzen. Auf seinem Blog Modernes C++ beschäftigt er sich intensiv mit seiner Leidenschaft C++.

  1. Softwareentwicklung: Eine kompakte Einführung in Coroutinen von Dian-Lun Li
  2. Coroutinen: Ein Scheduler für Tasks - Teil 2 von Dian-Lun Li
  3. Programmiersprache C++: Ein Prioritäts-Scheduler für Coroutinen

Dian-Luns Scheduler basierten auf dem Container-Adapter std::stack und std::queue. Der std::stack plant seine Tasks nach der Strategie last-in first-out, aber die std::queue wendet first-in first-out an. Schließlich unterstützt mein std::priority_queue-basierter Scheduler aus dem letzten Artikel die Priorisierung von Tasks.

In diesem Artikel werde ich den letzten Scheduler in zweierlei Hinsicht verbessern. Zunächst möchte ich mit der Vergleichsfunktion von std::priority_queue spielen.

Die Prioritätswarteschlange hat ihr größtes Element immer an der Spitze. std::priority_queue verwendet standardmäßig den Vergleichsoperator std::less. Der folgende Scheduler besitzt die Vergleichsfunktion als Template-Parameter.

// priority_queueSchedulerComparator.cpp

#include <concepts>
#include <coroutine>
#include <functional>
#include <iostream>
#include <queue>
#include <utility>


struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle}{}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

using job = std::pair<int, std::coroutine_handle<>>;        // (1)

template <typename Comparator = std::ranges::less>          // (2)
requires std::predicate<decltype(Comparator()), job, job>   // (3)
class Scheduler {

  std::priority_queue<job, std::vector<job>, Comparator> _prioTasks;

  public: 

    void emplace(int prio, std::coroutine_handle<> task) {
      _prioTasks.push(std::make_pair(prio, task));
    }

    void schedule() {
      while(!_prioTasks.empty()) {
        auto [prio, task] = _prioTasks.top();
        _prioTasks.pop();
        task.resume();

        if(!task.done()) { 
          _prioTasks.push(std::make_pair(prio, task));     // (6)
        }
        else {
          task.destroy();
        }
      }
    }

};

Task createTask(const std::string& name) {
  std::cout << name << " start\n";
  co_await std::suspend_always();
  std::cout << name << " execute\n";
  co_await std::suspend_always();
   std::cout << name << " finish\n";
}


int main() {

  std::cout << '\n';

  Scheduler scheduler1;                                     // (4)

  scheduler1.emplace(0, createTask("TaskA").get_handle());            
  scheduler1.emplace(1, createTask("  TaskB").get_handle());

  scheduler1.schedule();

  std::cout << '\n';

  Scheduler<std::ranges::greater> scheduler2;               // (5)

  scheduler2.emplace(0, createTask("TaskA").get_handle());            
  scheduler2.emplace(1, createTask("  TaskB").get_handle());

  scheduler2.schedule();

  std::cout << '\n';

}

Diejenigen, die der Scheduler zu sehr überwältigt, sollten die vorherigen Artikel der Serie lesen. Zunächst nenne ich die Kombination aus einer Priorität und einem Task einen Job (1). Der Scheduler benötigt einen Template-Parameter Comparator, der standardmäßig std::ranges::less ist (2). Außerdem prüft das Concept std:::predicate in (3), ob das Prädikat mit zwei Jobs aufgerufen werden kann.

scheduler1 (5) beginnt mit dem Job mit der höchsten Priorität, scheduler2 (6) dagegen mit dem Job mit der niedrigsten Priorität.

In (6) schiebe ich den Job auf den Scheduler zurück. Wäre es nicht schön, wenn ich die Priorität eines zurückgeschobenen Auftrags verändern könnte?

Der folgende Scheduler kann zusätzlich die Priorität eines zurückgeschobenen Jobs aktualisieren.

// priority_queueSchedulerPriority.cpp

#include <concepts>
#include <coroutine>
#include <functional>
#include <iostream>
#include <queue>
#include <utility>


struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle}{}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

using job = std::pair<int, std::coroutine_handle<>>;

template <typename Updater = std::identity,            // (1)
          typename Comperator = std::ranges::less>                   
requires std::invocable<decltype(Updater()), int> &&   // (2)
         std::predicate<decltype(Comperator()), job, job>             
class Scheduler {

  std::priority_queue<job, std::vector<job>, Comperator> _prioTasks;

  public: 
    void emplace(int prio, std::coroutine_handle<> task) {
      _prioTasks.push(std::make_pair(prio, task));
    }

    void schedule() {
      Updater upd = {};                                // (3)
      while(!_prioTasks.empty()) {
        auto [prio, task] = _prioTasks.top();
        _prioTasks.pop();
        task.resume();

        if(!task.done()) { 
          _prioTasks.push(std::make_pair(upd(prio), task)); // (4)
        }
        else {
          task.destroy();
        }
      }
    }

};


Task createTask(const std::string& name) {
  std::cout << name << " start\n";
  co_await std::suspend_always();
  for (int i = 0; i <= 3; ++i ) { 
    std::cout << name << " execute " << i << "\n";        // (5)
    co_await std::suspend_always();
  }
  co_await std::suspend_always();
  std::cout << name << " finish\n";
}


int main() {

  std::cout << '\n';

  Scheduler scheduler1;                                     // (6)

  scheduler1.emplace(0, createTask("TaskA").get_handle());
  scheduler1.emplace(1, createTask("  TaskB").get_handle());
  scheduler1.emplace(2, createTask("    TaskC").get_handle());

  scheduler1.schedule();

  std::cout << '\n';

  Scheduler<decltype([](int a) { return a - 1; })> scheduler2; //(7)

  scheduler2.emplace(0, createTask("TaskA").get_handle());
  scheduler2.emplace(1, createTask("  TaskB").get_handle());
  scheduler2.emplace(2, createTask("    TaskC").get_handle());

  scheduler2.schedule();

  std::cout << '\n';

}

Nur wenige Änderungen sind am Scheduler im Programm priority_queueSchedulerComparator.cpp notwendig, um die Aktualisierung der Prioritäten zu unterstützen.

Zunächst erhält der Scheduler einen zusätzlichen Template-Parameter Updater (1), der standardmäßig std::identity verwendet. Updater muss aufrufbar sein und einen int annehmen. Natürlich ist std::invocable ein Concept (2). Der Updater wird in (3) erstellt und in (4) angewendet. Außerdem zeigt die Coroutine in (5) an, welcher Teil des Tasks ausgeführt wird. scheduler1 (6) führt seine Aufgabe aus, beginnend mit der höchsten Priorität, aber scheduler2 (7) senkt die Priorität eines zurückgestellten Tasks um eins. Ich verwende ein Lambda als aufrufbare Einheit.

Die Ausgabe des Programms zeigt die verschiedenen Scheduling-Strategien.

Coroutinen bieten eine intuitive Möglichkeit, einen asynchronen Code zu schreiben. Mein nächster Artikel wird ein Gastbeitrag von Ljubic Damir sein, in dem er einen Single Producer - Single Consumer Workflow basierend auf Coroutinen vorstellt. (rme)