zurück zum Artikel

async/await in Python: Nebenläufigkeit leicht gemacht

Martin Meeser

(Bild: rodho/Shutterstock.com)

Mit überschaubarem Aufwand, und ohne auf komplizierte Threading-Mechanismen zurückgreifen zu müssen, lassen sich Programme nebenläufig entwickeln.

Das mächtige Konstrukt async/await stellt Python bereits seit vielen Jahren zur Verfügung. Coroutines mit den Schlüsselwörtern async und await hielten 2015 in Python 3.5 Einzug [1]. Damit lassen sich nahezu ohne Mehraufwand nebenläufige Programme entwickeln. Mit etwas Hintergrundwissen eröffnen sich darüber hinaus Möglichkeiten, komplexe nebenläufige Programme elegant zu entwerfen, ohne dafür komplizierte Threading-Mechanismen kennen oder gar anwenden zu müssen.

Eine asynchrone Funktions-Definition wird englisch als Coroutine – eine kooperative Routine – bezeichnet. Um sie zu verwenden, muss dem def-Schlüsselwort das async-Schlüsselwort vorangestellt sein:

async def af():
	i = await y()
	print(i)
	print(await y())

Listing 1: Definition einer Coroutine

Der Grundgedanke dabei ist, dass eine kooperative Funktion (Routine ist nur ein anderer Begriff für Funktion) von sich aus anderen Programmteilen anbietet, die eigene Ausführung zu unterbrechen, und dadurch Rechen- und Ressourcen-Kapazitäten zur Verfügung stellt.

Eine async def kann an all jenen Stellen definiert werden, an denen man eine standardmäßige def verwenden kann – beispielsweise auch innerhalb von Klassen:

class MyClass:
	async def caf(self):
		i = await y()
		print(i)
		print(await y())

Listing 2: Definition einer Coroutine als Klassen-Methode

Das await-Schlüsselwort darf, wie in den obigen Beispielen bereits zu sehen ist, ausschließlich innerhalb von Coroutines zum Einsatz kommen. An den durch await gekennzeichneten Stellen ist die Coroutine kooperativ: Hier lässt sich das Ausführen der Funktion unterbrechen. Sobald das Ergebnis der asynchronen Operation vorliegt (im Beispiel await y()), wird das Abarbeiten der Funktion fortgesetzt. Es kommt an dieser Stelle aber nicht zu einem Blockieren des Threads.

Innerhalb einer Coroutine darf es auch Anweisungen ohne await geben. Eine Coroutine ohne await ist technisch möglich, aber nicht sinnvoll.

Eine Coroutine kann nur innerhalb einer Event-Loop ausgeführt werden, die sich mit der Funktion asyncio.run aus dem Modul asyncio erzeugen lässt. Das empfohlene Vorgehen dabei ist es, eine asynchrone main-Funktion zu definieren und den gesamten Programmablauf direkt an die Event-Loop zu übergeben:

async def main():
	pass # non blocking program

if __name__ == "__main__":
	asyncio.run(main())

Listing 3: Main-Methode in einem Python-Programm, in dem async/await zum Einsatz kommt

Die Event-Loop endet, sobald keine asynchronen Aufgaben mehr vorliegen. Je Thread kann es nur eine Event-Loop geben. Entsprechend sollte man die Funktion asyncio.run nur einmal pro Thread aufrufen.

Klassische Programme haben ein wesentliches Problem: Sie blockieren, wenn sie auf Ressourcen wie Speicher oder Netzwerk zugreifen. Man betrachte dabei folgendes Beispiel:

def main():
	with open('file.txt', 'r') as file:
		s: str = file.readline()

Listing 4: Blockierendes Öffnen einer Datei

Was genau passiert hier? Das Python-Programm läuft innerhalb eines CPython-Prozesses auf dem Betriebssystem. Um eine Datei zu öffnen und aus ihr zu lesen, muss CPython Anfragen an das Betriebssystem senden. Jenes prüft unter anderem, ob die Datei existiert. Dabei kann etwas Zeit vergehen, beispielsweise, weil eine mechanische Festplatte erst anlaufen muss oder ein Netzlaufwerk die Gegenstelle kontaktiert. Der Aufruf an open kehrt erst zurück, nachdem die Operation vollständig ausgeführt wurde.

Konzeptuelle Darstellung des blockierenden Öffnens (Abb. 1)

Während der Thread aus dem CPython-Programm auf die Rückmeldung des Betriebssystems wartet, ist er blockiert – kann also keine weiteren Aktionen oder Berechnungen ausführen. In dieser Zeit stoppt der Programmfortschritt.

