Patterns in der Softwarearchitektur: Das Active Object

Das Design Pattern Active Object entkoppelt die Methodenausführung vom Methodenaufruf für Objekte, die sich jeweils in einem eigenen Thread befinden.

In Pocket speichern vorlesen Druckansicht

(Bild: Dmytro Zinkevych/Shutterstock.com)

Lesezeit: 9 Min.
Von
  • Rainer Grimm
Inhaltsverzeichnis

Patterns sind eine wichtige Abstraktion in der modernen Softwareentwicklung und Softwarearchitektur. Sie bieten eine klar definierte Terminologie, eine saubere Dokumentation und das Lernen von den Besten. Das zu den Concurrency Patterns gehörende Active Object beschreibt ein Wikipedia-Eintrag folgendermaßen: "The active object design pattern decouples method execution from method invocation for objects that each reside in their own thread of control.The goal is to introduce concurrency, by using asynchronous method invocation and a scheduler for handling requests."

Das Active Object entkoppelt den Methodenaufruf von der Methodenausführung. Der Methodenaufruf wird auf dem Client-Thread ausgeführt, die Methodenausführung jedoch auf dem Active Object. Das Active Object hat seinen Thread und eine Liste von Methodenanforderungsobjekten (kurze Anfragen), die ausgeführt werden sollen. Der Methodenaufruf des Clients reiht die Anfragen in der Liste des Active Objects ein. Die Anfragen werden an den Server weitergeleitet.

Modernes C++ – Rainer Grimm

Rainer Grimm ist seit vielen Jahren als Softwarearchitekt, Team- und Schulungsleiter tätig. Er schreibt gerne Artikel zu den Programmiersprachen C++, Python und Haskell, spricht aber auch gerne und häufig auf Fachkonferenzen. Auf seinem Blog Modernes C++ beschäftigt er sich intensiv mit seiner Leidenschaft C++.

Das Active Object ist auch bekannt als Concurrency Objekt

Wenn viele Threads synchronisiert auf ein gemeinsames Objekt zugreifen, müssen die folgenden Herausforderungen gelöst werden:

  • Ein Thread, der eine verarbeitungsintensive Mitgliedsfunktion aufruft, sollte die anderen Threads, die das gleiche Objekt aufrufen, nicht zu lange blockieren.
  • Der Zugriff auf ein gemeinsames Objekt sollte einfach zu synchronisieren sein.
  • Die Concurrency-Eigenschaften der ausgeführten Anfragen sollten an die konkrete Hardware und Software anpassbar sein.
  • Der Methodenaufruf des Clients geht an einen Stellvertreter, der die Schnittstelle des aktiven Objekts repräsentiert.
  • Der Server implementiert diese Mitgliedsfunktionen und läuft im Thread des Active Objects. Zur Laufzeit wandelt der Stellvertreter den Aufruf in einen Methodenaufruf an den Server um. Diese Anfrage wird vom Scheduler in eine Aktivierungsliste eingetragen.
  • Die Ereignisschleife des Schedulers läuft im selben Thread wie der Server, entnimmt die Anfragen aus der Aktivierungsliste, entfernt sie und sendet sie an den Server.
  • Der Client erhält das Ergebnis des Methodenaufrufs über einen Future vom Stellvertreter.

