Channels in Go: Bequem parallel
Seite 2: Der Weg der Daten: Senden und empfangen
Ein Beispiel: Jemand muss häufig viele Bilder verkleinern; er ist ungeduldig und möchte gern die 16 CPU-Kerne seines Ryzen-Prozessors auslasten. Unter Linux eigentlich kein Problem mit xargs -P16
und dem ImageMagick-Befehl convert
. Allerdings stellt sich heraus, dass das Paket golang.org/x/image/draw die Bilder mit CatmullRom.Scale()
meist ohne erkennbaren Qualitätsverlust drei- bis fünfmal kleiner rechnet als mit ImageMagick. Damit ist die Entscheidung für ein eigenes Programm gefallen – eine Implementierung in Go könnte im einfachsten Fall so aussehen, wie es das folgende Listing zeigt:
func main() {
pics, _ := filepath.Glob("*.jpg")
for _, filename := range pics {
go do_scale_image(filename)
}
}
Dieser Code setzt eine primitive Parallelisierung über Go-Routinen um.
Das go
vor do_scale_imagefile()
startet die Funktion als Go-Routine, also nebenläufig. Go-Routinen sind mit Threads vergleichbar, sie sind aber je nach Runtime ressourcenschonender. Das Laufzeitsystem von Go kümmert sich um das Scheduling der Go-Routinen und die Auslastung der CPUs. Dieses primitive Vorgehen hat jedoch einige Nachteile:
- Fehlermeldungen geraten durcheinander und werden möglicherweise zerstückelt, also unlesbar.
- Jede Go-Routine alloziert für sich benötigten Speicher und gibt ihn nach Beendigung wieder frei.
Es hängt vom Laufzeitsystem ab, wie viel Speicher jede dieser Routinen bereits beim Aufruf oder erst dann reserviert, wenn eine CPU frei wird. Bei hunderten großen Bildern könnte das sehr viel Ressourcen benötigen. Außerdem eignet sich ein derart simples Programm nicht für Sonderwünsche – vielleicht soll es zum Beispiel noch für jedes Bild den Kompressionsgrad protokollieren. Paralleles Schreiben in eine Datei erzeugt Chaos, also senden die Go-Routinen ihre Ausgaben über einen Channel msgchan
, den das Programm zentral liest:
for msg := range msgchan {
fmt.Fprintln(fd, msg) // fd = Filedeskriptor
}
Die range
-Schleife würde mit dem Schließen des Channels verlassen (close(msgchan))
, was hier aber nicht passiert. Eine weitere Go-Routine und eine WaitGroup wg
können dabei helfen, wie sie im folgenden Listing zu sehen sind:
import (
"fmt"
"os"
"path/filepath"
"sync"
)
var wg sync.WaitGroup
var msgchan chan string
func main() {
msgchan = make(chan string)
pics,_ := filepath.Glob("*.jpg")
fd,_ := os.Create("protocol.txt")
for _, filename := range pics {
wg.Add(1)
go scale_image(filename)
}
go func() {
for msg := range msgchan {
fmt.Fprintln(fd, msg)
}
}()
wg.Wait()
close(msgchan)
// weitere Aktionen
}
func scale_image(fn string) {
defer wg.Done()
...
msgchan <- message
}
Auf diese Weise können Entwickler Ausgaben geordnet erfassen.
Dabei handelt es sich um ein Semaphor, den das Programm explizit vor jedem Start einer Go-Routine um 1 erhöht. Es führt das defer()
automatisch beim Verlassen einer Funktion aus und senkt den Zähler wieder. wg.Wait()
blockiert so lange, bis alle erfassten Go-Routinen beendet sind. Das close(msgchan)
beendet schließlich die Protokoll-Go-Routine. Allerdings hat auch diese Umsetzung noch Probleme:
- Die Zahl der Go-Routinen ist nach wie vor nicht begrenzt.
- Es ist aus technischen Gründen leider nicht möglich, über die
WaitGroup
die Zahl laufender Go-Routinen abzufragen –Wait()
blockiert den Ablauf. Die aufrufende Funktionmain()
muss also warten.
Ein erster Ansatz
Eine Standardumsetzung für diese Probleme findet sich im folgenden Listing:
import (
"fmt"
"os"
"path/filepath"
"runtime"
)
func main() {
fnchan := make(chan string)
msgchan := make(chan string)
done := make(chan bool)
pics,_ := filepath.Glob("*.jpg")
fd,_ := os.Create("protocol.txt")
defer fd.Close()
numcpu := runtime.NumCPU()
for i := 0; i < numcpu; i++ {
go scaler(fnchan, msgchan)
}
for _, filename := range pics {
fnchan <- filename
}
close(fnchan)
// Empfänger-Goroutine
go func() {
for i := numcpu; i > 0; {
for msg := range msgchan {
if len(msg) == 0 {
i--
continue
}
fmt.Fprintln(fd, msg)
}
}
done<- true
}()
close(msgchan)
<-done
// weitere Aktionen
}
func scaler(fnchan, msgchan chan string) {
for fn := range fnchan {
...
msgchan <- message
}
msgchan <- "" // end marker
}
Eine Standardlösung, die mit beschränkter Zahl von Routinen arbeitet.