Um ein Blockieren bei Ressourcen-Zugriffen zu vermeiden, kann man konzeptuell entsprechend des Hollywood Principle "Don’t call us, we'll call you" eine Rückruf-Funktion angeben (Callback).

def on_opened(f):
	read_line_noblock(f, on_line_read)

def on_line_read(f, line):
	pass


open_nonblocking('file.txt', 'r', on_opened)

Listing 5: Nicht-blockierendes Öffnen einer Datei via Callback anhand eines Pseudo-Beispiels

In diesem Beispiel erhält die Pseudo-Funktion open_nonblocking die Anweisung, die Datei file.txt lesend zu öffnen und, nachdem die Datei erfolgreich geöffnet wurde, den Callback on_opened auszuführen.

Der Aufruf an open_nonblocking blockiert nicht, und open_nonblocking kehrt sofort zurück. Während im Hintergrund das Betriebssystem die Datei öffnet, läuft das Programm weiter. Sobald das Ereignis eintritt, dass die Datei geöffnet wurde, erfolgt das Ausführen des Callbacks on_opened. Der genaue Zeitpunkt, zu dem der Callback-Aufruf geschieht, ist unbekannt.

Die Blockade aus Listing 4 ließ sich eliminieren, indem nicht mehr das Programm das Betriebssystem ruft und auf die Antwort wartet, sondern ein Callback platziert wird. Das Betriebssystem meldet sich dann, sobald das Ergebnis vorliegt.

Allerdings sind auch in diesem Fall einige Nachteile auszumachen:

Mit async await lässt sich ein nicht-blockierender Dateizugriff umsetzen, der aber nicht die genannten Nachteile mit sich bringt. Die implizierte Reihenfolge des Quelltextes bleibt erhalten.

import asyncio
from aiofile import async_open

async def main():
	async with async_open("./hello.txt", 'r') as afp:
	line: str = await afp.readline()

asyncio.run(main())

Listing 6: Nicht-blockierendes Öffnen einer Datei mit async/await

Das Beispiel im vorangehenden Listing verwendet das async with-Konstrukt. Es verhält sich wie ein herkömmlicheswith – die Coroutine bietet zusätzlich an, das Ausführen an dieser Stelle zu unterbrechen.

Der Thread blockiert jedoch nicht, sondern die Kontrolle geht an die Event-Loop zurück. Sie kann nun andere Coroutines starten oder fortsetzen.

Konzeptuelle Darstellung nicht-blockierender I/O mittels Event-Loop (Abb. 2)

Sobald das Betriebssystem die Datei geöffnet hat, sendet es das Ereignis an den CPython-Prozess. Dort nimmt die Event-Loop das Ereignis an. Sobald eine laufende Coroutine beendet oder mit await eine mögliche Unterbrechung markiert ist, bewirkt die Event-Loop den Wiedereintritt in die Beispiel-Routine.

Das Listing 6 zeigt vor allem die Verwendung von async await. Der eigentliche Vorteil ergibt sich jedoch erst in einer echten Applikation wie einem Webserver, der pro Sekunde Tausende von Anfragen verarbeitet.

Der Aufruf einer Coroutine löst nicht deren Ausführung aus, sondern das Erzeugen eines Awaitable Object, das sich anschließend mit await verwenden lässt. Erst beim Verwenden erfolgt der Aufruf der Coroutine. Ein Awaitable Object lässt sich nur einmal mit await verwenden. Außerhalb einer Event-Loop ist es nicht verwendbar.

import asyncio


async def say_hello():
    print("Greetings from say hello!")


async def main():
    await say_hello_aw
    # await say_hello_aw => can not reuse
    local_awaitable = say_hello()
    await say_hello()


say_hello_aw = say_hello()

asyncio.run(main())

Listing 7: Wie oft wird “say hello” ausgeführt?

In Zeile 15 des Listings erzeugt der Aufruf von say_hello ein Awaitable Object. Es zeichnet sich durch eine "magische" __await__-Methode aus. In Zeile 9 wird das Awaitable Object innerhalb der Event-Loop mit await verwendet. Anschließend erfolgt das erstmalige Ausführen von say_hello.

Zeile 11 zeigt das Erstellen eines weiteren Awaitable Object und dessen Binden an eine lokale Variable. Diese wird jedoch nicht mit await eingesetzt und say_hello nicht durch das Awaitable Object ausgeführt. Beim Verlassen von main() gibt CPython daher am Terminal eine Warnung aus.

