Scala: Microservices mit dem Actor-Modell serialisieren

Seite 2: Das Actor-Modell als Alternative zum REST-Endpoint-Ansatz

Inhaltsverzeichnis

Ein Actor besteht aus einer Thread-sicheren Message Queue und einer Actor-Funktion, die die Messages der Queue nacheinander bearbeitet. Die Interaktion innerhalb der Anwendung erfolgt ausschließlich über Messages, die in Message-Queues abgelegt werden.

Sobald ein Actor eine Message in seiner Queue vorfindet, ist er bereit zur Ausführung. Der Scheduler des Actor-Systems weist den Actor einem Thread zu. Dieser führt die Funktion mit der Message als Parameter aus. Die Rückgabe der Funktion kann wiederum auch nur über Messages erfolgen. Parallelität wird durch gleichzeitiges Ausführen mehrerer Actors erreicht. Ein einzelner HTTP-Request kann mehrere Actors gleichzeitig beschäftigen. Wobei ein einzelner Actor zu einer Zeit immer nur von einem Thread ausgeführt wird. Das heißt, auch bei zwei Messages in der Queue wird eine nach der anderen bearbeitet.

Die Unterscheidung der einzelnen Actors erfolgt anhand ihrer ActorRef – vereinfacht gesagt der Eingabeseite der Message-Queue. Erwartet ein Actor eine Antwort auf seine Message, so ist ein Bestandteil der abgesendeten Message stets seine eigene ActorRef. Damit weiß der aufgerufene Actor, wohin er die Antwort senden soll.

Aus der Anfangszeit der Microservices-Ära hat sich die provokante und viel diskutierte Forderung gehalten, dass Microservices nicht mehr als 100 Zeilen Code enthalten sollen. Da Actors meist mit weniger als 100 Zeilen Code auskommen, könnten sie als die "wahren Microservices" gelten.

Beispiel eines Actors:

sealed trait Command // 1

final case class GetUsers(replyTo: ActorRef[Users]) extends Command // 2
final case class CreateUser(user: User, replyTo: ActorRef[ActionPerformed]) extends Command

final case class Users(users: Seq[User]) // 3
final case class ActionPerformed(description: String)

def apply(): Behavior[Command] = registry(Set.empty)

private def registry(users: Set[User]): Behavior[Command] = // 4
Behaviors.receiveMessage {
case GetUsers(replyTo) =>
replyTo ! Users(users.toSeq) // 5
Behaviors.same // 6
case reateUser(user, replyTo) =>
replyTo ! ActionPerformed(s"User ${user.name} created.")
registry(users + user)
    case GetUser(name, replyTo) =>
replyTo ! GetUserResponse(users.find(_.name == name))
Behaviors.same
    case DeleteUser(name, replyTo) =>
replyTo ! ActionPerformed(s"User $name deleted.")
registry(users.filterNot(_.name == name))
}

Da der Actor unter Umständen mehr als eine Art von Messages verarbeiten kann, sind alle in einem Trait (Interface) zusammengefasst (1) . Abschnitt (2) beschreibt die Message, die der Actor empfängt. Da es eine reine Abfrage ist, taucht lediglich die Antwortadresse auf. Die Klasse, die der aufrufende Code als Antwort erhält, ist in Abschnitt (3) zu sehen. Die Funktion des Actors besteht aus einem Pattern Match, der die verarbeitbaren Messages trennt (4). In Abschnitt (5) wird die Antwort an den Absender zurückgegeben. Dass der Actor sein Verhalten nicht ändert, beschreibt Abschnitt (6) (siehe hierzu auch das Kapitel "Finite-state Machine").

Da auf einen HTTP-Request auch eine Antwort erfolgen muss, kommt das Ask-Pattern zum Einsatz. Wie im Beispiel oben gezeigt, wird dazu die ActorRef des Actors mitgegeben, der die Antwort erhalten soll. Das muss nicht zwingend der Actor sein, der die Anfrage gestellt hat. Auch Weiterleiten und Delegieren der Bearbeitung sind möglich. Allerdings sind lange Verkettungen von Weiterleitungen oft schwer zu warten, sodass die ActorRef meistens auf den Absender zeigt. Wichtig dabei ist, dass die Antwort nicht der Rückgabewert der Funktion ist, sondern auch mittels Message übermittelt wird.

