Wird geladen...

Asynchrone Streams in C# (IAsyncEnumerable)

Lernen Sie asynchrone Streams in C# mit IAsyncEnumerable kennen, um Daten schrittweise effizient zu verarbeiten.

In modernen .NET-Anwendungen kommen Daten oft stückweise und über die Zeit: Netzwerkströme, Logzeilen, Sensordaten… IAsyncEnumerable<T> ermöglicht es, solche Datenströme asynchron und lazy (verzögert) zu verarbeiten. Auf der Produzentenseite werden async + yield return verwendet, auf der Konsumentenseite await foreach.


Was ist IAsyncEnumerable?

IAsyncEnumerable<T> ermöglicht es, eine Datenfolge asynchron zu erzeugen und zu konsumieren. Im Gegensatz zu IEnumerable<T> kann bei jedem Schritt ein Warten (z. B. aufgrund einer I/O-Verzögerung) auftreten. Zum Konsumieren wird await foreach verwendet; bei jedem Schritt wird gewartet, bis das nächste Element verfügbar ist.


// Einfache Verwendung
await foreach (var item in GetNumbersAsync())
{
    Console.WriteLine(item);
}

Asynchroner Iterator: async + yield return

Wenn Sie eine asynchrone Erzeugermethode schreiben, ist der Rückgabetyp IAsyncEnumerable<T>. Innerhalb der Methode können Sie await verwenden und mit yield return Elemente zurückgeben.


using System;
using System.Collections.Generic;
using System.Threading.Tasks;

static async IAsyncEnumerable<int> GetNumbersAsync()
{
    for (int i = 1; i <= 5; i++)
    {
        await Task.Delay(500); // I/O oder zeitaufwändige Operation simulieren
        yield return i;        // alle 500 ms eine neue Zahl
    }
}

// Konsumieren
await foreach (var n in GetNumbersAsync())
{
    Console.WriteLine($"Eingetroffen: {n}");
}

Konsumieren mit await foreach

await foreach liest Elemente aus einem asynchronen Stream, ohne die UI oder den Thread zu blockieren. In jedem Schritt wird gewartet, bis Daten verfügbar sind; danach wird die Schleife fortgesetzt.


await foreach (var eintrag in ReadLogEntriesAsync())
{
    // Verarbeite den Eintrag, sobald er verfügbar ist
    Process(eintrag);
}

Fehlerbehandlung und using

Bei der Verarbeitung asynchroner Streams können Sie ganz normal try/catch verwenden. Wenn im Stream oder beim Konsumieren ein Fehler auftritt, wird dieser während await foreach ausgelöst.


try
{
    await foreach (var item in GetNumbersAsync())
        Console.WriteLine(item);
}
catch (Exception ex)
{
    Console.WriteLine("Fehler: " + ex.Message);
}

Für asynchrone Ressourcen können Sie IAsyncDisposable und await using einsetzen.


await using var resource = await OpenResourceAsync();
await foreach (var x in resource.StreamAsync())
{
    // ...
}

Abbruchunterstützung (CancellationToken)

Um lang laufende Streams zu stoppen, kann der Konsum abgebrochen werden. Auf der Produzentenseite kann der Token mit [EnumeratorCancellation] erfasst und regelmäßig überprüft werden.


using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

static async IAsyncEnumerable<int> CountAsync(
    int start, int count, [EnumeratorCancellation] CancellationToken ct = default)
{
    for (int i = 0; i < count; i++)
    {
        ct.ThrowIfCancellationRequested();
        await Task.Delay(300, ct);
        yield return start + i;
    }
}

// Konsumieren
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); // nach 1 Sekunde abbrechen
await foreach (var n in CountAsync(10, 100, cts.Token))
{
    Console.WriteLine(n);
}

Vergleich: IEnumerable<Task<T>> vs IAsyncEnumerable<T>


Asynchrones zeilenweises Lesen einer Datei (Wrapper-Beispiel)

Das folgende Beispiel erzeugt einen Stream, der eine Datei asynchron zeilenweise liest. Jede Zeile wird mit yield return an den Konsumenten weitergegeben, sobald sie verfügbar ist.


using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;

static async IAsyncEnumerable<string> ReadLinesAsync(string path)
{
    using var fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read,
                                  bufferSize: 4096, useAsync: true);
    using var sr = new StreamReader(fs);

    while (!sr.EndOfStream)
    {
        var line = await sr.ReadLineAsync(); // sobald eine Zeile verfügbar ist
        if (line is not null)
            yield return line;
    }
}

// Konsumieren
await foreach (var line in ReadLinesAsync("log.txt"))
{
    Console.WriteLine(line);
}

Einfache Filterung über einem Stream (Pipeline)

Asynchrone Streams können zu einer Pipeline verbunden werden: erzeugen → filtern → transformieren → konsumieren.


static async IAsyncEnumerable<int> FilterAsync(IAsyncEnumerable<int> quelle)
{
    await foreach (var n in quelle)
    {
        if (n % 2 == 0)          // gerade Zahlen durchlassen
            yield return n * 10; // transformieren
    }
}

// Verwendung
await foreach (var x in FilterAsync(GetNumbersAsync()))
{
    Console.WriteLine(x); // 20, 40, ...
}

UI-Szenario: await foreach in WPF

In WPF frieren lang laufende Streams die Benutzeroberfläche nicht ein; die UI kann bei jedem neuen Element aktualisiert werden. Wenn UI-Aktualisierungen erforderlich sind, verwenden Sie Dispatcher, um auf den UI-Thread zurückzukehren.


// Beispiel: Sensordaten aus einem Stream lesen und in die UI schreiben
await foreach (var v in ReadSensorAsync())
{
    Dispatcher.Invoke(() => txtStatus.Text = $"Letzter Wert: {v}");
}

Tipps und Hinweise


TL;DR

  • IAsyncEnumerable<T> dient zum Arbeiten mit asynchronen und lazy Datenströmen.
  • Produzent: async + yield return; Konsument: await foreach.
  • Verwenden Sie [EnumeratorCancellation] und CancellationToken für Abbruch.
  • IAsyncEnumerable<T> kann im Vergleich zu IEnumerable<Task<T>> Speicher- und Zeitvorteile bieten.
  • Wenn UI-Aktualisierungen erforderlich sind, kehren Sie mit Dispatcher zum UI-Thread zurück.

Ähnliche Artikel