C# Channelとは?System.Threading.Channelsの使い方を非同期キュー・スレッド間通信の実例で解説

はじめに

C#でバックグラウンド処理、非同期キュー、スレッド間通信を実装していると、次のような悩みにぶつかることがあります。

「複数の処理から安全にデータを追加したい」「キューに積まれたデータを非同期で順番に処理したい」「処理が追いつかないときにメモリが増え続けるのを防ぎたい」「lockThread.Sleepを使わず、自然なasync/awaitで書きたい」。

こうした場面で役立つのが、System.Threading.Channelsです。

C# Channelは、Producer/Consumerパターンを実装するための非同期データ受け渡し機構です。Microsoft Learnでも、System.Threading.Channels名前空間は、プロデューサーとコンシューマーの間で非同期的にデータを渡すための同期データ構造を提供すると説明されています。

この記事では、c# channelをテーマに、System.Threading.Channelsの基本、Channel<T>の使い方、非同期キューとしての実装例、スレッド間通信での活用、Bounded ChannelとUnbounded Channelの違い、実務でのベストプラクティスまで解説します。

1. C# Channelとは?System.Threading.Channelsでできること

1-1. C# Channelは非同期のProducer/Consumerを実装するための仕組み

C# Channelとは、簡単にいうと「非同期に使えるキュー」です。

ただし、単なるキューではありません。Channelは、データを作る側であるProducerと、データを処理する側であるConsumerを分離し、安全にデータを受け渡すための仕組みです。

たとえば、次のような構成を考えます。

C#
Producer 1 ─┐
Producer 2 ─┼─> Channel<T> > Consumer
Producer 3 ─┘

ProducerはChannelにデータを書き込み、ConsumerはChannelからデータを読み取ります。書き込み側と読み取り側は別々のタスクやスレッドで動かせます。

C# Channelの大きな特徴は、async/awaitと相性が良いことです。

C#
await channel.Writer.WriteAsync(item);
var item = await channel.Reader.ReadAsync();

キューが空なら読み取り側は非同期に待機し、キューが満杯なら書き込み側も非同期に待機できます。待機中にスレッドをブロックしないため、ASP.NET CoreやWorker Serviceのようなサーバーアプリケーションでも扱いやすいです。

1-2. QueueやConcurrentQueueとの違い

Queue<T>は通常のFIFOキューです。しかし、スレッドセーフではありません。複数スレッドから同時にアクセスする場合は、lockなどで保護する必要があります。

C#
private readonly Queue<string> _queue = new();
private readonly object _lock = new();

public void Enqueue(string item)
{
lock (_lock)
{
_queue.Enqueue(item);
}
}

一方、ConcurrentQueue<T>はスレッドセーフなキューです。複数スレッドから安全にEnqueueTryDequeueができます。

C#
private readonly ConcurrentQueue<string> _queue = new();

_queue.Enqueue("item");

if (_queue.TryDequeue(out var item))
{
Console.WriteLine(item);
}

ただし、ConcurrentQueue<T>には「データが来るまで非同期に待つ」という機能がありません。そのため、空のときにポーリングしたり、SemaphoreSlimと組み合わせたりする必要があります。

C#
while (!cancellationToken.IsCancellationRequested)
{
if (_queue.TryDequeue(out var item))
{
await ProcessAsync(item);
}
else
{
await Task.Delay(100, cancellationToken);
}
}

このようなポーリングは無駄が多く、遅延やCPU使用率の問題につながることがあります。

Channelを使うと、データがないときは自然に非同期待機できます。

C#
var item = await channel.Reader.ReadAsync(cancellationToken);
await ProcessAsync(item);

つまり、C# Channelは「スレッドセーフなキュー」に加えて、「非同期待機」「完了通知」「容量制限」「バックプレッシャー」まで扱える仕組みです。

1-3. スレッド間通信・非同期キュー・バックグラウンド処理で使われる理由

C# Channelは、次のような場面でよく使われます。

Web APIのリクエストを受けたあと、時間のかかる処理をバックグラウンドで実行したい場合。ログやイベントを非同期に蓄積して順番に処理したい場合。ファイル読み込み、変換、保存のようなパイプライン処理を作りたい場合。UIスレッドとバックグラウンドスレッドの間でデータを渡したい場合。

これらに共通しているのは、「データを作る処理」と「データを処理する処理」を分けたいという点です。

Channelを使うと、Producerは「Channelに入れる」ことだけに集中できます。Consumerは「Channelから取り出して処理する」ことだけに集中できます。両者の速度が違っても、Channelが間に入ることで、処理を疎結合にできます。

1-4. どんな悩みを解決できるのか

C# Channelを使うと、次のような悩みを解決しやすくなります。

lockを多用したくない。Thread.Sleepやポーリングを避けたい。ConcurrentQueue<T>だけでは非同期待機が書きづらい。処理の入口と出口を分離したい。一定数以上キューに積まれないようにしたい。アプリ終了時にキュー処理を安全に止めたい。複数Producer・複数Consumerの構成をシンプルに書きたい。

特に、C#で非同期キューを実装する場合、Channelは非常に有力な選択肢です。

2. System.Threading.Channelsの基本構成

2-1. Channel<T>・ChannelReader<T>・ChannelWriter<T>の役割

System.Threading.Channelsの中心になる型は、主に次の3つです。

Channel<T>は、読み書きできるChannel本体です。Microsoft Learnでは、Channel<T>は型Tの要素を読み書きできるチャネルの基底クラスと説明されています。

ChannelWriter<T>は、Channelにデータを書き込むためのオブジェクトです。

ChannelReader<T>は、Channelからデータを読み取るためのオブジェクトです。

基本的な作成コードは次のとおりです。

C#
using System.Threading.Channels;

var channel = Channel.CreateUnbounded<string>();

ChannelWriter<string> writer = channel.Writer;
ChannelReader<string> reader = channel.Reader;

