Softwareentwicklung: Concurrency in Go

Seite 2: Problem: Data Race

Inhaltsverzeichnis

Jedoch zeigt sich bei diesem Beispiel ein anderes Problem, das bei nebenläufiger Programmierung vorkommen kann. Denn dort werden fünf Goroutinen gestartet, die den Zähler i ausgeben. Das Ergebnis ist nicht 0 bis 4, sondern meistens fünfmal die 5, wobei das je nach System variieren kann. Das liegt daran, dass die Goroutine von main die Variable i ändert und die fünf im for-Loop gestarteten Goroutinen darauf zugreifen. Denn Go unterstützt über Goroutinen hinweg Closures. Bis die erste Ausgabe von i erfolgt, ist der Loop bereits fertig und i hat den Wert 5. Das Problem wird Data Race genannt und tritt immer dann auf, wenn ein "unsynchronisierter lesender und schreibender Zugriff von zwei oder mehr Goroutinen auf einen Speicherbereich" stattfindet.

Listing 5: Data Race

func main() {
	var wg sync.WaitGroup
	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func() {
			fmt.Println(i)
			wg.Done()
		}()
	}
	wg.Wait()
}
// Output:
// 5
// 5
// 5
// 5
// 5

In einer Produktivumgebung sind Data Races sehr gefährlich, da sie nicht immer offensichtlich im Code erkennbar sind und auch nur manchmal zu falschen Ergebnissen führen. Das macht das Debuggen in einigen Fällen fast unmöglich. Da es auch innerhalb des Go-Teams zu Problemen gekommen ist, wurde mit der Go-Version 1.1 der Race Detector eingeführt. Er prüft während des Ausführens, ob mehrere Goroutinen auf einen Speicherbereich schreibend zugreifen. Der Race Detector wird über das Flag -race aktiviert. Das Starten des Programmes aus Listing 4 mit go run -race main.go findet das Data Race. Die Befehle go test, go build oder go install unterstützen dieses Flag ebenfalls. Deshalb sollten Entwickler alle Tests immer auch mit dem Race Detector ausführen. Wichtig: Der Race Detector deckt lediglich die Data Race auf, die auch während des Ausführens zur Laufzeit auftreten. Nicht aktiv ausgeführter Code läuft nicht in die Analyse ein. In dem Kontext ist auch eine höhere Testabdeckung sinnvoll, da so auch mehr Code geprüft wird.

Listing 6 startet eine Goroutine als sogenannten Worker, der auf Daten aus einem Channel wartet. Hierfür kommt innerhalb einer Loop range zum Einsatz. In Verbindung mit einem Channel wird n der übermittelte Wert aus inc zugewiesen. Das passiert so lange, bis close() den Channel schließt. Der Sender sollte das Schließen übernehmen, da er erkennt, dass keine weiteren Daten mehr folgen und sich der Loop somit über den Channel beenden lässt. Ein range-Loop verarbeitet die Daten aus einem Channel besonders gut. counter schickt das Ergebnis am Ende über den result-Channel. Um Daten für die Goroutine zu produzieren, wird eine for-Schleife verwendet, über die fünf Goroutinen gestartet werden. Alle Goroutinen senden dabei in den Channel inc den Wert 1. Für die Synchronisierung des Abschlusses aller Goroutinen wird die sync.WaitGroup verwendet. Wenn mehrere Goroutinen Daten an einen Channel schicken, heißt dieses Pattern Fan-In. Das Verfahren stellt sicher, dass es zu keinem Data Race kommt, denn am Ende werden Daten, in diesem Fall counter, nur durch eine Goroutine geschrieben und gelesen.

Listing 6: Fan-In Pattern

func main() {
	inc := make(chan int)
	result := make(chan int)
    // Worker der auf Events lauscht
	go func() { 
		counter := 0
		for n := range inc {
			counter += n
		}
		result <- counter
	}()
	wg := sync.WaitGroup{}
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			inc <- 1
			wg.Done()
		}()
	}
	wg.Wait()
    // Channel schließen, wenn keine Daten mehr gesendet werden
	close(inc) 
	fmt.Println(<-result)
}

