Benchmarking Spark: Wie sich unterschiedliche Hardware-Parameter auf Big-Data-Anwendungen auswirken

Seite 3: Dependencies

Inhaltsverzeichnis

Die Spark-Web-UI ist für die Analyse der Jobs der erste Ausgangspunkt. Hier lässt sich die Ausführung der Jobs, ihrer einzelnen Stages im Zusammenspiel mit dem Storage, der Spark-Umgebung und den Exekutoren untersuchen. Die Spark-API ermöglicht es Entwicklern, mithilfe der verschiedenen Transformations und Actions Daten auf verschiedenste Art und Weise zu verarbeiten. Je nach verwendeter Operation können die Abhängigkeiten der Daten zwischen den Partitionen der RDDs nah (Narrow Dependencies) oder weit (Wide Dependencies) sein.

Nahe und weite Abhängigkeiten von RDDs am Beispiel einiger Spark Actions (Abb. 4)

Von Datenlokalität (Narrow Dependencies) spricht man dabei nur, wenn die Daten als Partitions bereits auf dem Worker Node vorhanden sind. Oft ist die Ausführung von Operationen auf den Daten davon abhängig, welche auf anderen Worker Nodes gespeichert sind. Zwischenresultate bei Actions und Transformations mit weiten Abhängigkeiten werden auf dem Storage zwischengespeichert. Die Nutzung von Storage zur Zwischenspeicherung oder gar der Transfer der Daten von einem Worker Node auf die anderen bremsen die Datenverarbeitung von Spark extrem ab. Das lässt sich einfach verdeutlichen, macht man sich die unterschiedlichen Größenordnungen der Zugriffe bewusst:

Speicher Lese-/Schreibgeschwindigkeit Zugriffszeit
RAM 100 GByte/s 50,00 ns
SSD 500 GByte/s 0,05 ms
HDD 100 GByte/s 5,00 ms

Zusammenfassend heißt das: Der Zugriff auf den Hauptspeicher ist in der Regel 1000-mal schneller als der Zugriff auf SSDs. Letztere hingegen sind rund 100-mal schneller als der Einsatz von Festplatten. Natürlich sind das nur Richtwerte, die nicht in allen Fällen die Performance abbilden. Als Anhaltspunkt können die Werte aber nützlich sein.

Diese Frage sollte man sich als Erstes stellen, wenn man daran geht, die unterschiedlichen Cluster-Konfigurationen mit Hilfe einer Testumgebung in der Cloud zu analysieren. Trotz eines Spark-Clusters-on-Demand kostet jedes Hochfahren einer Konfiguration Geld. Da schadet es nicht, sich vorher den Speicherverbrauch und die Struktur der eigenen RDDs einmal anzuschauen.

Je nach Art der Anwendung muss man sich über die Hardware-Resourcen des Clusters Gedanken machen. Handelt es sich bei der eigenen Spark-Anwendung um eine CPU-lastige Maschine-Learning-Anwendung, sollten Instanzen mit mehr Kerne und mehr GHz genutzt werden. Sollen viele kleine Partitionen verarbeitet werden und nutzen viele RDDs Transformationen mit Narrow Dependencies, eignet sich ein Cluster aus vielen kleinen Instanzen vielleicht besser als wenige große Instanztypen.

Instanztyp vCPUs Arbeitsspeicher Instanz-Speicher Preis*
m3.xlarge 4 15 GByte 2x 40 GByte SSD $0,266/h
m3.2xlarge 8 30 GByte 2x 80 GByte SSD $0,532/h
c3.4xlarge 16 30 GByte 2x 160 GByte SSD $0,84/h
c3.8xlarge 32 60 GByte 2x 320 GByte SSD $1,68/h

Im Gegensatz zu Machine-Learning-Anwendungen, die viele Datentransformationen im Rahmen ihrer Algorithmen ausführen, sind Programme für die Extraktion, Transformation und sowie das Laden der Daten (ETL) in der Regel I/O-lastiger.

Die Ausführung zweier Anwendungen aus dem Fundus der mitgelieferten Spark-Beispiele sollen das illustrieren. Die erste nutzt die Machine-Learning-Bibliothek MLlib, um eine lineare Regression für eine Anzahl Werte zu erstellen, die sich innerhalb einer Textdatei befinden. Einstellen lässt sich bei dem Beispielprogramm die Anzahl der Iterationen. Es erzeugt dabei ein Trainingsmodell. Dieses Modell wird genutzt, den Fehler mit der mittleren quadratischen Abweichung zu bestimmen (Root Mean Squared Error). Um die Anwendung im Cluster ausführen zu können, müssen die Daten im HDFS liegen. Mit Hilfe von

