Programmiesprache C++: Ein Prioritäts-Scheduler für Coroutinen
Der Artikel erweitert den in den beiden vorangegangenen Blogbeiträgen gezeigten einfachen Scheduler von Dian-Lun um Prioritäten.
Dies ist der dritte Artikel in meiner Miniserie über Scheduler für C++ Coroutinen. Die ersten beiden Artikel waren Gastbeiträge von Dian-Lun Lin:
- Softwareentwicklung. Eine kompakte Einführung in Coroutinen von Dian-Lun Lin [1]
- Coroutinen: Ein Scheduler für Tasks - Teil 2 von Dian-Lun Lin [2]
Dian-Luns Scheduler basierten auf dem Container-Adapter std::stack [3]und std::queue. [4] Der std::stack
führt seine Tasks nach der Strategie last-in first-out, die std::queue
hingegen nach first-in first-out aus.
Der folgende Codeschnipsel zeigt den Queue-basierten Scheduler:
class Scheduler {
std::queue<std::coroutine_handle<>> _tasks;
public:
void emplace(std::coroutine_handle<> task) {
_tasks.push(task);
}
void schedule() {
while(!_tasks.empty()) {
auto task = _tasks.front();
_tasks.pop();
task.resume();
if(!task.done()) {
_tasks.push(task);
}
else {
task.destroy();
}
}
}
auto suspend() {
return std::suspend_always{};
}
};
Diesen Scheduler mit Prioritäten zu erweitern, ist ziemlich einfach.
Ein Priorität-Queue-basierten Scheduler
std::priority_queue [5]
ist neben std::stack
und std::queue
der dritte Container-Adapter in C++98.
Die std::priority_queue
ist der std::queue
ähnlich. Der Hauptunterschied besteht darin, dass ihr größtes Element immer an der Spitze der Prioritäts-Queue steht. std::priority_queue
verwendet standardmäßig den Vergleichsoperator std::less
. Die Nachschlagezeit
in einer std::priority_queue
ist konstant, aber das Einfügen und Herausnehmen ist logarithmisch.
Ich ersetze die std::queue
im vorherigen Scheduler durch eine std::priority_queue
:
// priority_queueScheduler.cpp
#include <coroutine>
#include <iostream>
#include <queue>
#include <utility>
struct Task {
struct promise_type {
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
Task get_return_object() {
return std::coroutine_handle<promise_type>::from_promise(*this);
}
void return_void() {}
void unhandled_exception() {}
};
Task(std::coroutine_handle<promise_type> handle): handle{handle}{}
auto get_handle() { return handle; }
std::coroutine_handle<promise_type> handle;
};
class Scheduler {
// (1)
std::priority_queue<std::pair<int, std::coroutine_handle<>>> _prioTasks;
public:
void emplace(int prio, std::coroutine_handle<> task) { // (2)
_prioTasks.push(std::make_pair(prio, task));
}
void schedule() {
while(!_prioTasks.empty()) { // (3)
auto [prio, task] = _prioTasks.top();
_prioTasks.pop();
task.resume();
if(!task.done()) {
_prioTasks.push(std::make_pair(prio, task)); // (4)
}
else {
task.destroy();
}
}
}
auto suspend() {
return std::suspend_always{};
}
};
Task TaskA(Scheduler& sch) {
std::cout << "Hello from TaskA\n";
co_await sch.suspend();
std::cout << "Executing the TaskA\n";
co_await sch.suspend();
std::cout << "TaskA is finished\n";
}
Task TaskB(Scheduler& sch) {
std::cout << "Hello from TaskB\n";
co_await sch.suspend();
std::cout << "Executing the TaskB\n";
co_await sch.suspend();
std::cout << "TaskB is finished\n";
}
int main() {
std::cout << '\n';
Scheduler scheduler1;
scheduler1.emplace(0, TaskA(scheduler1).get_handle()); // (5)
scheduler1.emplace(1, TaskB(scheduler1).get_handle());
scheduler1.schedule();
std::cout << '\n';
Scheduler scheduler2;
scheduler2.emplace(1, TaskA(scheduler2).get_handle()); // (6)
scheduler2.emplace(0, TaskB(scheduler2).get_handle());
scheduler2.schedule();
std::cout << '\n';
}
Zunächst verwendet die std::priority_queue
ein Paar (priority, handle) (1). Nun wird dieses Paar auf die _prioTask
gelegt (2). Wenn der Scheduler läuft, prüft er, ob die _prioTask
leer ist (3), wenn nicht, wird der erste Task aufgerufen, entfernt und wieder aufgenommen. Wenn der Task nicht fertig ist, wird er wieder in die _prioTasks
verschoben (4).
Die Verwendung einer std::priority_queue<std::pair<int, std::coroutine_handle<>>>
hat den angenehmen Nebeneffekt, dass Tasks mit höherer Priorität zuerst ausgeführt werden. Es macht keinen Unterschied, in welcher Reihenfolge die Tasks auf dem Scheduler platziert werden (5 und 6); der Task mit Priorität 1 läuft zuerst.
Ich möchte die Coroutine zunächst vereinfachen, bevor ich in meinem nächsten Artikel ihre Prioritätsbehandlung verbessere.
Die vereinfachte Coroutine
Hier sind die vorherigen Coroutinen TaskA
und TaskB
:
Task TaskA(Scheduler& sch) {
std::cout << "Hello from TaskA\n";
co_await sch.suspend();
std::cout << "Executing the TaskA\n";
co_await sch.suspend();
std::cout << "TaskA is finished\n";
}
Task TaskB(Scheduler& sch) {
std::cout << "Hello from TaskB\n";
co_await sch.suspend();
std::cout << "Executing the TaskB\n";
co_await sch.suspend();
std::cout << "TaskB is finished\n";
}
Anstatt co_await
im Scheduler aufzurufen, ersetze ich es durch den direkten Aufruf des vordefinierten awaitable std::suspend_always
. So kann ich die suspend-Member-Funktion des Schedulers entfernen. Zweitens: Die Coroutine erhält den Namen ihres Tasks:
Task createTask(const std::string& name) {
std::cout << name << " start\n";
co_await std::suspend_always();
std::cout << name << " execute\n";
co_await std::suspend_always();
std::cout << name << " finish\n";
}
Hier ist schließlich das vereinfachte Programm mit der entsprechenden Ausgabe.
// priority_queueSchedulerSimplified.cpp
#include <coroutine>
#include <iostream>
#include <queue>
#include <utility>
struct Task {
struct promise_type {
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
Task get_return_object() {
return std::coroutine_handle<promise_type>::from_promise(*this);
}
void return_void() {}
void unhandled_exception() {}
};
Task(std::coroutine_handle<promise_type> handle): handle{handle}{}
auto get_handle() { return handle; }
std::coroutine_handle<promise_type> handle;
};
class Scheduler {
std::priority_queue<std::pair<int, std::coroutine_handle<>>> _prioTasks;
public:
void emplace(int prio, std::coroutine_handle<> task) {
_prioTasks.push(std::make_pair(prio, task));
}
void schedule() {
while(!_prioTasks.empty()) {
auto [prio, task] = _prioTasks.top();
_prioTasks.pop();
task.resume();
if(!task.done()) {
_prioTasks.push(std::make_pair(prio, task));
}
else {
task.destroy();
}
}
}
};
Task createTask(const std::string& name) {
std::cout << name << " start\n";
co_await std::suspend_always();
std::cout << name << " execute\n";
co_await std::suspend_always();
std::cout << name << " finish\n";
}
int main() {
std::cout << '\n';
Scheduler scheduler1;
scheduler1.emplace(0, createTask("TaskA").get_handle());
scheduler1.emplace(1, createTask(" TaskB").get_handle());
scheduler1.schedule();
std::cout << '\n';
Scheduler scheduler2;
scheduler2.emplace(1, createTask("TaskA").get_handle());
scheduler2.emplace(0, createTask(" TaskB").get_handle());
scheduler2.schedule();
std::cout << '\n';
}
Wie geht es weiter?
In meinem nächsten Artikel werde ich die Prioritätsbehandlung der Tasks weiter verbessern. (rme [6])
URL dieses Artikels:
https://www.heise.de/-9534510
Links in diesem Artikel:
[1] https://www.heise.de/blog/Softwareentwicklung-Ein-kompakte-Einfuehrung-in-Coroutinen-von-Dian-Lun-Li-9356895.html
[2] https://www.heise.de/blog/Coroutinen-Einen-Scheduler-fuer-Tasks-Teil-2-von-Dian-Lun-Li-9529813.html
[3] https://en.cppreference.com/w/cpp/container/stack
[4] https://en.cppreference.com/w/cpp/container/queue
[5] https://en.cppreference.com/w/cpp/container/priority_queue
[6] mailto:rme@ix.de
Copyright © 2023 Heise Medien