Der Rückgabewert der Funktion ist typischerweise ein Behavior[Command]. Ein Aktor kann nach jeder Message sein Verhalten ändern, also die Funktion, die die Messages verarbeitet oder auch die Parameter der Funktion. Eine Finite-state Machine hat immer einen Zustand und interpretiert abhängig davon das nächste Command. Über das Behaviour[Command] lässt sich somit auch ein volatiler Zustand halten. Dabei wird eine Liste von User-Objekten als innerer Zustand von einem Aufruf zum nächsten über das Behaviour übergeben. Die Liste existiert allerdings nur so lange, wie es den Actor gibt. Ein Neustart des Actors oder des gesamten Actor-Systems führt zum Datenverlust – ein Verhalten, das beispielsweise beim Zwischenspeichern von Daten (Cache) sinnvoll ist.

Um den Zustand eines Actors auch über einen Neustart hinweg zu erhalten, müssen Entwicklerinnen und Entwickler auf die Actor-Persistenz zurückgreifen. Dieses optionale, zusätzliche Framework unterstützt beim Sichern des Zustands in einer Datenbank, das mit Hilfe von Event Sourcing erfolgt. Beim Event Sourcing werden nicht der aktuelle Zustand gespeichert, sondern alle Events, die den Zustand verändern. Dadurch lässt sich nicht nur der aktuelle Zustand, sondern auch jeder frühere wiederherstellen. Damit die Zeit für das Wiederherstellen nicht ins Unendliche wächst, empfehlen sich regelmäßige Snapshots. Der jeweils letzte dient dann als Grundlage, und anschließend müssen nur noch alle späteren Events erneut bearbeitet werden.

Beispiel eines persistenten Actors:

sealed trait Command // 1 
 
final case class GetUsers(replyTo: ActorRef[Users]) extends Command // 2 
final case class CreateUser(user: User, replyTo: ActorRef[ActionPerformed])
  extends Command 

final case class Users(users: Seq[User]) // 3 
final case class ActionPerformed(description: String) 

// actor events 
sealed trait Event extends CborSerializable // 4
final case class CreateUserEvent(user: User) extends Event 
final case class DeleteUserEvent(name: String) extends Event 
 
// actor state 
final case class State(users: Set[User]) // 5
 
def apply(entityId: String): Behavior[Command] = 
  EventSourcedBehavior.withEnforcedReplies[Command, Event, State](
    persistenceId = PersistenceId.of("UserRegitry", entityId), 
    emptyState = State(Set.empty), 
    commandHandler = commandHandler, 
    eventHandler = eventHandler) 
 
val commandHandler: (State, Command) => ReplyEffect[Event, State] = { // 6
    (state, command) => command match { 
  case GetUsers(replyTo) =>  
    Effect.none.thenReply(replyTo)(state => Users(state.users.toSeq)) 
  case CreateUser(user, replyTo) => // 7
    Effect.persist(CreateUserEvent(user)).thenReply(replyTo)(_ => 
      ActionPerformed(s"User ${user.name} created.")) // 8
  case GetUser(name, replyTo) => 
    Effect.none.thenReply(replyTo)(state => 
      GetUserResponse(state.users.find(_.name == name)))
  case DeleteUser(name, replyTo) => 
    Effect.persist(DeleteUserEvent(name)).thenReply(replyTo)(_ => 
      ActionPerformed(s"User $name deleted."))
  }
}
 
val eventHandler: (State, Event) => State = { (state, event) => // 9
  event match { 
    case CreateUserEvent(user) => State(state.users + user) // 10
    case DeleteUserEvent(name) => State(state.users.filterNot(_.name == name)) 
  } 
}

