zurück zum Artikel

Patterns in der Softwarearchitektur: Das Active Object

Rainer Grimm

(Bild: Dmytro Zinkevych/Shutterstock.com)

Das Design Pattern Active Object entkoppelt die Methodenausführung vom Methodenaufruf für Objekte, die sich jeweils in einem eigenen Thread befinden.

Patterns sind eine wichtige Abstraktion in der modernen Softwareentwicklung und Softwarearchitektur. Sie bieten eine klar definierte Terminologie, eine saubere Dokumentation und das Lernen von den Besten. Das zu den Concurrency Patterns gehörende Active Object beschreibt ein Wikipedia-Eintrag folgendermaßen [1]: "The active object design pattern decouples method execution from method invocation for objects that each reside in their own thread of control.The goal is to introduce concurrency, by using asynchronous method invocation and a scheduler for handling requests."

Das Active Object entkoppelt den Methodenaufruf von der Methodenausführung. Der Methodenaufruf wird auf dem Client-Thread ausgeführt, die Methodenausführung jedoch auf dem Active Object. Das Active Object hat seinen Thread und eine Liste von Methodenanforderungsobjekten (kurze Anfragen), die ausgeführt werden sollen. Der Methodenaufruf des Clients reiht die Anfragen in der Liste des Active Objects ein. Die Anfragen werden an den Server weitergeleitet.

Modernes C++ – Rainer Grimm

Rainer Grimm ist seit vielen Jahren als Softwarearchitekt, Team- und Schulungsleiter tätig. Er schreibt gerne Artikel zu den Programmiersprachen C++, Python und Haskell, spricht aber auch gerne und häufig auf Fachkonferenzen. Auf seinem Blog Modernes C++ beschäftigt er sich intensiv mit seiner Leidenschaft C++.

Das Active Object ist auch bekannt als Concurrency Objekt

Wenn viele Threads synchronisiert auf ein gemeinsames Objekt zugreifen, müssen die folgenden Herausforderungen gelöst werden:

Das Active Object Pattern besteht aus sechs Komponenten:

Das dynamische Verhalten des Active Objects besteht aus drei Phasen:

  1. Anfrageerstellung und Scheduling: Der Client ruft eine Methode des Stellvertreters auf. Der Stellvertreter erstellt eine Anfrage und gibt sie an den Scheduler weiter. Der Scheduler reiht die Anfrage in die Aktivierungsliste ein. Außerdem gibt der Stellvertreter einen Future an den Client zurück, wenn die Anfrage ein Ergebnis liefert.
  2. Ausführen der Mitgliedsfunktion: Der Scheduler bestimmt, welche Anfrage lauffähig wird, indem er die Guard-Methode der Anfrage auswertet. Er streicht die Anfrage aus der Aktivierungsliste und schickt sie an den Server.
  3. Fertigstellung: Wenn die Anfrage ein Ergebnis liefert, wird es im Future gespeichert. Der Client kann nach dem Ergebnis fragen. Wenn der Client das Ergebnis hat, können die Anfrage und der Future gelöscht werden.

Die folgende Abbildung zeigt die Abfolge der Nachrichten.

Bevor ich ein minimales Beispiel für das Active Object vorstelle, hier eine kurze Auflistung seiner Vor- und Nachteile:

Das folgende Beispiel zeigt eine vereinfachte Implementierung des Active Objects. Insbesondere definiere ich keine Schnittstelle für die Methodenanforderungen an das Active Object, die der Stellvertreter und der Server implementieren sollten. Außerdem führt der Scheduler den nächsten Job aus, wenn er dazu aufgefordert wird, und die run-Mitgliedsfunktion des Active Objects erstellt die Threads.

Die beteiligten Datentypen future<vector<future<pair<bool, int>>>> fallen oft recht lang aus. Um die Lesbarkeit zu verbessern, habe ich stark auf using-Deklarationen zurückgegriffen (Zeile 1). Dieses Beispiel setzt eine solide Kenntnis von Promises und Futures in C++ voraus. In meinen Artikeln über Tasks [2] finden sich dazu weitere Details.

// activeObject.cpp

#include <algorithm>
#include <deque>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <thread>
#include <utility>
#include <vector>