Das Active Object Pattern besteht aus sechs Komponenten:

  • Der Stellvertreter (Proxy) bietet eine Schnittstelle für die zugänglichen Mitgliedsfunktionen des Active Objects. Der Stellvertreter stößt die Erstellung einer Anfrage in der Aktivierungsliste an. Der Stellvertreter wird im Client-Thread ausgeführt.
  • Die Methodenanforderungsklasse (method request) definiert die Schnittstelle für die Methode, die auf einem Active Object ausgeführt wird. Diese Schnittstelle enthält auch Guard-Methoden, die anzeigen, ob der Auftrag zur Ausführung bereit ist. Die Anfrage enthält alle Kontextinformationen, die später verwendet werden sollen.
  • Die Aktivierungsliste (activation list) verwaltet die anstehenden Anfragen. Die Aktivierungsliste entkoppelt den Thread des Clients vom Thread des Active Objects. Der Stellvertreter fügt das Anforderungsobjekt ein und der Scheduler entfernt es wieder. Daher muss der Zugriff auf die Aktivierungsliste serialisiert werden.
  • Der Scheduler läuft im Thread des Active Objects und entscheidet, welche Anfrage aus der Aktivierungsliste als nächstes ausgeführt wird. Der Scheduler wertet die Guards der Anfrage aus, um festzustellen, ob die Anfrage ausgeführt werden kann.
  • Der Server implementiert das Active Object und läuft im Thread des Active Objects. Der Server implementiert die Schnittstelle der Methodenanforderung, und der Scheduler ruft seine Mitgliedsfunktionen auf.
  • Der Stellvertreter erstellt den Future. Dieser ist nur notwendig, wenn die Anfrage ein Ergebnis liefert. Daher erhält der Client den Future und kann das Ergebnis des Methodenaufrufs auf dem Active Object abrufen. Der Client kann auf das Ergebnis warten oder es abfragen.

Das dynamische Verhalten des Active Objects besteht aus drei Phasen:

  1. Anfrageerstellung und Scheduling: Der Client ruft eine Methode des Stellvertreters auf. Der Stellvertreter erstellt eine Anfrage und gibt sie an den Scheduler weiter. Der Scheduler reiht die Anfrage in die Aktivierungsliste ein. Außerdem gibt der Stellvertreter einen Future an den Client zurück, wenn die Anfrage ein Ergebnis liefert.
  2. Ausführen der Mitgliedsfunktion: Der Scheduler bestimmt, welche Anfrage lauffähig wird, indem er die Guard-Methode der Anfrage auswertet. Er streicht die Anfrage aus der Aktivierungsliste und schickt sie an den Server.
  3. Fertigstellung: Wenn die Anfrage ein Ergebnis liefert, wird es im Future gespeichert. Der Client kann nach dem Ergebnis fragen. Wenn der Client das Ergebnis hat, können die Anfrage und der Future gelöscht werden.

Die folgende Abbildung zeigt die Abfolge der Nachrichten.

Bevor ich ein minimales Beispiel für das Active Object vorstelle, hier eine kurze Auflistung seiner Vor- und Nachteile:

  • Die Synchronisierung ist nur auf dem Thread des Active Objects erforderlich, nicht aber auf den Threads des Clients.
  • Klare Trennung zwischen dem Client (Benutzer) und dem Server (Implementierer). Die Herausforderungen der Synchronisierung liegen auf der Seite des Implementierers.
  • Erhöhter Durchsatz des Systems aufgrund der asynchronen Ausführung der Anfragen. Der Aufruf von verarbeitungsintensiven Mitgliedsfunktionen blockiert nicht das gesamte System.
  • Der Scheduler kann verschiedene Strategien anwenden, um die anstehenden Anfragen auszuführen. Wenn dies der Fall ist, können die Aufträge in einer anderen Reihenfolge ausgeführt werden als sie in der Warteschlange stehen.
  • Wenn die Anfragen zu feinkörnig sind, kann die Performanz (das Leistungsverhalten) des Active Object Pattern, wie etwa des Stellvertreters, der Aktivierungsliste und des Schedulers, übermäßig hoch sein.
  • Aufgrund der Scheduler-Strategie und des Schedulings des Betriebssystems ist die Fehlersuche im Active Object oft schwierig. Dies gilt vor allem, wenn die Reihenfolge der Auftragsausführung sich von der der Auftragserstellung unterscheidet.

Das folgende Beispiel zeigt eine vereinfachte Implementierung des Active Objects. Insbesondere definiere ich keine Schnittstelle für die Methodenanforderungen an das Active Object, die der Stellvertreter und der Server implementieren sollten. Außerdem führt der Scheduler den nächsten Job aus, wenn er dazu aufgefordert wird, und die run-Mitgliedsfunktion des Active Objects erstellt die Threads.

Die beteiligten Datentypen future<vector<future<pair<bool, int>>>> fallen oft recht lang aus. Um die Lesbarkeit zu verbessern, habe ich stark auf using-Deklarationen zurückgegriffen (Zeile 1). Dieses Beispiel setzt eine solide Kenntnis von Promises und Futures in C++ voraus. In meinen Artikeln über Tasks finden sich dazu weitere Details.