Producer側にはChannelWriter<T>だけを渡し、Consumer側にはChannelReader<T>だけを渡す設計にすると、責務を分離しやすくなります。

C#
public class Producer
{
private readonly ChannelWriter<string> _writer;

public Producer(ChannelWriter<string> writer)
{
_writer = writer;
}
}

public class Consumer
{
private readonly ChannelReader<string> _reader;

public Consumer(ChannelReader<string> reader)
{
_reader = reader;
}
}

このように分けることで、Producerが誤って読み取り処理を行ったり、Consumerが誤って書き込み処理を行ったりするのを防げます。

2-2. Writerでデータを書き込み、Readerでデータを読み取る流れ

Channelの基本的な流れはシンプルです。

C#
var channel = Channel.CreateUnbounded<int>();

await channel.Writer.WriteAsync(1);
await channel.Writer.WriteAsync(2);
await channel.Writer.WriteAsync(3);

var value1 = await channel.Reader.ReadAsync();
var value2 = await channel.Reader.ReadAsync();
var value3 = await channel.Reader.ReadAsync();

Console.WriteLine(value1); // 1
Console.WriteLine(value2); // 2
Console.WriteLine(value3); // 3

通常のFIFOキューと同じように、先に書き込まれた値から順番に読み取られます。

ただし、複数Consumerが同時に読み取る場合は、「取り出される順番」と「処理が完了する順番」は別です。この点は後ほど詳しく解説します。

2-3. WriteAsync・ReadAsync・WaitToReadAsync・ReadAllAsyncの使い分け

Channelを使うときによく使うAPIは、次の4つです。

WriteAsyncは、Channelにデータを非同期で書き込みます。

C#
await channel.Writer.WriteAsync(item, cancellationToken);

ReadAsyncは、Channelからデータを1件読み取ります。データがなければ、データが書き込まれるまで非同期で待ちます。

C#
var item = await channel.Reader.ReadAsync(cancellationToken);

WaitToReadAsyncは、読み取れるデータがあるか、またはChannelが完了するまで待ちます。whileループと組み合わせて使うことがあります。

C#
while (await channel.Reader.WaitToReadAsync(cancellationToken))
{
while (channel.Reader.TryRead(out var item))
{
await ProcessAsync(item, cancellationToken);
}
}

ReadAllAsyncは、Channelからすべてのデータを非同期列挙するためのIAsyncEnumerable<T>を作成します。Microsoft Learnでも、ReadAllAsyncはチャネルからのすべてのデータを読み取るためのIAsyncEnumerable<T>を作成すると説明されています。

C#
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
await ProcessAsync(item, cancellationToken);
}

実務では、Consumerの実装はReadAllAsyncawait foreachで書くと読みやすくなります。

2-4. CompleteとCompletionで終了を通知する仕組み

Channelで重要なのが、終了通知です。

Producerがすべてのデータを書き込んだら、CompleteまたはTryCompleteを呼び出して「もうデータは追加されない」ことをReader側に伝えます。Microsoft Learnでも、ChannelWriter<T>.CompleteはChannelを完了としてマークし、これ以上項目が書き込まれないことを表すAPIと説明されています。

C#
channel.Writer.Complete();

Reader側は、Channelに残っているデータを読み終えると終了できます。

C#
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}

Completionは、Reader側でChannelの完了を待つためのTaskです。

C#
await channel.Reader.Completion;

例外を通知したい場合は、Complete(exception)またはTryComplete(exception)を使えます。

C#
try
{
await channel.Writer.WriteAsync("data");
}
catch (Exception ex)
{
channel.Writer.TryComplete(ex);
}

Completeを呼ばないと、Reader側は「まだデータが来るかもしれない」と判断し、待ち続ける可能性があります。Channelを使う場合は、終了時に必ず完了通知を行う設計が重要です。

3. C# Channelの基本的な使い方

3-1. NuGetパッケージとusingの準備

C# Channelを使うには、次の名前空間を使います。

C#
using System.Threading.Channels;

System.Threading.ChannelsはNuGetパッケージとして提供されています。ただし、.NET Core 3.0以降を使っている場合は共有フレームワークに含まれています。Microsoft Learnでも、System.Threading.Channels NuGetパッケージとして入手でき、.NET Core 3.0以降ではフレームワークの一部として含まれると説明されています。

古いプロジェクトや.NET Standardライブラリで必要な場合は、次のようにパッケージを追加します。

Bash
dotnet add package System.Threading.Channels

3-2. CreateUnbounded<T>()で非同期キューを作成する

もっとも簡単なChannelの作成方法は、CreateUnbounded<T>()を使うことです。

C#
var channel = Channel.CreateUnbounded<string>();

Unbounded Channelは容量制限のないChannelです。書き込み側は、基本的に容量不足で待たされません。

C#
await channel.Writer.WriteAsync("message-1");
await channel.Writer.WriteAsync("message-2");
await channel.Writer.WriteAsync("message-3");

ただし、容量制限がないということは、ProducerがConsumerより速い場合にメモリ使用量が増え続ける可能性があるということです。実務では、まずBounded Channelを検討するのがおすすめです。

3-3. Producer側の実装例

Producerは、Channelにデータを書き込む側です。

C#
using System.Threading.Channels;

static async Task ProduceAsync(
ChannelWriter<string> writer,
CancellationToken cancellationToken)
{
try
{
for (int i = 1; i <= 5; i++)
{
var message = $"message-{i}";

await writer.WriteAsync(message, cancellationToken);

Console.WriteLine($"Produced: {message}");

await Task.Delay(500, cancellationToken);
}
}
finally
{
writer.Complete();
}
}

この例では、finallyCompleteを呼んでいます。途中で例外が起きても、Reader側に終了を通知できるようにするためです。

ただし、複数Producerがいる場合は、各Producerが個別にCompleteを呼ぶと問題になります。複数Producerの場合は、すべてのProducerが終わったあとに、代表して1回だけCompleteを呼ぶ設計にします。

