zurück zum Artikel

Softwareentwicklung: Concurrency in Go

Andreas Schröpfer
Gleisanlagen in Maschen im Gegenlicht

Während die Grundlagen zur Nebenläufigkeit einfach erscheinen, gibt es doch einige Hürden, die es zu überwinden gilt. Go weiß lästige Data Races zu vermeiden.

Eines der spannendsten Features von Go ist die Eleganz, mit der die Sprache Concurrency (Nebenläufigkeit) umsetzt. In nebenläufigen Programmen sind die Funktionen so definiert, dass sie sich unabhängig voneinander ausführen oder auch pausieren lassen. Somit kann die Go-Runtime Anweisungen auf mehrere Threads beziehungsweise CPU-Kerne verteilen. Rechner mit mehreren Kernen sind auf diese Weise besser ausgelastet – das ist auch einer der Gründe für die hohe Geschwindigkeit von Go-Programmen.

Die Basis für Concurrency in Go sind Goroutinen, die über Channels miteinander kommunizieren können. Die theoretische Grundlage bildet das CSP-Model (Communicating Sequential Processes) von Tony Hoare, das bereits über 40 Jahre alt ist. Rob Pike, einer der Erfinder von Go, hatte bereits Anfang der 80er-Jahre mit Squeak und Newsqueak Erfahrungen mit diesem Model gesammelt, die in die Umsetzung von Go eingeflossen sind.

Die Definition von Goroutinen ist denkbar einfach – ein go vor dem Funktionsaufruf reicht aus. Channels können Nachrichten eines beliebigen Typs übermitteln und bilden mit diesem zusammen jeweils einen eigenen Typ. Für die Kennzeichnung eines Channels wird chan gefolgt vom Namen des Typs verwendet. chan string bezeichnet den Typ Channel eines Strings beziehungsweise Channel of string. Da Channels im Arbeitsspeicher nach einer eigenen Struktur angelegt sind, gilt es, die Instanz immer mit make() zu erzeugen. Für den String-Channel ist das make(chan string).

Um die Koordination der jeweiligen Goroutinen – wann was auf welchem Thread ausgeführt wird – kümmert sich der Scheduler. Er ist Teil der Go-Runtime und dient als Schnittstelle zwischen den Threads des Systems und dem Go-Programm. Eine Goroutine benötigt somit kaum Systemressourcen, da der Scheduler die Threads sinnvoll verwaltet und im Anschluss die Goroutinen möglichst optimal verteilt. Nebenläufige Programme sind deshalb nicht zwingend auch parallel. Außerdem kann die Verteilung der Goroutinen auf die Threads mit jedem Programmlauf unterschiedlich ausfallen, weshalb das Thema der Synchronisierung von Goroutinen wichtig ist.

Goroutinen sind in Go sehr tief verankert. Jedes Go-Programm startet immer in mindestens einer Goroutine, in der die Main-Funktion läuft. Das lässt sich bei einem Programmabbruch sehr gut nachvollziehen. In Listing 1 bricht das Kommando panic das Programm hart ab. Bei der Ausführung wird zusätzlich der Stacktrace ausgegeben. Dabei zeigt sich, dass die Funktion main in gouroutine 1 ausgeführt wird bzw. wurde. Go ist somit von Grunde auf für Goroutinen ausgelegt.

Listing 1: Programmabbruch in Go

func main() {
	panic("Hello panic!")
}

// Output:
// panic: Hello panic!
// 
// goroutine 1 [running]:
// main.main()
// 	/tmp/sandbox1748122117/prog.go:8 +0x27
// 
// Program exited.

In Listing 2 wird die Funktion routine so definiert, dass sie sich als Goroutine ausführen lässt. Der Output erfolgt über einen Channel. Die Funktion schreibt einen einfachen Text "Hallo Goroutine!" in den Channel. Die Anweisung ist durch den Pfeil <- definiert. Zusätzlich zur Übermittlung von Daten synchronisiert der Channel die Ausführung des Codes der Goroutinen. Der Programmfluss wird beim Sender so lange angehalten, bis die Nachricht auf der anderen Seite ankommt.

make() erzeugt unter main einen Channel. Anschließend startet go die routine als Goroutine. Ab diesem Zeitpunkt läuft routine unabhängig zu main. Unter <-msg wird nun die Nachricht aus dem Channel ausgelesen und der Variable m zugewiesen. Der Programmfluss blockiert hier so lange, bis die Nachricht erfolgreich über den Channel verschickt wurde. Die Ausgabe erfolgt am Ende mit fmt.Println(m).

Listing 2: Einfache Definition einer Goroutine