using std::async;                                      // (1)
using std::boolalpha;
using std::cout;
using std::deque;
using std::distance;
using std::for_each;
using std::find_if;
using std::future;
using std::lock_guard;
using std::make_move_iterator;
using std::make_pair;
using std::move;
using std::mt19937;
using std::mutex;
using std::packaged_task;
using std::pair;
using std::random_device;
using std::sort;
using std::jthread;
using std::uniform_int_distribution;
using std::vector;

class IsPrime {                                         // (8)
 public:
  IsPrime(int num): numb{num} {} 
  pair<bool, int> operator()() {
    for (int j = 2; j * j <= numb; ++j){
      if (numb % j == 0) return make_pair(false, numb);
    }
    return make_pair(true, numb);
  }
 private:
    int numb;       
};

class ActiveObject {
 public:
    
  future<pair<bool, int>> enqueueTask(int i) {
    IsPrime isPrime(i);
    packaged_task<pair<bool, int>()> newJob(isPrime);
    auto isPrimeFuture = newJob.get_future();
    {
      lock_guard<mutex> lockGuard(activationListMutex);
      activationList.push_back(move(newJob));            // (6)
    }
    return isPrimeFuture;
  }

  void run() {
    std::jthread j([this] {                               // (12)
      while ( !runNextTask() );                           // (13)
    });
  }

 private:

  bool runNextTask() {                                     // (14)
    lock_guard<mutex> lockGuard(activationListMutex);
    auto empty = activationList.empty();
    if (!empty) {                                           // (15)
      auto myTask= std::move(activationList.front());
      activationList.pop_front();
      myTask();
    }
    return empty;
  }

  deque<packaged_task<pair<bool, int>()>> activationList;      //(7)
  mutex activationListMutex;
};