Es ist möglich, mehrere Channels zu einen zusammenzufassen. In Listing 7 passiert genau das in der Funktion fanIn. Die Signatur fanIn(cs ...chan int) chan int definiert eine variadische Funktion (Funktion mit unbestimmter Arität) mit beliebig vielen Input-Channels. cs ist dabei ein Slice, also []chan int. Der Rückgabewert besteht lediglich aus einem Channel. Das erste Range über cs ist nun eine Loop über das Slice. Im Anschluss wird pro Input-Channel eine eigene Goroutine gestartet. Damit es nicht zu einem Data Race durch das Closure kommt, ist c direkter Input der Goroutine. Anschließend folgt ein Range über den Input-Channel. Der erhaltene Wert wird dann in out geschrieben.

Wenn klar ist, dass keine weiteren Daten mehr kommen, wird out geschlossen. Geschlossene Channel blockieren nicht mehr auf der lesenden Seite. Dadurch ist sichergestellt, dass alle Goroutinen, die auf Daten aus diesem Channel warten, sauber beendet werden. Die Funktion des Schließens erfolgt über eine zusätzliche Goroutine, denn der Zeitpunkt dafür liegt in der Zukunft, nachdem alle Input-Channels geschlossen wurden. Sie wartet, bis die WaitGroup nicht mehr blockiert. Dann wird out geschlossen. Da Goroutinen sehr leicht zu starten sind und intern kaum Ressourcen benötigen, sind Anwendungen mit vielen kleinen Goroutinen typisch für Go.

Listing 7: Beliebige Channel zusammenfassen

func fanIn(cs ...chan int) chan int {
	wg := sync.WaitGroup{}
	out := make(chan int)
	for _, c := range cs {
		wg.Add(1)
		go func(in chan int) {
			for n := range in {
				out <- n
			}
			wg.Done()
		}(c)
	}
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

Die Anwendung der Concurrency-Patterns kann in Verbindung mit einem HTTP-Server erfolgen. Doch was hat er mit Concurrency zu tun? Goroutinen können innerhalb von Paketen versteckt sein. Im HTTP-Paket wird mit jedem Request eine Goroutine gestartet.

> Serve accepts incoming HTTP connections on the listener l, 
creating a new service goroutine for each. 
The service goroutines read requests and then call handler 
to reply to them.

Quelle: go.dev

Als Grundstruktur für den Server dient ein Pattern von Mat Ryer, das alle relevanten Daten und Funktionen innerhalb einer Struktur definiert. Dadurch ist es möglich, innerhalb der Requests auf interne Strukturen oder auch Connections des Servers zuzugreifen. In Listing 8 ist diese Struktur definiert. Die Initialisierung und den Start des Servers hat der Autor in diesem Beispiel in die main-Funktion gesteckt. In größeren Projekten ist es sinnvoll, hier die einzelnen logischen Teile in eigene Funktionen beziehungsweise Methoden zu schreiben. Die Serverstruktur hat neben dem Router und dem HTTP-Server noch einen Counter im Gepäck.

Listing 8: Grundstruktur für den HTTP-Server

type server struct {
	router  *http.ServeMux
	server  *http.Server
	counter int
}

func main() {
	addr := ":8080"
	s := &server{
		router: http.NewServeMux(),
		server: &http.Server{
			Addr:           addr,
			ReadTimeout:    3 * time.Second,
			WriteTimeout:   3 * time.Second,
			MaxHeaderBytes: 1 << 20,
		},
	}
	s.router.HandleFunc("/", s.handleCount())
	s.server.Handler = s.router
	fmt.Printf("Server started at %s\n", s.server.Addr)
	s.server.ListenAndServe()
}

Die Methode handleCount liefert die Handle-Funktion zurück. Dadurch entsteht ein Closure, wodurch es ganz einfach möglich ist, auf die Daten des Servers zuzugreifen. Hier wird der Counter pro Request erhöht und der neue Stand als Response zurückgeliefert. Listing 9 zeigt den zugehörigen Code.

Da jedoch jeder Request in einer eigenen Goroutine läuft, kann es hier zu einem Data Race kommen, wenn zwei Requests gleichzeitig an den Server gestellt werden. Das bedeutet, hier muss man sicherstellen, dass ein geordneter Zugriff von mehreren Goroutinen auf den Counter stattfindet.

Listing 9: Die Handle-Funktion für den Counter

func (s *server) handleCount() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		s.counter++
		io.WriteString(w, fmt.Sprintf("Counter: %03d", s.counter))
	}
}