Programmiesprache C++: Ein Prioritäts-Scheduler für Coroutinen​

Der Artikel erweitert den in den beiden vorangegangenen Blogbeiträgen gezeigten einfachen Scheduler von Dian-Lun um Prioritäten.

In Pocket speichern vorlesen Druckansicht 1 Kommentar lesen
Zwei Verkehrsampeln vor wolkigem Abendhimmel.

(Bild: monticello/Shutterstock.com)

Lesezeit: 2 Min.
Von
  • Rainer Grimm
Inhaltsverzeichnis

Dies ist der dritte Artikel in meiner Miniserie über Scheduler für C++ Coroutinen. Die ersten beiden Artikel waren Gastbeiträge von Dian-Lun Lin:

Modernes C++ – Rainer Grimm

Rainer Grimm ist seit vielen Jahren als Softwarearchitekt, Team- und Schulungsleiter tätig. Er schreibt gerne Artikel zu den Programmiersprachen C++, Python und Haskell, spricht aber auch gerne und häufig auf Fachkonferenzen. Auf seinem Blog Modernes C++ beschäftigt er sich intensiv mit seiner Leidenschaft C++.

Dian-Luns Scheduler basierten auf dem Container-Adapter std::stack und std::queue. Der std::stack führt seine Tasks nach der Strategie last-in first-out, die std::queue hingegen nach first-in first-out aus.

Der folgende Codeschnipsel zeigt den Queue-basierten Scheduler:

class Scheduler {

  std::queue<std::coroutine_handle<>> _tasks;

  public: 

    void emplace(std::coroutine_handle<> task) {
      _tasks.push(task);
    }

    void schedule() {
      while(!_tasks.empty()) {
        auto task = _tasks.front();
        _tasks.pop();
        task.resume();

        if(!task.done()) { 
          _tasks.push(task);
        }
        else {
          task.destroy();
        }
      }
    }

    auto suspend() {
      return std::suspend_always{};
    }
};

Diesen Scheduler mit Prioritäten zu erweitern, ist ziemlich einfach.

std::priority_queue ist neben std::stack und std::queue der dritte Container-Adapter in C++98.

Die std::priority_queue ist der std::queue ähnlich. Der Hauptunterschied besteht darin, dass ihr größtes Element immer an der Spitze der Prioritäts-Queue steht. std::priority_queue verwendet standardmäßig den Vergleichsoperator std::less. Die Nachschlagezeit in einer std::priority_queue ist konstant, aber das Einfügen und Herausnehmen ist logarithmisch.

Ich ersetze die std::queue im vorherigen Scheduler durch eine std::priority_queue:

// priority_queueScheduler.cpp

#include <coroutine>
#include <iostream>
#include <queue>
#include <utility>


struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle}{}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

class Scheduler {
                                                            // (1)
  std::priority_queue<std::pair<int, std::coroutine_handle<>>> _prioTasks;

  public: 

    void emplace(int prio, std::coroutine_handle<> task) {  // (2)
      _prioTasks.push(std::make_pair(prio, task));
    }

    void schedule() {
      while(!_prioTasks.empty()) {                          // (3)
        auto [prio, task] = _prioTasks.top();
        _prioTasks.pop();
        task.resume();

        if(!task.done()) { 
          _prioTasks.push(std::make_pair(prio, task));      // (4)
        }
        else {
          task.destroy();
        }
      }
    }

    auto suspend() {
      return std::suspend_always{};
    }
};


Task TaskA(Scheduler& sch) {
  std::cout << "Hello from TaskA\n";
  co_await sch.suspend();
  std::cout << "Executing the TaskA\n";
  co_await sch.suspend();
  std::cout << "TaskA is finished\n";
}

Task TaskB(Scheduler& sch) {
  std::cout << "Hello from TaskB\n";
  co_await sch.suspend();
  std::cout << "Executing the TaskB\n";
  co_await sch.suspend();
  std::cout << "TaskB is finished\n";
}