$ ./ephemeral-hdfs/bin/hdfs dfs -mkdir /data
$ ./ephemeral-hdfs/bin/hdfs dfs -put /root/spark/data/mllib
/sample_linear_regression_data.txt /data/.

lassen sich die lokalen Beispieldaten in Hadoops Dateisystem kopieren. Die Anwendung selbst lässt sich dann wie folgt starten:

$ ./spark/bin/spark-submit 
--class org.apache.spark.examples.mllib.LinearRegression
--name "LinearRegressionTest" spark/lib/spark-examples-*.jar
--numIterations 10000
hdfs://ec2-52-19-43-176.eu-west-1.compute.amazonaws.com:9000
/data sample_linear_regression_data.txt

Folgende Tabelle illustriert die Testergebnisse bei 1000 Iterationen.

Test Nr. Instanztyp Cores Gesamt RAM pro Node Anzahl WorkerNode Laufzeit
1 m1.large 2 2 GByte 3 4,60 Min.
2 m1.large 6 6 GByte 3 4,50 Min.
3 m3.large 6 5,8 GByte 3 0,43 Min.
4 c3.2xlarge 24 13,6 GByte 3 0,29 Min.

Die Testergenisse zeigen, dass im Beispiel zur Berechung der linearen Regression, die Leistung der CPU stärker wiegt als der I/O-Durchsatz des angeschlossenen Storage. Das liegt daran, dass die m3-Instanzen Intels neueste Xeon-Prozessorgeneration nutzen.

Eine Anwendung, bei der es stärker auf I/O ankommt, ist die sogenannte Logdateianalyse. Für den Test soll ein Beispielanwendung von Databricks zum Einsatz kommen, welche die Logdaten eines Apache-Webservers analysieren kann. Die eigentlichen Anwendung, lässt sich über folgende Kommandos installieren:

$ git clone https://github.com/databricks/reference-apps.git
$ cd reference-apps/logs_analyzer/chapter1/scala/

Ist das Scala Build Tool (sbt) noch nicht installiert, kann man das über

$ curl https://bintray.com/sbt/rpm/rpm | sudo tee 
/etc/yum.repos.d/bintray-sbt-rpm.repo
$ sudo yum install sbt

nachholen. Danach benötigt man noch access.log-Daten im HDFS. Für den Test wurde eine Dateigröße von drei Gigabyte gewählt.

$./ephemeral-hdfs/bin/hdfs dfs -put access.log /data/

Den DAG der RDDs zeigt folgende Abbildung:

Überblick über die Spark Worker Nodes (Abb. 5)
Test Nr. Instanztyp Cores Gesamt RAM pro Node Anzahl WorkerNode Laufzeit
5 m1.large 6 5,8 GByte 3 3,60 Min.
6 c3.2xlarge 24 13,6 GByte 3 0,2 Min.

Selbstverständlich lässt sich nicht nur die nötige Hardware auf die Anforderungen der eigenen Spark-Anwendung optimieren. Neben der Spark-UI gibt es weitere Tools, mit deren Hilfe sich das Laufzeitverhalten einer Spark-Anwendung untersuchen lässt. Beispiele dafür sind:

  • YourKit (Java-Profiler)
  • Ganglia (Monitoring-System)
  • jmap (in Java enthaltenes Tool zur Untersuchung von Prozessspeicher)
  • jstat (in Java enthaltenes Tool für Statistiken der JVM)
  • jconsole (grafische Oberfläche für die Analyse von JMX)

Es gibt eine Reihe von Möglichkeiten, Spark-Anwendungen zu tunen. Im Standard nutzt Spark MEMORY-ONLY-Caches zur Deserialisierung von Objekten. Die Nutzung von MEMORYANDDISK-Cache kann vor teuren Neuberechnung bewahren. Stellt die CPU-Leitung im Cluster keinen Flaschenhals dar, wohl aber die I/O bei zu vielen Shuffle-Operationen, lässt sich die Geschwindigkeit mit der LZF-Kompression verbessern (conf.set("spark.io.compression.codec", "lzf")). Alternativ können in solchen Fällen auch mehr Disks die Performance erhöhen.