Apache Spark 2.0: Zweiter Akt einer Erfolgsgeschichte

Seite 2: Datasets

Inhaltsverzeichnis

Bereits im ersten Major-Release erhielt Spark mit der Spark SQL eine Art Schweizer Taschenmesser zur Verarbeitung strukturierter Daten, die in eine Tabellenform passen. Obwohl Daten in unstrukturierter Form rapide zunehmen, liegen sie in vielen Anwendungsfällen in strukturierter Form vor oder lassen sich in eine solche bringen. Spark SQL nutzt die zusätzlichen Strukturinformationen.

Neben dem Laden und Speichern solcher Daten sind auch Abfragen in SQL beziehungsweise HQL möglich, deren Ergebnisse als DataFrames vorliegen. Diese stellen einen höheren Abstraktionsgrad bereit als RDDs und ersetzten ab Version 1.3 die SchemaRDDs. Die DataFrame API bietet zusätzlich zu den Möglichkeiten der RDD API mehr Komfort bei der Abarbeitung typischer relationaler Berechnungen wie der Bestimmung von Mittelwerten, Summen oder Aggregationen. Außerdem kann Spark die zusätzlich verfügbaren Strukturinformationen beim Speichermanagement nutzen, Codeoptimierung durchführen und Abfragen vor dem Laden der zutreffenden Daten an die Datenquelle weiterreichen. Dazu kommt Sparks Catalyst Optimizer zum Einsatz. Daher sind DataFrames RDDs hinsichtlich der Performance deutlich überlegen.

Eine gravierende Schwachstelle beim Verwenden von DataFrames besteht allerdings darin, dass sie im Grunde RDDs von Row-Objekten sind, die wiederum einfach Arrays vom Typ Any (Java: Object) sind – also allgemeine, unspezifizierte Objekte. Kurzum: Trotz vorhandenem Datenschema ist die Typinformation zu den Spalten zur Compile-Zeit nicht bekannt. Damit sind die Entwickler dafür verantwortlich, die aus einer Row stammenden Werte bei der Verwendung im restlichen Java- beziehungsweise Scala-Code in den richtigen Typ der stark typisierten Sprachen umzuwandeln. Ansonsten kommt es im schlimmsten Fall zu einer ClassCastException, nachdem ein Job mehrere Stunden im Cluster gelaufen ist und bereits mehrere Gigabyte verarbeitet hat.

Dagegen helfen Datasets, die bereits in Version 1.6 experimentell Eingang in Spark hielten und ab Version 2.0 offizieller Bestandteil sind – inklusive einer erweiterten API. Sie vereinen die Vorteile von DataFrames (Performance und die Option, relational arbeiten zu können) mit den Vorzügen der von Java beziehungsweise Scala gewohnten Typsicherheit. Für die Python- und R-Programmierschnittstelle von Spark gibt es übrigens weiterhin nur DataFrames, da die beiden Sprachen dynamisch typisiert sind.

Datasets sind Instanzen der generischen Klasse Dataset[T], deren Typparameter T Entwickler gegenwärtig an primitive Typen wie int und String, Produkttypen wie Row oder – als Besonderheit – den Klassentyp einer Scala case class oder einer Java Bean binden können. Die Übersetzung der JVM-Objekte in die interne Repräsentation von Spark SQL geschieht anhand sogenannter Encoder. In Scala sind sie via implizitem Import verfügbar und in Java über statische Methoden.

Statt die strukturierte Daten als Felder in einem nicht weiter typisierten Array abzulegen, können Entwickler sie nun als Felder in einer Instanz einer speziellen case-Klasse ablegen, ohne die Typinformation aus der Struktur zu verlieren. Ein DataFrame ist einfach ein Dataset[Row] und bietet weiterhin eine untypisierte Sicht.

Die folgenden Beispiele verwenden eine CSV-Datei, die Nährwertangaben zu Lebensmitteln enthält. Das Einlesen erfolgt über das SparkSession-Objekt. In vielen Fällen kann Spark das Schema der Daten beim Einlesen ableiten. Gelegentlich müssen Entwickler aber nachhelfen und wie im folgenden Code das Schema explizit programmatisch angeben. Spark würde ansonsten alle Datentypen als String ermitteln, obwohl die Nährwertangaben als Double-Werte in den üblichen Einheiten vorliegen. Ähnliches gilt für den booleschen Wert isFruit, der angibt, ob es sich bei dem Lebensmittel um eine Frucht handelt.

val id = StructField("id", DataTypes.IntegerType) 
val name = StructField("name", DataTypes.StringType)
val energy = StructField("energy", DataTypes.DoubleType)
val calories = StructField("calories",
DataTypes.DoubleType)
val protein = StructField("protein", DataTypes.DoubleType)
val carbohydrate = StructField("carbohydrate",
DataTypes.DoubleType)
val fat = StructField("fat", DataTypes.DoubleType)
val dietaryFibres = StructField("dietaryFibres",
DataTypes.DoubleType)
val cholesterol = StructField("cholesterol",
DataTypes.DoubleType)
val waterContent = StructField("waterContent",
DataTypes.IntegerType)
val isFruit = StructField("isFruit", DataTypes.BooleanType)
val fields = Array(id, 
name,
energy,
calories,
protein,
carbohydrate,
fat,
dietaryFibres,
cholesterol,
waterContent,
isFruit)
val schema = StructType(fields)
val df = spark.read
.schema(schema)
.option("header", true)
.csv(/* Path to CSV file 'fruits.csv' */)
df.printSchema()