// activeObject.cpp

#include <algorithm>
#include <deque>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <thread>
#include <utility>
#include <vector>

using std::async;                                      // (1)
using std::boolalpha;
using std::cout;
using std::deque;
using std::distance;
using std::for_each;
using std::find_if;
using std::future;
using std::lock_guard;
using std::make_move_iterator;
using std::make_pair;
using std::move;
using std::mt19937;
using std::mutex;
using std::packaged_task;
using std::pair;
using std::random_device;
using std::sort;
using std::jthread;
using std::uniform_int_distribution;
using std::vector;

class IsPrime {                                         // (8)
 public:
  IsPrime(int num): numb{num} {} 
  pair<bool, int> operator()() {
    for (int j = 2; j * j <= numb; ++j){
      if (numb % j == 0) return make_pair(false, numb);
    }
    return make_pair(true, numb);
  }
 private:
    int numb;       
};

class ActiveObject {
 public:
    
  future<pair<bool, int>> enqueueTask(int i) {
    IsPrime isPrime(i);
    packaged_task<pair<bool, int>()> newJob(isPrime);
    auto isPrimeFuture = newJob.get_future();
    {
      lock_guard<mutex> lockGuard(activationListMutex);
      activationList.push_back(move(newJob));            // (6)
    }
    return isPrimeFuture;
  }

  void run() {
    std::jthread j([this] {                               // (12)
      while ( !runNextTask() );                           // (13)
    });
  }

 private:

  bool runNextTask() {                                     // (14)
    lock_guard<mutex> lockGuard(activationListMutex);
    auto empty = activationList.empty();
    if (!empty) {                                           // (15)
      auto myTask= std::move(activationList.front());
      activationList.pop_front();
      myTask();
    }
    return empty;
  }

  deque<packaged_task<pair<bool, int>()>> activationList;      //(7)
  mutex activationListMutex;
};

