Task- und Datenparallelität mit Rust

Seite 3: Nebenläufigkeit mit Tokio

Inhaltsverzeichnis

Tokio ist eine Bibliothek für Rust, die im Gegensatz zu Rayon von vornherein auf Nebenläufigkeit ausgelegt ist und hauptsächlich für asynchrone I/O-Operationen wie File I/O oder Netzwerkkommunikation zum Einsatz kommt. Technisch nutzt Tokio Futures, deren Ausführung eine Laufzeitumgebung auf die ihr zugewiesenen CPU-Kerne verteilt und notfalls mit work-stealing die Last zwischen den CPU-Kernen ausbalanciert. Futures sind – vergleichbar mit std::future in C++ oder promises in JavaScript – eine Repräsentation eines Ergebnisses, dessen Berechnung unter Umständen noch nicht ausgeführt wurde. Sie ermöglichen es, Aufgaben zu definieren, die sich asynchron ausführen lassen.

Als Beispielanwendung dient hier eine einfache Client/Server-Anwendung, in der der Client eine Zeichenkette an den Server schickt und dieser anschließend mit derselben Zeichenkette antwortet. Sowohl Client als auch Server skalieren – mit Hilfe von Tokio automatisch – auf mehrere CPU Kerne und auch die Integration einer Backoff-Strategie im Client gestaltet sich relativ einfach.

Der folgende Code beschreibt, wie der Server TCP-Verbindungen annimmt und für jede Verbindung den Code ausführen kann. Als Erstes werden eine Socket-Adresse addr angelegt und ein listener erzeugt, der an der entsprechenden Adresse einen Port öffnet. Die Variable server ist ein Future – beziehungsweise bei genauer Betrachtung eine unendliche Menge an Futures und lässt sich daher auch als unendlicher Stream bezeichnen.

Sie legt für jede der ankommenden Verbindungen fest, dass die Verbindung angenommen werden und als Variable socket für die weitere Verarbeitung zur Verfügung gestellt werden soll. Im Falle eines Fehlers beim Verbindungsaufbau wird im Beispiel unten nur der aufgetretene Fehler ausgegeben. Da es sich bei der Variable server um einen Stream handelt, erfolgt beim Anlegen der Variable noch kein Verbindungsaufbau. Dies passiert erst, sobald server an die Tokio-Laufzeitumgebung übergeben ist.

let addr = "127.0.0.1:1234".parse().unwrap();
let listener = TcpListener::bind(&addr).unwrap();
let server = listener
.incoming()
.for_each(|socket| {
// für jede Verbindung
// siehe nächste Code-Snippet
})
.map_err(|err| {
// Fehlerbehandlung
println!("Fehler beim Verbindungsaufbau = {:?}", err);
});
tokio::run(server);

Der bis hier gezeigte Code nimmt Verbindungen an, bearbeitet diese allerdings nicht. Der folgende Code legt zwei miteinander verbundene Futures an. amountF beschreibt, dass alle in socket empfangenen Daten auch wieder über socket versendet beziehungsweise aus dem Empfangsbereich von socket in den Versendebereich kopiert werden sollen. Nach Abschluss dieser Operation enthält amountF entweder die Anzahl der geschriebenen Bytes oder einen Fehler.

Da es sich bei der Variablen server wie erwähnt um ein Future handelt, startet die Ausführung der Operation jedoch noch nicht (io::copy ist hier Teil der Tokio-Bibliothek). Sobald aber das Ergebnis der Operation vorliegt, soll entweder die Anzahl der kopierten Bytes oder der Fehlercode ausgegeben werden. Dies ist im Future msg beschrieben, das anschließend der Tokio-Laufzeitumgebung übergeben und von dieser ausgeführt wird.

let (reader, writer) = socket.split();
let amountF = io::copy(reader, writer);

let msg = amountF.then(|result| {
match result {
Ok((amount, _, _)) => println!("{} Bytes geschrieben", amount),
Err(e) => println!("error: {}", e),
}

Ok(())
});

tokio::spawn(msg);
Ok(())

Da der Client ähnlich wie der Server funktioniert, verzichten die Autoren an dieser Stelle auf eine detailliertere Besprechung. Die Funktion action erzeugt ein Future, das eine Verbindung aufbaut und die Zeichenkette "Hallo Heise Developer" schickt. Im Fall eines Fehlers beim Verbindungsaufbau soll dieser gemäß Exponential-Backoff-Strategie wiederholt werden. Um dieses Future mit der gewählten Strategie number_of_connections häufig auszuführen, lässt sich ein Stream erzeugen und für jeden Eintrag im Stream die Funktion action von der vorher gewählten Fehler-Strategie ausführen. Laufen sowohl Client als auch Server lokal, ist damit zu rechnen, dass der Verbindungsaufbau – abhängig vom verwendeten OS – häufiger scheitert, da das System-Limit der zulässigen offenen Socket-Verbindungen schneller erreicht ist.

fn action() -> impl Future<Item = (), Error = ()> {
let addr = "127.0.0.1:1234".parse().unwrap();
TcpStream::connect(&addr)
.and_then(|stream| {
io::write_all(stream, "Hallo Heise Developer").then(|result| {
println!("Daten geschrieben; Erfolg={:?}",
result.is_ok());
Ok(())
})
})
.map_err(|err| {
println!("Verbindungsfehler = {:?}", err);
})
}

fn main() {
let number_of_connections = 100_000;
let retry_strategy = ExponentialBackoff::from_millis(10).map(jitter).take(3);

let client = stream::iter_ok(0..number_of_connections)
.for_each(move |_| Retry::spawn(retry_strategy.clone(), action)
.then(|_| Ok(())));

tokio::run(client);
}

Die Future-Implementierung von Tokio ist Pull-basiert, das heißt, die Laufzeitumgebung fragt die registrierten Futures, ob sie aktuell in der Lage sind, ihre Berechnung fortzuführen oder gegebenenfalls noch auf Daten warten. Dieser Ansatz hat den Vorteil, dass es im Allgemeinen kein Backpressure-Problem durch zu schnell arbeitende Produzenten gibt. Die Laufzeitumgebung verteilt alle registrierten Futures auf alle CPU-Kerne, führt aber zu einem bestimmten Zeitpunkt auf jedem CPU-Kern immer nur ein Future aus. Sollte eines der Future gerade seine Berechnung nicht weiter fortführen können, erfolgt der Wechsel zu einem anderen Future.