Programmiersprache C++: Ein anspruchsvoller Prioritäts-Scheduler für Coroutinen
Der Blogbeitrag verwendet den einfachen Scheduler aus dem vorherigen Artikel und verbessere seine Prioritätsbehandlung.
- Rainer Grimm
Dies ist der vierte Artikel in meiner Miniserie über Scheduler für C++ Coroutinen. Die ersten beiden Artikel waren Gastbeiträge von Dian-Lun Lin:
- Softwareentwicklung: Eine kompakte Einführung in Coroutinen von Dian-Lun Li
- Coroutinen: Ein Scheduler für Tasks - Teil 2 von Dian-Lun Li
- Programmiersprache C++: Ein Prioritäts-Scheduler für Coroutinen
Dian-Luns Scheduler basierten auf dem Container-Adapter std::stack
und std::queue
. Der std::stack
plant seine Tasks nach der Strategie last-in first-out, aber die std::queue
wendet first-in first-out an. Schließlich unterstützt mein std::priority_queue
-basierter Scheduler aus dem letzten Artikel die Priorisierung von Tasks.
In diesem Artikel werde ich den letzten Scheduler in zweierlei Hinsicht verbessern. Zunächst möchte ich mit der Vergleichsfunktion von std::priority_queue
spielen.
Die Vergleichsfunktion
Die Prioritätswarteschlange hat ihr größtes Element immer an der Spitze. std::priority_queue
verwendet standardmäßig den Vergleichsoperator std::less
. Der folgende Scheduler besitzt die Vergleichsfunktion als Template-Parameter.
// priority_queueSchedulerComparator.cpp
#include <concepts>
#include <coroutine>
#include <functional>
#include <iostream>
#include <queue>
#include <utility>
struct Task {
struct promise_type {
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
Task get_return_object() {
return std::coroutine_handle<promise_type>::from_promise(*this);
}
void return_void() {}
void unhandled_exception() {}
};
Task(std::coroutine_handle<promise_type> handle): handle{handle}{}
auto get_handle() { return handle; }
std::coroutine_handle<promise_type> handle;
};
using job = std::pair<int, std::coroutine_handle<>>; // (1)
template <typename Comparator = std::ranges::less> // (2)
requires std::predicate<decltype(Comparator()), job, job> // (3)
class Scheduler {
std::priority_queue<job, std::vector<job>, Comparator> _prioTasks;
public:
void emplace(int prio, std::coroutine_handle<> task) {
_prioTasks.push(std::make_pair(prio, task));
}
void schedule() {
while(!_prioTasks.empty()) {
auto [prio, task] = _prioTasks.top();
_prioTasks.pop();
task.resume();
if(!task.done()) {
_prioTasks.push(std::make_pair(prio, task)); // (6)
}
else {
task.destroy();
}
}
}
};
Task createTask(const std::string& name) {
std::cout << name << " start\n";
co_await std::suspend_always();
std::cout << name << " execute\n";
co_await std::suspend_always();
std::cout << name << " finish\n";
}
int main() {
std::cout << '\n';
Scheduler scheduler1; // (4)
scheduler1.emplace(0, createTask("TaskA").get_handle());
scheduler1.emplace(1, createTask(" TaskB").get_handle());
scheduler1.schedule();
std::cout << '\n';
Scheduler<std::ranges::greater> scheduler2; // (5)
scheduler2.emplace(0, createTask("TaskA").get_handle());
scheduler2.emplace(1, createTask(" TaskB").get_handle());
scheduler2.schedule();
std::cout << '\n';
}
Diejenigen, die der Scheduler zu sehr überwältigt, sollten die vorherigen Artikel der Serie lesen. Zunächst nenne ich die Kombination aus einer Priorität und einem Task einen Job (1). Der Scheduler benötigt einen Template-Parameter Comparator
, der standardmäßig std::ranges::less
ist (2). Außerdem prüft das Concept std:::predicate
in (3), ob das Prädikat mit zwei Jobs aufgerufen werden kann.
scheduler1
(5) beginnt mit dem Job mit der höchsten Priorität, scheduler2
(6) dagegen mit dem Job mit der niedrigsten Priorität.
In (6) schiebe ich den Job auf den Scheduler zurück. Wäre es nicht schön, wenn ich die Priorität eines zurückgeschobenen Auftrags verändern könnte?
Aktualisieren der Priorität
Der folgende Scheduler kann zusätzlich die Priorität eines zurückgeschobenen Jobs aktualisieren.
// priority_queueSchedulerPriority.cpp
#include <concepts>
#include <coroutine>
#include <functional>
#include <iostream>
#include <queue>
#include <utility>
struct Task {
struct promise_type {
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
Task get_return_object() {
return std::coroutine_handle<promise_type>::from_promise(*this);
}
void return_void() {}
void unhandled_exception() {}
};
Task(std::coroutine_handle<promise_type> handle): handle{handle}{}
auto get_handle() { return handle; }
std::coroutine_handle<promise_type> handle;
};
using job = std::pair<int, std::coroutine_handle<>>;
template <typename Updater = std::identity, // (1)
typename Comperator = std::ranges::less>
requires std::invocable<decltype(Updater()), int> && // (2)
std::predicate<decltype(Comperator()), job, job>
class Scheduler {
std::priority_queue<job, std::vector<job>, Comperator> _prioTasks;
public:
void emplace(int prio, std::coroutine_handle<> task) {
_prioTasks.push(std::make_pair(prio, task));
}
void schedule() {
Updater upd = {}; // (3)
while(!_prioTasks.empty()) {
auto [prio, task] = _prioTasks.top();
_prioTasks.pop();
task.resume();
if(!task.done()) {
_prioTasks.push(std::make_pair(upd(prio), task)); // (4)
}
else {
task.destroy();
}
}
}
};
Task createTask(const std::string& name) {
std::cout << name << " start\n";
co_await std::suspend_always();
for (int i = 0; i <= 3; ++i ) {
std::cout << name << " execute " << i << "\n"; // (5)
co_await std::suspend_always();
}
co_await std::suspend_always();
std::cout << name << " finish\n";
}
int main() {
std::cout << '\n';
Scheduler scheduler1; // (6)
scheduler1.emplace(0, createTask("TaskA").get_handle());
scheduler1.emplace(1, createTask(" TaskB").get_handle());
scheduler1.emplace(2, createTask(" TaskC").get_handle());
scheduler1.schedule();
std::cout << '\n';
Scheduler<decltype([](int a) { return a - 1; })> scheduler2; //(7)
scheduler2.emplace(0, createTask("TaskA").get_handle());
scheduler2.emplace(1, createTask(" TaskB").get_handle());
scheduler2.emplace(2, createTask(" TaskC").get_handle());
scheduler2.schedule();
std::cout << '\n';
}
Nur wenige Änderungen sind am Scheduler im Programm priority_queueSchedulerComparator.cpp
notwendig, um die Aktualisierung der Prioritäten zu unterstützen.
Zunächst erhält der Scheduler einen zusätzlichen Template-Parameter Updater
(1), der standardmäßig std::identity
verwendet. Updater
muss aufrufbar sein und einen int
annehmen. Natürlich ist std::invocable
ein Concept (2). Der Updater
wird in (3) erstellt und in (4) angewendet. Außerdem zeigt die Coroutine in (5) an, welcher Teil des Tasks ausgeführt wird. scheduler1
(6) führt seine Aufgabe aus, beginnend mit der höchsten Priorität, aber scheduler2
(7) senkt die Priorität eines zurückgestellten Tasks um eins. Ich verwende ein Lambda als aufrufbare Einheit.
Die Ausgabe des Programms zeigt die verschiedenen Scheduling-Strategien.
Wie geht's weiter?
Coroutinen bieten eine intuitive Möglichkeit, einen asynchronen Code zu schreiben. Mein nächster Artikel wird ein Gastbeitrag von Ljubic Damir sein, in dem er einen Single Producer - Single Consumer Workflow basierend auf Coroutinen vorstellt. (rme)