Benchmarking Spark: Wie sich unterschiedliche Hardware-Parameter auf Big-Data-Anwendungen auswirken

Seite 2: Programmatisches

Inhaltsverzeichnis

Jede Spark-Anwendung besteht aus einem Driver-Programm und einem Set von Executors, verteilt über die Worker Nodes des Spark-Clusters. Das Driver-Programm kümmert sich dabei um die Konfiguration und den Kontrollfluss der Spark-Anwendung. Folgendes Beispiel soll das illustrieren.

val conf = new SparkConf()
.setMaster("master_url") // URL des Masterknoten
.setAppName("SparkExamplesMinimal") // Anwendungsname

val sc = new spark.SparkContext(conf) // SparkContext initialisieren
// ...

Im Beispiel initialisiert das Driver-Programm einen SparkContext, um mit dem Cluster interagieren zu können. Darüber hinaus enthält es die Definition der RDDs, die wiederum die einzelnen Actions und Transformationen auf den Daten definieren. Das Driver-Programm serialisiert dabei den entstehenden Graphen und schickt ihm zum Spark-Masterknoten.

Bei der Programmausführung wird die logische Transformation der Daten über die einzelen Operationen in einen Execution Plan überführt. Einzelne Executors führen die einzelnen Operationen auf einem Rechnerknoten aus. Ein Job enthält alle Tasks die für die Ausführung einer Action benötigt werden. Aufeinander folgende Tasks die von einem Job ausgeführt werden, nennt man in Spark Stages.

Das folgende Beispiel zählt die Wörter eines Texts. Die letzte Operation collect() konvertiert den resultierenden RDD in ein Scala-Array und liefert dieses zurück. RDDs lassen sich mit toDebugString() darstellen.