func routine(out chan string) {
	out <- "Hallo Goroutine!"
}

func main() {
	msg := make(chan string)
	go routine(msg)
	m := <-msg
	fmt.Println(m)
}

Neben der einfachen Syntax gibt es ein paar Punkte bei der nebenläufigen Programmierung zu beachten. Da Channels so lange auf Sender- und Empfängerseite blockieren, bis die Nachricht übermittelt ist, kann der Programmfluss komplett zum Erliegen kommen. Das passiert, wenn eine Seite keine Nachricht schickt oder abholt. Listing 3 erweitert das letzte Beispiel (Listing 2) einfach um ein weiteres Auslesen aus msg. Das hat zur Folge, dass es hier zu einem sogenannten Deadlock kommt, da nur eine Nachricht geschickt wird. Ein fatal error bricht das Programm an diesem Punkt ab.

Listing 3: Deadlock

func main() {
	msg := make(chan string)
	go routine(msg)
	m := <-msg
	fmt.Println(m)
	m = <-msg
}

// Hallo Goroutine!
// fatal error: all goroutines are asleep - deadlock!

Ein weiteres Problem entsteht, wenn Goroutinen ohne Channels nicht aufeinander warten. Da sie nicht blockieren und parallel laufen können, sollte dieser Vorgang Entwicklerinnen und Entwicklern bewusst sein. In Listing 4 erfolgt keine Ausgabe, da main nicht wartet, bis routine mit der Ausgabe anfangen kann. Genau für diese notwendige Synchronisierung verwendet das erste Beispiel aus Listing 1 einen Channel.

Listing 4: Keine Ausgabe

func routine() {
	fmt.Println("Hallo zusammen")
}

func main() {
	go routine()
}

Wenn für das Problem aus Listing 4 kein Channel verwendet werden soll, können Entwickler auch auf das sync-Paket aus der Standardbibliothek zurückgreifen. Als elegante Lösung gibt es dafür die sync.WaitGroup. Sie besitzt die Methoden Add, Done und Wait, welche die Steuerung ermöglichen. Im Listing 5 wird die WaitGroup eingesetzt, deren Anwendung relativ selbsterklärend ist.

Jedoch zeigt sich bei diesem Beispiel ein anderes Problem, das bei nebenläufiger Programmierung vorkommen kann. Denn dort werden fünf Goroutinen gestartet, die den Zähler i ausgeben. Das Ergebnis ist nicht 0 bis 4, sondern meistens fünfmal die 5, wobei das je nach System variieren kann. Das liegt daran, dass die Goroutine von main die Variable i ändert und die fünf im for-Loop gestarteten Goroutinen darauf zugreifen. Denn Go unterstützt über Goroutinen hinweg Closures. Bis die erste Ausgabe von i erfolgt, ist der Loop bereits fertig und i hat den Wert 5. Das Problem wird Data Race genannt und tritt immer dann auf, wenn ein "unsynchronisierter lesender und schreibender Zugriff von zwei oder mehr Goroutinen auf einen Speicherbereich" stattfindet.

Listing 5: Data Race

func main() {
	var wg sync.WaitGroup
	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func() {
			fmt.Println(i)
			wg.Done()
		}()
	}
	wg.Wait()
}
// Output:
// 5
// 5
// 5
// 5
// 5

In einer Produktivumgebung sind Data Races sehr gefährlich, da sie nicht immer offensichtlich im Code erkennbar sind und auch nur manchmal zu falschen Ergebnissen führen. Das macht das Debuggen in einigen Fällen fast unmöglich. Da es auch innerhalb des Go-Teams zu Problemen gekommen ist, wurde mit der Go-Version 1.1 der Race Detector eingeführt. Er prüft während des Ausführens, ob mehrere Goroutinen auf einen Speicherbereich schreibend zugreifen. Der Race Detector wird über das Flag -race aktiviert. Das Starten des Programmes aus Listing 4 mit go run -race main.go findet das Data Race. Die Befehle go test, go build oder go install unterstützen dieses Flag ebenfalls. Deshalb sollten Entwickler alle Tests immer auch mit dem Race Detector ausführen. Wichtig: Der Race Detector deckt lediglich die Data Race auf, die auch während des Ausführens zur Laufzeit auftreten. Nicht aktiv ausgeführter Code läuft nicht in die Analyse ein. In dem Kontext ist auch eine höhere Testabdeckung sinnvoll, da so auch mehr Code geprüft wird.