In Zeile 12 folgt das dritte Awaitable Object, das nun direkt mit await verwendet wird und anschließend das zweite Ausführen von say_hello auslöst.

Mit Tasks kann ein Programm in einen neuen nebenläufigen (concurrent) Ablaufzweig ausscheren. Die Funktion asyncio.create_task erzeugt Tasks. Als Argument übergibt man ein noch nicht verwendetes Awaitable Object.

Erzeugen eines nebenläufigen Ablaufs mit createTask (Abb. 3)

Übergibt man den Aufruf einer Coroutine an create_task, führt die Event-Loop die Coroutine so bald wie möglich nebenläufig aus – nicht erst beim Verwenden mit await. Es ist dabei unbekannt und unerheblich, ob das Ausführen tatsächlich parallel oder verzahnt erfolgt. Es handelt sich lediglich um ein Implementierungsdetail der Event-Loop. Wartet man mit await auf den Ablaufzweig, so wird der aktuelle Ablauf mit dem anderen Zweig synchronisiert: Dabei unterbricht der aktuelle Ablauf, bis der erwartete Task beendet ist. Konzeptuell ist das vergleichbar mit dem Thread.join(), technisch kommt es aber nicht zum Blockieren. Die Event-Loop stellt automatisch die erforderliche Synchronisierung her.

Mithilfe von Tasks lassen sich auch Werte aus nebenläufigen Vorgängen zurückgeben. Das ist eine Aufgabe, die mit Threads bereits einigen Aufwand erfordern würde, einen Lock, eine while-Schleife oder ein notifyAll().

t1 = asyncio.create_task(...)
i:int = await t1

Das folgende Listing macht die Unterschiede beim Verwenden von Tasks im Gegensatz zum direkten Aufrufen der Koroutinen deutlich.


import asyncio
import time


async def provide_after(delay_sec: float, to_say: str):
    await asyncio.sleep(delay_sec)
    print(to_say + f" said at {time.strftime('%X')}")
    return to_say


async def main():
    print(f"started at {time.strftime('%X')}")
    await provide_after(1, "1st awaitable")
    await provide_after(1, "2nd awaitable")
    print(f"finished at {time.strftime('%X')}")
    await asyncio.sleep(2)  # just to have some gap
    t1 = asyncio.create_task(provide_after(1., "1st task"))
    t2 = asyncio.create_task(provide_after(1., "2nd task"))
    print(f"started at {time.strftime('%X')}")
    await t1
    await t2
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Listing 8: Einsatz von Tasks und Koroutinen im Vergleich

Zeile 14 des Codes wird eine Sekunde nach Zeile 13 ausgeführt. In der Coroutine provide_after erfolgt der Aufruf von asyncio.sleep, der die Kontrolle an die Event-Loop zurückgibt und einen Wiedereintritt nach der übergebenen Zeitspanne herbeiführt. Entsprechend wird Zeile 15 eine Sekunde nach Zeile 14 aufgerufen.

Dagegen kehren die Aufrufe an create_task in den Zeilen 17 und 18 sofort zurück, während nebenläufig die Funktion provide_after zur Ausführung kommt.

#Ausgabe von Listing 8:

started at 10:24:49
1st awaitable said at 10:24:50
2nd awaitable said at 10:24:51
finished at 10:24:51
started at 10:24:53
1st task said at 10:24:54
2nd task said at 10:24:54
finished at 10:24:54

Asynchrone Exceptions werden Task-übergreifend abgefangen, wie das folgende Listing zeigt. Dadurch ist das Abfangen von Exceptions zentral einfach realisierbar. Auch die Lesbarkeit ist dabei ausgezeichnet.


import asyncio


async def will_ex():
    raise Exception("This is unavoidable")


async def main():
    try:
        await asyncio.create_task(will_ex())
    except Exception as ex:
        print(ex)


asyncio.run(main())

Listing 9: Asynchrone Exceptions werden Task-übergreifend abgefangen

Tasks mehrfach auslösen (Abb. 4)

Im Gegensatz zu Awaitable Objects lassen sich Tasks mehrfach mit await verwenden. Wie im nächsten Listing dargestellt, lässt sich auf diese Weise eine Reihe von Abläufen als Reaktion auf einen Einzelablauf auslösen. Der in Zeile 10 erzeugte Task wird als Parameter an die drei Tasks übergeben, die in Zeile 12 innerhalb der Schleife erzeugt werden. Sobald der Task t aus Zeile 10 beendet ist, fahren alle Koroutinen fort, die diesen Task abgewartet haben. Das erinnert konzeptuell an ein "notify-all"-Muster.

