Parallele Programmierung: Pythons Multiprocessing-Modul vs. PLINQ

Seite 3: Python, TPL und PLINQ

Inhaltsverzeichnis

Für die parallele Programmierung verfügt Python über unterschiedliche Module. Dazu gehört unter anderem os, das die Prozesse mittels Betriebssystem forken und parallel ausführen kann. Leider gibt es die Option nur für Unix- und Linux-Systeme und nicht für Windows, sodass keine Plattformunabhängigkeit entstehen kann. Das Modul threading hingegen löst das Problem des Global Interpreter Lock (GIL) nicht. Ein Programm wird in mehreren Threads ausgeführt, was unter Umständen sogar länger dauern kann als die serielle Arbeitsweise, weil es einen Overhead zur internen Kommunikation zwischen einzelnen Threads generiert. Um dieses Problem umgehen zu können, kam für die Implementierung der parallelen Berechnungen das plattformunabhängige Multiprocessing-Modul zum Einsatz. Setzt man es ein, werden die Python-Prozesse unabhängig voneinander gestartet. Dadurch lassen sich Leistungsfähigkeit und Möglichkeiten der einzelnen CPUs vollständig ausnutzen.

Das Modul multiprocessing hat viele unterschiedliche Funktionen (siehe offizielle Python-Dokumentation). Im Praxisbeispiel kam die Klasse Pool zum Einsatz, mit der sich die Implementierung des Parallelismus vergleichsweise leicht abbilden lässt:

1  import multiprocessing
2
3 # number of workers for process Pool (default all CPUs)
4 count_worker = multiprocessing.cpu_count()
5
6 # ----------------------------------------
7 # Create Multiproceccing Pool
8 # ----------------------------------------
9 pool = multiprocessing.Pool(count_worker)
10
11 #-----------------------------------------
12 # -= Map =-
13 #-----------------------------------------
14 # Put mapper-Function in the multiproceccing pool.
15 # The returning data are already reduced for each
16 # process. It's necessary to pool all this data together
17 mapped_connections = pool.map(mapper, input_files, chunksize=1)
18
19 # Output
20 print '\n=========================================\n'
21 print 'mapped_connections', mapped_connections, '\n'
22 print '=========================================\n'

Entwickler müssen die Anzahl der Prozesse definieren (Zeile 4), einen multiprocessing-Pool für ihre Verwaltung instanziieren (Zeile 9) und eine Funktion (zum Beispiel die Mapper-Funktion aus Abbildung 4) für die Map-Phase implementieren. Diese Funktion und eine Liste der Eingabeparameter sind an den Pool zu übergeben (Zeile 17). Den Rest erledigt das Modul multiprocessing. Es liest die Liste der Parameter ein, nimmt einen, übergibt ihn an die Funktion und startet letztere als einen unabhängigen Python-Prozess, und so geht es weiter, bis der Pool mit den parallelen Prozessen voll ist.

Wenn ein Prozess im Pool fertig ist, übergibt das System den nächsten Parameter aus der Liste an die Funktion und startet einen neuen, bis die Liste der Parameter abgearbeitet ist. Das Ergebnis der parallelen Verarbeitung ist eine Liste der Rückgabewerte der benutzerdefinierten Funktion. Beim MapReduce-Verfahren wird diese Liste sortiert und danach an die Reduce-Funktion übergeben, die auch genauso parallel ausgeführt werden kann. Im Groben sieht die gesamte Verarbeitung wie folgt aus:

1  import multiprocessing
2
3 # number of workers for process Pool (default all CPUs)
4 count_worker = multiprocessing.cpu_count()
5
6 # ----------------------------------------
7 # Create Multiproceccing Pool
8 # ----------------------------------------
9 pool = multiprocessing.Pool(count_worker)
10
11 #-----------------------------------------
12 # -= Map =-
13 #-----------------------------------------
14 # Put mapper-Function in the multiproceccing pool.
15 # The returning data are already reduced for each
16 # process. It's necessary to pool all this data together
17 mapped_connections = pool.map(mapper, input_files, chunksize=1)
18
19 # Group the mapped data from the different workers
20 partitioned_connections =
partition(itertools.chain(*mapped_connections))
21
22 #-----------------------------------------
23 # -= Reduce =-
24 #-----------------------------------------
25 # aggregate the partitioned data from the previos step
26 reduced_connections = pool.map(reducer, partitioned_connections)
27
28 pool.close() # no more tasks
29 pool.join() # wrap up current tasks
30
31 # Sort data descending order
32 reduced_connections.sort( key=operator.itemgetter(1) )
33 reduced_connections.reverse()
34
35 # output :)
36 print '=========================================\n'
37 print 'Connections from Oracle Listener Log File\n'
38 print '=========================================\n'
39 for connection, count in reduced_connections:
40 print "{0}\t{1}".format(connection, count)

Wie zu Anfang erwähnt, wurde die 20 GByte große Log-Datei in acht Stücke à 2,5 GByte geteilt und jede Datei in einem unabhängigen Prozess analysiert. Allerdings waren die Zwischenergebnisse danach immer noch recht groß, was zu enormem Speicherverbrauch führte. Deswegen schloss man am Ende der Map-Phase zur Performanceversbesserung für jede Map einen Reduktionsgang an. Danach wurden die Ergebnisse gruppiert (Zeile 20) und erneut reduziert (Zeile 26). Um Aussagen über die Perfomance treffen zu können, formulierten die Entwickler den Algorithmus mit der Task Parallel Library zusätzlich in C#.

Die Task Parallel Library (TPL) ist Microsofts Kernkomponente für die parallele Programmierung mit dem .NET Framework und seit Version 4.0 fester Bestandteil von letzterem. Parallel LINQ (PLINQ) ermöglicht es LINQ (Language Integrated Query, eine allgemeine Abfragesprache, die mit der Version 3.5 von .NET eingeführt wurde), Abfragen parallel auszuführen. Für die Parallelisierung verwendet sie intern TPL. Der Vorgang erfolgt für Entwickler transparent und sie müssen sich nicht um die Verwaltung der Parallelisierung kümmern. Zudem können sie mit wenigen Handgriffen den Quellcode ändern, sodass sich Verarbeitungen im Programm parallel ausführen lassen.

Im Crashkurs für die parallele Programmierung mit Visual Studio [a] ist gut erklärt, wie die TPL arbeitet und welche Funktionen zur Parallelisierung vorhanden sind. In dem Zusammenhang sind dort unter anderem die Schleifen Parallel.For und Parallel.ForEach dargestellt, die for und foreach ersetzen, damit sich im Programm nicht sequenziell sondern parallel arbeiten lässt. Außerdem ist in dem Kurs das MapReduce-Entwurfmuster als eine Klasse abgebildet, die als Grundgerüst für die Analyse der Oracle TNSLISTENER-Log-Dateien verwendet wurde:

...
using System.Diagnostics;

namespace Oracle_Listener_PLINQ_MapReduce_Test_00
{
// MS-Suggestion MapReduce Class s. das Buch
// Parallele Programmierung mit Visual Studio 2010 – Crashkurs
// Microsoft Press Deutschland 2011
// Druck-ISBN 978-3-86645-555-9
static class PLINQ
{
public static ParallelQuery<TResult> MapReduce<TSource, TMapped,
TKey, TResult>(
this ParallelQuery<TSource> source,
Func<TSource, IEnumerable<TMapped>> map,
Func<TMapped, TKey> keySelector,
Func<IGrouping<TKey, TMapped>, IEnumerable<TResult>> reduce)
{
return source.SelectMany(map)
.GroupBy(keySelector)
.SelectMany(reduce);
}
}
...

Sonst war die Implementierung in C# der in Python ähnlich.

Die Reduce-Phase wurde wie die Map-Phase für die parallele Ausführung programmiert und die kleine Applikation für die Windows-Konsole für einen 64-Bit-Prozessor kompiliert. Ohne diese Änderungen brach das Programm mit einer Out-of-Memory Exception ab.