Listing 6 startet eine Goroutine als sogenannten Worker, der auf Daten aus einem Channel wartet. Hierfür kommt innerhalb einer Loop range zum Einsatz. In Verbindung mit einem Channel wird n der übermittelte Wert aus inc zugewiesen. Das passiert so lange, bis close() den Channel schließt. Der Sender sollte das Schließen übernehmen, da er erkennt, dass keine weiteren Daten mehr folgen und sich der Loop somit über den Channel beenden lässt. Ein range-Loop verarbeitet die Daten aus einem Channel besonders gut. counter schickt das Ergebnis am Ende über den result-Channel. Um Daten für die Goroutine zu produzieren, wird eine for-Schleife verwendet, über die fünf Goroutinen gestartet werden. Alle Goroutinen senden dabei in den Channel inc den Wert 1. Für die Synchronisierung des Abschlusses aller Goroutinen wird die sync.WaitGroup verwendet. Wenn mehrere Goroutinen Daten an einen Channel schicken, heißt dieses Pattern Fan-In. Das Verfahren stellt sicher, dass es zu keinem Data Race kommt, denn am Ende werden Daten, in diesem Fall counter, nur durch eine Goroutine geschrieben und gelesen.

Listing 6: Fan-In Pattern

func main() {
	inc := make(chan int)
	result := make(chan int)
    // Worker der auf Events lauscht
	go func() { 
		counter := 0
		for n := range inc {
			counter += n
		}
		result <- counter
	}()
	wg := sync.WaitGroup{}
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			inc <- 1
			wg.Done()
		}()
	}
	wg.Wait()
    // Channel schließen, wenn keine Daten mehr gesendet werden
	close(inc) 
	fmt.Println(<-result)
}

Es ist möglich, mehrere Channels zu einen zusammenzufassen. In Listing 7 passiert genau das in der Funktion fanIn. Die Signatur fanIn(cs ...chan int) chan int definiert eine variadische Funktion (Funktion mit unbestimmter Arität) mit beliebig vielen Input-Channels. cs ist dabei ein Slice, also []chan int. Der Rückgabewert besteht lediglich aus einem Channel. Das erste Range über cs ist nun eine Loop über das Slice. Im Anschluss wird pro Input-Channel eine eigene Goroutine gestartet. Damit es nicht zu einem Data Race durch das Closure kommt, ist c direkter Input der Goroutine. Anschließend folgt ein Range über den Input-Channel. Der erhaltene Wert wird dann in out geschrieben.

Wenn klar ist, dass keine weiteren Daten mehr kommen, wird out geschlossen. Geschlossene Channel blockieren nicht mehr auf der lesenden Seite. Dadurch ist sichergestellt, dass alle Goroutinen, die auf Daten aus diesem Channel warten, sauber beendet werden. Die Funktion des Schließens erfolgt über eine zusätzliche Goroutine, denn der Zeitpunkt dafür liegt in der Zukunft, nachdem alle Input-Channels geschlossen wurden. Sie wartet, bis die WaitGroup nicht mehr blockiert. Dann wird out geschlossen. Da Goroutinen sehr leicht zu starten sind und intern kaum Ressourcen benötigen, sind Anwendungen mit vielen kleinen Goroutinen typisch für Go.

Listing 7: Beliebige Channel zusammenfassen

