Ferris Talk #5: Tokio als asynchrone Laufzeitumgebung ist ein Fast-Alleskönner
Seite 2: Tasks und deren Tücken
Tokio Tasks haben ein paar interessante Eigenschaften. So sind sie zum Beispiel ‘static
bound. ‘static
bezeichnet eine besondere Lifetime, die angibt, dass sämtliche Referenzen für die Ausführung der Aufgabe Gültigkeit besitzen. In diesem Fall bedeutet das, dass Tasks keine Referenzen zu Daten außerhalb ihrer Einheit besitzen dürfen. Mit dem Schlüsselwort move
kann der Task allerdings die Ownership über solche Daten übernehmen.
Des Weiteren sind Tasks Send
bound. Send
ist ein spezieller Trait in Rust, der signalisiert, dass sich die darin enthaltenen Daten sicher zu verschiedenen Threads senden lassen. Nachdem zum Zeitpunkt des Absendens an die Tokio Runtime noch nicht klar ist, in welchem Thread der Task landet, müssen alle Daten innerhalb des Tasks auch Send
sein.
Diese Bedingung kann zu spannenden Fehlern führen. Folgendes Beispiel "spawnt" (erzeugt) einen Task, der einen Reference Counter anlegt. Reference Counter (kurz: Rc) sind nötig, um Rust explizit mitzuteilen, dass mehrere Teile der Applikation Ownership über die darunterliegenden Daten übernehmen können. Ein Reference Counter zählt mit jedem .clone
-Aufruf eine Referenz hoch, mit jedem Verschwinden der Referenz wieder herunter. Diese Referenzen können den Besitzer wechseln, lassen allerdings die Ownership über das Original unangetastet. Reference Counter sind allerdings nicht Send
, das heißt, man kann sie nicht zwischen Threads hin- und herschicken.
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
yield_now().await;
println!("{}", rc); // ERROR
});
}
yield_now()
zwingt die Tokio Runtime, den Task wieder zurück zum Scheduler zu schicken. Dadurch gibt es die Möglichkeit, dass die folgende Zeile mit der Ausgabe des Reference Counters auf einem anderen Thread landet. Da Reference Counter nicht Send
, also nicht threadsicher sind, wirft Rust zur Übersetzungszeit einen Fehler.
Das heißt allerdings nicht, dass Reference Counter gar nicht zulässig wären. In der folgenden Anpassung ist die Nutzung von Rc
in einem Block gekapselt. Rust versteht, dass sämtliche Referenzen auf Rc
nach dem Block verschwinden. Es wird also nie passieren, dass die Daten auf einem anderen Thread landen. Dieses Codestück kompiliert:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
{
let rc = Rc::new("hello");
println!("{}", rc);
}
yield_now().await;
});
}
Es gibt in der Runtime auch noch andere Workarounds für ähnliche Situationen:
- Mit
spawn_blocking
kommt der Task auf einem Thread zur Ausführung, auf dem das Blockieren zulässig ist. Dazu legt Tokio eigene Threads zur Seite, um den Rest der Ausführung nicht zu behindern. - Mit
spawn_local
stellt Tokio sicher, dass die Tasks wieder auf dem gleichen Thread landen. Damit können auch Datenstrukturen, die nichtSend
sind, verwendet werden.
Unter den Zero Cost Abstractions von Rust können diese Bedingungen zu Beginn verwirrend wirken und zu Fehlern führen. Die Funktionsweise der Runtimes zu kennen und zu verstehen, hilft auch bei der Fehlerbehandlung.
Channels zum Senden von Nachrichten
Neben äquivalenten asynchronen Implementierungen der relevanten Standardbibliothek-Structs gibt es auch Werkzeuge, die die Kommunikation zwischen Tasks vereinfachen. Dazu zählen die Channels, die in Tokio in verschiedenen Varianten vorliegen.
Channels erlauben das Verschicken von Nachrichten zwischen Sender (oder Transmitter, tx
) und Empfänger (Receiver, rx
). Einer der bekanntesten, da auch in der Standardbibliothek verfügbaren Channels ist "Multiple Producer, Single Consumer (MPSC)". Wie der Name sagt, können hier mehrere Produzenten Resultate an einen Empfänger schicken, wie in diesem Beispiel:
async fn main() -> io::Result<()> {
let mut socket = TcpStream::connect("www.example.com:1234").await?;
let (tx, mut rx) = mpsc::channel(100); // 1
for _ in 0..10 {
let tx = tx.clone(); // 2
tokio::spawn(async move { // 3
tx.send(&b"data to write"[..]).await.unwrap();
});
}
drop(tx); // 4
while let Some(res) = rx.recv().await { // 5
socket.write_all(res).await?;
}
Ok(())
}
Im Detail ist der Ablauf wie folgt:
- Das Erzeugen eines Channels (hier mit einem Maximum an 100 gleichzeitigen Nachrichten) erstellt Sender (
tx
) und Empfänger (rx
). - Über einen
.clone
-Aufruf lassen sich neue Sender erzeugen. Dieser kann ebenfalls an den gleichen Empfängerrx
senden. So entstehen Multiple Producers. - Jede Schleifeniteration legt einen neuen Task ab. Der Task übernimmt Ownership über den neu erstellten Sender beziehungsweise Produzenten. Der Produzent schickt Daten an den Empfänger. Nach diesem Schritt bestimmen die Ownership-Regeln von Rust, dass der Speicher für Sender
tx
freizugeben ist. - Nach dem Erzeugen aller Tasks bleibt der ursprüngliche Sender übrig. Er erhält nun die explizite Freigabe. Intern führt Rust darüber Buch, wie viele Produzenten noch an den Receiver senden können.
- Das hat Einfluss auf den nächsten Schritt, der die Nachrichten aller Produzenten abarbeitet. Sobald alle Produzenten freigegeben sind, gibt der Aufruf auf
rx.recv()
keine Ergebnisse mehr zurück. Das Programm kann also terminieren.
MPSC-Channels erlauben eine elegante Kommunikation zwischen Tasks. Dank Ownership-Regeln gibt es auch keine Überraschung, welche Produzenten noch Nachrichten schicken können. Rust nimmt seinen Anwendern hier wieder einmal viele Aufgaben ab.
Neben den bekannten MPSC-Channels gibt es noch folgende Varianten in Tokio:
- Oneshot-Channels. Wie der Name schon sagt, gibt es hier eine einzige Nachricht. Es gibt jeweils nur einen Sender und Empfänger. Sobald die Nachricht versandt ist, haben beide keine Aufgabe mehr und können weggeworfen werden. Auf diese Weise lassen sich zum einen Resultate von lange dauernden Tasks wieder zurückschicken, zum anderen lässt sich aber auch das Send-the-Sender-Muster damit umsetzen. Hier können Multiple Producer eine Aufgabe an einen Single Consumer schicken (wie ein Webserver an eine Maschinensteuerung) und erhalten mit der Aufgabe auch gleich einen Oneshot-Sender, über den sie die Rückgabe an den ursprünglichen Task erwarten.
- Broadcasts. Alle Sender schicken an alle. Das heißt auch, dass jeder Empfänger alle Nachrichten von allen Sendern empfängt. Das klingt ein wenig nach Chat und ist auch perfekt dafür geeignet. Pub/Sub- und Fan-Out-Muster lassen sich gut damit umsetzen.
- Watch-Channels implementieren Single Producer und Multi Consumer. Produzenten schicken eine Nachricht ab und alle Empfänger erhalten sie. Das ist besonders gut geeignet, um Änderungen in Konfigurationen an alle ausführenden Einheiten zu übermitteln.
Beispiele dazu gibt es in den Schulungsunterlagen zum Workshop "Netzwerkapplikationen mit Rust und Tokio", der im Rahmen der betterCode() Rust im Herbst 2021 stattfand.
Makros zur Kontrolle von Futures
Ein weiterer, nicht ganz unwichtiger Punkt sind Makros, die dabei helfen, genauere Kontrolle über die Ausführung von Futures zu erlangen. Eines davon ist join!()
, das die Ergebnisse aller übergebenen Futures zurückgibt.
Ein anderes Makro, das des Öfteren zum Einsatz kommt, ist select!
. Hier können mehrere Futures an Tokio geschickt werden. Tokio gibt allerdings nur das Resultat der Future zurück, die als erste fertig ist. Alle anderen werden abgebrochen. Im folgenden Beispiel warten wir hier auf Ergebnisse eines Streams oder lassen die Schleife mittels Timeout unterbrechen, sollte sie zu lange dauern.
let mut stream = stream::iter(vec![1, 2, 3]);
let mut delay = time::delay_for(Duration::from_secs(1));
loop {
tokio::select! {
maybe_v = stream.next() => {
if let Some(v) = maybe_v {
println!("got = {}", v);
} else {
break;
}
}
_ = &mut delay => {
println!("timeout");
break;
}
}
}
Als Beispiel dient eine TCP-Chat-Applikation, die sich auf GitHub einsehen lässt. Sie wartet über select!
darauf, ob zuerst eine Nachricht empfangen oder gesendet wurde. Eine solche Logik wäre ohne select!
schwer umzusetzen. Wer hier an das Schlüsselwort select
und die Goroutines aus Go denkt, liegt richtig. Vieles davon ist direkt von Go inspiriert.