RxSwift – Grundlagen und praktische Beispiele

Viele Anwendungsfälle erfordern die Implementierung asynchroner Tasks. Eine eigenständige Entwicklung ist häufig fehleranfällig, also muss ein Framework her. ReactiveX definiert eine API, die wiederum RxSwift implementiert, um die asynchronen Aufgaben durch reaktive Implementierung leichter lesbar, testbar und robuster zu machen.

In Pocket speichern vorlesen Druckansicht 3 Kommentare lesen
RxSwift – Grundlagen und praktische Beispiele
Lesezeit: 15 Min.
Von
  • Mattes Wieben
Inhaltsverzeichnis

Reaktive Programmierung bietet Entwicklern die Möglichkeit, Datenströme zu verarbeiten, die asynchrone Tasks erzeugen. Viel zu häufig wird Reaktivität jedoch durch die Verkettung von Callback-Funktionen realisiert. Das endet schnell in einer Pyramid of Doom.

ReactiveX verspricht, reaktive Programmierung zu vereinfachen und die Entstehung solcher Pyramiden zu verhindern. Die Popularität der API lässt sich durch die Reichweite seiner Nutzer zum Ausdruck bringen. Microsoft, Netflix, Github und AirBnBhaben die Vorteile erkannt und setzen auf ReactiveX.

Bevor jedoch RxSwift zur Sprache kommt, erläutert der Artikel die theoretischen Grundlagen von ReactiveX. Dieses ist eine technologieagnostische API-Spezifikation, und RxSwift implementiert sie mit kleineren Änderungen und Erweiterungen. Neben Swift gibt es diverse weitere Sprachen, die ebenfalls ReactiveX implementieren. Dazu gehören Java mit RxJava, JavaScript mit RxJS und Python mit RxPy. Sind die Konzepte von ReactiveX einmal verinnerlicht, lassen sie sich nahezu überall analog anwenden.

Seine Entwickler beschreiben ReactiveX als "An API for asynchronous programming with observables". Die Definition setzt sich hauptsächlich aus drei Elementen zusammen, neben einer API aus

  • asynchroner Programmierung: eine Art der Programmierung, die es erlaubt, langläufige Tasks unabhängig vom primären Programmfluss auszuführen.
  • Observables: eine der fundamentalen Säulen von ReactiveX.

Ein Observable verkörpert eine Reihe von Events, die asynchron erzeugt werden. Häufig vergleicht man Observables mit Iterables. Der einzige Unterschied im Verständnis liegt darin, dass ein Observable im Laufe der Zeit, also asynchron, zusätzliche Elemente erhalten kann. Von diesen Events kann ein Observable beliebig viele erzeugen, von null bis unendlich. Folglich kann auch ein Observable endlich oder unendlich sein. Außer den Events emittiert es Benachrichtigungen für einen Fehlerfall beziehungsweise für das Ende der Event-Erzeugung. Die Benachrichtigungen signalisieren allen Observern, dass es keine weiteren Events von jenem Observable mehr geben wird.

Um auf die Events eines Observables zu reagieren, ist ein Observer zu erstellen. Er entsteht durch die Subscription auf einem Observable. Das bedeutet, eine Funktion wird beim Observable angemeldet und bei dem Erzeugen jedes Events mit eben jenem als Parameter aufgerufen. Die Benachrichtigungen des Observables, die am Ende oder im Fehlerfall verschickt werden, kann ein Observer separat behandeln.

Die Abbildung zeigt ein Observable mit den Elementen Stern, Dreieck et cetera. Sie werden durch den flip-Operator transformiert. Bei der Behandlung des Kreises entsteht ein Fehler, der zum Abbruch des Streams führt (Abb. 1).