3-4. Consumer側の実装例

Consumerは、Channelからデータを読み取って処理する側です。

C#
static async Task ConsumeAsync(
ChannelReader<string> reader,
CancellationToken cancellationToken)
{
await foreach (var message in reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine($"Consumed: {message}");

await Task.Delay(1000, cancellationToken);
}
}

ReadAllAsyncを使うと、Channelが完了するまでデータを読み続け、完了したら自然にループを抜けます。

ProducerとConsumerを組み合わせると、次のようになります。

C#
using System.Threading.Channels;

var channel = Channel.CreateUnbounded<string>();
using var cts = new CancellationTokenSource();

var producer = ProduceAsync(channel.Writer, cts.Token);
var consumer = ConsumeAsync(channel.Reader, cts.Token);

await Task.WhenAll(producer, consumer);

static async Task ProduceAsync(
ChannelWriter<string> writer,
CancellationToken cancellationToken)
{
try
{
for (int i = 1; i <= 5; i++)
{
var message = $"message-{i}";
await writer.WriteAsync(message, cancellationToken);
Console.WriteLine($"Produced: {message}");
await Task.Delay(500, cancellationToken);
}
}
finally
{
writer.Complete();
}
}

static async Task ConsumeAsync(
ChannelReader<string> reader,
CancellationToken cancellationToken)
{
await foreach (var message in reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine($"Consumed: {message}");
await Task.Delay(1000, cancellationToken);
}
}

Producerは0.5秒ごとにデータを作り、Consumerは1秒ごとに処理します。ProducerとConsumerの速度が違っても、Channelが間に入ることで安全に受け渡しできます。

3-5. await foreachで読み取り処理をシンプルに書く

Channelの読み取り処理は、await foreachと非常に相性が良いです。

C#
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
await ProcessAsync(item, cancellationToken);
}

従来のように、空チェック、待機、キャンセル判定、終了判定を自分で細かく書く必要がありません。

もちろん、より細かく制御したい場合は、WaitToReadAsyncTryReadを組み合わせる方法もあります。

C#
while (await reader.WaitToReadAsync(cancellationToken))
{
while (reader.TryRead(out var item))
{
await ProcessAsync(item, cancellationToken);
}
}

基本はReadAllAsync、必要に応じてWaitToReadAsyncを使う、と考えるとよいでしょう。

4. 非同期キューとして使う実装例

4-1. タスクをChannelに積んで順番に処理するサンプル

実務でよくあるのが、「処理したい仕事をキューに積み、バックグラウンドで順番に処理する」パターンです。

次の例では、Func<CancellationToken, ValueTask>をChannelに積みます。

C#
using System.Threading.Channels;

public sealed class BackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

public BackgroundTaskQueue(int capacity)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
};

_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}

public ValueTask QueueAsync(
Func<CancellationToken, ValueTask> workItem,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(workItem);

return _queue.Writer.WriteAsync(workItem, cancellationToken);
}

public ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
return _queue.Reader.ReadAsync(cancellationToken);
}

public void Complete()
{
_queue.Writer.TryComplete();
}
}

Consumer側は、キューから取り出して順番に実行します。

C#
public sealed class QueueWorker
{
private readonly BackgroundTaskQueue _taskQueue;

public QueueWorker(BackgroundTaskQueue taskQueue)
{
_taskQueue = taskQueue;
}

public async Task RunAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var workItem = await _taskQueue.DequeueAsync(cancellationToken);

try
{
await workItem(cancellationToken);
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
}
}

このようにすると、Web APIやUIイベントなどから処理をChannelに積み、実際の重い処理はConsumer側で実行できます。

4-2. 複数Producer・単一Consumerの実装

複数のProducerから1つのConsumerへデータを渡す構成は、Channelの代表的な使い方です。

C#
var channel = Channel.CreateUnbounded<int>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false
});

var producers = Enumerable.Range(1, 3)
.Select(id => Task.Run(async () =>
{
for (int i = 1; i <= 5; i++)
{
var value = id * 100 + i;
await channel.Writer.WriteAsync(value);
Console.WriteLine($"Producer {id}: {value}");
}
}))
.ToArray();

var consumer = Task.Run(async () =>
{
await foreach (var value in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer: {value}");
}
});

await Task.WhenAll(producers);
channel.Writer.Complete();

await consumer;

この例では、Producerが3つ、Consumerが1つです。SingleReader = trueSingleWriter = falseを設定することで、読み取り側は1つ、書き込み側は複数であることをChannelに伝えています。

4-3. 単一Producer・複数Consumerの実装

次に、1つのProducerがデータを作り、複数のConsumerで並列処理する例です。

C#
var channel = Channel.CreateUnbounded<int>(
new UnboundedChannelOptions
{
SingleReader = false,
SingleWriter = true
});

var producer = Task.Run(async () =>
{
for (int i = 1; i <= 20; i++)
{
await channel.Writer.WriteAsync(i);
}

channel.Writer.Complete();
});

var consumers = Enumerable.Range(1, 3)
.Select(id => Task.Run(async () =>
{
await foreach (var value in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer {id}: {value}");
await Task.Delay(500);
}
}))
.ToArray();

await producer;
await Task.WhenAll(consumers);

複数Consumerにすると、キューに積まれたデータを複数の処理で分担できます。重い処理を並列化したい場合に便利です。

4-4. 複数Consumerで処理順序が変わる場合の注意点

複数Consumerを使うと、Channelから取り出される順序はFIFOでも、処理完了順は保証されません。

たとえば、1番をConsumer A、2番をConsumer Bが取り出したとしても、2番の処理が先に終わることがあります。

C#
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
await ProcessAsync(item, cancellationToken);
}

ProcessAsyncの処理時間がデータごとに異なる場合、完了順は簡単に入れ替わります。

そのため、順序が重要な処理では単一Consumerにするか、処理結果をシーケンス番号で並べ直す設計が必要です。

