async/await in Python: Nebenläufigkeit leicht gemacht
Mit überschaubarem Aufwand, und ohne auf komplizierte Threading-Mechanismen zurückgreifen zu müssen, lassen sich Programme nebenläufig entwickeln.
- Martin Meeser
Das mächtige Konstrukt async/await stellt Python bereits seit vielen Jahren zur Verfügung. Coroutines mit den Schlüsselwörtern async
und await
hielten 2015 in Python 3.5 Einzug. Damit lassen sich nahezu ohne Mehraufwand nebenläufige Programme entwickeln. Mit etwas Hintergrundwissen eröffnen sich darüber hinaus Möglichkeiten, komplexe nebenläufige Programme elegant zu entwerfen, ohne dafür komplizierte Threading-Mechanismen kennen oder gar anwenden zu müssen.
Kooperative Routinen
Eine asynchrone Funktions-Definition wird englisch als Coroutine – eine kooperative Routine – bezeichnet. Um sie zu verwenden, muss dem def
-Schlüsselwort das async
-Schlüsselwort vorangestellt sein:
async def af():
i = await y()
print(i)
print(await y())
Listing 1: Definition einer Coroutine
Der Grundgedanke dabei ist, dass eine kooperative Funktion (Routine ist nur ein anderer Begriff für Funktion) von sich aus anderen Programmteilen anbietet, die eigene Ausführung zu unterbrechen, und dadurch Rechen- und Ressourcen-Kapazitäten zur Verfügung stellt.
Eine async def
kann an all jenen Stellen definiert werden, an denen man eine standardmäßige def
verwenden kann – beispielsweise auch innerhalb von Klassen:
class MyClass:
async def caf(self):
i = await y()
print(i)
print(await y())
Listing 2: Definition einer Coroutine als Klassen-Methode
Das await
-Schlüsselwort darf, wie in den obigen Beispielen bereits zu sehen ist, ausschließlich innerhalb von Coroutines zum Einsatz kommen. An den durch await
gekennzeichneten Stellen ist die Coroutine kooperativ: Hier lässt sich das Ausführen der Funktion unterbrechen. Sobald das Ergebnis der asynchronen Operation vorliegt (im Beispiel await y()
), wird das Abarbeiten der Funktion fortgesetzt. Es kommt an dieser Stelle aber nicht zu einem Blockieren des Threads.
Innerhalb einer Coroutine darf es auch Anweisungen ohne await
geben. Eine Coroutine ohne await
ist technisch möglich, aber nicht sinnvoll.
Coroutines in der Event-Loop
Eine Coroutine kann nur innerhalb einer Event-Loop ausgeführt werden, die sich mit der Funktion asyncio.run
aus dem Modul asyncio
erzeugen lässt. Das empfohlene Vorgehen dabei ist es, eine asynchrone main
-Funktion zu definieren und den gesamten Programmablauf direkt an die Event-Loop zu übergeben:
async def main():
pass # non blocking program
if __name__ == "__main__":
asyncio.run(main())
Listing 3: Main-Methode in einem Python-Programm, in dem async/await zum Einsatz kommt
Die Event-Loop endet, sobald keine asynchronen Aufgaben mehr vorliegen. Je Thread kann es nur eine Event-Loop geben. Entsprechend sollte man die Funktion asyncio.run
nur einmal pro Thread aufrufen.
Blocking I/O vermeiden
Klassische Programme haben ein wesentliches Problem: Sie blockieren, wenn sie auf Ressourcen wie Speicher oder Netzwerk zugreifen. Man betrachte dabei folgendes Beispiel:
def main():
with open('file.txt', 'r') as file:
s: str = file.readline()
Listing 4: Blockierendes Öffnen einer Datei
Was genau passiert hier? Das Python-Programm läuft innerhalb eines CPython-Prozesses auf dem Betriebssystem. Um eine Datei zu öffnen und aus ihr zu lesen, muss CPython Anfragen an das Betriebssystem senden. Jenes prüft unter anderem, ob die Datei existiert. Dabei kann etwas Zeit vergehen, beispielsweise, weil eine mechanische Festplatte erst anlaufen muss oder ein Netzlaufwerk die Gegenstelle kontaktiert. Der Aufruf an open
kehrt erst zurück, nachdem die Operation vollständig ausgeführt wurde.
Während der Thread aus dem CPython-Programm auf die Rückmeldung des Betriebssystems wartet, ist er blockiert – kann also keine weiteren Aktionen oder Berechnungen ausführen. In dieser Zeit stoppt der Programmfortschritt.
Don’t call us, we'll call you
Um ein Blockieren bei Ressourcen-Zugriffen zu vermeiden, kann man konzeptuell entsprechend des Hollywood Principle "Don’t call us, we'll call you" eine Rückruf-Funktion angeben (Callback).
def on_opened(f):
read_line_noblock(f, on_line_read)
def on_line_read(f, line):
pass
open_nonblocking('file.txt', 'r', on_opened)
Listing 5: Nicht-blockierendes Öffnen einer Datei via Callback anhand eines Pseudo-Beispiels
In diesem Beispiel erhält die Pseudo-Funktion open_nonblocking
die Anweisung, die Datei file.txt lesend zu öffnen und, nachdem die Datei erfolgreich geöffnet wurde, den Callback on_opened
auszuführen.
Der Aufruf an open_nonblocking
blockiert nicht, und open_nonblocking
kehrt sofort zurück. Während im Hintergrund das Betriebssystem die Datei öffnet, läuft das Programm weiter. Sobald das Ereignis eintritt, dass die Datei geöffnet wurde, erfolgt das Ausführen des Callbacks on_opened
. Der genaue Zeitpunkt, zu dem der Callback-Aufruf geschieht, ist unbekannt.
Die Blockade aus Listing 4 ließ sich eliminieren, indem nicht mehr das Programm das Betriebssystem ruft und auf die Antwort wartet, sondern ein Callback platziert wird. Das Betriebssystem meldet sich dann, sobald das Ergebnis vorliegt.
Allerdings sind auch in diesem Fall einige Nachteile auszumachen:
- Les- und Wartbarkeit sind schlechter als zuvor, denn eigentlich soll das Programm eine Datei öffnen und dann einlesen. Dieser Prozess erstreckt sich nun über mehrere Funktionen. Der intendierte Ablauf lässt sich aus dem Quelltext nicht mehr implizit herauslesen.
- Das Debugging ist erschwert, da die Ausgangssituation und ihre Variablenwerte zum Zeitpunkt des Aufrufs von
open_nonblocking
beim Ausführen des Callbacks nicht mehr bekannt sind. - Durch Debugging verändert man den zeitlichen Ablauf einzelner asynchroner Funktionen, wodurch das Verhalten anders sein kann als beim normalen Ablauf des Programms.
- Es ist nicht mehr möglich, Exceptions zusammenhängend abzufangen, sondern lediglich innerhalb des einzelnen Callbacks.
- Bei Callbacks mit vielen Verschachtelungen spricht man auch von der "Callback Hell" (der Rückruf-Hölle).
Nicht-blockierende I/O mit async await
Mit async await
lässt sich ein nicht-blockierender Dateizugriff umsetzen, der aber nicht die genannten Nachteile mit sich bringt. Die implizierte Reihenfolge des Quelltextes bleibt erhalten.
import asyncio
from aiofile import async_open
async def main():
async with async_open("./hello.txt", 'r') as afp:
line: str = await afp.readline()
asyncio.run(main())
Listing 6: Nicht-blockierendes Öffnen einer Datei mit async/await
Das Beispiel im vorangehenden Listing verwendet das async with
-Konstrukt. Es verhält sich wie ein herkömmlicheswith
– die Coroutine bietet zusätzlich an, das Ausführen an dieser Stelle zu unterbrechen.
Der Thread blockiert jedoch nicht, sondern die Kontrolle geht an die Event-Loop zurück. Sie kann nun andere Coroutines starten oder fortsetzen.
Sobald das Betriebssystem die Datei geöffnet hat, sendet es das Ereignis an den CPython-Prozess. Dort nimmt die Event-Loop das Ereignis an. Sobald eine laufende Coroutine beendet oder mit await
eine mögliche Unterbrechung markiert ist, bewirkt die Event-Loop den Wiedereintritt in die Beispiel-Routine.
Das Listing 6 zeigt vor allem die Verwendung von async await
. Der eigentliche Vorteil ergibt sich jedoch erst in einer echten Applikation wie einem Webserver, der pro Sekunde Tausende von Anfragen verarbeitet.
Coroutines und Awaitable Objects
Der Aufruf einer Coroutine löst nicht deren Ausführung aus, sondern das Erzeugen eines Awaitable Object
, das sich anschließend mit await
verwenden lässt. Erst beim Verwenden erfolgt der Aufruf der Coroutine. Ein Awaitable Object
lässt sich nur einmal mit await
verwenden. Außerhalb einer Event-Loop ist es nicht verwendbar.
import asyncio
async def say_hello():
print("Greetings from say hello!")
async def main():
await say_hello_aw
# await say_hello_aw => can not reuse
local_awaitable = say_hello()
await say_hello()
say_hello_aw = say_hello()
asyncio.run(main())
Listing 7: Wie oft wird “say hello” ausgeführt?
In Zeile 15 des Listings erzeugt der Aufruf von say_hello
ein Awaitable Object
. Es zeichnet sich durch eine "magische" __await__
-Methode aus. In Zeile 9 wird das Awaitable Object
innerhalb der Event-Loop mit await
verwendet. Anschließend erfolgt das erstmalige Ausführen von say_hello
.
Zeile 11 zeigt das Erstellen eines weiteren Awaitable Object
und dessen Binden an eine lokale Variable. Diese wird jedoch nicht mit await
eingesetzt und say_hello
nicht durch das Awaitable Object
ausgeführt. Beim Verlassen von main()
gibt CPython daher am Terminal eine Warnung aus.
In Zeile 12 folgt das dritte Awaitable Object
, das nun direkt mit await
verwendet wird und anschließend das zweite Ausführen von say_hello
auslöst.
Nebenläufige Abläufe mit Tasks erzeugen
Mit Tasks kann ein Programm in einen neuen nebenläufigen (concurrent) Ablaufzweig ausscheren. Die Funktion asyncio.create_task
erzeugt Tasks. Als Argument übergibt man ein noch nicht verwendetes Awaitable Object
.
Übergibt man den Aufruf einer Coroutine an create_task
, führt die Event-Loop die Coroutine so bald wie möglich nebenläufig aus – nicht erst beim Verwenden mit await
. Es ist dabei unbekannt und unerheblich, ob das Ausführen tatsächlich parallel oder verzahnt erfolgt. Es handelt sich lediglich um ein Implementierungsdetail der Event-Loop. Wartet man mit await
auf den Ablaufzweig, so wird der aktuelle Ablauf mit dem anderen Zweig synchronisiert: Dabei unterbricht der aktuelle Ablauf, bis der erwartete Task beendet ist. Konzeptuell ist das vergleichbar mit dem Thread.join()
, technisch kommt es aber nicht zum Blockieren. Die Event-Loop stellt automatisch die erforderliche Synchronisierung her.
Mithilfe von Tasks lassen sich auch Werte aus nebenläufigen Vorgängen zurückgeben. Das ist eine Aufgabe, die mit Threads bereits einigen Aufwand erfordern würde, einen Lock, eine while
-Schleife oder ein notifyAll()
.
t1 = asyncio.create_task(...)
i:int = await t1
Das folgende Listing macht die Unterschiede beim Verwenden von Tasks im Gegensatz zum direkten Aufrufen der Koroutinen deutlich.
import asyncio
import time
async def provide_after(delay_sec: float, to_say: str):
await asyncio.sleep(delay_sec)
print(to_say + f" said at {time.strftime('%X')}")
return to_say
async def main():
print(f"started at {time.strftime('%X')}")
await provide_after(1, "1st awaitable")
await provide_after(1, "2nd awaitable")
print(f"finished at {time.strftime('%X')}")
await asyncio.sleep(2) # just to have some gap
t1 = asyncio.create_task(provide_after(1., "1st task"))
t2 = asyncio.create_task(provide_after(1., "2nd task"))
print(f"started at {time.strftime('%X')}")
await t1
await t2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Listing 8: Einsatz von Tasks und Koroutinen im Vergleich
Zeile 14 des Codes wird eine Sekunde nach Zeile 13 ausgeführt. In der Coroutine provide_after
erfolgt der Aufruf von asyncio.sleep
, der die Kontrolle an die Event-Loop zurückgibt und einen Wiedereintritt nach der übergebenen Zeitspanne herbeiführt. Entsprechend wird Zeile 15 eine Sekunde nach Zeile 14 aufgerufen.
Dagegen kehren die Aufrufe an create_task
in den Zeilen 17 und 18 sofort zurück, während nebenläufig die Funktion provide_after
zur Ausführung kommt.
#Ausgabe von Listing 8:
started at 10:24:49
1st awaitable said at 10:24:50
2nd awaitable said at 10:24:51
finished at 10:24:51
started at 10:24:53
1st task said at 10:24:54
2nd task said at 10:24:54
finished at 10:24:54
Catch me if you can: asynchrone Exceptions
Asynchrone Exceptions werden Task-übergreifend abgefangen, wie das folgende Listing zeigt. Dadurch ist das Abfangen von Exceptions zentral einfach realisierbar. Auch die Lesbarkeit ist dabei ausgezeichnet.
import asyncio
async def will_ex():
raise Exception("This is unavoidable")
async def main():
try:
await asyncio.create_task(will_ex())
except Exception as ex:
print(ex)
asyncio.run(main())
Listing 9: Asynchrone Exceptions werden Task-übergreifend abgefangen
Im Gegensatz zu Awaitable Objects
lassen sich Tasks mehrfach mit await
verwenden. Wie im nächsten Listing dargestellt, lässt sich auf diese Weise eine Reihe von Abläufen als Reaktion auf einen Einzelablauf auslösen. Der in Zeile 10 erzeugte Task wird als Parameter an die drei Tasks übergeben, die in Zeile 12 innerhalb der Schleife erzeugt werden. Sobald der Task t aus Zeile 10 beendet ist, fahren alle Koroutinen fort, die diesen Task abgewartet haben. Das erinnert konzeptuell an ein "notify-all"-Muster.
import asyncio
async def trigger(t, id):
s = await t
print(s, id)
async def main():
t = asyncio.create_task(asyncio.sleep(1, 'Hello'))
for i in range(3):
asyncio.create_task(trigger(t, i))
await t
asyncio.run(main())
Listing 10: Ein Task wird mehrfach von verschiedenen anderen Tasks erwartet
Zusammenführen mehrerer Ablaufstränge
Häufig möchte man mehrere nebenläufige Abläufe wieder zu einem synchronisieren, wie es bei einem Task über das Anwenden von await
möglich ist. Damit man mehrere Tasks nicht immer wieder manuell mit einer Schleife synchronisieren muss, steht die Funktion asyncio.wait
zur Verfügung (siehe nachfolgendes Listing). Ohne diese nützliche Funktion würde man immer wieder gleichen oder sehr ähnlichen Quelltext erzeugen. Übergibt man nun asyncio.wait
eine Coroutine, so wird sie automatisch in einen neu erzeugten Task gelegt. Das Ausführen geschieht dabei nebenläufig und startet unmittelbar. Mit dem zweiten Argument der Funktion kann man steuern, wie das Synchronisieren mit den anderen Tasks konkret auszuführen ist.
import asyncio
import time
async def x(i: float):
await asyncio.sleep(i)
return i
async def main():
print(f"started at {time.strftime('%X')}")
done, pending = await asyncio.wait([x(i) for i in range(10)], return_when="FIRST_COMPLETED")
done, pending = await asyncio.wait([x(i) for i in range(10)], return_when="ALL_COMPLETED")
done, pending = await asyncio.wait([x(i) for i in range(10)], return_when="FIRST_EXCEPTION")
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Listing 11: Anwendung von asyncio.wait
Übergibt man FIRST_COMPLETED
, dann kehrt die Funktion zurück, sobald der erste der übergebenen Tasks zurückkehrt. Der zuerst zurückgekehrte Task ist dann in der Variable done
abgelegt, alle anderen noch nicht beendeten Tasks !!!befinden sich zu diesem Zeitpunkt in der Variablen pending
.
Übergibt man ALL_COMPLETED
, dann kehrt das erwartete asyncio.wait
erst zurück, wenn alle Tasks ihre Ausführung beendet haben. In dem Fall ist pending
leer und alle übergebenen Tasks sind in done
enthalten.
Tasks der Reihe nach abarbeiten
In Python steht eine besonders komfortable Funktion zur Verfügung: asyncio.as_completed
. Damit kann man innerhalb einer Schleife die Tasks in der Reihenfolge bearbeiten, in der die Tasks ihre Ausführung beenden. Das ist im folgenden Listing dargestellt:
import asyncio
import time
async def x(i: float):
await asyncio.sleep(i)
return i
async def main():
print(f"started at {time.strftime('%X')}")
for f in asyncio.as_completed([x(i) for i in range(10)]):
print(await f)
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Listing 12: Anwendung von asyncio.as_completed
Zusammenfassung und Ausblick
Mit den async await
-Schlüsselwörtern und dem asyncio
-Modul bietet Python ein mächtiges Programmierwerkzeug. I/O-Blockaden finden nicht statt und nebenläufige Programme lassen sich ohne komplizierte Thread-Mechanismen beschreiben. Die Entwicklung ist einfacher bei zugleich sehr guter Lesbarkeit, besserem Debugging und effizienterer Ausführung.
In Zukunft dürfte die Beschränkung auf einen einzigen Thread und das Python-typische GIL-Problem (Global Interpreter Lock) keine Rolle mehr spielen. Man denke beispielsweise an die GraalVM oder spezialisierte Event-Loop-Implementierungen, wie sie schon im Internet frei zu finden sind. Ein heute noch mit async await
entwickeltes Programm nutzt morgen bereits ohne Anpassungsbedarf die Fähigkeiten einer besseren Event-Loop.
Martin Meeser
ist freiberuflicher Diplominformatiker (Uni) und entwickelt seit seinem neunten Lebensjahr Software, seit 2004 professionell. In zahlreichen Projekten betreute er bisher Kunden unter anderem aus den Bereichen Automotive, Finance, Raumfahrt, Radioastronomie und Medizintechnik.
(map)