Reactive Extensions (Rx)–Asynchrone Prozesse

by Gregor Biswanger 9. August 2011 12:45

Rx_Logo_small

Um einen Prozess asynchron auszuführen, werden diese in Threads gepackt. Ab dem .NET-Framework 4.0 kann man sogar seine Prozesse mittels Tasks-Klasse auf mehrere CPU-Kerne parallelisieren. Was allerdings nur bei wirklich komplexen und lang andauernden Prozessen sinnvoll ist. Dazu aber mehr bei einem eigenen Blog-Post.

Ein großes Problem bei Threads ist die gegenseitige Abhängigkeit der Prozesse. Zum Beispiel startet Prozess 1 den Prozess 2 in einem eigenen Thread. So kann Prozess 2 eigenständig vom System verarbeitet werden, während Prozess 1 sich auch erst um seine Details kümmert. Doch gegen Ende von Prozess 1 werden die Ergebnisse aus Prozess 2 benötigt. Jedoch braucht Prozess 2 noch um einiges länger. Das Problem hierbei ist, das Prozess 1 aber trotzdem weiterverarbeitet wird, ohne direkt auf den Prozess 2-Thread zu warten. Das wäre eine solche Abhängigkeit, wie zu Beginn erläutert.

Das folgende Beispiel zeigt, wie ein falsches Ergebnis angezeigt wird, weil die zu erwartenden Daten noch nicht von Prozess 2 geliefert wurden. Der braucht nämlich noch 2 Sekunden länger.

 

   1:  internal class Program
   2:  {
   3:      public static int ResultValue;
   4:   
   5:      private static void Main(string[] args)
   6:      {
   7:          Console.WriteLine("Prozess 1 läuft... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
   8:   
   9:          Thread prozess2Thread = new Thread(Calculate);
  10:          prozess2Thread.Start();
  11:   
  12:          Console.WriteLine(ResultValue + 5);
  13:   
  14:          Console.WriteLine("End...");
  15:          Console.ReadLine();
  16:      }
  17:   
  18:      private static void Calculate()
  19:      {
  20:          Console.WriteLine("Calculating... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
  21:          Thread.Sleep(TimeSpan.FromSeconds(2));
  22:          ResultValue = 5;
  23:      }
  24:  }

Listing 1 – Asynchrone Probleme durch Abhängigkeiten zu ResultValue

 

SNAGHTML14654812

Abbildung 1 – Falsches Ergebnis durch die Abhängigkeit zu Process 2

 

Um auf einen laufenden Thread zu warten, bietet die Thread-Klasse die Join-Methode. Wird diese nach dem Start aufgerufen, bleibt der Prozess 1 genau an dieser Stelle so lange stehen, bis Prozess 2 erfolgreich abgearbeitet wurde.

 

   1:  internal class Program
   2:  {
   3:      public static int ResultValue;
   4:   
   5:      private static void Main(string[] args)
   6:      {
   7:          Console.WriteLine("Prozess 1 läuft... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
   8:   
   9:          Thread prozess2Thread = new Thread(Calculate);
  10:          prozess2Thread.Start();
  11:          prozess2Thread.Join();
  12:   
  13:          Console.WriteLine(ResultValue + 5);
  14:   
  15:          Console.WriteLine("End...");
  16:          Console.ReadLine();
  17:      }
  18:   
  19:      private static void Calculate()
  20:      {
  21:          Console.WriteLine("Calculating... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
  22:          Thread.Sleep(TimeSpan.FromSeconds(2));
  23:          ResultValue = 5;
  24:      }
  25:  }

Listing 2 – Warten auf laufende Threads mittels Join-Methode

 

SNAGHTML14d99450

Abbildung 2 – Das richtige Ergebnis durch das Abwarten von Prozess 2

 

Wenn man allerdings möchte das Prozess 1 weiter macht, wenn nur eine Teilverarbeitung von Prozess 2 abgeschlossen ist, dann hilft die Klasse AutoResetEvent. Diese wird einmal Statisch gesetzt und bietet die Methode WaitOne. Auch hier wird der Prozess 1 so lange bei der Methode WaitOne gestoppt, bis eine Freigabe durch Prozess 2 explizit vergeben wird. Das geschieht über die Set-Methode der AutoResetEvent-Klasse.

 

   1:  internal class Program
   2:  {
   3:      public static AutoResetEvent ResetEvent = new AutoResetEvent(false);
   4:      public static int ResultValue;
   5:   
   6:      private static void Main(string[] args)
   7:      {
   8:          Console.WriteLine("Prozess 1 läuft... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
   9:   
  10:          Thread prozess2Thread = new Thread(Calculate);
  11:          prozess2Thread.Start();
  12:   
  13:          ResetEvent.WaitOne();
  14:   
  15:          Console.WriteLine(ResultValue + 5);
  16:   
  17:          Console.WriteLine("End...");
  18:          Console.ReadLine();
  19:      }
  20:   
  21:      private static void Calculate()
  22:      {
  23:          Console.WriteLine("Calculating... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
  24:          Thread.Sleep(TimeSpan.FromSeconds(2));
  25:          ResultValue = 5;
  26:   
  27:          ResetEvent.Set();
  28:      }
  29:  }

Listing 3 – Das Verwenden von AutoResetEvent

 

Jedoch ist das Problem mit der Abhängigkeit und der allgemeinen ResultValue-Variable noch gegeben. Umso komplexer die spätere Anforderung ist, desto wahrscheinlicher ist es hier, einen Fehler zu erhalten. Das Schlimme darin ist, bis man überhaupt auf den Fehler aufmerksam wird. Schließlich konnte Listing 1 auch ohne Probleme den Code ausführen. Es werden ja keine Exceptions ausgelöst. Wer mit TDD (Test-Driven Development) arbeitet, der kann zwar hier etwas schneller herausfinden „wo“ etwas gerade nicht stimmt, aber das Problem mit der Abhängigkeit wurde noch nicht gelöst.

Threads arbeiten mit keinen Methoden die einen Rückgabewert haben. Eher intern mit einem eigenen Datenstack. Um dennoch damit eine sichere Variante aufzubauen, müssen noch weitere Implementierungen dazu kommen. Der Source-Code wird dadurch aber nicht unbedingt angenehm lesbar. Für eine angenehmere und schlankere Lösung helfen uns die Reactive Extensions (Kurz Rx).

 

Rx – Prozesse im eigenen Thread ausführen

Mit Rx können genauso wie mit der Thread-Klasse alle Prozesse bei einem eigenen Thread ausgeführt werden. Das liegt daran, das Rx auch die Threading-Klasse verwendet. Dennoch bietet uns Rx eine „einfachere“ und „schönere“ Arbeit mit Threads. So auch mit der Start-Methode. Dieser gibt man mittels Lambda, oder direkt die auszuführende Instanz mit.

   1:  internal class Program
   2:  {
   3:      public static int ResultValue;
   4:   
   5:      private static void Main(string[] args)
   6:      {
   7:          Console.WriteLine("Prozess 1 läuft... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
   8:   
   9:          Observable.Start(Calculate);
  10:   
  11:          Console.WriteLine(ResultValue + 5);
  12:   
  13:          Console.WriteLine("End...");
  14:          Console.ReadLine();
  15:      }
  16:   
  17:      private static void Calculate()
  18:      {
  19:          Console.WriteLine("Calculating... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
  20:          Thread.Sleep(TimeSpan.FromSeconds(2));
  21:          ResultValue = 5;
  22:      }
  23:  }

Listing 4 – Prozess in einem eigenen Thread ausführen unter Rx

 

SNAGHTML14daa31f

Abbildung 3 – Mit Rx Prozesse in eigenen Threads ausführen

 

Möchte man nun das auch auf die Fertigstellung von Prozess 2 gewartet wird. Dann muss die First-Methode aufgerufen werden. Diese wird genauso wie die Join-Methode verwendet.

   1:  internal class Program
   2:  {
   3:      public static int ResultValue;
   4:   
   5:      private static void Main(string[] args)
   6:      {
   7:          Console.WriteLine("Prozess 1 läuft... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
   8:   
   9:          var observable = Observable.Start(Calculate);
  10:          observable.First();
  11:   
  12:          Console.WriteLine(ResultValue + 5);
  13:   
  14:          Console.WriteLine("End...");
  15:          Console.ReadLine();
  16:      }
  17:   
  18:      private static void Calculate()
  19:      {
  20:          Console.WriteLine("Calculating... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
  21:          Thread.Sleep(TimeSpan.FromSeconds(2));
  22:          ResultValue = 5;
  23:      }
  24:  }

Listing 5 - Warten auf laufende Threads mittels First-Methode

 

SNAGHTML14dcd97a

Abbildung 4 -  Das richtige Ergebnis durch das Abwarten von Prozess 2

 

Rx – Asynchrone Prozesse mittels ToAsync()-Methode

 

Wenn Listing 2 und Listing 5 verglichen werden, haben wir noch keinen wirklich Vorteil durch Rx gewinnen können. Das Ziel war die Abhängigkeit zur ResultValue-Variable zu lösen. In diesem Fall dient uns die ToAsync-Methode. Die Threads mit Return-Werten verarbeiten kann.

Es soll nochmal das Beispiel aus Listing 5 verwendet werden. Hier entfernen wir die ResultValue-Variable. Lassen uns eine IObservable<int> Instanz von der Observable-Klasse geben, die durch die ToAsync-Erweiterung erzeugt wird. Sobald ein Subscribe gesetzt wurde, wird automatisch von Rx die Calculate-Methode gefeuert. Das alles passiert auch in einem eigenständigen Thread. Auch hier können wir mittels First-Methode abwarten, bis Prozess 2 mit seiner Arbeit fertig ist.

   1:  internal class Program
   2:  {
   3:      private static void Main(string[] args)
   4:      {
   5:          Console.WriteLine("Prozess 1 läuft... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
   6:   
   7:          var observable = Observable.ToAsync<int>(Calculate)();
   8:          observable.Subscribe(resultValue => Console.WriteLine(resultValue + 5));
   9:          observable.First();
  10:   
  11:          Console.WriteLine("End...");
  12:          Console.ReadLine();
  13:      }
  14:   
  15:      private static int Calculate()
  16:      {
  17:          Console.WriteLine("Calculating... in Thread {0}", Thread.CurrentThread.ManagedThreadId);
  18:          Thread.Sleep(TimeSpan.FromSeconds(2));
  19:          return 5;
  20:      }
  21:  }

Listing 6 – Asynchronen Prozess mittels Rx regeln

 

SNAGHTML14deaaec

Abbildung 5 –  Das richtige Ergebnis durch das Abwarten von Prozess 2 mittels ToAsync

 

Fazit

Bei den Beispielen sieht man nicht unbedingt weniger Zeilen Code, jedoch umso umfangreicher das Projekt wird, desto mehr spürt man den Vorteil. Für die Datensynchronisierung kann auch eine eigenständige Methode ausgelagert werden. So läuft man nicht in Gefahr, dass eine Abhängigkeit öffentlich gehalten werden muss. Sehr Toll an Rx ist zudem, dass man mittels LINQ auch erst definieren kann, ab wann wirklich der Thread dann gestartet werden darf. Ist der Zustand erfüllt, dann wird Subscript.

 

Mein Tipp! Kostenloses Video-Training „Einführung in Reactive Extensions“.



Wenn ihnen der Artikel gefallen hat oder er für sie hilfreich war, bitten "kicken" sie ihn.
kick it on dotnet-kicks.de

Kommentare

Powered by BlogEngine.NET 1.4.5.0
Theme by Extensive SEO

Über den Autor

Gregor Biswanger

Microsoft MVP für Client App Dev
XING

Gregor Biswanger (Microsoft MVP für Client App Dev) ist freier Consultant, Trainer, Autor und Speaker.


Seine Schwerpunkte liegen im Bereich der .NET-Architektur, agilen Prozessen und XAML. Er veröffentlichte vor kurzem seine DVD´s mit Video-Trainings zum Thema „Meine erste Windows 8 App“, „Windows Store Apps mit XAML und C#“ und „WPF 4.5 und Silverlight 5“ bei Addison-Wesley von video2brain.


Biswanger ist auch im Auftrag von Intel GmbH als Technologieberater für die Intel Developer Zone aktiv und ist Leader bei der Ingolstädter .NET Developers Group (INdotNET). 

 

Video über mich:
http://www.youtube.com/watch?v=mx_6SiiLxjk


Basta! 2011 Speaker

CLIPer

MCTS
Windows SharePoint Services 3.0 – Application Development (MCTS)