vector<int> getRandNumbers(int number) {
  random_device seed;
  mt19937 engine(seed());
  uniform_int_distribution<> dist(1'000'000, 1'000'000'000);  // (4)
  vector<int> numbers;
  for (long long i = 0 ; i < number; ++i) numbers.push_back(dist(engine)); 
  return numbers;
}

future<vector<future<pair<bool, int>>>> getFutures(ActiveObject& activeObject, 
                                                   int numberPrimes) {
  return async([&activeObject, numberPrimes] {
    vector<future<pair<bool, int>>> futures;
    auto randNumbers = getRandNumbers(numberPrimes);      // (3)
    for (auto numb: randNumbers){
      futures.push_back(activeObject.enqueueTask(numb));  // (5)
    }
    return futures;
  });
}
    

int main() {
    
  cout << boolalpha << '\n';
    
  ActiveObject activeObject;
        
  // a few clients enqueue work concurrently                  // (2)
  auto client1 = getFutures(activeObject, 1998);
  auto client2 = getFutures(activeObject, 2003);
  auto client3 = getFutures(activeObject, 2011);
  auto client4 = getFutures(activeObject, 2014);
  auto client5 = getFutures(activeObject, 2017);
    
  // give me the futures                                      // (9)
  auto futures = client1.get();
  auto futures2 = client2.get();
  auto futures3 = client3.get();
  auto futures4 = client4.get();
  auto futures5 = client5.get();
    
  // put all futures together                                 // (10)
  futures.insert(futures.end(),make_move_iterator(futures2.begin()), 
                               make_move_iterator(futures2.end()));
    
  futures.insert(futures.end(),make_move_iterator(futures3.begin()), 
                               make_move_iterator(futures3.end()));
    
  futures.insert(futures.end(),make_move_iterator(futures4.begin()), 
                               make_move_iterator(futures4.end()));
    
  futures.insert(futures.end(),make_move_iterator(futures5.begin()), 
                               make_move_iterator(futures5.end()));
        
  // run the promises                                         // (11)
  activeObject.run();
    
  // get the results from the futures
  vector<pair<bool, int>> futResults;
  futResults.reserve(futures.size());
  for (auto& fut: futures) futResults.push_back(fut.get());   // (16)
    
  sort(futResults.begin(), futResults.end());                 // (17)
    
  // separate the primes from the non-primes
  auto prIt = find_if(futResults.begin(), futResults.end(), 
                      [](pair<bool, int> pa){ return pa.first == true; });
 
  cout << "Number primes: " << distance(prIt, futResults.end()) << '\n';     // (19)
  cout << "Primes:" << '\n';
  for_each(prIt, futResults.end(), [](auto p){ cout << p.second << " ";} );  // (20)
    
  cout << "\n\n";
    
  cout << "Number no primes: " << distance(futResults.begin(), prIt) << '\n'; // (18)
  cout << "No primes:" << '\n';
  for_each(futResults.begin(), prIt, [](auto p){ cout << p.second << " ";} );
    
  cout << '\n';
    
}

Der Grundgedanke des Beispiels ist zunächst, dass die Clients gleichzeitig Aufträge in die Aktivierungsliste einreihen können. Der Server bestimmt, welche Zahlen Primzahlen sind und die Aktivierungsliste ist Teil des Active Objects. Das Active Object führt die Aufträge, die in der Aktivierungsliste stehen, in einem separaten Thread aus und die Clients können die Ergebnisse abfragen.

Die fünf Clients stellen die Arbeit (Zeile 2) über die Funktion getFutures in die Warteschlange des activeObjects. getFutures nimmt das activeObject und eine Zahl numberPrimes entgegen. numberPrimes generiert Zufallszahlen (Zeile 3) zwischen 1'000'000 und 1'000'000'000 (Zeile 4) und schiebt sie auf den Rückgabewert: vector<future<pair<bool, int>>. future<pair<bool, int> enthält ein bool und ein int. Das bool gibt an, ob die Zahl eine Primzahl ist. Schauen wir uns die Zeile (5) genauer an: futures.push_back(activeObject.enqueueTask(numb)). Dieser Aufruf löst aus, dass eine neue Aufgabe in die Aktivierungsliste eingetragen wird (Zeile 6). Alle Aufrufe auf der Aktivierungsliste müssen geschützt werden. Die Aktivierungsliste ist eine deque von Promises (Zeile 7): deque<packaged_task<pair<bool, int>()>>. Jedes Promise führt beim Aufruf das Funktionsobjekt IsPrime (Zeile 8) aus. Der Rückgabewert ist ein Paar aus einem bool und einem int. Das bool gibt an, ob die Zahl int eine Primzahl ist.

Jetzt sind die Arbeitspakete vorbereitet, die Berechnung kann beginnen. Alle Clients geben in Zeile (9) ihre Handles zu den zugehörigen Futures zurück. Das Zusammenfassen aller Futures (Zeile 10) erleichtert die Arbeit. Der Aufruf activeObject.run() in Zeile (11) startet die Ausführung. Die Mitgliedsfunktion run (Zeile 12) erstellt den Thread, um die Mitgliedsfunktion runNextTask (Zeile 13) auszuführen. runNextTask (Zeile 14) stellt fest, ob die deque nicht leer ist (Zeile 15) und erstellt die neue Aufgabe. Durch den Aufruf von futResults.push_back(fut.get()) (Zeile 16) auf jedem Future werden alle Ergebnisse angefordert und auf futResults geschoben. Zeile (17) sortiert den Vektor der Paare: vector<pair<bool, int>>. In den restlichen Zeilen wird die Berechnung dargestellt. Der Iterator prIt in Zeile (18) enthält den ersten Iterator zu einem Paar, das eine Primzahl hat.

Der folgende Screenshot zeigt die Anzahl der Primzahlen distance(prIt, futResults.end()) (Zeile 19) und die Primzahlen (Zeile 20). Es werden nur die ersten Nicht-Primzahlen angezeigt:

Das Active Object und das Monitor Object synchronisieren und planen den Aufruf von Mitgliedsfunktionen. Der Hauptunterschied besteht darin, dass das Active Object seine Mitgliedsfunktion in einem anderen Thread ausführt, während sich das Monitor Object im selben Thread wie der Client befindet. In meinem nächsten Artikel werde ich das Monitor Object genauer vorstellen. (map)