C#
public sealed record WorkItem(int Sequence, string Payload);

「投入順に必ず完了させたい」なら単一Consumer、「順序よりスループットが重要」なら複数Consumer、という基準で選ぶとよいでしょう。

4-5. キュー処理のキャンセルと例外処理

Channel処理では、CancellationTokenを必ず意識しましょう。

C#
await channel.Writer.WriteAsync(item, cancellationToken);
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
await ProcessAsync(item, cancellationToken);
}

Consumer側では、個別の処理で例外が起きてもループ全体が止まらないようにする設計がよく使われます。

C#
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
try
{
await ProcessAsync(item, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex}");
}
}

ただし、例外時に処理を継続するべきか、Channel自体を停止するべきかは要件によって異なります。

停止すべき重大な例外なら、Writer側でTryComplete(ex)を呼び、Reader側ではCompletionを待って例外を受け取る設計もあります。

C#
channel.Writer.TryComplete(exception);

try
{
await channel.Reader.Completion;
}
catch (Exception ex)
{
Console.WriteLine($"Channel failed: {ex}");
}

5. スレッド間通信で使う実装例

5-1. UIスレッドとバックグラウンドスレッド間でデータを渡す

WPF、WinForms、MAUIなどのUIアプリでは、UIスレッドをブロックしないことが重要です。重い処理をバックグラウンドで行い、その結果をUI側に渡すときにもChannelが使えます。

たとえば、バックグラウンド処理で進捗メッセージを作り、UI側で受け取る例です。

C#
var channel = Channel.CreateUnbounded<string>();

var backgroundTask = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await channel.Writer.WriteAsync($"Progress: {i * 10}%");
await Task.Delay(500);
}

channel.Writer.Complete();
});

await foreach (var message in channel.Reader.ReadAllAsync())
{
// WPFならDispatcher.InvokeAsyncなどでUI更新する
Console.WriteLine(message);
}

await backgroundTask;

実際のUIアプリでは、Channelから読み取ったあとにUIスレッドへディスパッチして画面を更新します。

C#
await foreach (var message in channel.Reader.ReadAllAsync(cancellationToken))
{
await Dispatcher.InvokeAsync(() =>
{
StatusTextBlock.Text = message;
});
}

Channel自体はUI更新の仕組みではありません。あくまで、バックグラウンド処理とUI側の受け渡しを安全にするためのキューとして使います。

5-2. Task.RunとChannelを組み合わせる

Task.Runでバックグラウンド処理を起動し、その結果をChannelに流す構成はシンプルです。

C#
var channel = Channel.CreateUnbounded<int>();

var producer = Task.Run(async () =>
{
try
{
for (int i = 1; i <= 100; i++)
{
await channel.Writer.WriteAsync(i);
await Task.Delay(50);
}
}
finally
{
channel.Writer.Complete();
}
});

var consumer = Task.Run(async () =>
{
await foreach (var value in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Received: {value}");
}
});

await Task.WhenAll(producer, consumer);

この構成では、ProducerとConsumerが別々のタスクとして動きます。データの受け渡しはChannelが担当するため、共有リストやlockを直接扱う必要がありません。

5-3. lockやBlockingCollectionを使わずに安全にデータを受け渡す

従来、スレッド間通信ではBlockingCollection<T>が使われることもありました。

C#
var collection = new BlockingCollection<string>();

Task.Run(() =>
{
collection.Add("data");
collection.CompleteAdding();
});

foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine(item);
}

BlockingCollection<T>は便利ですが、基本的にはブロッキングAPIです。非同期メソッド中心のコードでは、awaitと組み合わせづらい場面があります。

Channelなら、読み書きを非同期メソッドとして扱えます。

C#
await writer.WriteAsync("data", cancellationToken);
var item = await reader.ReadAsync(cancellationToken);

これにより、スレッドをブロックせずにデータ到着を待てます。ASP.NET CoreやWorker Serviceのように、多数の非同期処理を扱う環境では特に重要です。

5-4. CancellationTokenで停止できるスレッド間通信にする

スレッド間通信で忘れてはいけないのが停止処理です。アプリ終了時や画面クローズ時に、Consumerが永久に待ち続けないようにする必要があります。

C#
using var cts = new CancellationTokenSource();

var channel = Channel.CreateUnbounded<string>();

var consumer = Task.Run(async () =>
{
try
{
await foreach (var item in channel.Reader.ReadAllAsync(cts.Token))
{
Console.WriteLine(item);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer canceled.");
}
});

await channel.Writer.WriteAsync("hello");

cts.Cancel();

try
{
await consumer;
}
catch (OperationCanceledException)
{
}

通常は、キャンセルだけでなくCompleteも組み合わせます。

C#
channel.Writer.TryComplete();
cts.Cancel();

「もうデータが来ない」ことを伝えるのがCompleteで、「処理を中断してほしい」ことを伝えるのがCancellationTokenです。この2つは役割が違うため、適切に使い分けることが大切です。

6. Bounded ChannelとUnbounded Channelの違い

6-1. Unbounded Channelの特徴と向いているケース

Unbounded Channelは、容量制限のないChannelです。

C#
var channel = Channel.CreateUnbounded<MyMessage>();

Producerは基本的に書き込みで待たされません。Consumerが遅くても、データはChannel内部に蓄積されます。

向いているのは、データ量が少ない、Producerの速度が制御されている、短時間で処理が終わる、メモリ増加のリスクが小さい、といったケースです。

たとえば、アプリ内の小規模なイベント通知や、件数が限定されている一時的な非同期処理ではUnbounded Channelでも問題になりにくいでしょう。

ただし、Web APIのリクエストを大量に受け付ける場合や、ログ・イベントが高頻度で発生する場合にUnbounded Channelを使うと、Consumerが追いつかないときにメモリを圧迫するリスクがあります。

6-2. Bounded Channelの特徴とバックプレッシャー

Bounded Channelは、容量制限のあるChannelです。

C#
var channel = Channel.CreateBounded<MyMessage>(capacity: 100);

容量が100なら、Channel内に保持できる項目数は最大100です。満杯になったときの動作は、FullModeで制御できます。

もっとも基本的なのは、空きが出るまで書き込みを待つ設定です。

C#
var options = new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait
};