(Bild: http://reactivex.io/documentation/observable.html)

Auf diesem Konzept baut der spannende Teil von ReactiveX auf, die Extensions – daher das X im Namen. Sie werden durch Operatoren implementiert und dienen dazu, den vom Observable erzeugten Datenstrom zu transformieren oder zu manipulieren.

Eine Übersicht aller Operatoren ist in der hervorragenden Dokumentation von ReactiveX zu finden, die jeden Operator beschreibt und veranschaulicht. Außerdem ist auf der Seite ein Entscheidungsbaum dokumentiert, der bei der Suche nach dem passenden Operator für einen Anwendungsfall unterstützt. Grob zusammengefasst lassen sich die Operatoren in die folgenden Kategorien einteilen:

  1. erzeugend
  2. transformierend
  3. filternd
  4. kombinierend
  5. fehlerbehandelnd
  6. aggregierend

Auf die Beschreibung weiterer Operatoren verzichtet der Autor an dieser Stelle, die wichtigsten stellt er später anhand praktischer Beispielen dar. Auch weiterführende Konzepte wie Scheduler und Subjects werden hier nicht weiter betrachtet.

RxSwift wurde 2015 in Version 1.0 veröffentlicht – drei Monate nach der Vorstellung von Swift selbst. Seither hat es stets an Beliebtheit in der Entwickler-Community gewonnen und ist mittlerweile ein fester Bestandteil vieler Anwendungen.

RxSwift besteht aus fünf Modulen. Sie sind inklusive ihrer Abhängigkeiten in der folgenden Abbildung dargestellt:

RxSwift und RxTest werden in den folgenden Beispielen praktisch verwendet (Abb. 2).

(Bild: https://github.com/ReactiveX/RxSwift)

  • RxSwift: Kernmodul, stellt die Standards bereit, die ReactiveX definiert.
  • RxCocoa: enthält einige Erweiterungen, die die Entwicklung von Apps mit RxSwift erleichtern.
  • RxRelay: Wrapper um Subjects.
  • RxTest und RxBlocking: ermöglichen die Testbarkeit von RxSwift-Code.

Laut der Dokumentation von RxSwift versuchen die Entwickler die Vorgaben von ReactiveX so genau wie möglich abzubilden. Es gibt jedoch einige Unterschiede und Erweiterungen. Ein elementarer Unterschied sind Traits. Das sind praktische Zusätze in RxSwift, obwohl es dazu keine Spezifikation in ReactiveX gibt. Der Grund dafür ist das starke Typsystem von Swift. Durch dieses lassen sich die Korrektheit und die Stabilität von Anwendungen erhöhen, die RxSwift nutzen. Das erreichen Traits, indem sie die Eigenschaften, die ein Observable haben kann, über Schnittstellen hinweg sicherstellen. Außerdem werden die Observables durch den Einsatz von Traits deskriptiv. Es existieren die drei folgenden Traits:

  • Single: erzeugt keine Folge von Events, sondern genau ein Event oder einen Fehler. Ein sinnvolles Beispiel sind HTTP-Requests.
  • Completeable: erzeugt keine Folge von Events, sondern ein onComplete- oder onError-Event. Ein sinnvolles Beispiel sind langläufige Tasks, bei denen nur der erfolgreiche Abschluss, nicht aber das Ergebnis interessiert.
  • Maybe: kombiniert Single sowie Completeable und gibt entweder genau ein Event, ein onError oder ein onComplete aus.

Traits sind optional, und der Aufruf von .asObservable() auf einem Trait löst die Konvertierung in ein normales Observable aus.

Nach den theoretischen Grundlagen von ReactiveX und den Hintergründen zu RxSwift geht es nun an erste Code-Beispiele. Um diese ausführen zu können, ist ein neuer Xcode-Workspace anzulegen und der Pod RxSwift im Podfile hinzuzufügen. Alle Beispiele werden als Tests implementiert. Die Code-Snippets des Artikels sind auf GitHub zu finden.

Das HelloWorld-Beispiel für RxSwift zeigt das Erzeugen eines Observables sowie einer Subscription darauf, die die Events in der Konsole ausgibt:

func testHelloRxSwift() {
let observable = Observable.from([1, 2, 3])
observable.subscribe { event in print(event)}
}

Die Ausführung des Tests gibt unter anderem folgende Werte in der Konsole aus:

next(1)
next(2)
next(3)
completed

Die ersten drei Zeilen geben die erwarteten Events wieder. Das Array, das das Observable erzeugt hat, enthält die Integers 1, 2 und 3. In derselben Reihenfolge ruft das Observable die next()-Methode auf dem Observer auf und übergibt jeweils das entsprechende Integer des Arrays als Argument.

Die vierte Zeile gibt ein weiteres Event aus – completed. Der From-Operator, der das Observable erzeugt, erhält ein endliches Array als Eingabeparameter. Nachdem das Observable alle Elemente des Arrays emittiert hat, kann es keine weiteren Events geben. Deswegen wird das completed-Event an die Observer übermittelt. Daraus lässt sich schließen, dass die Quelle der Events versiegt ist und das Observable keine weiteren Events mehr erzeugen wird.

Eine etwas erweiterte Version des HelloWorld-Beispiels sieht wie folgt aus:

let disposeBag = DisposeBag()
unc testHelloRxSwiftExtended() {
let observable = Observable.from([1, 2, 3])
observable.subscribe { event in
switch event {
case .next(let value):
print(value)
case .error(let error):
print(error)
case .completed:
print("Completed")
}
}
}.disposed(by: disposeBag)

Die Ausführung dieses Tests erzeugt unter anderem die folgende Ausgabe in der Konsole:

1
2
3
Completed

Neu ist im Vergleich zum vorherigen Beispiel, dass der Observer ein switch-Statement implementiert. Das verdeutlicht noch einmal, dass ein Observable drei unterschiedliche Events emittieren kann:

  • Next
  • Error
  • Completed

Das switch-Statement implementiert für jedes dieser Events eine Funktion, die entweder den übergebenen Parameter (1, 2 oder 3) oder den übermittelten Fehler ausgibt. Beim Completed-Event wird ein entsprechender String ausgegeben.

Ein weiterer Unterschied ist, dass auf dem Observer .disposed(by: disposeBag) aufgerufen wird. Das verhindert die Entstehung eines Speicherlecks.

Auf der Basis des Beispiels geht es nun an die wichtigsten Operatoren:

Map: Der Map-Operator transformiert den Parameter jedes Events mit einer angegebenen Funktion und gibt ein Observable zurück, das die transformierten Werte enthält.

Der Map-Operator transformiert die Werte 1, 2 und 3 eines Observables in die Werte 10, 20 und 30 (Abb. 3).

(Bild: http://reactivex.io/documentation/operators/map.html)

Eine RxSwift-Implementierung dieses Operators sieht wie folgt aus:

func testMap() {
let observable = Observable.from([1, 2, 3])
observable
.map{value in value * 10}
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
}

und führt zu folgender Ausgabe in der Konsole:

10
20
30

In der Praxis kommt der Operator zum Beispiel zum Einsatz, wenn nur eine Eigenschaft der emittierten Objekte von Interesse ist.

Scan: Dieser Operator arbeitet ebenfallsauf einem Observable. Bei seinem Aufruf ist eine Funktion zu übergeben, die zwei Argumente erhält. Das erste ist das Ergebnis des vorherigen Aufrufs und das zweite ist das Event des Observables. Für den ersten Aufruf des Operators ist ein Default-Wert anzugeben. Die Ergebnisse der Funktion werden wiederum als neues Observable ausgegeben.

Der [code]Scan[/code]-Operator summiert alle Werte des Observables und gibt jeweils den kumulierten Wert aus (Abb. 4).

(Bild: http://reactivex.io/documentation/operators/scan.html)

Eine RxSwift-Implementierung dieses Operators mit einer Funktion, die die Werte aller Events summiert, sieht wie folgt aus:

func testScan() {
let observable = Observable.from([1, 2, 3, 4, 5])
observable
.scan(0) {seed, value in seed + value}
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
}

und führt zu folgender Ausgabe in der Konsole:

1 
3
6
10
15

In der Praxis wäre das gezeigte Beispiel auch das gängigste. Mit der Verwendung des Scan-Operators lässt sich stets die Summe aller Elemente des Streams ausgeben.

Filter: Auch dieser Operator arbeitet auf einem Observable. Er gibt eines zurück, das nur diejenigen Events des Input-Streams enthält, die die übergebene Prädikatsfunktion als wahr evaluiert.

Der Filter-Operator filtert alle Werte aus dem Observable, die größer als 10 sind (Abb. 5).

(Bild: http://reactivex.io/documentation/operators/filter.html)

Eine RxSwift-Implementierung dieses Operators sieht wie folgt aus:

func testFilter() {
let observable = Observable.from([2, 30, 22, 5, 60, 1])
observable
.filter {$0 > 10}
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
}

und führt zu folgender Ausgabe in der Konsole:

30
22
60

Zip: Dieser Operator arbeitet auf mehreren Observables. Sobald er von jedem der Observables, auf denen er arbeitet, ein Event erhalten hat, kombiniert er sie und gibt sie als ein Event in seinem Ausgabe-Observable aus. Anschließend wartet er auf das zweite Event jedes Observables und so weiter.

Der Zip-Operator kombiniert die Werte von zwei Observables und erstellt ein neues, über das die neuen Werte emittiert werden (Abb. 6).

Eine RxSwift-Implementierung dieses Operators sieht wie folgt aus:

func testZip() {
let numbers = Observable.from([1, 2, 3, 4, 5])
let letters = Observable.from(["A", "B", "C", "D"])
Observable.zip(numbers, letters) { return ($0, $1) }
.subscribe(onNext: {print($0)})
.disposed(by: disposeBag)
}

und führt zu folgender Ausgabe in der Konsole:

(1, "A")
(2, "B")
(3, "C")
(4, "D")

Ein in der Praxis üblicher Anwendungsfall ist, dass man eine Aktion ausführen soll, sobald zwei oder mehr HTTP-Requests eine Antwort geliefert haben.

Wie im theoretischen Teil erwähnt, spezifiziert ReactiveX für nahezu jeden Anwendungsfall einen Operator. Nach der Implementierung der ersten Operatoren ist erkennbar, dass ihre Benennung äußerst deskriptiv ist. Hierbei handelt es sich um einen der größten Vorteile von RxSwift. Das Framework lässt asynchronen Code und reaktive Programmierung leserlich und wartbar werden.

Abschließend fasst ein etwas komplexerer Beispiel alle Konzepte zusammen. Zur Ausführung ist der Umgebung des vorherigen Kapitels noch der Pod RxTest hinzuzufügen. Es soll folgende Fachlichkeit abgebildet werden: Coding Kid ist einer der Superhelden aus Coding Valley. Weil er ein vielbeschäftigter Held ist, möchte er sich ein System bauen, das die Nachrichten aus mehreren Kanälen integriert und ihn auf relevante Gefahren hinweist. Als geeignete Input-Kanäle identifiziert er Polizeifunkmeldungen und Tweets.

Erstere wird modelliert als:

struct Funkspruch {
var desc: String;
var severity: Int;
var requiresHero: Bool;
}

Und ein Tweet als:

struct Tweet {
var location: String;
var text: String;
}

Das Ziel von Coding Kit ist es, die beiden Event-Streams zusammenzufassen und als Event auszugeben:

struct Event : Equatable{
var desc: String;
// Hilfsmethode zur Evaluierung der Tests
static func ==(lhs: Event, rhs: Event) -> Bool {
return lhs.desc == rhs.desc
}
}

Allerdings möchte er nur eine Information erhalten, wenn die Severity eines Funkspruchs größer als sieben ist und explizit ein Held angefordert wird. Über Tweets möchte er nur informiert werden, wenn sie in Coding Valley erstellt wurden und das Wort "Bug" enthalten. Das Ergebnis ist folgendes Warnsystem:

struct Warnsystem {
let eventStream: Observable<Event>

init(polizeiFunk: Observable<Funkspruch>, twitterFeed: Observable<Tweet>) {
eventStream = Observable.of(
polizeiFunk
.filter {$0.requiresHero && $0.severity > 7}
.map {funkspruch in return Event(desc: funkspruch.desc)},
twitterFeed
.filter {$0.location == "Coding Valley" && $0.text.contains("Bug")}
.map {tweet in return Event(desc: tweet.text)}
).merge()
// merge führt zwei Streams zusammen, in dem jedes Event der beiden Streams
// als eigenes Event in einem neuen Observable ausgegeben werden.
}
}

Um zu überprüfen, ob sein System funktioniert, baut er einen Test mit RxTest und sendet ein paar Tweets und Funksprüche an sein System:

func testWarnsystem(){
// Der scheduler wird die Test-Events zu definierten Zeitpunkten simulieren
let scheduler = TestScheduler(initialClock: 0)

// Definition der Funksprüche, die emittiert werden
let funkspruch1 = Funkspruch(desc: "Writing a callback-pyramid",
severity: 10,
requiresHero: true)
let funkspruch2 = Funkspruch(desc: "Not testing your Rx-Code",
severity: 6,
requiresHero: false)

// Erstellen eines Observables mit den definierten Funksprüchen
let funkObservable = scheduler.createHotObservable([
Recorded.next(1, funkspruch1),
Recorded.next(4, funkspruch2),
Recorded.completed(6)
]).asObservable();

// Definition der Tweets, die emittiert werden
let tweet1 = Tweet(location: "Coding Valley",
text: "Let's go and deploy.")
let tweet2 = Tweet(location: "Coding Valley",
text: "Bug in Production. Please send help!")
let tweet3 = Tweet(location: "Somewhere else",
text: "Boring day.")

// Erstellen eines Observables mit den definierten Tweets
let tweetObservable = scheduler.createHotObservable([
Recorded.next(2, tweet1),
Recorded.next(3, tweet2),
Recorded.next(5, tweet3),
Recorded.completed(8)
]).asObservable();

// Definition der Events, die als Ausgabe des Warnsystems erwartet werden.
let expectedEvents = [
Recorded.next(1, Event(desc: "Writing a callback-pyramid")),
Recorded.next(3, Event(desc: "Bug in Production. Please send help!")),
Recorded.completed(8)
]

// Initialisierung des Warnsystems
let warnsystem = Warnsystem(polizeiFunk: funkObservable, twitterFeed: tweetObservable)

// Zuweisung des Ausgabe-Streams zu einem testObserver, sodass die
// Ergebnisse mit den erwarteten Ergebnissen verglichen werden können
let testObserver = scheduler.createObserver(Event.self)
warnsystem.eventStream
.subscribe(testObserver)
.disposed(by: disposeBag)

// Start der Erzeugung der Events
scheduler.start()

// Vergleich der Ausgabe mit der erwarteten Ausgabe
XCTAssertEqual(expectedEvents, testObserver.events)
}

Der Test ist erfolgreich und alles funktioniert. Aus dem Beispiel lässt sich eine weitere Erkenntnis gewinnen, die für die reaktive Entwicklung sehr wichtig ist: Statt des üblichen Informations-Pulls der imperativen Entwicklung übernimmt RxSwift den Informations-Push, auf den Entwickler reagieren. Imperative Programme müssten sich darum kümmern, regelmäßig nach neuen Tweets beziehungsweise Funksprüchen zu fragen, die letzten Ergebnisse vorzuhalten, Deltas zu identifizieren et cetera. Mit RxSwift lässt sich eine Funktion definieren, die über neue Events informiert wird und diese entsprechend verarbeitet. Asynchronität, Nebenläufigkeit und Zustände sind dabei vollständig abstrahiert.

Das abschließende Beispiel zeigt die Leistungsstärke von RxSwift. Mit wenigen Zeilen Code lassen sich komplexe Funktionen auf einem oder mehreren Streams ausführen. Ganz ohne Pyramid of Doom. Der Code, der so entsteht, ist für jeden Entwickler, der sich mit synchronen Iterables auskennt, leicht zu verstehen. Ein weiteres großes Plus gibt es für die Testbarkeit von Observables mit RxTest. Außerdem werden sich plattformübergreifende Entwickler freuen, dass sich ihre RxSwift-Kenntnisse leicht in Java-, JavaScript-, Python- und diverse andere Projekte übertragen lassen.

Mattes Wieben
befasst sich seit vielen Jahren mit der Entwicklung von Apps. Aktuell liegt sein Fokus auf der Implementierung von Web-Anwendungen und plattformübergreifenden Apps. Mehr Tutorials und Informationen zu Coding Kid und seinen Freunden werden bald im Blog des Autors zu finden sein.
(ane)