Reactive Programming – vom Hype zum Praxiseinsatz

Seite 3: Rx-Operatoren

Inhaltsverzeichnis

Hierbei helfen die vielen in Rx auf dem Observable definierten Operatoren, die denen auf Iterable definierten in ihrer Verwendung ähnlich sind. Für einfache Transformationen der Werte, die in Observables stecken, gibt es beispielsweise den map-Operator. Er nimmt als Parameter eine Transformationsfunktion.

Transformieren von Observables via "map" (Abb. 1)

Hier ist map im Einsatz mit Hilfe von RxJava:

import rx.lang.scala.Observable
val chars = Observable("a", "b", "c")
chars.map(_.toUpperCase).subscribe(onNext = println)
// liefert "A", "B", "C"

Spannender werden Operatoren, sobald sie mehrere Observables kombinieren sollen. Wenn der Entwickler beispielsweise zwei Observables vom selben Typ hat, die er zu einem einzelnen zusammenfügen will, hat er dafür den merge-Operator:

Einfaches Zusammenfügen zweier Observables vom selben Typ (Abb. 2)
import rx.lang.scala.Observable
val chars = Observable("a", "b", "c")
val caps = Observable("A", "B", "C")
chars merge caps subscribe(onNext = println)
// liefert "a", "b", "c", "A", "B", "C"

Oft hat man mehrere unterschiedliche Observables, die kombiniert werden wollen. Im Videobeispiel wäre es etwa denkbar, dass man zu jedem Video jeweils Bewertungen und Kommentare kombinieren möchte. In so einem Fall kommt der zip-Operator ins Spiel. Er verwandelt mit einer Transformationsfunktion, die auf den einzelnen Observable-Werten arbeitet, zwei Eingabe- in ein Ausgabe-Observable. Dabei sorgt zip dafür, dass die Observables ordentlich verzahnt sind.

import rx.lang.scala.Observable
val chars = Observable("a", "b", "c")
val nums = Observable(1, 2, 3)
chars zip nums subscribe(onNext = println)
// liefert ("a", 1), ("b", 2), ("c", 3)

Will man die Verzahnung nicht, weil die Werte der beiden Eingabe-Observables nicht direkt zusammenhängen, gibt es dafür wieder andere ähnliche Operatoren wie combineLatest. Im Fall von Scala, wo es Tupel gibt, werden die Eingabe-Observables direkt zu einem Tupel kombiniert, statt eine Transformationsfunktion anzuwenden. Letzteres wäre mit map in einem weiteren nachgelagerten Schritt möglich, wenn der Entwickler es benötigt.

Zwei verschiedene Observables zu einem verzahnten umwandeln (Abb. 3)

Zu guter Letzt sei noch der Operator flatMap (auch ähnlich wie in .NET selectMany oder mapMany genannt) vorgestellt, der dazu dient, zwei geschachtelte Observables zu einem umzuwandeln. Er behandelt den Fall, dass der Entwickler zu jedem einzelnen Wert eines Eingabe-Observables eine Funktion aufrufen kann, die jeweils ein weiteres Observable zurückliefert. All diese Rückgabe-Observables werden vom flatMap-Operator in einem Ausgabe-Observable gesammelt.

Aufsammeln von Sekundär-Observables via "flatMap"/"mapMany" (Abb. 4)

Wie die Ergebnis-Observables von flatMap hintereinander gehängt werden, zeigt das folgende Listing:

import rx.lang.scala.Observable
val chars = Observable("a", "b", "c")
val addUpper = (s: String) => Observable(s, s.toUpperCase)
chars flatMap addUpper subscribe(onNext = println)
// liefert "a", "A", "b", "B", "c", "C"

Das Video-Beispiel ist ein typischer Fall für den flatMap-Operator: Die Videos werden als Observable geholt, und zu jedem Video wird eine Funktion aufgerufen, die asynchron die Bewertung holen soll. Diese Funktion liefert die Bewertung ebenfalls als Observable zurück. Das bedeutet, dass pro Video ein verschachteltes Observable mit Bewertungen zurückgeliefert wird (auch wenn im Beispiel je nur eine Bewertung zu erwarten ist). flatMap hilft, daraus ein einzelnes Observable zu erzeugen, das die Bewertungen der Videos nacheinander zurückliefert. Damit vereinfacht sich das Holen der Bewertungen im obigen Videoservice-Beispiel wie folgt, wenn man komplett auf Observables und RxJava umstellt:

import rx.lang.scala.Observable

case class User(name: String, videos: List[Video])
case class Video(id: String, rating: Rating)
case class Rating(rating: Int)

trait VideoService {
def getVideos(user: User): Observable[Video]
def getRating(video: Video): Observable[Rating]
}

object Videos extends VideoService {
def getVideos(user: User) =
Observable(user.videos: _*)
def getRating(video: Video) =
Observable(video.rating)
}

object Main extends App {
val videos = Videos.getVideos(
User("joe", List(Video("V1", Rating(9)),
Video("V2", Rating(3)))))

val ratings = videos flatMap Videos.getRating
ratings subscribe (onNext = println)
}