Reactive Programming – vom Hype zum Praxiseinsatz

Seite 2: Implementierungen

Inhaltsverzeichnis

Hier kommen die sogenannten Monaden ins Spiel, die in der funktionalen Programmierung verwendet werden, um aus kleinen Bauteilen bequem größere Module zu komponieren. Hinter Monaden steckt trotz des abstrakten Namens keine Zauberei. Um aus den Futures Monaden zu machen, müssen nur wenige Forderungen erfüllt sein: Erstens braucht der Entwickler eine Methode, um Werte in Futures einzupacken, zweitens eine Methode, um mit Werten in Futures Berechnungen anzustellen, ohne diese via get() aus dem Kontext auspacken zu müssen. Diese heißt zum Beispiel bei Scala map und bei .NET/LINQ Select. Und drittens braucht er eine Methode, um asynchrone Aufrufe aneinanderketten zu können (was dem Schachteln von Callbacks entspricht). Diese heißt bei Scala flatMap und bei .NET/LINQ SelectMany.

So ein monadisches Future liefert Scala glücklicherweise gleich mit. Sieht das Beispiel von oben mit Scala-Futures nun besser aus?

import java.util.concurrent.{TimeUnit, Executors}
import Executors.newSingleThreadExecutor
import scala.concurrent.{ExecutionContext, Future}

case class User(name: String, videos: List[Video])
case class Video(id: String, rating: Rating)
case class Rating(rating: Int)

trait VideoService {
def getVideos(user: User): Future[List[Video]]
def getRating(video: Video): Future[Rating]
}

object Videos extends VideoService {
implicit val ctx = ExecutionContext
.fromExecutorService(newSingleThreadExecutor)

def getVideos(user: User) = async(user.videos)
def getRating(video: Video) = async(video.rating)

private def async[T](op: => T) = Future(op)
}

object Main extends App {
import Videos.ctx

val futureVideos = Videos.getVideos(
User("joe", List(Video("V1", Rating(9)),
Video("V2", Rating(3)))))

val futureRatings = futureVideos map { videos =>
videos map Videos.getRating
} // liefert Future[List[Future[Rating]]] - und nun?

// Callbacks für die Ausgabe
futureRatings onSuccess { case ratings =>
ratings foreach (_ onSuccess { case rating =>
println(rating)
})
}

ctx.awaitTermination(1L, TimeUnit.SECONDS)
ctx.shutdown
}

Ein sich daraus ergebender Vorteil ist, dass die Berechnung nun von der Ausgabe getrennt ist, ohne dass Variablen zu verändern wären. Damit kommen die Callbacks nur noch ganz am Ende zum Einsatz, um die Ergebnisse auf die Konsole auszugeben.

Aber das eigentliche Ziel, dass der Entwickler die Aufrufe möglichst schön zusammensetzen kann, ist leider nicht erreicht. Das Problem ist jetzt, dass Future und List zwei unterschiedliche Monaden sind, die gemischt verwendet werden. Das verkompliziert die Angelegenheit stark, sodass man von einer einfachen Benutzung der API noch weit entfernt sind. Und abgesehen davon hat die API eine weitere Schwäche: Sie liefert nämlich die Liste der Videos zwar asynchron, aber als ein großes Gesamtpaket. Schöner wäre es, die Videos asynchron jeweils zu bekommen, sobald sie zur Verfügung stehen, sprich: als Datenstrom.

Ab hier helfen die Future-Varianten nicht mehr weiter, denn Futures sind auf die Behandlung asynchroner Einzelwerte ausgelegt. Es liegt hier aber ein asynchroner Datenstrom vor, also eine asynchrone Folge von Einzelwerten, die man in ihrer Gesamtheit betrachten möchte. Dafür benötigt der Entwickler eine neue Abstraktion, die genau darauf ausgelegt ist: das "Observable". Dieses ist Grundlage für die sogenannten Reactive Extensions (Rx), die es für .NET schon länger gibt und die Microsoft im November 2012 als Open-Source-Software freigegeben hatte. Rx gibt es neben der .NET-Variante auch noch als RxJS für JavaScript. In der Java-Welt fand man leider lange Zeit nichts Vergleichbares – in jüngster Vergangenheit sind aber einige Projekte entstanden, die in unterschiedlichsten Varianten versuchen, Rx auf die Java Virtual Machine zu portieren. Eines der derzeit erfolgversprechendsten ist RxJava, das Netflix für die interne API in großem Umfang (mehr als zwei Milliarden API-Requests pro Tag) produktiv einsetzt.

Doch zurück zum Observable. Was ist das überhaupt? Zunächst einmal ist es einfach etwas, bei dem man einen Observer registrieren (und über eine Subscription auch deregistrieren) kann. Der Observer wiederum ist ein Interface, das aus drei Callback-Methoden besteht: einer für das Behandeln des jeweils nächsten Werts, einer für Fehler und einer für das Beenden:

trait Observable[+A] {
def subscribe(observer: Observer[A]): Subscription
}

trait Observer[-A] {
def onNext(value: A): Unit
def onError(error: Throwable): Unit
def onCompleted: Unit
}

trait Subscription {
def unsubscribe: Unit
}

Das liest sich zunächst unspektakulär. Parallelen zum synchronen Gegenstück des Iterable fallen auf: Es definiert im Wesentlichen einen Iterator. Und der wiederum ist dazu da, jeweils nächste Werte zu liefern. Fehler verursachen im synchronen Fall einfach Exceptions, und wann er fertig ist, kann man ihn auch fragen. Der Iterator ist dem Observer also sehr ähnlich. Tatsächlich sind das duale Konzepte. Doch so schön das in der Theorie sein mag, stellt sich die Frage, inwiefern diese einfache Definition, die im Wesentlichen ausgerechnet drei Callbacks bereitstellt, aus der Callback-Hölle führen soll?