func fanIn(cs ...chan int) chan int {
	wg := sync.WaitGroup{}
	out := make(chan int)
	for _, c := range cs {
		wg.Add(1)
		go func(in chan int) {
			for n := range in {
				out <- n
			}
			wg.Done()
		}(c)
	}
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

Die Anwendung der Concurrency-Patterns kann in Verbindung mit einem HTTP-Server erfolgen. Doch was hat er mit Concurrency zu tun? Goroutinen können innerhalb von Paketen versteckt sein. Im HTTP-Paket wird mit jedem Request eine Goroutine gestartet.

> Serve accepts incoming HTTP connections on the listener l, 
creating a new service goroutine for each. 
The service goroutines read requests and then call handler 
to reply to them.

Quelle: go.dev [1]

Als Grundstruktur für den Server dient ein Pattern von Mat Ryer, das alle relevanten Daten und Funktionen innerhalb einer Struktur definiert. Dadurch ist es möglich, innerhalb der Requests auf interne Strukturen oder auch Connections des Servers zuzugreifen. In Listing 8 ist diese Struktur definiert. Die Initialisierung und den Start des Servers hat der Autor in diesem Beispiel in die main-Funktion gesteckt. In größeren Projekten ist es sinnvoll, hier die einzelnen logischen Teile in eigene Funktionen beziehungsweise Methoden zu schreiben. Die Serverstruktur hat neben dem Router und dem HTTP-Server noch einen Counter im Gepäck.

Listing 8: Grundstruktur für den HTTP-Server

type server struct {
	router  *http.ServeMux
	server  *http.Server
	counter int
}

func main() {
	addr := ":8080"
	s := &server{
		router: http.NewServeMux(),
		server: &http.Server{
			Addr:           addr,
			ReadTimeout:    3 * time.Second,
			WriteTimeout:   3 * time.Second,
			MaxHeaderBytes: 1 << 20,
		},
	}
	s.router.HandleFunc("/", s.handleCount())
	s.server.Handler = s.router
	fmt.Printf("Server started at %s\n", s.server.Addr)
	s.server.ListenAndServe()
}

Die Methode handleCount liefert die Handle-Funktion zurück. Dadurch entsteht ein Closure, wodurch es ganz einfach möglich ist, auf die Daten des Servers zuzugreifen. Hier wird der Counter pro Request erhöht und der neue Stand als Response zurückgeliefert. Listing 9 zeigt den zugehörigen Code.

Da jedoch jeder Request in einer eigenen Goroutine läuft, kann es hier zu einem Data Race kommen, wenn zwei Requests gleichzeitig an den Server gestellt werden. Das bedeutet, hier muss man sicherstellen, dass ein geordneter Zugriff von mehreren Goroutinen auf den Counter stattfindet.

Listing 9: Die Handle-Funktion für den Counter

func (s *server) handleCount() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		s.counter++
		io.WriteString(w, fmt.Sprintf("Counter: %03d", s.counter))
	}
}

Diese Vermutung mit einem Browser von Hand zu prüfen, ist so gut wie unmöglich. Jedoch kann ein einfacher Unit-Test (Listing 10) Abhilfe schaffen. Er startet mehrere Goroutinen, die den Handler nebenläufig aufrufen. Dafür wird in einem Testfile main_test.go eine Testfunktion angelegt. ts initialisiert einen leeren Server. Da es in dem internen Test möglich ist, die zu prüfenden Handler-Funktionen direkt aufzurufen, braucht es dafür keinen HTTP-Server. Die Testfunktion lässt sich auf diese Weise möglichst einfach halten. Im nächsten Schritt wird unter handler die zu testende Handler-Funktion aus der Methode handleCount geholt.

Im Anschluss gilt es, die Hilfsfunktion getCounter zu definieren. Für das Ergebnis wird für den http.ResponseWriter der Recorder aus dem httptest-Paket geholt. Er speichert intern die Antwort des Handlers, wodurch das Ergebnis im Test auswertbar ist. Da der Handler keinen Request auswertet, lässt sich in diesem Test einfach ein leerer http.Request übergeben. Die Funktion handler wird anschließend ausgeführt und das Ergebnis aus dem Body in den out-Channel geschrieben.

Listing 10: Test des Handlers auf eine Data Race

func TestServerRace(t *testing.T) {
	ts := &server{}
	handler := ts.handleCount()
	getCounter := func(out chan []byte) {
		w := httptest.NewRecorder()
		handler(w, &http.Request{})
		resp := w.Result()
		body, _ := io.ReadAll(resp.Body)
		out <- body
	}
	out := make(chan []byte)
	go getCounter(out)
	go getCounter(out)
	<-out
	<-out
	want := 2
	if ts.counter != want {
		t.Errorf("Got: %d - Want: %d", ts.counter, want)
	}
}

Der nächste Schritt sieht das Initialisieren des out-Channels vor. Anschließend startet getCounter zweimal als Goroutine, um danach aus dem Channel gelesen zu werden. Die zwei Goroutinen simulieren zwei nahezu gleichzeitige Requests an den Server. Durch <-out wartet die Testfunktion, bis die beiden anderen Goroutinen fertig sind und ignoriert dabei die Werte aus dem Channel. Mit go test lässt sich der Test ausführen und siehe da – alles grün. Der Counter funktioniert anscheinend korrekt. Lediglich der Test des Codes auf ein Data Race steht noch aus. go test -race aktiviert den Race Detector – wie schon vermutet, kommt es bei dem Counter zu einem Data Race (Listing 11).

Listing 11: Test entdeckt Data Race

--- FAIL: TestServerRace (0.00s)
    testing.go:1312: race detected during execution of test
FAIL
exit status 1
FAIL    heise/concserver/raceserver     0.287s