// ...
// RDD to read input file
val textFile = sc.textFile("/Users/rwartala/Documents/Quellcodes/spark
/20000miles-under-the-sea.txt")
// RDD as a result of a map transformation
val lineLengths = textFile.map(s => s.length)
// persits lineLength in memory
lineLengths.persist()
// Split into words and remove empty lines
val tokenized = textFile.filter(line => line.size > 0).map(line
=> line.split(" "))
// Extract the first word from each line (the log level) and do a count
val counts = tokenized.map(words=>(words(0),1)).reduceByKey{(a,b)=>a+b}
val wordCount = counts.collect()

Ein scala> counts.toDebugString liefert folgende Ausgabe:

res16: String = 
(2) ShuffledRDD[6] at reduceByKey at :25 []
+-(2) MapPartitionsRDD[5] at map at :25 []
| MapPartitionsRDD[4] at map at :23 []
| MapPartitionsRDD[3] at filter at :23 []
| MapPartitionsRDD[1] at textFile at :21 []
| /Users/rwartala/Documents/Quellcodes/spark/
20000miles-under-the-sea.txt HadoopRDD[0] at textFile at :21 []

Über die Spark-Weboberfläche lässt sich sich der Job im Detail untersuchen und auch der DAG grafisch anzeigen.

Sobald innerhalb eines Driver-Programms eine Action aufgerufen wird, erzeugt Spark einen neuen Job. Je nach Execution Plan des zu verarbeitenden RDDs werden einzelne Stages erzeugt, die wiederum einzelne Task auf den Rechnerknoten mit den Executors gegebenenfalls parallel ausführen können. Damit sich diese Operationen parallel erledigen lassen, müssen sich die Daten auf den Rechnern zur Ausführung bereit befinden.

RDDs verwalten die Daten in Form sogenannter Partitions. Jede Partition enthält dabei einen Teil der Daten. Vorstellen lassen sie sich als Teile einer großen Liste. Die Daten innerhalb einer Partition werden sequenziell verarbeitet. Die Partitionen lassen sich hingegen über die Task der Executors parallel durchführen. Folgendes Beispiel soll das veranschaulichen: Der Befehl

val rdd = sc.textFile("hdfs://192.168.172.56/Users/rwartala/
Documents/Quellcodes/spark/20000miles-under-the-sea.txt")

erzeugt einen RDD aus der Textdatei, die sich im Hadoop-Dateisystem befindet. Ein

val tokenized = textFile.filter(line => line.size > 0).map(line 
=> line.split(" "))

splittet die Datei in einzelne Partitionen auf, damit sich diese parallel verarbeiten lassen. Wenn die Daten aus Hadoop kommen, ist die Partitionsgröße in der Regel gleich der Größe eines HDFS-Blocks.

Um die ideale Laufzeitumgebung für die eigene Spark-Anwendung zu finden und das Zusammenspiel von Hauptspeicherbedarf, Netzwerk- und Storagedurchsatz zu untersuchen, wäre eine Testumgebung nötig, die sich in diesen Hardwareparametern individuell konfigurieren lässt. Das wird erst seit dem Aufkommen von Cloud-Anbietern und der Virtualisierung von Hardware-Ressourcen möglich.

Dank Cloud-Anbietern muss man sich nicht selbst die teure Hardware anschaffen und installieren, um ein Rechen-Cluster betreiben zu können. Spark liefert mit spark-ec2 ein Skript, mit dem sich Amazons Elastic Compute Cloud als Cluster-System nutzen lässt.

Mit einem gültigen AWS-Konto kann man mit spark-ec2 vollständig automatisiert ein Cluster konfigurieren, die nötige Software installieren und seine Spark-Anwendungen ausführen. Dazu übergibt man dem Skript eine Reihe von Parametern. Mit ihnen lassen sich nicht nur die Region der Installation bestimmen (z.B. eu-central-1 für Frankfurt, ap-northeast-1 für Tokio oder us-west-1 für Kalifornien), sondern auch die Anzahl und Leistungsfähigkeit der zu startenden Computing-Hardware und des Storage.

Folgender Beispielaufruf soll die Anwendung illustrieren. Dem Skript übergibt man als Erstes die Zugangsschlüssel für den Login und wählt danach die Region und die Zone von Amazons Rechenzentrum aus. Die Parameter --instance-type und --slaves bestimmen dann den Typ der Hardware und die Anzahl der zu nutzenden Worker Nodes.

$ export AWS_ACCESS_KEY_ID=MEIN_AWS_ACCESS_KEY
$ export AWS_SECRET_ACCESS_KEY=MEIN_AWS_SECRET_ACCESS_KEY
$SPARK_HOME/ec2/spark-ec2
--key-pair=data2day-benchmark
--identity-file=data2day-benchmark.pem.txt
--region=eu-west-1 --zone=eu-west-1a
--instance-type=m1.large
--slaves=2
--hadoop-major-version=yarn
--spark-version=1.4.1 launch data2day-spark-cluster

Mit den letzten beiden Optionen --hadoop-major-version und --spark-version wählt man die zu installierende Hadoop- und Spark-Version aus. Der Befehl launch startet dann das Cluster mit dem angegebenen Namen data2day-spark-cluster. Je nach gewählter Anzahl Worker Nodes kann die Instanziierung des Clusters einige Minuten in Anspruch nehmen. Ab jetzt kostet die Nutzung von Amazons Cloud-Infrastruktur Geld und kann je nach gewählter Region und Instanztypen wenige bis einige Dollars kosten.

Ist das Skript fertig, lässt sich die Spark-Web-UI über http://ec2-52-18-92-74.eu-west-1.compute.amazonaws.com:8080 öffnen.

Neben dem Namen des Master Node führt die UI die Anzahl der Worker Nodes mit ihrer Hardwareausstattung auf (Abb. 2)

Gleichzeitig mit der Spark-Installation wurde ein vollständiges Hadoop Distributed File System (HDFS) auf dem gleichen Knoten gestartet. Es ist über die Portnummer 50070 zugänglich mit http://ec2-52-18-92-74.eu-west-1.compute.amazonaws.com:50070/dfshealth.html#tab-overview.

Um das Cluster nutzen zu können, muss man sich auf ihm mit dem login-Befehl anmelden.

spark-ec2 
--key-pair=data2day-benchmark
--identity-file=data2day-benchmark.pem.txt
--region=eu-west-1 --zone=eu-west-1a login data2day-spark-cluster

Nach dem Login findet man sich auf dem Master Node wieder und kann dort sowohl das Cluster-Dateisystem über den hdfs-Befehl als auch Spark-Anwendungen mithilfe von spark-submit verteilen und ausführen. So lässt sich das Beispielprogramm zur Berechnung von Pi mit dem folgenden Code starten:

./spark/bin/spark-submit 
--master spark://ec2-52-19-33-37.eu-west-1.compute.amazonaws.com:7077
--deploy-mode cluster
--name "SparkPi program"
--class org.apache.spark.examples.SparkPi
/root/spark/lib/spark-examples-*.jar 1000

Um das Cluster zu beenden, reicht die Ausführung des spark-ec2-Skripts via Aufruf von destroy.

spark-ec2 
--key-pair=data2day-benchmark
--identity-file=data2day-benchmark.pem.txt
--region=eu-west-1
--zone=eu-west-1a destroy data2day-spark-cluster

Dabei werden alle Worker Nodes terminiert, und es sind keine Gebühren mehr fällig.

Ansicht des HDFS-Dateisystems auf der EC2-Cluster von Spark (Abb. 3)

Mit spark-ec2 lässt sich eine Reihe unterschiedlicher Cluster-Konfigurationen auf einfache Art und Weise nutzen. Es eignet sich als günstige Testumgebung, um Maschinen mit mehr Hauptspeicher und Maschinen mit schnellerem Storage (z. B. SSDs) für die eigene Spark-Anwendung zu konfigurieren.