int main() {

  std::cout << '\n';

  Scheduler scheduler1;

  scheduler1.emplace(0, TaskA(scheduler1).get_handle());    // (5)   
  scheduler1.emplace(1, TaskB(scheduler1).get_handle());

  scheduler1.schedule();

  std::cout << '\n';

  Scheduler scheduler2;

  scheduler2.emplace(1, TaskA(scheduler2).get_handle());    // (6)
  scheduler2.emplace(0, TaskB(scheduler2).get_handle());

  scheduler2.schedule();

  std::cout << '\n';

}

Zunächst verwendet die std::priority_queue ein Paar (priority, handle) (1). Nun wird dieses Paar auf die _prioTask gelegt (2). Wenn der Scheduler läuft, prüft er, ob die _prioTask leer ist (3), wenn nicht, wird der erste Task aufgerufen, entfernt und wieder aufgenommen. Wenn der Task nicht fertig ist, wird er wieder in die _prioTasks verschoben (4).

Die Verwendung einer std::priority_queue<std::pair<int, std::coroutine_handle<>>> hat den angenehmen Nebeneffekt, dass Tasks mit höherer Priorität zuerst ausgeführt werden. Es macht keinen Unterschied, in welcher Reihenfolge die Tasks auf dem Scheduler platziert werden (5 und 6); der Task mit Priorität 1 läuft zuerst.

Ich möchte die Coroutine zunächst vereinfachen, bevor ich in meinem nächsten Artikel ihre Prioritätsbehandlung verbessere.

Hier sind die vorherigen Coroutinen TaskA und TaskB:

Task TaskA(Scheduler& sch) {
  std::cout << "Hello from TaskA\n";
  co_await sch.suspend();
  std::cout << "Executing the TaskA\n";
  co_await sch.suspend();
  std::cout << "TaskA is finished\n";
}

Task TaskB(Scheduler& sch) {
  std::cout << "Hello from TaskB\n";
  co_await sch.suspend();
  std::cout << "Executing the TaskB\n";
  co_await sch.suspend();
  std::cout << "TaskB is finished\n";
}

Anstatt co_await im Scheduler aufzurufen, ersetze ich es durch den direkten Aufruf des vordefinierten awaitable std::suspend_always. So kann ich die suspend-Member-Funktion des Schedulers entfernen. Zweitens: Die Coroutine erhält den Namen ihres Tasks:

Task createTask(const std::string& name) {
  std::cout << name << " start\n";
  co_await std::suspend_always();
  std::cout << name << " execute\n";
  co_await std::suspend_always();
  std::cout << name << " finish\n";
}

Hier ist schließlich das vereinfachte Programm mit der entsprechenden Ausgabe.

// priority_queueSchedulerSimplified.cpp

#include <coroutine>
#include <iostream>
#include <queue>
#include <utility>


struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle}{}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

class Scheduler {

  std::priority_queue<std::pair<int, std::coroutine_handle<>>> _prioTasks;

  public: 

    void emplace(int prio, std::coroutine_handle<> task) {
      _prioTasks.push(std::make_pair(prio, task));
    }

    void schedule() {
      while(!_prioTasks.empty()) {
        auto [prio, task] = _prioTasks.top();
        _prioTasks.pop();
        task.resume();

        if(!task.done()) { 
          _prioTasks.push(std::make_pair(prio, task));
        }
        else {
          task.destroy();
        }
      }
    }

};


Task createTask(const std::string& name) {
  std::cout << name << " start\n";
  co_await std::suspend_always();
  std::cout << name << " execute\n";
  co_await std::suspend_always();
  std::cout << name << " finish\n";
}


int main() {

  std::cout << '\n';

  Scheduler scheduler1;

  scheduler1.emplace(0, createTask("TaskA").get_handle());
  scheduler1.emplace(1, createTask("  TaskB").get_handle());

  scheduler1.schedule();

  std::cout << '\n';

  Scheduler scheduler2;

  scheduler2.emplace(1, createTask("TaskA").get_handle());
  scheduler2.emplace(0, createTask("  TaskB").get_handle());

  scheduler2.schedule();

  std::cout << '\n';

}

In meinem nächsten Artikel werde ich die Prioritätsbehandlung der Tasks weiter verbessern. (rme)