var channel = Channel.CreateBounded<MyMessage>(options);

このとき、Consumerが遅くてChannelが満杯になると、ProducerのWriteAsyncが待機します。これをバックプレッシャーと呼びます。

バックプレッシャーがあると、Producerが無制限にデータを積み続けるのを防げます。実務では非常に重要な考え方です。

6-3. BoundedChannelOptionsの設定項目

BoundedChannelOptionsでは、Bounded Channelの動作を細かく設定できます。

C#
var options = new BoundedChannelOptions(capacity: 100)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false
};

var channel = Channel.CreateBounded<string>(options);

主な設定項目は次のとおりです。

CapacityはChannelの容量です。コンストラクタで指定します。

FullModeは、Channelが満杯のときの動作です。

SingleReaderは、Readerが1つだけであることを示します。

SingleWriterは、Writerが1つだけであることを示します。

AllowSynchronousContinuationsは、継続処理を同期的に実行することを許可するかどうかの設定です。通常は明確な理由がなければfalseのままでよいでしょう。

SingleReaderSingleWriterを実態に合わせて設定すると、Channel内部の最適化に役立ちます。Microsoft Learnでも、SingleReaderは同時に最大1つの読み取り操作だけであることを保証する設定として説明されています。

6-4. FullModeのWait・DropOldest・DropNewest・DropWriteの違い

Bounded Channelで重要なのが、BoundedChannelFullModeです。

Microsoft Learnでは、Waitは空き領域が利用可能になるまで待機、DropNewestは最新の項目を削除、DropOldestは最も古い項目を削除、DropWriteは書き込まれる項目を削除すると説明されています。

それぞれの使い分けは次のように考えます。

Waitは、データを失いたくない場合に使います。バックグラウンドジョブ、注文処理、メール送信キューなど、原則としてすべて処理すべきデータに向いています。

C#
FullMode = BoundedChannelFullMode.Wait

DropOldestは、古いデータより新しいデータが重要な場合に使います。センサー値、進捗状況、リアルタイム通知などで使いやすいです。

C#
FullMode = BoundedChannelFullMode.DropOldest

DropNewestは、新しいデータを捨てて既存のキューを優先したい場合に使います。

C#
FullMode = BoundedChannelFullMode.DropNewest

DropWriteは、満杯時に今回の書き込みを捨てる設定です。ログやメトリクスなど、「落としても致命的ではない」データで検討できます。

C#
FullMode = BoundedChannelFullMode.DropWrite

業務データでは、まずWaitを検討するのが安全です。データを捨てるモードを使う場合は、「捨ててよいデータか」「捨てたことを記録する必要があるか」を必ず確認しましょう。

6-5. メモリ使用量を抑えるならBounded Channelを選ぶべき理由

Unbounded Channelは便利ですが、ProducerがConsumerより速い状態が続くと、Channel内の項目が増え続けます。

たとえば、1秒に1,000件のデータが書き込まれ、Consumerが1秒に100件しか処理できない場合、差分の900件が毎秒蓄積されます。

短時間なら問題にならなくても、長時間稼働するサービスではメモリ使用量が増え続け、最終的にパフォーマンス劣化やプロセス停止につながる可能性があります。

Bounded Channelなら、容量上限を設定できます。

C#
var channel = Channel.CreateBounded<Job>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});

この場合、Channel内に保持されるジョブ数は最大1000件です。満杯になったらProducerが待機するため、システム全体に自然な制御がかかります。

実務のバックグラウンドキューでは、基本的にBounded Channelを第一候補にするのがおすすめです。

7. 実務でよくあるC# Channelの活用パターン

7-1. ASP.NET Coreのバックグラウンドジョブキュー

ASP.NET Coreでよくあるのが、HTTPリクエストではすぐレスポンスを返し、重い処理はバックグラウンドで行うパターンです。

たとえば、ユーザー登録後のメール送信、画像変換、外部API通知、集計処理などです。

C#
public interface IBackgroundTaskQueue
{
ValueTask QueueAsync(
Func<CancellationToken, ValueTask> workItem,
CancellationToken cancellationToken = default);

ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}

実装はChannelで行います。

C#
public sealed class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

public BackgroundTaskQueue()
{
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(
new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
});
}

public ValueTask QueueAsync(
Func<CancellationToken, ValueTask> workItem,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(workItem);

return _queue.Writer.WriteAsync(workItem, cancellationToken);
}

public ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
return _queue.Reader.ReadAsync(cancellationToken);
}
}

BackgroundServiceでConsumerを動かします。

C#
public sealed class QueuedHostedService : BackgroundService
{
private readonly IBackgroundTaskQueue _taskQueue;
private readonly ILogger<QueuedHostedService> _logger;

public QueuedHostedService(
IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger)
{
_taskQueue = taskQueue;
_logger = logger;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await _taskQueue.DequeueAsync(stoppingToken);

try
{
await workItem(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Background task failed.");
}
}
}
}

DIに登録します。

C#
builder.Services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
builder.Services.AddHostedService<QueuedHostedService>();

ControllerやMinimal APIからキューに積みます。

C#
app.MapPost("/jobs", async (
IBackgroundTaskQueue queue,
CancellationToken cancellationToken) =>
{
await queue.QueueAsync(async token =>
{
await Task.Delay(5000, token);
Console.WriteLine("Heavy job completed.");
}, cancellationToken);

return Results.Accepted();
});

この構成にすると、リクエスト処理とバックグラウンド処理を分離できます。

7-2. ログ・イベント・通知処理の非同期化

