Ferris Talk #4: Asynchrone Programmierung in Rust

Seite 2: Futures: Willkommen in der Zukunft

Inhaltsverzeichnis

Um die als Task bezeichneten Einheiten zu erstellen, hat Rust das Konzept der Futures angenommen und in einem Future Trait abstrahiert. Die Future beschreibt einen zukünftigen Wert (daher auch der Name), bei dem nachzufragen ist, ob es ihn bereits gibt.

trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
      Poll<Self::Output>;
}

Der Output legt dabei den Typ dieses Wertes fest, die poll-Methode kommt mit einem Context daher (dazu später mehr) und gibt eine Ausprägung des Poll-Enums zurück.

enum Poll<T> {
    Ready(T),
    Pending
}

Dieses Poll-Enum hat zwei mögliche Ausprägungen: Entweder hat die Operation, die über die Future abstrahiert wird, schon einen Wert hervorgebracht, dann lässt er sich jetzt zurückgeben. Oder aber man wartet noch, bis die Operation im Hintergrund abgearbeitet worden ist (Pending). Wichtig ist, dass bei der Ausführung der poll-Methode sofort ein Wert zurückkommen soll.

Eine Ausführungslaufzeitumgebung für Futures dieser Art kann jetzt kontinuierlich alle bestehenden Futures durchlaufen und so lange den Status abklappern, bis alle Futures von Pending auf Ready geschaltet haben. Man kann sich allerdings vorstellen, dass das recht ineffektiv ist. Effektiver wäre es, einen Waker zu benachrichtigen, der bei einem Ereignis wie “TCP Socket steht!” oder “Timer Event vom Betriebssystem!” die Laufzeitumgebung regelrecht aufweckt, um nach neuen Werten zu fragen.

Um also aus dem Codestück eine Funktion zu machen, die sich optimal auf einen Thread legen lässt und von solch einer Laufzeitumgebung ausgeführt wird, muss sie eine Future zurückgeben.

Aus

fn read_to_string(&mut self, buf: &mut String) -> Result<usize>;

wird also

fn read_to_string(&mut self, buf: &mut String) -> 
  Future<Output = Result<usize>>;

Oder mit dem nötigen Syntaxzucker:

async fn read_to_string(&mut self, buf: &mut String) -> 
  Result<usize>;

Beim Aufruf dieser Funktion wird nur eine Future konstruiert und zurückgegeben. Erst wenn wir mit dem Polling anfangen, wird auch tatsächlich Programmcode ausgeführt. Für das Polling gibt es auch etwas Syntax, um das Ganze zu versüßen. Mit einem angehängten .await geben wir Rust bekannt, dass diese Future bitte dem Executor zu übergeben ist.

Empfohlener redaktioneller Inhalt

Mit Ihrer Zustimmmung wird hier ein externes YouTube-Video (Google Ireland Limited) geladen.

Ich bin damit einverstanden, dass mir externe Inhalte angezeigt werden. Damit können personenbezogene Daten an Drittplattformen (Google Ireland Limited) übermittelt werden. Mehr dazu in unserer Datenschutzerklärung.

Apropos Executor und Ausführung, wie geht das eigentlich? Um das zu veranschaulichen, bietet es sich an, eine dieser Futures zu implementieren. Im folgenden Beispiel gilt es einen Delay von einer gewissen Anzahl an Millisekunden als Future zu abstrahieren, dem "Hello World" der asynchronen Programmierung. Das Ziel war eigentlich, einen bestehenden Thread optimal auszunutzen und nicht im Hintergrund neue Threads als Ausweichmöglichkeit zu erzeugen. Dazu kommen Ereignisse des Betriebssystems ins Spiel, die an dieser Stelle der Einfachheit halber außen vor bleiben. Zur Veranschaulichung genügt die vereinfachte Darstellung allemal.

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str> {
        if Instant::now() >= self.when {
            Poll::Ready("done")
        } else {
            let waker = cx.waker().clone();
            let when = self.when;

           thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }
                waker.wake();
            });
            Poll::Pending
        }
    }
}

Gegeben ist ein Struct namens Delay, das mit einem Instant befüllt wird. Dieses Struct implementiert nun den Future Trait. Als Ausgabe gibt es einen Status-String. In der poll-Methode ist Folgendes der nächste Schritt: Sobald die vorgegebene Zeitspanne vergangen ist (der aktuelle Zeitpunkt ist also größer als oder gleich wie der angegebene), dann ist die Funktion mit Poll::Ready bereit und gibt einen Rückgabewert des bestimmten Typs zurück. Wie bereits gesagt, retourniert eine Future Ergebnisse sofort.

Falls die nötige Zeitspanne noch nicht vergangen ist, bietet sich folgendes Vorgehen an: einen Hintergrund-Thread absetzen, den wir für die angegebene Zeit schlafen lassen. Wenn der Thread aus seinem Schlaf zurückkommt, ruft er den über den Context mitgebrachten Waker auf. Wie erwähnt würde das in einer echten Runtime über Runtime-interne Konstrukte funktionieren, die bei Ereignissen des Betriebssystem-Timers reagieren.

Im Hintergrund passiert Folgendes:

    Mit der Future besteht eine Ausführungseinheit, die sich an den Executor einer asynchronen Laufzeitumgebung übergeben lässt.Die Übergabe dieser Einheit findet meist über einen Spawner statt.Dieser Spawner legt die Future auf die Verarbeitungs-Queue des Executors.Dort wird gepollt. Die Future kann nun bereits einen Wert über den Ready State zurückgeben, sofern es schon einen gibt.Lässt das Ergebnis allerdings noch auf sich warten, ist der Status Pending zurückzugeben, und der Executor führt so lange weiter Tasks aus, bis die Future bekannt gibt, dass sie wieder abrufbar ist.Der Executor pollt die Future, hat jetzt einen Wert und gibt ihn an das Programm zurück.

Entwickler und Entwicklerinnen brauchen von all dem nichts zu wissen, denn für sie reicht es, wenn sie bekanntgeben, über welche Laufzeitumgebung sie arbeiten wollen. Mit .await müssen sie dann auf die Ergebnisse warten.

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

Im obigen Beispiel kommt Tokio zum Einsatz, eine populäre asynchrone Laufzeitumgebung. Die Main-Funktion lässt sich mit einem entsprechenden Makro aufpolieren, und in Zeile 3 ruft das Programm die Delay Future mit .await auf.