Die Unterschiede zum beispielhaften Actor weiter oben zeigen sich ab Abschnitt (4) in den serialisierbaren Events, die den inneren Zustand des Actors verändern. Der innere Zustand des Actors wird über die Klasse State gespeichert (5). Die Abschnitte (6) und (7) beschreiben die Funktion des Actors für Commands sowie das Pattern zum Erstellen eines Users. In Abschnitt (8) werden das Event zum Wiederherstellen des Actors gespeichert und die Antwort gesendet. Die Funktion des Actors zur Wiederherstellung des inneren Zustands zeigt sich in Abschnitt (9), das Ändern des inneren Zustands erfolgt in Abschnitt (10).

Beim Erstellen eines Actors mit Actor-Persistenz ist eine PersistenceID notwendig. Ist diese ID schon in der Datenbank vorhanden, wird der frühere Zustand des Actors wiederhergestellt (siehe dazu ein Beispiel des Autors, das die gesamte Liste aller User-Objekte als inneren Zustand verwendet).

Ein weiteres optionales Framework ermöglicht den Betrieb im Cluster. In der ActorRef wird jetzt zusätzlich die IP des Hosts des jeweiligen Actor-Systems gespeichert. Um das Zusammenfinden und den Status der einzelnen Knoten im Cluster kümmert sich das Framework ebenfalls. Darüber hinaus sind Cluster-Singletons und Distributed Publish Subscribe (Pub/Sub) möglich. Für den Betrieb im Cluster ist Actor-Persistenz auch eine Voraussetzung – außer für den seltenen Fall, dass ausschließlich zustandslose Actors vorliegen. Da die Events in einer Datenbank liegen, lässt sich ein persistierter Actor in jedem beliebigen Knoten des Clusters neu erstellen.

Cluster-Singletons sind ein mächtiges Werkzeug. Mit ihnen lässt sich sicherstellen, dass nur ein einziger Actor eines bestimmten Typs im gesamten Cluster erstellt wird. Der Actor wird auf dem ältesten Knoten im Cluster erstellt. Mit diesem Pattern lassen sich Cluster-weit verteilte Actors steuern. Die verteilten Actors sind dann üblicherweise Node Coordinator, die wiederum für verschiedene Child Actors zuständig sind. Über die Baumstruktur aus Cluster-Singleton, Node Coordinator und Childs lassen sich Child Actors im Cluster verteilen und einfach wiederfinden. Auch ein Rebalancing nach dem Hinzufügen oder Entfernen von Knoten ist schnell implementiert:

  • Der Singleton verteilt die Zuständigkeiten im Cluster neu (z. B. Node 1: User A-G, Node 2: User H-N ...)
  • Die Node Coordinator beenden die Actors, für die sie nicht mehr zuständig sind
  • Die Node Coordinator starten die Actors, für die sie jetzt zuständig sind

Ein einzelner Actor kann so einen einzelnen Vorgang abbilden, etwa wie im Beispielcode das Anlegen und Bearbeiten eines Nutzers oder ein Online-Warenkorb vom ersten Artikel bis zum Bezahlen an der Kasse. Über einen eindeutigen Identifier lässt sich der bearbeitende Actor in einem beliebig großen Cluster finden und ihm dann die Messages zuschicken.

Veranschaulichen lässt sich das am Beispiel eines Actors in einer Prozessstrecke beim Beantragen eines Onlinekredites. Bei einem Kredit hält ein Actor alle Daten des Vorgangs. Wird der Antrag eine gewisse Zeit nicht mehr weiterbearbeitet, kann man den Actor löschen, um Speicher freizugeben. Kommen später doch wieder neue Daten hinzu, lässt sich der Vorgang mit den gespeicherten Events einfach wiederherstellen – und der Actor ist erneut bereit, den Vorgang fortsetzen zu können. Auch ein Timeout ist leicht zu implementieren: Stellen Entwickler nach Erstellen des Actors fest, dass die letzten Eingaben zu lange zurückliegen, können sie die erneute Dateneingabe einfach mit einer entsprechenden Fehlermeldung quittieren. Der Zustand des Actors wird entsprechend gesetzt und damit sind keine weiteren Änderungen möglich.