ログ、イベント、通知処理は、アプリケーション本体の処理を遅くしないよう非同期化したいことがあります。

C#
public sealed record AppEvent(DateTimeOffset Timestamp, string Message);

var channel = Channel.CreateBounded<AppEvent>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.DropWrite
});

Producer側はイベントをChannelに書き込みます。

C#
await channel.Writer.WriteAsync(
new AppEvent(DateTimeOffset.UtcNow, "User logged in"),
cancellationToken);

Consumer側はまとめて処理します。

C#
await foreach (var appEvent in channel.Reader.ReadAllAsync(cancellationToken))
{
await SaveEventAsync(appEvent, cancellationToken);
}

ログやメトリクスでは、アプリケーション本体を守るために、満杯時に一部を捨てる設計もあります。ただし、監査ログのように失ってはいけないデータではDropWriteを使うべきではありません。

7-3. ファイル処理やバッチ処理のパイプライン化

Channelは、パイプライン処理にも向いています。

たとえば、ファイルを読み込む、変換する、保存するという3段階の処理を考えます。

C#
Channel<string> filePathChannel = Channel.CreateBounded<string>(100);
Channel<ParsedData> parsedChannel = Channel.CreateBounded<ParsedData>(100);

ステージ1ではファイルパスを流します。

C#
async Task ReadFilesAsync(CancellationToken cancellationToken)
{
try
{
foreach (var path in Directory.EnumerateFiles("input"))
{
await filePathChannel.Writer.WriteAsync(path, cancellationToken);
}
}
finally
{
filePathChannel.Writer.Complete();
}
}

ステージ2ではファイルを読み、変換結果を次のChannelに流します。

C#
async Task ParseFilesAsync(CancellationToken cancellationToken)
{
try
{
await foreach (var path in filePathChannel.Reader.ReadAllAsync(cancellationToken))
{
var text = await File.ReadAllTextAsync(path, cancellationToken);
var data = new ParsedData(path, text.Length);

await parsedChannel.Writer.WriteAsync(data, cancellationToken);
}
}
finally
{
parsedChannel.Writer.Complete();
}
}

ステージ3では結果を保存します。

C#
async Task SaveResultsAsync(CancellationToken cancellationToken)
{
await foreach (var data in parsedChannel.Reader.ReadAllAsync(cancellationToken))
{
await SaveAsync(data, cancellationToken);
}
}

public sealed record ParsedData(string Path, int Length);

Channelを段階ごとに分けることで、処理をパイプライン化できます。

7-4. Web APIリクエスト後の重い処理を非同期に逃がす

Web APIでは、リクエスト中に重い処理をすべて実行するとレスポンスが遅くなります。たとえば、画像変換、メール送信、外部API連携などです。

Channelを使うと、APIはジョブをキューに積んで202 Acceptedを返し、実処理はバックグラウンドで行えます。

C#
app.MapPost("/send-mail", async (
SendMailRequest request,
IBackgroundTaskQueue queue,
CancellationToken cancellationToken) =>
{
await queue.QueueAsync(async token =>
{
await SendMailAsync(request.Email, request.Subject, request.Body, token);
}, cancellationToken);

return Results.Accepted();
});

public sealed record SendMailRequest(
string Email,
string Subject,
string Body);

注意点として、この方式では「キューに積んだあとに処理が失敗する」可能性があります。重要なジョブでは、Channelだけで完結させず、データベースにジョブ状態を保存する設計も検討しましょう。

7-5. Worker ServiceやHostedServiceでの活用

Worker ServiceやHostedServiceでは、Channelを使って常駐処理をきれいに書けます。

C#
public sealed class EventWorker : BackgroundService
{
private readonly ChannelReader<AppEvent> _reader;
private readonly ILogger<EventWorker> _logger;

public EventWorker(
ChannelReader<AppEvent> reader,
ILogger<EventWorker> logger)
{
_reader = reader;
_logger = logger;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var appEvent in _reader.ReadAllAsync(stoppingToken))
{
try
{
await HandleEventAsync(appEvent, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to handle event.");
}
}
}

private static ValueTask HandleEventAsync(
AppEvent appEvent,
CancellationToken cancellationToken)
{
Console.WriteLine(appEvent.Message);
return ValueTask.CompletedTask;
}
}

HostedServiceでChannelを使う場合は、アプリ終了時にCancellationTokenが通知されるため、Consumer側のループに必ず渡しましょう。

8. C# Channelを使うときの注意点

8-1. Completeを呼ばないと読み取り側が終了できない

Channelの代表的な落とし穴は、Completeを呼び忘れることです。

C#
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}

このコードは、Channelが完了するまで終わりません。Writer側がもうデータを書かないなら、必ず完了通知を行います。

C#
channel.Writer.Complete();

例外が起きても完了通知できるように、Producer側ではtry/finallyを使うのがおすすめです。

C#
try
{
await ProduceAsync(channel.Writer, cancellationToken);
}
finally
{
channel.Writer.TryComplete();
}

8-2. Unbounded Channelでメモリが増え続けるリスク

Unbounded Channelは便利ですが、容量制限がありません。

C#
var channel = Channel.CreateUnbounded<Job>();

Consumerが遅い場合、Channel内にデータが溜まり続けます。特にWeb APIやログ処理など、外部入力が多いシステムでは注意が必要です。

実務では、まず次のようなBounded Channelを検討しましょう。

C#
var channel = Channel.CreateBounded<Job>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});

メモリを守るには、キューの最大数を決めることが重要です。

8-3. 複数Consumerでは処理完了順が保証されない

複数Consumerで処理すると、処理完了順は保証されません。

C#
var consumers = Enumerable.Range(1, 4)
.Select(_ => ConsumeAsync(channel.Reader, cancellationToken))
.ToArray();

この構成はスループットを上げるには有効ですが、順序が重要な処理には向きません。

たとえば、銀行取引、在庫更新、連番処理、ファイルの順序依存処理などでは、単一Consumerにするか、順序制御を別途実装する必要があります。

