Reactive Programming – vom Hype zum Praxiseinsatz

Das Thema "Reactive" ist en vogue. Doch was genau versteht man hinter diesem Programmierstil? Und wie entwickelt man etwas, sodass es als "reactive" gelten darf?

In Pocket speichern vorlesen Druckansicht 9 Kommentare lesen
Lesezeit: 14 Min.
Von
  • Joachim Hofer
Inhaltsverzeichnis

Das Thema "Reactive" ist en vogue. Doch was genau versteht man hinter diesem Programmierstil? Und wie entwickelt man etwas, sodass es als "reactive" gelten darf?

Mitte Juli 2013 wurde das "Reactive Manifesto" veröffentlicht. Mittlerweile haben es eüber 3000 Entwickler unterzeichnet – Tendenz stark steigend. Zunächst einmal soll dem Manifest zufolge eine Anwendung jederzeit auf Stimuli reaktionsfähig sein, damit man sie "reactive" nennen kann. Dazu gehört, dass die Anwendung interaktiv, fehlertolerant und skalierbar sein muss. Insbesondere bedeutet es, dass die Anwendung ereignisgesteuert ("event-driven") sein soll. Das ist nicht weiter überraschend. Jede Bedienoberfläche ist fast zwangsläufig ereignisgesteuert, wenn sie nicht komplett auf Polling setzen will. Die Frage ist allerdings, wie viele Bedienoberflächen auf ein Benutzerereignis tatsächlich jederzeit ohne eine vom Benutzer wahrnehmbare Verzögerung reagieren können. Allzu oft findet man hier leider Gegenbeispiele.

Denn zur echten ereignisgesteuerten Anwendung gehört neben dem einfachen Bearbeiten von Ereignissen eben auch, dass dies asynchron geschieht und die Anwendung niemals blockieren darf. Das sind zwei Aspekte, die in der Praxis schwierig umzusetzen sind. Wie kommt nun ein Entwickler schrittweise von einer synchronen, blockierenden API zu einer asynchronen, nicht blockierenden?

Als Beispiel kann eine einfache API zum Abfragen von Filmen dienen. Solange sie nur lokal genutzt wird, genĂĽgt es sicherlich, sie synchron zu formulieren (im Beispiel in Scala):

trait User
trait Video
trait Rating

trait VideoService {
def getVideos(user: User): List[Video]
def getRating(video: Video): Rating
})

Wenn die API fĂĽr Abfragen ĂĽber das Internet verwendet werden soll, kann die einzelne Abfrage etwas dauern. In dieser Zeit darf die Anwendung nicht blockieren. Ein erster naiver Ansatz hierfĂĽr ist die Verwendung von Futures:

trait User
trait Video
trait Rating

trait VideoService {
def getVideos(user: User): Future[List[Video]]
def getRating(video: Video): Future[Rating]
}

Dabei muss man sich fĂĽr ein geeignetes Future entscheiden. An Futures gibt es leider inzwischen viele zur Auswahl. Wer von Java kommt, ist vielleicht das Future aus der Java-Concurrency gewohnt. Zu diesem heiĂźt es oft: "Future.get() is the most important method." Diese Aussage leitet leider in die Irre: Wenn man das Future zusammen mit seiner get()-Methode verwendet, ist nicht viel gewonnen, denn diese Methode blockiert wieder den Thread. Und das ist genau das, was zu vermeiden ist ...

Mit beispielsweise der Guava-Bibliothek ist man besser aufgestellt: Die darin enthaltene Futures-Klasse liefert Callbacks, die aufgerufen werden, wenn der asynchrone Aufruf ein Ergebnis hat. Das ist angenehm, weil nirgendwo zu blockieren ist, wenn die Ergebnisbehandlung in den Callback verlegt wurde. Es wird allerdings problematisch, wenn der Entwickler asynchrone Aufrufe kombinieren und ineinander schachteln will, denn verschachtelte Callbacks werden schnell unübersichtlich und unverständlich. Das passiert aber recht schnell, wenn man etwa zunächst Übersichtsinformationen holt, um dann jeweils Detailinformationen nachzuholen, wie im folgenden Beispiel beim Holen von Video-Bewertungen zu sehen:

import java.util.concurrent._
import Executors._
import com.google.common.util.concurrent._

case class User(name: String, videos: List[Video])
case class Video(id: String, rating: Rating)
case class Rating(rating: Int)

trait VideoService {
def getVideos(user: User): Future[List[Video]]
def getRating(video: Video): Future[Rating]
}

object Videos extends VideoService {
def getVideos(user: User) = async(user.videos)
def getRating(video: Video) = async(video.rating)

private val executor = newSingleThreadExecutor

private def async[T](op: => T) = {
val task = ListenableFutureTask.create(
new Callable[T] { def call: T = op })
executor submit task
task
}

def shutdown = executor.shutdown
}

object Main extends App {
val videos = Videos.getVideos(
User("joe", List(Video("V1", Rating(9)),
Video("V2", Rating(3)))))

Futures addCallback (videos,
new FutureCallback[List[Video]] {

def onSuccess(videos: List[Video]): Unit = for {
video <- videos
rating = Videos.getRating(video)
} {
Futures addCallback (rating,
new FutureCallback[Rating] {

def onSuccess(rating: Rating): Unit =
println(s"Rating: $rating")

def onFailure(e: Throwable): Unit =
println("getRating: Failure!")
})
}

def onFailure(e: Throwable): Unit =
println("getVideos: Failure!")
})

Videos.shutdown
}

Es liegt in der Natur der Sache, dass asynchrone APIs viele asynchrone Aufrufe miteinander kombinieren wollen. Die sogenannte "Callback-Hölle" scheint vorprogrammiert zu sein, oder mit den Worten von Evan Czaplicki, dem Erfinder der (Reactive-)Programmiersprache Elm: "Callbacks are the modern 'goto'." Man braucht also etwas Besseres als Callbacks. Dabei ist die wesentliche Anforderung, dass es sich beliebig kombinieren und schachteln lassen muss.