Bereits bei den folgenden Schritten treten die genannten Fallstricke bei DataFrames auf:

// Typ bei schnellem Refactoring falsch angepasst; 
// Absturz zur Laufzeit:
val maxCalories = df.rdd.map(row =>
row.getAs[String]("calories")).max
// Kompiliert, stürzt aber zur Laufzeit ab:
df.select("calorie")

Sind die externen Daten korrekt eingelesen, können Entwickler wie im folgenden Listing durch Verwendung von as festlegen, dass die Spaltennamen eines DataFrames mit den Feldbezeichnern einer case-Klasse zur Deckung gebracht werden. Ab dem Punkt übernimmt wieder das statische Typsystem die Kontrolle. Zudem können in einigen Fällen für den Zugriff die Punktnotation und Lambda-Ausdrücke zum Einsatz kommen. Die IDE bietet dadurch Autovervollständigung an und erspart Entwicklern die manuelle Eingabe der korrekten Spaltennamen. Dass Datasets nicht alle Probleme beheben, die bei DataFrames auftreten, zeigt das Listing ebenfalls:

case class Food(id: Int,
name: String,
energy: Double,
calories: Double,
protein: Double,
carbohydrate: Double,
fat: Double,
dietaryFibres: Double,
cholesterol: Double,
waterContent: Int,
isFruit: Boolean)
// Import der für as benötigen Encoder:
import spark.implicits._
val ds: Dataset[Food] = df.as[Food]
// Liefert einen zu `df` inhaltsgleichen DataFrame zurück:
ds.toDF()
val maxCaloriesWithDs = ds.map(_.calories).rdd.max
// Immer noch fehleranfällig:
ds.select("calorie")
// Verbesserung mit Lambda (`map` statt `select`):
val calories: Dataset[Double] = ds.map(_.calories)
val lowCaloriesFood: Dataset[String] = 
ds.filter(_.calories < 55).map(_.name)
val dailyFatValues: Dataset[Double] =
ds.map(food => (food.fat / 65) * 100)

Der Einsatz von Lambdas greift bei filter, map (sowie mapPartitions) und flatMap. Um aber Spalten mit select auszulesen, müssen Entwickler immer noch den richtigen Spaltenamen als String übergeben. Außerdem empfiehlt sich weiterhin der Einsatz von SQL oder UDFs (nutzerdefinierte Funktionen), um beispielsweise die berechneten täglich empfohlenen Fettwerte als neue Spalte hinzufügen, wie das folgende Listing zeigt. Die Umsetzung mit case-Klassen ist ebenfalls möglich, aber der Artikel verzichtet aus Platzgründen darauf:

val dailyValueFat = udf {
(fat: Double) => (fat / 65) * 100
}
ds.withColumn("dvFat", dailyValueFat($"fat")).show()
ds.createOrReplaceTempView("foods")
spark.sql("SELECT *, (fat / 65) * 100 AS dvFat
FROM foods").show()

Sinnvoll ist der Einsatz von Lambdas und der Punktnotation zum Schreiben typsicherer Aggregationen mit der Dataset-API, wie das folgende Listing zeigt, das die Anzahl der Früchte beziehungsweise der anderen Lebensmittel, bei denen isFruit false ist, zählt und für jede Gruppe die jeweilige Gesamtsumme für Energie, Kalorien und Fett ermittelt:

import
org.apache.spark.sql.expressions.scalalang.typed.
{count => typedCount, sum => typedSum}
ds.groupByKey(food => food.isFruit)
.agg(typedCount[Food](_.id).name("count"),
typedSum[Food](_.energy).name("sum(energy)"),
typedSum[Food](_.calories).name("sum(calories)"),
typedSum[Food](_.fat).name("sum(fat)"))
.withColumnRenamed("value", "isFruit")
.show()
// Alternative Abfrage via String als Query; 
// Potential für Fehler
val query = s"""|SELECT
|isFruit, COUNT(id) AS count,
|SUM(energy), SUM(calories), SUM(fat)
|FROM foods
|GROUP BY isFruit
""".stripMargin
spark.sql(query).show()

Das ist vor allem bei komplizierten Abfragen hilfreich, da falsche Formulierungen bereits beim Kompilieren auffallen. Andererseits müssen Entwickler das Typsystem einigermaßen beherrschen, um beispielsweise Scalas Typinferenzalgorithmus durch die zusätzliche Angabe von Typargumenten zu helfen: Beim Ersetzen von typedSum[Food] durch typedSum im obigen Beispiel, kann Scala allein die benötigen Typen nicht ableiten, und das Kompilieren schlägt fehl.