8-4. 例外をChannelの外にどう伝えるか

Channel内の処理で例外が起きた場合、その例外をどう扱うかを決めておく必要があります。

Consumer内でログだけ出して継続する場合は、次のように書けます。

C#
try
{
await ProcessAsync(item, cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to process item.");
}

一方、重大な例外で処理全体を止めたい場合は、Channelを例外付きで完了させます。

C#
channel.Writer.TryComplete(ex);

Reader側では、Completionを待つことで例外を検知できます。

C#
try
{
await channel.Reader.Completion;
}
catch (Exception ex)
{
logger.LogError(ex, "Channel completed with error.");
}

Channelはデータを運ぶ仕組みであり、業務エラーの管理方法まで自動で決めてくれるわけではありません。リトライ、失敗ジョブの保存、デッドレターキューなどが必要な場合は、別途設計しましょう。

8-5. CancellationTokenを渡し忘れない

ChannelのAPIには、CancellationTokenを渡せるものが多くあります。

C#
await writer.WriteAsync(item, cancellationToken);
var item = await reader.ReadAsync(cancellationToken);
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
await ProcessAsync(item, cancellationToken);
}

CancellationTokenを渡し忘れると、アプリ終了時やリクエスト中断時に、処理が想定より長く残ることがあります。

特にHostedService、Worker Service、UIアプリでは、停止処理が重要です。CancellationTokenはChannelの読み書きだけでなく、実際の処理メソッドにも渡しましょう。

9. Channelと他のキュー・並行処理手段の比較

9-1. ChannelとConcurrentQueueの違い

ConcurrentQueue<T>は、スレッドセーフなFIFOキューです。軽量で使いやすく、複数スレッドから安全にEnqueue/Dequeueできます。

しかし、データが来るまで非同期に待つ仕組みはありません。

Channelは、スレッドセーフなデータ受け渡しに加えて、非同期待機、完了通知、容量制限、バックプレッシャーを扱えます。

単純に「複数スレッドから安全にキュー操作したい」だけならConcurrentQueue<T>で十分です。一方、「Consumerがデータ到着をawaitしたい」「バックグラウンド処理キューを作りたい」ならChannelが向いています。

9-2. ChannelとBlockingCollectionの違い

BlockingCollection<T>は、Producer/Consumerパターンを実装するための古くからある便利なクラスです。

C#
var collection = new BlockingCollection<string>();

TakeGetConsumingEnumerableでデータが来るまで待てます。ただし、基本的にはブロッキングモデルです。

Channelは、async/awaitを前提にした非同期モデルで使えます。

C#
var item = await channel.Reader.ReadAsync(cancellationToken);

同期的なコンソールアプリや既存のスレッドベース処理ではBlockingCollection<T>が合うこともあります。非同期処理中心の現代的なC#アプリでは、Channelのほうが自然に書ける場面が多いでしょう。

9-3. ChannelとTPL Dataflowの違い

TPL Dataflowは、より高機能なデータフロー処理ライブラリです。BufferBlockTransformBlockActionBlockなどを組み合わせて、複雑なパイプラインを作れます。

Channelは、より低レベルでシンプルなProducer/Consumerの仕組みです。

複雑な変換、分岐、結合、並列度制御、ブロック間リンクなどを豊富に使いたいならTPL Dataflowが向いています。一方、単純な非同期キュー、バックグラウンドジョブ、スレッド間通信ならChannelのほうが軽量で理解しやすいです。

9-4. ChannelとTask.WhenAll・SemaphoreSlimの使い分け

Task.WhenAllは、複数のTaskをまとめて待つためのAPIです。

C#
await Task.WhenAll(tasks);

すでに処理対象がすべて揃っていて、それらを並列に実行したい場合に向いています。

SemaphoreSlimは、同時実行数を制御するためによく使われます。

C#
await semaphore.WaitAsync(cancellationToken);
try
{
await ProcessAsync(item, cancellationToken);
}
finally
{
semaphore.Release();
}

Channelは、処理対象が継続的に発生する場合に向いています。Producerが随時データを投入し、Consumerが継続的に処理するような構成です。

つまり、次のように使い分けるとよいでしょう。

処理対象が最初から全部あるならTask.WhenAll。同時実行数だけ制限したいならSemaphoreSlim。継続的に発生するデータをキューとして処理したいならChannelです。

9-5. どのケースでChannelを選ぶべきか

Channelを選ぶべきなのは、次のようなケースです。

ProducerとConsumerを分離したい。データ到着まで非同期に待ちたい。バックグラウンドジョブキューを作りたい。容量制限とバックプレッシャーを使いたい。複数Producer・複数Consumerを安全に扱いたい。lockやポーリングを減らしたい。async/await中心の設計にしたい。

逆に、単純なリスト操作だけならList<T>Queue<T>で十分です。スレッドセーフな即時キュー操作だけならConcurrentQueue<T>でもよいでしょう。複雑なデータフロー処理ならTPL Dataflowも候補です。

Channelは、「非同期Producer/Consumerをシンプルに作りたい」ときにもっとも力を発揮します。

10. C# Channelのベストプラクティス

10-1. 基本はBounded Channelを検討する

実務では、まずBounded Channelを検討しましょう。

C#
var channel = Channel.CreateBounded<Job>(new BoundedChannelOptions(500)
{
FullMode = BoundedChannelFullMode.Wait
});

Unbounded Channelは簡単ですが、負荷が高いとメモリ使用量が増え続けるリスクがあります。

Bounded Channelなら、システムが抱えられるキュー数を明示できます。満杯時に待つ、古いデータを捨てる、新しいデータを捨てるといった方針も設定できます。

業務データを扱うならWait、リアルタイム性が重要で古い値が不要ならDropOldest、ログやメトリクスで欠損を許容できるならDropWriteなど、要件に合わせて選びましょう。

