Coroutinen: Ein Scheduler für Tasks – Teil 2 von Dian-Lun Li

Nachdem Dian-Lun bereits eine Einführung in die Grundlagen zu Coroutinen gegeben hat, stellt er nun einen Single-Thread-Scheduler für Coroutinen vor.

In Pocket speichern vorlesen Druckansicht

(Bild: Kwangmoozaa/Shutterstock.com)

Lesezeit: 2 Min.
Von
  • Rainer Grimm

Der vorliegende Blogbeitrag ist der zweite Teil der Miniserie zu einem Scheduler von Tasks und baut auf dem vorangegangenen Artikel "Softwareentwicklung: Eine kompakte Einführung in Coroutinen von Dian-Lun Li" auf.

In diesem Abschnitt implementiere ich einen Single-Thread-Scheduler, um Coroutinen zu planen. Beginnen wir mit der Schnittstelle:

Task TaskA(Scheduler& sch) {
  std::cout << "Hello from TaskA\n";
  co_await sch.suspend();
  std::cout << "Executing the TaskA\n";
  co_await sch.suspend();
  std::cout << "TaskA is finished\n";
}

Task TaskB(Scheduler& sch) {
  std::cout << "Hello from TaskB\n";
  co_await sch.suspend();
  std::cout << "Executing the TaskB\n";
  co_await sch.suspend();
  std::cout << "TaskB is finished\n";
}


int main() {

  Scheduler sch;

  sch.emplace(TaskA(sch).get_handle());
  sch.emplace(TaskB(sch).get_handle());

  std::cout << "Start scheduling...\n";

  sch.schedule();

Sowohl TaskA als auch TaskB sind Coroutinen. In der main-Funktion konstruiere ich einen Scheduler und platziere die beiden Tasks (Coroutine-Handles) in den Scheduler. Dann rufe ich schedule auf, um die beiden Tasks zu planen. Ein Task ist ein Coroutine-Objekt, das wie folgt definiert ist:

struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle} {}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

Zu beachten ist dabei, dass ich sowohl in der Funktion initial_suspend als auch in der Funktion final_suspend std::suspend_always zurückgebe. Das ist erforderlich, da ich die gesamte Ausführung der Coroutinen an den Scheduler übergeben möchte. Coroutinen werden erst ausgeführt, wenn ich schedule aufrufe. Der Scheduler ist wie folgt definiert:

class Scheduler {

  //std::queue<std::coroutine_handle<>> _tasks;
  std::stack<std::coroutine_handle<>> _tasks;

  public: 

    void emplace(std::coroutine_handle<> task) {
      _tasks.push(task);
    }

    void schedule() {
      while(!_tasks.empty()) {
        //auto task = _tasks.front();
        auto task = _tasks.top();
        _tasks.pop();
        task.resume();

        if(!task.done()) { 
          _tasks.push(task);
        }
        else {
          task.destroy();
        }
      }
    }

    auto suspend() {
      return std::suspend_always{};
    }
};

Im Scheduler speichere ich Tasks in einem Stack, und implementiere die emplace-Member-Funktion, damit Benutzerinnen und Benutzer einen Task auf den Stack schieben können. In der schedule-Member-Funktion entferne ich immer wieder einen Task vom Stack. Wenn ich einen Task wieder aufnehme, prüfe ich, ob der Task erledigt ist. Wenn nicht, schiebe ich den Task zurück auf den Stack, um ihn später einzuplanen. Andernfalls vernichte ich den fertigen Task. Nachdem das Programm ausgeführt wurde, liegen die folgenden Ergebnisse vor:

Der Scheduler verwendet einen Stack (last in, first out), um Tasks zu speichern. Ersetze ich den Stack durch eine Queue (first in, first out), ändert sich die Ausführungsreihenfolge der Tasks:

Der Vollständigkeit halber sind hier nochmals beide Programme zusammengefasst:

// stackScheduler.cpp

#include <coroutine>
#include <iostream>
#include <stack>


struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle} {}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

class Scheduler {

  std::stack<std::coroutine_handle<>> _tasks;

  public: 

    void emplace(std::coroutine_handle<> task) {
      _tasks.push(task);
    }

    void schedule() {
      while(!_tasks.empty()) {
        auto task = _tasks.top();
        _tasks.pop();
        task.resume();

        if(!task.done()) { 
          _tasks.push(task);
        }
        else {
          task.destroy();
        }
      }
    }

    auto suspend() {
      return std::suspend_always{};
    }
};


Task TaskA(Scheduler& sch) {
  std::cout << "Hello from TaskA\n";
  co_await sch.suspend();
  std::cout << "Executing the TaskA\n";
  co_await sch.suspend();
  std::cout << "TaskA is finished\n";
}

Task TaskB(Scheduler& sch) {
  std::cout << "Hello from TaskB\n";
  co_await sch.suspend();
  std::cout << "Executing the TaskB\n";
  co_await sch.suspend();
  std::cout << "TaskB is finished\n";
}


int main() {

  std::cout << '\n';

  Scheduler sch;

  sch.emplace(TaskA(sch).get_handle());
  sch.emplace(TaskB(sch).get_handle());

  std::cout << "Start scheduling...\n";

  sch.schedule();

  std::cout << '\n';

}
// queueScheduler.cpp

#include <coroutine>
#include <iostream>
#include <queue>


struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle} {}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

class Scheduler {

  std::queue<std::coroutine_handle<>> _tasks;

  public: 

    void emplace(std::coroutine_handle<> task) {
      _tasks.push(task);
    }

    void schedule() {
      while(!_tasks.empty()) {
        auto task = _tasks.front();
        _tasks.pop();
        task.resume();

        if(!task.done()) { 
          _tasks.push(task);
        }
        else {
          task.destroy();
        }
      }
    }

    auto suspend() {
      return std::suspend_always{};
    }
};


Task TaskA(Scheduler& sch) {
  std::cout << "Hello from TaskA\n";
  co_await sch.suspend();
  std::cout << "Executing the TaskA\n";
  co_await sch.suspend();
  std::cout << "TaskA is finished\n";
}

Task TaskB(Scheduler& sch) {
  std::cout << "Hello from TaskB\n";
  co_await sch.suspend();
  std::cout << "Executing the TaskB\n";
  co_await sch.suspend();
  std::cout << "TaskB is finished\n";
}


int main() {

  std::cout << '\n';

  Scheduler sch;

  sch.emplace(TaskA(sch).get_handle());
  sch.emplace(TaskB(sch).get_handle());

  std::cout << "Start scheduling...\n";

  sch.schedule();

  std::cout << '\n';

}

Dieser Artikel von Dian-Lun Li zeigt einen einfachen Scheduler für Coroutinen. Ich verwende Dian-Luns Scheduler in meinem nächsten Beitrag für weitere Experimente. (map)