Ferris Talk #4: Asynchrone Programmierung in Rust

Seite 3: Stillgestanden: automatisch generierte Futures als Zustandsautomaten

Inhaltsverzeichnis

Hier ist ein Moment zum Innehalten erreicht, denn ein Detail lässt grübeln: Da ist ja nicht nur die eine Future, die aufgerufen wird. Auch die Main-Funktion wird nun mit dem async-Schlüsselwort ausgezeichnet. Das heißt, dass auch diese Funktion nun eine Future zurückgibt. Aber welche bloß?

Kommen nun async/await für die Auszeichnung der Funktionen zum Einsatz, behandelt Rust diese Await Statements wie eine Future, die einen Zustandsautomaten implementiert. Dazu wird bei jedem .await ein Schnitt gemacht. Alles vor future.await ist ein Schritt im Zustandsautomaten, und alles bis zum nächsten .await ist ein weiterer Schritt. Das .await selbst wird über die Abfrage der eigentlichen Future abgebildet und gibt entsprechend ein Poll-Ergebnis zurück.

Den Zustandsautomaten kann man sich als automatisch generierten Code vorstellen, und solch eine automatisch generierte Future kann folgendermaßen aussehen:

enum MainFuture {
    // Initialized, never polled
    State0,
    // Waiting on `Delay`, i.e. the `future.await` line.
    State1(Delay),
    // The future has completed.
    Terminated,
}

Die Implementierung dazu wäre:

impl Future for MainFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()>
    {
        use MainFuture::*;

        loop {
            match *self {
                State0 => {
                    let when = Instant::now() +
                        Duration::from_millis(10);
                    let future = Delay { when };
                    *self = State1(future);
                }
                State1(ref mut my_future) => {
                    match Pin::new(my_future).poll(cx) {
                        Poll::Ready(out) => {
                            assert_eq!(out, "done");
                            *self = Terminated;
                            return Poll::Ready(());
                        }
                        Poll::Pending => {
                            return Poll::Pending;
                        }
                    }
                }
                Terminated => {
                    panic!("future polled after completion")
                }
            }
        }
    }
}

Wie lobenswert ist dagegen doch der Syntaxzucker! Allerdings sieht man recht eindeutig, dass es sich auch bei den async/await-Auszeichnungen "nur" um gewöhnliche Futures handelt, dem ursprünglichen Konzept treu bleibend.

Bevor man jedoch eine Future ausführen kann, muss diese sich frei im Speicher bewegen können. Sobald es zur Ausführung kommt, darf sie nicht mehr im Speicher bewegt werden, weil der Ausführungs-Thread Zugriff auf die Daten braucht. Der Pin<T> Smart Pointer garantiert, dass die Future an ihrem Platz bleibt.

Im Vergleich zu dem früheren Beispiel mit dem "billigen" HTTP-Request sieht eine asynchrone Version folgendermaßen aus:

async fn request(host: &str, port: u16, path: &str) -> 
  std::io::Result<String> {
    // from Tokio (1):
    let mut socket =  net::TcpStream::connect((host, port)).await?;

    let request = 
      format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
    socket.write_all(request.as_bytes()).await?; // from Tokio (2)
    socket.shutdown(net::Shutdown::Write)?;

    let mut response = String::new();
    socket.read_to_string(&mut response).await?; // from Tokio (3)

    Ok(response)
}

Das async-Schlüsselwort macht aus "request" eine Funktion, die Futures baut. Die blockierenden Varianten aus der Standardbibliothek werden gegen die nicht-blockierenden Äquivalente aus einer asynchronen Laufzeitumgebung ausgetauscht. Jedes await-Statement trennt Codeblöcke als Zustände im Automaten. Die IO Structs von Tokio reagieren dabei auf Betriebssystemereignisse.

Das Ausführen des obigen Programmstücks spielt sich nun wie folgt ab:

    Wenn wir request aufrufen, kommt eine Future A heraus, die es an eine asynchrone Laufzeitumgebung weiterzugeben gilt.Diese Umgebung pollt Future A. Damit wird die Funktion TcpStream::connect aufgerufen (1), die Future B zurückgibt.Future B wird gepollt und es passiert zunächst offenbar nichts. Tatsächlich kommt allerdings Poll::Pending zurück und Future B stellt sicher, dass sie wieder aufgerufen wird, sobald ein TCP-Socket steht.Future A gibt auch Poll::Pending zurück, somit steht die Runtime.Irgendwann ist Future B bereit und der TCP-Socket steht. Future B weckt die Runtime auf. Future A wird gepollt, die pollt wiederum Future B, bekommt jetzt allerdings ein Poll::Ready mitsamt TCP Socket als Ergebnis zurück. Damit erreicht Future A den nächsten Zustand, und kann weiter Code ausführen.write_all wird aufgerufen, und gibt eine Future C zurück (2).Nun beginnt das gleiche Spiel: Future C gibt Poll::Pending zurück, und die Runtime stoppt. Nach einiger Zeit wird die Runtime aufgeweckt. A wird gepollt, C wird gepollt. C retourniert Poll::Ready, Future A erreicht den nächsten Zustand.Read_to_string wird aufgerufen (3) und Future D kommt als Antwort zurück.Future D wird gepollt. Ab hier gibt es nichts Neues zu berichten: Der nun bekannte Vorgang wiederholt sich.

Am Ende des Prozesses erreicht auch Future A den Zustand der Bereitschaft (Ready) und es sind keine weiteren Durchgänge mehr notwendig. Unsere request-Funktion ist beendet, allerdings diesmal in einer asynchronen Laufzeitumgebung.

Im obigen Beispiel ist zunächst kein direkter Nutzen davon erkennbar. Im Gegenteil, das Einführen einer asynchronen Laufzeitumgebung führt sogar zu zusätzlichem Overhead. Spannend wird es hingegen ab dem Punkt, wenn die Wartezeiten sich nutzen lassen und das Absetzen weiterer Requests möglich ist: Dann landet der nächste Aufbau einer TCP-Verbindung genau zwischen den Ausführungseinheiten der geschilderten Funktion, und die Kapazitäten der CPUs lassen sich vollständig ausschöpfen.