Ferris Talk #5: Tokio als asynchrone Laufzeitumgebung ist ein Fast-Alleskönner

Seite 2: Tasks und deren Tücken

Inhaltsverzeichnis

Tokio Tasks haben ein paar interessante Eigenschaften. So sind sie zum Beispiel ‘static bound. ‘static bezeichnet eine besondere Lifetime, die angibt, dass sämtliche Referenzen für die Ausführung der Aufgabe Gültigkeit besitzen. In diesem Fall bedeutet das, dass Tasks keine Referenzen zu Daten außerhalb ihrer Einheit besitzen dürfen. Mit dem Schlüsselwort move kann der Task allerdings die Ownership über solche Daten übernehmen.

Des Weiteren sind Tasks Send bound. Send ist ein spezieller Trait in Rust, der signalisiert, dass sich die darin enthaltenen Daten sicher zu verschiedenen Threads senden lassen. Nachdem zum Zeitpunkt des Absendens an die Tokio Runtime noch nicht klar ist, in welchem Thread der Task landet, müssen alle Daten innerhalb des Tasks auch Send sein.

Diese Bedingung kann zu spannenden Fehlern führen. Folgendes Beispiel "spawnt" (erzeugt) einen Task, der einen Reference Counter anlegt. Reference Counter (kurz: Rc) sind nötig, um Rust explizit mitzuteilen, dass mehrere Teile der Applikation Ownership über die darunterliegenden Daten übernehmen können. Ein Reference Counter zählt mit jedem .clone-Aufruf eine Referenz hoch, mit jedem Verschwinden der Referenz wieder herunter. Diese Referenzen können den Besitzer wechseln, lassen allerdings die Ownership über das Original unangetastet. Reference Counter sind allerdings nicht Send, das heißt, man kann sie nicht zwischen Threads hin- und herschicken.

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");

        yield_now().await;

        println!("{}", rc); // ERROR
    });
}

yield_now() zwingt die Tokio Runtime, den Task wieder zurück zum Scheduler zu schicken. Dadurch gibt es die Möglichkeit, dass die folgende Zeile mit der Ausgabe des Reference Counters auf einem anderen Thread landet. Da Reference Counter nicht Send, also nicht threadsicher sind, wirft Rust zur Übersetzungszeit einen Fehler.

Das heißt allerdings nicht, dass Reference Counter gar nicht zulässig wären. In der folgenden Anpassung ist die Nutzung von Rc in einem Block gekapselt. Rust versteht, dass sämtliche Referenzen auf Rc nach dem Block verschwinden. Es wird also nie passieren, dass die Daten auf einem anderen Thread landen. Dieses Codestück kompiliert:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        yield_now().await;
    });
}

Es gibt in der Runtime auch noch andere Workarounds für ähnliche Situationen:

  • Mit spawn_blocking kommt der Task auf einem Thread zur Ausführung, auf dem das Blockieren zulässig ist. Dazu legt Tokio eigene Threads zur Seite, um den Rest der Ausführung nicht zu behindern.
  • Mit spawn_local stellt Tokio sicher, dass die Tasks wieder auf dem gleichen Thread landen. Damit können auch Datenstrukturen, die nicht Send sind, verwendet werden.

Unter den Zero Cost Abstractions von Rust können diese Bedingungen zu Beginn verwirrend wirken und zu Fehlern führen. Die Funktionsweise der Runtimes zu kennen und zu verstehen, hilft auch bei der Fehlerbehandlung.

Neben äquivalenten asynchronen Implementierungen der relevanten Standardbibliothek-Structs gibt es auch Werkzeuge, die die Kommunikation zwischen Tasks vereinfachen. Dazu zählen die Channels, die in Tokio in verschiedenen Varianten vorliegen.

Channels erlauben das Verschicken von Nachrichten zwischen Sender (oder Transmitter, tx) und Empfänger (Receiver, rx). Einer der bekanntesten, da auch in der Standardbibliothek verfügbaren Channels ist "Multiple Producer, Single Consumer (MPSC)". Wie der Name sagt, können hier mehrere Produzenten Resultate an einen Empfänger schicken, wie in diesem Beispiel:

async fn main() -> io::Result<()> {
    let mut socket = TcpStream::connect("www.example.com:1234").await?;
    let (tx, mut rx) = mpsc::channel(100); // 1

    for _ in 0..10 {
       let tx = tx.clone(); // 2

        tokio::spawn(async move { // 3
            tx.send(&b"data to write"[..]).await.unwrap();
        });
    }

    drop(tx); // 4

    while let Some(res) = rx.recv().await { // 5
        socket.write_all(res).await?;
    }

    Ok(())
}