vector<int> getRandNumbers(int number) {
  random_device seed;
  mt19937 engine(seed());
  uniform_int_distribution<> dist(1'000'000, 1'000'000'000);  // (4)
  vector<int> numbers;
  for (long long i = 0 ; i < number; ++i) numbers.push_back(dist(engine)); 
  return numbers;
}

future<vector<future<pair<bool, int>>>> getFutures(ActiveObject& activeObject, 
                                                   int numberPrimes) {
  return async([&activeObject, numberPrimes] {
    vector<future<pair<bool, int>>> futures;
    auto randNumbers = getRandNumbers(numberPrimes);      // (3)
    for (auto numb: randNumbers){
      futures.push_back(activeObject.enqueueTask(numb));  // (5)
    }
    return futures;
  });
}
    

int main() {
    
  cout << boolalpha << '\n';
    
  ActiveObject activeObject;
        
  // a few clients enqueue work concurrently                  // (2)
  auto client1 = getFutures(activeObject, 1998);
  auto client2 = getFutures(activeObject, 2003);
  auto client3 = getFutures(activeObject, 2011);
  auto client4 = getFutures(activeObject, 2014);
  auto client5 = getFutures(activeObject, 2017);
    
  // give me the futures                                      // (9)
  auto futures = client1.get();
  auto futures2 = client2.get();
  auto futures3 = client3.get();
  auto futures4 = client4.get();
  auto futures5 = client5.get();
    
  // put all futures together                                 // (10)
  futures.insert(futures.end(),make_move_iterator(futures2.begin()), 
                               make_move_iterator(futures2.end()));
    
  futures.insert(futures.end(),make_move_iterator(futures3.begin()), 
                               make_move_iterator(futures3.end()));
    
  futures.insert(futures.end(),make_move_iterator(futures4.begin()), 
                               make_move_iterator(futures4.end()));
    
  futures.insert(futures.end(),make_move_iterator(futures5.begin()), 
                               make_move_iterator(futures5.end()));
        
  // run the promises                                         // (11)
  activeObject.run();
    
  // get the results from the futures
  vector<pair<bool, int>> futResults;
  futResults.reserve(futures.size());
  for (auto& fut: futures) futResults.push_back(fut.get());   // (16)
    
  sort(futResults.begin(), futResults.end());                 // (17)
    
  // separate the primes from the non-primes
  auto prIt = find_if(futResults.begin(), futResults.end(), 
                      [](pair<bool, int> pa){ return pa.first == true; });
 
  cout << "Number primes: " << distance(prIt, futResults.end()) << '\n';     // (19)
  cout << "Primes:" << '\n';
  for_each(prIt, futResults.end(), [](auto p){ cout << p.second << " ";} );  // (20)
    
  cout << "\n\n";
    
  cout << "Number no primes: " << distance(futResults.begin(), prIt) << '\n'; // (18)
  cout << "No primes:" << '\n';
  for_each(futResults.begin(), prIt, [](auto p){ cout << p.second << " ";} );
    
  cout << '\n';
    
}

Der Grundgedanke des Beispiels ist zunächst, dass die Clients gleichzeitig Aufträge in die Aktivierungsliste einreihen können. Der Server bestimmt, welche Zahlen Primzahlen sind und die Aktivierungsliste ist Teil des Active Objects. Das Active Object führt die Aufträge, die in der Aktivierungsliste stehen, in einem separaten Thread aus und die Clients können die Ergebnisse abfragen.

Die fünf Clients stellen die Arbeit (Zeile 2) über die Funktion getFutures in die Warteschlange des activeObjects. getFutures nimmt das activeObject und eine Zahl numberPrimes entgegen. numberPrimes generiert Zufallszahlen (Zeile 3) zwischen 1'000'000 und 1'000'000'000 (Zeile 4) und schiebt sie auf den Rückgabewert: vector<future<pair<bool, int>>. future<pair<bool, int> enthält ein bool und ein int. Das bool gibt an, ob die Zahl eine Primzahl ist. Schauen wir uns die Zeile (5) genauer an: futures.push_back(activeObject.enqueueTask(numb)). Dieser Aufruf löst aus, dass eine neue Aufgabe in die Aktivierungsliste eingetragen wird (Zeile 6). Alle Aufrufe auf der Aktivierungsliste müssen geschützt werden. Die Aktivierungsliste ist eine deque von Promises (Zeile 7): deque<packaged_task<pair<bool, int>()>>. Jedes Promise führt beim Aufruf das Funktionsobjekt IsPrime (Zeile 8) aus. Der Rückgabewert ist ein Paar aus einem bool und einem int. Das bool gibt an, ob die Zahl int eine Primzahl ist.

Jetzt sind die Arbeitspakete vorbereitet, die Berechnung kann beginnen. Alle Clients geben in Zeile (9) ihre Handles zu den zugehörigen Futures zurück. Das Zusammenfassen aller Futures (Zeile 10) erleichtert die Arbeit. Der Aufruf activeObject.run() in Zeile (11) startet die Ausführung. Die Mitgliedsfunktion run (Zeile 12) erstellt den Thread, um die Mitgliedsfunktion runNextTask (Zeile 13) auszuführen. runNextTask (Zeile 14) stellt fest, ob die deque nicht leer ist (Zeile 15) und erstellt die neue Aufgabe. Durch den Aufruf von futResults.push_back(fut.get()) (Zeile 16) auf jedem Future werden alle Ergebnisse angefordert und auf futResults geschoben. Zeile (17) sortiert den Vektor der Paare: vector<pair<bool, int>>. In den restlichen Zeilen wird die Berechnung dargestellt. Der Iterator prIt in Zeile (18) enthält den ersten Iterator zu einem Paar, das eine Primzahl hat.

Der folgende Screenshot zeigt die Anzahl der Primzahlen distance(prIt, futResults.end()) (Zeile 19) und die Primzahlen (Zeile 20). Es werden nur die ersten Nicht-Primzahlen angezeigt:

Das Active Object und das Monitor Object synchronisieren und planen den Aufruf von Mitgliedsfunktionen. Der Hauptunterschied besteht darin, dass das Active Object seine Mitgliedsfunktion in einem anderen Thread ausführt, während sich das Monitor Object im selben Thread wie der Client befindet. In meinem nächsten Artikel werde ich das Monitor Object genauer vorstellen. (map [3])


URL dieses Artikels:
https://www.heise.de/-9186720

Links in diesem Artikel:
[1] https://en.wikipedia.org/wiki/Active_object
[2] https://www.grimm-jaud.de/index.php/blog/tag/tasks
[3] mailto:map@ix.de