10-2. Producer/Consumerの責務を分ける

Channel本体をあちこちに渡すより、ProducerにはChannelWriter<T>、ConsumerにはChannelReader<T>を渡す設計がおすすめです。

C#
public sealed class MessageProducer
{
private readonly ChannelWriter<string> _writer;

public MessageProducer(ChannelWriter<string> writer)
{
_writer = writer;
}
}

public sealed class MessageConsumer
{
private readonly ChannelReader<string> _reader;

public MessageConsumer(ChannelReader<string> reader)
{
_reader = reader;
}
}

これにより、書き込み側と読み取り側の責務が明確になります。

さらに、アプリケーション層では独自インターフェースで包むと、テストしやすくなります。

C#
public interface IMessageQueue
{
ValueTask EnqueueAsync(string message, CancellationToken cancellationToken);
IAsyncEnumerable<string> ReadAllAsync(CancellationToken cancellationToken);
}

Channelを直接公開しすぎないことで、将来的に実装を変えやすくなります。

10-3. Complete・Completion・try/finallyで終了処理を安全にする

Channelでは、終了処理を明確に設計することが重要です。

Producerが終わったらCompleteします。

C#
try
{
await ProduceAsync(writer, cancellationToken);
}
finally
{
writer.TryComplete();
}

Consumer側は、ReadAllAsyncで最後まで読みます。

C#
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
await ProcessAsync(item, cancellationToken);
}

必要に応じてCompletionも待ちます。

C#
await reader.Completion;

例外をChannelの完了に含めたい場合は、TryComplete(ex)を使います。

C#
catch (Exception ex)
{
writer.TryComplete(ex);
}

「誰がCompleteを呼ぶのか」「複数Producerの場合はいつCompleteするのか」「例外時にどう完了させるのか」を事前に決めておきましょう。

10-4. SingleReader・SingleWriterを設定して性能を最適化する

Channel作成時には、読み取り側と書き込み側の数が決まっているなら、SingleReaderSingleWriterを設定しましょう。

C#
var channel = Channel.CreateBounded<Job>(new BoundedChannelOptions(100)
{
SingleReader = true,
SingleWriter = false,
FullMode = BoundedChannelFullMode.Wait
});

単一ConsumerならSingleReader = true、単一ProducerならSingleWriter = trueです。

C#
var channel = Channel.CreateUnbounded<Job>(new UnboundedChannelOptions
{
SingleReader = false,
SingleWriter = true
});

これらの設定は必須ではありませんが、実態に合わせて指定することで、Channel側がより効率的に動作できます。

10-5. テストしやすいChannel処理の設計

Channelを使った処理は、設計次第でテストしやすくなります。

まず、Channelを直接あちこちで作らず、キューを抽象化します。

C#
public interface IJobQueue
{
ValueTask EnqueueAsync(Job job, CancellationToken cancellationToken);
IAsyncEnumerable<Job> ReadAllAsync(CancellationToken cancellationToken);
}

実装ではChannelを使います。

C#
public sealed class JobQueue : IJobQueue
{
private readonly Channel<Job> _channel;

public JobQueue()
{
_channel = Channel.CreateBounded<Job>(100);
}

public ValueTask EnqueueAsync(Job job, CancellationToken cancellationToken)
{
return _channel.Writer.WriteAsync(job, cancellationToken);
}

public IAsyncEnumerable<Job> ReadAllAsync(CancellationToken cancellationToken)
{
return _channel.Reader.ReadAllAsync(cancellationToken);
}

public void Complete()
{
_channel.Writer.TryComplete();
}
}

public sealed record Job(int Id, string Name);

テストでは、実際にChannelにデータを流してConsumerの動作を確認できます。

C#
[Fact]
public async Task Consumer_Should_Process_Jobs()
{
var channel = Channel.CreateUnbounded<Job>();

await channel.Writer.WriteAsync(new Job(1, "test"));
channel.Writer.Complete();

var processed = new List<Job>();

await foreach (var job in channel.Reader.ReadAllAsync())
{
processed.Add(job);
}

Assert.Single(processed);
Assert.Equal(1, processed[0].Id);
}

また、Consumerの処理ロジックをChannelから切り離すことも重要です。

C#
public sealed class JobProcessor
{
public ValueTask ProcessAsync(Job job, CancellationToken cancellationToken)
{
Console.WriteLine($"Processed: {job.Name}");
return ValueTask.CompletedTask;
}
}

Channelはあくまでデータを受け渡す仕組みです。業務ロジックをChannelの読み取りループに直接詰め込みすぎないようにすると、テストや保守がしやすくなります。

まとめ

C# Channelは、System.Threading.Channelsで提供される非同期Producer/Consumer向けの仕組みです。非同期キュー、スレッド間通信、バックグラウンドジョブ、ログ処理、イベント処理、パイプライン処理など、さまざまな場面で活用できます。

Queue<T>ConcurrentQueue<T>と違い、Channelはデータ到着まで非同期に待てます。BlockingCollection<T>と違い、async/await中心のコードに自然に組み込めます。

基本的な使い方は、Channel.CreateUnbounded<T>()またはChannel.CreateBounded<T>()でChannelを作り、ProducerがWriter.WriteAsyncで書き込み、ConsumerがReader.ReadAllAsyncReadAsyncで読み取る、という流れです。

実務では、メモリ使用量を制御しやすいBounded Channelをまず検討しましょう。容量を決め、FullModeで満杯時の動作を明確にし、CancellationTokenCompleteを適切に使うことが重要です。

また、複数Consumerでは処理完了順が保証されない点、Unbounded Channelではメモリが増え続ける可能性がある点、例外をどのように外へ伝えるかを設計する点にも注意が必要です。

C#で非同期キューやスレッド間通信を実装するなら、Channelは非常に強力な選択肢です。lockやポーリングに頼らず、ProducerとConsumerをきれいに分離できるため、保守性の高い非同期処理を実装できます。