Im Detail ist der Ablauf wie folgt:

  1. Das Erzeugen eines Channels (hier mit einem Maximum an 100 gleichzeitigen Nachrichten) erstellt Sender (tx) und Empfänger (rx).
  2. Über einen .clone-Aufruf lassen sich neue Sender erzeugen. Dieser kann ebenfalls an den gleichen Empfänger rx senden. So entstehen Multiple Producers.
  3. Jede Schleifeniteration legt einen neuen Task ab. Der Task übernimmt Ownership über den neu erstellten Sender beziehungsweise Produzenten. Der Produzent schickt Daten an den Empfänger. Nach diesem Schritt bestimmen die Ownership-Regeln von Rust, dass der Speicher für Sender tx freizugeben ist.
  4. Nach dem Erzeugen aller Tasks bleibt der ursprüngliche Sender übrig. Er erhält nun die explizite Freigabe. Intern führt Rust darüber Buch, wie viele Produzenten noch an den Receiver senden können.
  5. Das hat Einfluss auf den nächsten Schritt, der die Nachrichten aller Produzenten abarbeitet. Sobald alle Produzenten freigegeben sind, gibt der Aufruf auf rx.recv() keine Ergebnisse mehr zurück. Das Programm kann also terminieren.

MPSC-Channels erlauben eine elegante Kommunikation zwischen Tasks. Dank Ownership-Regeln gibt es auch keine Überraschung, welche Produzenten noch Nachrichten schicken können. Rust nimmt seinen Anwendern hier wieder einmal viele Aufgaben ab.

Neben den bekannten MPSC-Channels gibt es noch folgende Varianten in Tokio:

  • Oneshot-Channels. Wie der Name schon sagt, gibt es hier eine einzige Nachricht. Es gibt jeweils nur einen Sender und Empfänger. Sobald die Nachricht versandt ist, haben beide keine Aufgabe mehr und können weggeworfen werden. Auf diese Weise lassen sich zum einen Resultate von lange dauernden Tasks wieder zurückschicken, zum anderen lässt sich aber auch das Send-the-Sender-Muster damit umsetzen. Hier können Multiple Producer eine Aufgabe an einen Single Consumer schicken (wie ein Webserver an eine Maschinensteuerung) und erhalten mit der Aufgabe auch gleich einen Oneshot-Sender, über den sie die Rückgabe an den ursprünglichen Task erwarten.
  • Broadcasts. Alle Sender schicken an alle. Das heißt auch, dass jeder Empfänger alle Nachrichten von allen Sendern empfängt. Das klingt ein wenig nach Chat und ist auch perfekt dafür geeignet. Pub/Sub- und Fan-Out-Muster lassen sich gut damit umsetzen.
  • Watch-Channels implementieren Single Producer und Multi Consumer. Produzenten schicken eine Nachricht ab und alle Empfänger erhalten sie. Das ist besonders gut geeignet, um Änderungen in Konfigurationen an alle ausführenden Einheiten zu übermitteln.

Beispiele dazu gibt es in den Schulungsunterlagen zum Workshop "Netzwerkapplikationen mit Rust und Tokio", der im Rahmen der betterCode() Rust im Herbst 2021 stattfand.

Ein weiterer, nicht ganz unwichtiger Punkt sind Makros, die dabei helfen, genauere Kontrolle über die Ausführung von Futures zu erlangen. Eines davon ist join!(), das die Ergebnisse aller übergebenen Futures zurückgibt.

Ein anderes Makro, das des Öfteren zum Einsatz kommt, ist select!. Hier können mehrere Futures an Tokio geschickt werden. Tokio gibt allerdings nur das Resultat der Future zurück, die als erste fertig ist. Alle anderen werden abgebrochen. Im folgenden Beispiel warten wir hier auf Ergebnisse eines Streams oder lassen die Schleife mittels Timeout unterbrechen, sollte sie zu lange dauern.

let mut stream = stream::iter(vec![1, 2, 3]);
let mut delay = time::delay_for(Duration::from_secs(1));

loop {
    tokio::select! {
        maybe_v = stream.next() => {
            if let Some(v) = maybe_v {
                println!("got = {}", v);
            } else {
                break;
            }
        }
        _ = &mut delay => {
            println!("timeout");
            break;
        }
    }
}

Als Beispiel dient eine TCP-Chat-Applikation, die sich auf GitHub einsehen lässt. Sie wartet über select! darauf, ob zuerst eine Nachricht empfangen oder gesendet wurde. Eine solche Logik wäre ohne select! schwer umzusetzen. Wer hier an das Schlüsselwort select und die Goroutines aus Go denkt, liegt richtig. Vieles davon ist direkt von Go inspiriert.