Eine Lösung ist die Verwendung eines Worker, der über einen Channel das Ereignis zum Hochzählen bekommt. Dies entspricht dem vorher vorgestellten Fan-In-Pattern. Da der Handler am Ende den aktuellen Zählerstand ausgeben soll, sollte auch dieser über einen Channel zurückgesendet werden. In dem Fall kommt ein Channel of Channel zum Einsatz. runCounter startet den Worker. Die Goroutine wartet dabei auf Ereignisse aus dem Channel. In dem Fall werden nun Channels für das Ergebnis verschickt. Sobald ein Channel ankommt, erhöht sich der Counter. Der aktuelle Stand ist dem erhaltenen Channel zu entnehmen.

Jeder Request erzeugt innerhalb des Handlers einen Channel, der dann verschickt wird. Der Zählerstand des Counters wird anschließend aus diesem Channel wieder ausgelesen (Listing 12).

Um den Code besser zu strukturieren, wäre es möglich, die Kommunikation über die Channels in eine eigene Funktion auszulagern. Diese Funktion lässt sich direkt vom Handler aus aufrufen und stellt sicher, dass ein nebenläufiger Zugriff nicht zu Data Races führt. Um das zu prüfen, gilt es, den Test noch einmal aufzurufen, der dann auch mit Race Detector erfolgreich ist (Listing 13).

Listing 12: Server nach dem Refactoring

type server struct {
	router  *http.ServeMux
	server  *http.Server
	counter int
	inc     chan chan int
}

func (s *server) runCounter() {
	go func() {
		for resultChan := range s.inc {
			s.counter++
			resultChan <- s.counter
		}
	}()
}

func (s *server) handleCount() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		resultChan := make(chan int)
		s.inc <- resultChan
		io.WriteString(w, fmt.Sprintf("Counter: %03d", <-resultChan))
	}
}

Listing 13: Eigene Funktion für Kommunikation mit Server

func (s *server) incCounter() int {
	resultChan := make(chan int)
	s.inc <- resultChan
	return <-resultChan
}

Eine andere Möglichkeit ist die Verwendung eines Mutex. Damit wird ein Bereich gesperrt, sobald eine Goroutine darauf zugreift. Der Mutex wird zum einem dem server hinzugefügt. Zum anderen lässt sich die incCounter-Methode umschreiben. Diese Option hat den großen Vorteil, dass der Code einfacher zu verstehen ist und auch ohne das Konstrukt eines Channels auskommt (Listing 14).

Für einfache Operationen, wie das Hochzählen eines Counters, gibt es noch eine weitere Option. Dabei bietet sich das atomic-Paket an. Mit der Funktion AddInt32() lässt sich ein passender Integer-Wert mit einem int32 addieren. Dadurch wird die Methode incCounter um einiges einfacher. Auch hier ist der Test erfolgreich und zeigt kein Data Race an (Listing 15).

Listing 14: Verwendung eines mutex

type server struct {
	router  *http.ServeMux
	server  *http.Server
	counter int
	mtx     *sync.RWMutex
}

func (s *server) incCounter() int {
	s.mtx.Lock()
	defer s.mtx.Unlock()
	s.counter++
	return s.counter
}

Listing 15: Sicherer Counter mit dem atomic-Paket

func (s *server) incCounter() int32 {
	return atomic.AddInt32(&s.counter, 1)
}

Diese kleinen Beispiele haben einen Einblick in die Verwendung von Concurrency in Go gegeben. Die Grundlagen des CSP-Models sind dabei einfach, jedoch gibt es genügend Fallstricke. Besonders bei Server-Anwendungen kann es bei hoher Last zu parallelen Anfragen kommen. Hier ist es wichtig, durch Tests zu prüfen, ob es dadurch zu Data Races kommen kann. Dafür bietet Go mit den integrierten Tests und dem Race Detector gute Methoden, diese Bugs schon während der Entwicklung zu finden. Für die Behebung der Data Races können Pakete aus der Standardbibliothek oder Patterns mit Channels zum Einsatz kommen. Die optimale Lösung ist dabei vom jeweiligen Fall abhängig. Zu den jeweiligen angesprochenen Themen gibt es auf der Webseite von Go weitere spannende und hilfreiche Beiträge [2].

Andreas Schröpfer
ist seit über zehn Jahren in der IT-Beratung tätig und seit 2015 begeisterter Gopher. Er ist Contributor bei mehreren Open-Source-Projekten; darunter Go Buffalo. Er gibt Workshops zu Go, ist Mentor bei excercism.io, unterrichtet auch auf Udemy und ist Buchautor zu Go.

(map [3])


URL dieses Artikels:
https://www.heise.de/-7152320

Links in diesem Artikel:
[1] https://pkg.go.dev/net/http#Serve
[2] https://go.dev
[3] mailto:map@ix.de