Channels in Go: Bequem parallel

Seite 2: Der Weg der Daten: Senden und empfangen

Inhaltsverzeichnis

Ein Beispiel: Jemand muss häufig viele Bilder verkleinern; er ist ungeduldig und möchte gern die 16 CPU-Kerne seines Ryzen-Prozessors auslasten. Unter Linux eigentlich kein Problem mit xargs -P16 und dem ImageMagick-Befehl convert. Allerdings stellt sich heraus, dass das Paket golang.org/x/image/draw die Bilder mit CatmullRom.Scale() meist ohne erkennbaren Qualitätsverlust drei- bis fünfmal kleiner rechnet als mit ImageMagick. Damit ist die Entscheidung für ein eigenes Programm gefallen – eine Implementierung in Go könnte im einfachsten Fall so aussehen, wie es das folgende Listing zeigt:

func main() {
	pics, _ := filepath.Glob("*.jpg")
	for _, filename := range pics {
		go do_scale_image(filename)
	}
}

Dieser Code setzt eine primitive Parallelisierung über Go-Routinen um.

Das go vor do_scale_imagefile() startet die Funktion als Go-Routine, also nebenläufig. Go-Routinen sind mit Threads vergleichbar, sie sind aber je nach Runtime ressourcenschonender. Das Laufzeitsystem von Go kümmert sich um das Scheduling der Go-Routinen und die Auslastung der CPUs. Dieses primitive Vorgehen hat jedoch einige Nachteile:

  • Fehlermeldungen geraten durcheinander und werden möglicherweise zerstückelt, also unlesbar.
  • Jede Go-Routine alloziert für sich benötigten Speicher und gibt ihn nach Beendigung wieder frei.

Es hängt vom Laufzeitsystem ab, wie viel Speicher jede dieser Routinen bereits beim Aufruf oder erst dann reserviert, wenn eine CPU frei wird. Bei hunderten großen Bildern könnte das sehr viel Ressourcen benötigen. Außerdem eignet sich ein derart simples Programm nicht für Sonderwünsche – vielleicht soll es zum Beispiel noch für jedes Bild den Kompressionsgrad protokollieren. Paralleles Schreiben in eine Datei erzeugt Chaos, also senden die Go-Routinen ihre Ausgaben über einen Channel msgchan, den das Programm zentral liest:

for msg := range msgchan {
fmt.Fprintln(fd, msg) // fd = Filedeskriptor
}

Die range-Schleife würde mit dem Schließen des Channels verlassen (close(msgchan)), was hier aber nicht passiert. Eine weitere Go-Routine und eine WaitGroup wg können dabei helfen, wie sie im folgenden Listing zu sehen sind:

import (
	"fmt"
	"os"
	"path/filepath"
	"sync"
)

var wg sync.WaitGroup
var msgchan chan string

func main() {
	msgchan = make(chan string)
	pics,_ := filepath.Glob("*.jpg")
	fd,_ := os.Create("protocol.txt")

	for _, filename := range pics {
		wg.Add(1)
		go scale_image(filename)
	}

	go func() {
		for msg := range msgchan {
			fmt.Fprintln(fd, msg)
		}
	}()

	wg.Wait()
	close(msgchan)
	// weitere Aktionen
}

func scale_image(fn string) {
	defer wg.Done()
	...
	msgchan <- message
}

Auf diese Weise können Entwickler Ausgaben geordnet erfassen.

Dabei handelt es sich um ein Semaphor, den das Programm explizit vor jedem Start einer Go-Routine um 1 erhöht. Es führt das defer() automatisch beim Verlassen einer Funktion aus und senkt den Zähler wieder. wg.Wait() blockiert so lange, bis alle erfassten Go-Routinen beendet sind. Das close(msgchan) beendet schließlich die Protokoll-Go-Routine. Allerdings hat auch diese Umsetzung noch Probleme:

  • Die Zahl der Go-Routinen ist nach wie vor nicht begrenzt.
  • Es ist aus technischen Gründen leider nicht möglich, über die WaitGroup die Zahl laufender Go-Routinen abzufragen – Wait() blockiert den Ablauf. Die aufrufende Funktion main() muss also warten.

Eine Standardumsetzung für diese Probleme findet sich im folgenden Listing:

import (
	"fmt"
	"os"
	"path/filepath"
	"runtime"
)

func main() {
	fnchan := make(chan string)
	msgchan := make(chan string)
	done := make(chan bool)

	pics,_ := filepath.Glob("*.jpg")
	fd,_ := os.Create("protocol.txt")
	defer fd.Close()

	numcpu := runtime.NumCPU()

	for i := 0; i < numcpu; i++ {
		go scaler(fnchan, msgchan)
	}

	for _, filename := range pics {
		fnchan <- filename
	}

	close(fnchan)

	// Empfänger-Goroutine
	go func() {
		for i := numcpu; i > 0; {
			for msg := range msgchan {
				if len(msg) == 0 {
					i--
					continue
				}
				fmt.Fprintln(fd, msg)
			}
		}
		done<- true
	}()

	close(msgchan)
	<-done
	// weitere Aktionen
}

func scaler(fnchan, msgchan chan string) {
	for fn := range fnchan {
		...
		msgchan <- message
	}
	msgchan <- "" // end marker
}

Eine Standardlösung, die mit beschränkter Zahl von Routinen arbeitet.