import asyncio


async def trigger(t, id):
    s = await t
    print(s, id)


async def main():
    t = asyncio.create_task(asyncio.sleep(1, 'Hello'))
    for i in range(3):
        asyncio.create_task(trigger(t, i))
    await t


asyncio.run(main())

Listing 10: Ein Task wird mehrfach von verschiedenen anderen Tasks erwartet

Synchronisieren mehrerer Tasks (Abb. 5)

Häufig möchte man mehrere nebenläufige Abläufe wieder zu einem synchronisieren, wie es bei einem Task über das Anwenden von await möglich ist. Damit man mehrere Tasks nicht immer wieder manuell mit einer Schleife synchronisieren muss, steht die Funktion asyncio.wait zur Verfügung (siehe nachfolgendes Listing). Ohne diese nützliche Funktion würde man immer wieder gleichen oder sehr ähnlichen Quelltext erzeugen. Übergibt man nun asyncio.wait eine Coroutine, so wird sie automatisch in einen neu erzeugten Task gelegt. Das Ausführen geschieht dabei nebenläufig und startet unmittelbar. Mit dem zweiten Argument der Funktion kann man steuern, wie das Synchronisieren mit den anderen Tasks konkret auszuführen ist.


import asyncio
import time


async def x(i: float):
    await asyncio.sleep(i)
    return i


async def main():
    print(f"started at {time.strftime('%X')}")
    done, pending = await asyncio.wait([x(i) for i in range(10)], return_when="FIRST_COMPLETED")
    done, pending = await asyncio.wait([x(i) for i in range(10)], return_when="ALL_COMPLETED")
    done, pending = await asyncio.wait([x(i) for i in range(10)], return_when="FIRST_EXCEPTION")
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Listing 11: Anwendung von asyncio.wait

Übergibt man FIRST_COMPLETED, dann kehrt die Funktion zurück, sobald der erste der übergebenen Tasks zurückkehrt. Der zuerst zurückgekehrte Task ist dann in der Variable done abgelegt, alle anderen noch nicht beendeten Tasks !!!befinden sich zu diesem Zeitpunkt in der Variablen pending.

Übergibt man ALL_COMPLETED, dann kehrt das erwartete asyncio.wait erst zurück, wenn alle Tasks ihre Ausführung beendet haben. In dem Fall ist pending leer und alle übergebenen Tasks sind in done enthalten.

In Python steht eine besonders komfortable Funktion zur Verfügung: asyncio.as_completed. Damit kann man innerhalb einer Schleife die Tasks in der Reihenfolge bearbeiten, in der die Tasks ihre Ausführung beenden. Das ist im folgenden Listing dargestellt:

import asyncio
import time


async def x(i: float):
    await asyncio.sleep(i)
    return i


async def main():
    print(f"started at {time.strftime('%X')}")
    for f in asyncio.as_completed([x(i) for i in range(10)]):
        print(await f)
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Listing 12: Anwendung von asyncio.as_completed

Mit den async await-Schlüsselwörtern und dem asyncio-Modul bietet Python ein mächtiges Programmierwerkzeug. I/O-Blockaden finden nicht statt und nebenläufige Programme lassen sich ohne komplizierte Thread-Mechanismen beschreiben. Die Entwicklung ist einfacher bei zugleich sehr guter Lesbarkeit, besserem Debugging und effizienterer Ausführung.

In Zukunft dürfte die Beschränkung auf einen einzigen Thread und das Python-typische GIL-Problem (Global Interpreter Lock) keine Rolle mehr spielen. Man denke beispielsweise an die GraalVM oder spezialisierte Event-Loop-Implementierungen, wie sie schon im Internet frei zu finden sind. Ein heute noch mit async await entwickeltes Programm nutzt morgen bereits ohne Anpassungsbedarf die Fähigkeiten einer besseren Event-Loop.

Martin Meeser
ist freiberuflicher Diplominformatiker (Uni) und entwickelt seit seinem neunten Lebensjahr Software, seit 2004 professionell. In zahlreichen Projekten betreute er bisher Kunden unter anderem aus den Bereichen Automotive, Finance, Raumfahrt, Radioastronomie und Medizintechnik
.

(map [2])


URL dieses Artikels:
https://www.heise.de/-6193925

Links in diesem Artikel:
[1] https://www.heise.de/news/Programmiersprache-Python-3-5-erschienen-2811997.html
[2] mailto:map@ix.de