wsf

wsf

軽量級メッセージキューRedisQueue

軽量メッセージキュー RedisQueue#

Title: 軽量メッセージキュー RedisQueue

URL Source: https://newlifex.com/core/redisqueue

Markdown Content:
メッセージキュー(Message Queue)は分散システムに欠かせないミドルウェアであり、大部分のメッセージキュー製品(RocketMQ/RabbitMQ/Kafka など)は、チームに比較的強い技術力を要求し、中小チームには適しておらず、.NET 技術のサポートが不十分です。しかし、Redis によって実現された軽量メッセージキューは非常にシンプルで、Redis の通常の操作だけで済み、開発チームが追加の知識を習得する必要はほとんどありません!

強力な.NET5 のリリースに伴い、.NET 技術スタックには最高のメッセージキューのパートナーが必要です!

この記事では、高性能 Redis コンポーネント NewLife.Redis を基に、宅配便業務のシナリオを借りて、.NET で Redis をメッセージキューとして使用し、企業レベルの分散システムアーキテクチャを構築する方法を説明します!

この記事の例のコードは、コンソールプロジェクトを新規作成し、nuget から NewLife.Redis を参照する必要があります。

実践シナリオ:

中通快递、RedisQueue は大データ計算に使用され、統計データをバッチアップロードしてデータベースの落盤圧力を軽減し、メッセージ量は毎日 10 億です。

递易智能、RedisQueue は分散システムのデカップリングに使用され、メッセージトピックは 300 以上で、アプリケーションシステムは 50 以上、メッセージ量は毎日 1000 万です。

Image 27

メッセージキューとは#

メッセージキューとは、メッセージの伝送中にメッセージを保存するコンテナであり、その核心的な機能はピーク削減デカップリングです!

Image 28

朝のピーク時、宅配便会社のトラックが各ステーションに荷物を降ろし、多くのステーションのスタッフが PDA で到着をスキャンし、大量の情報がシステムに入ります(1000tps)、しかし宅配便会社への通知インターフェースは 400tps の処理能力しかありません。

MQ を追加してメッセージを保存し、システムの処理能力を超えたメッセージを滞留させ、朝のピークが過ぎた後にシステムが処理を完了できるようにします。これがピーク削減です!

Image 29

宅配ボックスの業務プロセスでは、配達員がボックスに投函した後、システム料金の減額、ユーザーへの SMS 通知、宅配便会社へのプッシュ通知の 3 つの業務アクションを経る必要があります。従来の方法では、これらの業務を順次実行する必要があり、その中のどれかのステップで異常が発生すると(例えば、ユーザーの携帯電話がオフになっている、または宅配便会社のインターフェースが故障している場合)、全体の投函プロセスが遅延または中断され、ユーザー体験に深刻な影響を与えます。

インターフェース層が投函データを受け取った後、MQ にメッセージを書き込み、後続の 3 つのサブシステムがそれぞれ消費処理を行うことで、この問題を完璧に解決でき、サブシステムの故障が上流システムに影響を与えません!これがデカップリングです!

メモリメッセージキュー#

最もシンプルなメッセージキューは、BlockingCollection を使用して実装できます。

public static void Start()
{
var queue = new BlockingCollection<Area>();

// 独立スレッドで消費
var thread = new Thread(s =\> Consume(queue));
thread.Start();

// メッセージを発行
Publish(queue);

}

private static void Publish(BlockingCollection<Area> queue)
{
var area = new Area {Code = 110000, Name = "北京市"};
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);

area = new Area { Code = 310000, Name = "上海市" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);

area = new Area { Code = 440100, Name = "广州市" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);

}

private static void Consume(BlockingCollection<Area> queue)
{
while (true)
{
var msg = queue.Take();
if (msg != null)
{
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
}
}
}

Image 30

毎秒 1 つのメッセージを生成(1 号スレッド)、独立したスレッド(9 号スレッド)が消費します。

Redis をメッセージキューとして使用#

Redis の LIST 構造は、左から入れて右から出す機能を持ち、BRPOP のブロッキングポップを使用することで、最も基本的なメッセージキュー RedisQueue<T> を完成させることができます。BRPOP は、各メッセージが消費され、かつ一度だけ消費されることを保証します。

GetQueue でキューを取得した後、Add メソッドでメッセージを発行します。

TakeOne で 1 つのメッセージを消費し、10 秒のブロッキングを指定し、10 秒以内にメッセージがあればすぐに返し、そうでなければ 10 秒のタイムアウト後に空を返します。

注意:消費端としてインスタンス化された Redis は、ブロッキング時間よりも大きいタイムアウト時間を指定する必要があります。例: new FullRedis {Timeout = 11_000} 。

消費端は実際には大ループで消費され、各トピックに対して 8 つのスレッドまたはタスクを開いて大ループ消費を実行することをお勧めします(8 スレッドはビジネスニーズに応じて他の数字に調整できます。例えば 1/2/5/16)。各スレッドは GetQueue を行った後、自分専用のキューインスタンスを取得し、すべてのインスタンスが共同でそのトピックのメッセージを消費処理します。したがって、メッセージトピックの総消費者数は、デプロイインスタンス数にスレッド数 8 を掛けたものであり、必要に応じて増減でき、パーティションの偏りの問題がなく、非常に柔軟です。

public static void Start(FullRedis redis)
{
var topic = "EasyQueue";

// 独立スレッドで消費
var thread = new Thread(s =\> Consume(redis, topic));
thread.Start();

// メッセージを発行。実際には生産と消費は異なるサーバーにあります。
Publish(redis, topic);

}

private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetQueue<Area>(topic);

queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "广州市" });
Thread.Sleep(1000);

}

private static void Consume(FullRedis redis, String topic)
{
var queue = redis.GetQueue<Area>(topic);

while (true)
{
    var msg = queue.TakeOne(10);
    if (msg != null)
    {
        XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
    }
}

}

LPUSH でメッセージを生成(1 号スレッドがリストに挿入)、BRPOP でメッセージを消費(11 号スレッドがリストからポップ)するため、メッセージが消費された後は消えます!

注:NewLife.Redis メッセージキューはデフォルトでメッセージを Json シリアライズし、消費時に String としてメッセージを取得し、再シリアライズしてメッセージオブジェクトに戻します。

Image 31

ログの時間から、生成と消費の時間差が 1〜3ms の間で、遅延が非常に低いことがわかります!

消費コードをコメントアウトして再実行すると、Redis で発行されたメッセージを見ることができます。

Image 32

注意:RedisQueue の欠点は、メッセージが消費された後に Redis から削除されるため、消費端がメッセージ処理に失敗した場合、メッセージが失われるリスクがあります!重要な業務には確認が必要な RedisReliableQueue キューを使用してください。

確認が必要なキュー#

宅配便会社の物流プッシュサブシステムがメッセージを処理する際にエラーが発生した場合、メッセージが失われたらどうしますか?明らかに上流に再度送信させることは不可能です!

ここで、消費確認をサポートする信頼できるキュー RedisReliableQueue<T> が必要です。消費後、プログラムが明示的に消費を確認しない限り、Redis はメッセージを削除しません。

RedisReliableQueue は Redis の LIST 構造を採用し、LPUSH でメッセージを発行し、BRPOPLPUSH のブロッキングポップを使用し、同時にメッセージを保留リストにバックアップします。消費が成功した後に確認すると、保留リストから削除されます。消費処理が失敗した場合、メッセージは保留リストに滞留し、一定時間後に自動的に主キューに戻され、再度消費が割り当てられます。BRPOPLPUSH は、各メッセージが消費され、かつ一度だけ消費されることを保証します。

GetReliableQueue でキューインスタンスを取得した後、Add でメッセージを発行し、TakeOneAsync で 1 つのメッセージを非同期に消費し、10 秒のブロッキングタイムアウトを指定し、処理が完了した後に Acknowledge で確認します。

注意:消費端としてインスタンス化された Redis は、ブロッキング時間よりも大きいタイムアウト時間を指定する必要があります。例: new FullRedis {Timeout = 11_000} 。

消費端は実際には大ループで消費され、各トピックに対して 8 つのスレッドまたはタスクを開いて大ループ消費を実行することをお勧めします(8 スレッドはビジネスニーズに応じて他の数字に調整できます。例えば 1/2/5/16)。各スレッドは GetReliableQueue を行った後、自分専用のキューインスタンスを取得し、すべてのインスタンスが共同でそのトピックのメッセージを消費処理します。したがって、メッセージトピックの総消費者数は、デプロイインスタンス数にスレッド数 8 を掛けたものであり、必要に応じて増減でき、パーティションの偏りの問題がなく、非常に柔軟です。

public static void Start(FullRedis redis)
{
var topic = "AckQueue";

// 独立スレッドで消費
var source = new CancellationTokenSource();
Task.Run(() =\> ConsumeAsync(redis, topic, source.Token));

// メッセージを発行。実際には生産と消費は異なるサーバーにあります。
Publish(redis, topic);

source.Cancel();

}

private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetReliableQueue<Area>(topic);

queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "广州市" });
Thread.Sleep(1000);

}

private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
var queue = redis.GetReliableQueue<String>(topic);

while (!token.IsCancellationRequested)
{
    var mqMsg = await queue.TakeOneAsync(10);
    if (mqMsg != null)
    {
        var msg = mqMsg.ToJsonEntity<Area\>();
        XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
        queue.Acknowledge(mqMsg);
    }
}

}

LPUSH でメッセージを生成(1 号スレッドがリストに挿入)、BRPOPLPUSH でメッセージを消費(6 号 5 号スレッドがリストからポップして別の Ack 保留リストに挿入)することが、メッセージを失わないための重要なポイントです。LREM は Ack 保留リストから削除し、消費完了後の確認に使用されます。

消費に異常が発生した場合、その確認操作は実行されず、Ack 保留リストに滞留しているメッセージは、60 秒後に主リストに戻り、再度消費が割り当てられます。

脳トレ:アプリケーションプロセスが異常終了した場合、未確認のメッセージはどう処理しますか?

Image 33

消費コードをコメントアウトして再実行すると、Redis で発行されたメッセージを見ることができ、通常のキューと同様に LIST 構造を使用しています。

Image 34

「北京市」のメッセージを処理する際、Acknowledge 確認がなければ、Redis には AckQueue:Ack:* という LIST 構造が見え、このメッセージが保存されます。

したがって、信頼できるキューは本質的に、消費時にメッセージを別の LIST にバックアップし、確認操作は待機確認 LIST から削除することです。

Image 35

この信頼できるキューが登場して以来、基本的に 90% 以上のビジネスニーズを満たすことができます。

遅延キュー#

ある日、小馬さんが言いました。宅配便員がボックスに投函した後、一定時間内にユーザーが受け取らなかった場合、システムは超過受取手数料を請求する必要があり、遅延キューが必要です。

そこで Redis の ZSET を思いつき、RedisDelayQueue<T> を作成しました。Add でメッセージを生成する際に、何秒後にそのメッセージを消費できるかを指定するパラメータが追加され、消費の使い方は信頼できるキューと同様です。

消費端は実際には大ループで消費され、各トピックに対して 8 つのスレッドまたはタスクを開いて大ループ消費を実行することをお勧めします(8 スレッドはビジネスニーズに応じて他の数字に調整できます。例えば 1/2/5/16)。各スレッドは GetDelayQueue を行った後、自分専用のキューインスタンスを取得し、すべてのインスタンスが共同でそのトピックのメッセージを消費処理します。したがって、メッセージトピックの総消費者数は、デプロイインスタンス数にスレッド数 8 を掛けたものであり、必要に応じて増減でき、パーティションの偏りの問題がなく、非常に柔軟です。

public static void Start(FullRedis redis)
{
var topic = "DelayQueue";

// 独立スレッドで消費
var source = new CancellationTokenSource();
Task.Run(() =\> ConsumeAsync(redis, topic, source.Token));

// メッセージを発行。実際には生産と消費は異なるサーバーにあります。
Publish(redis, topic);

source.Cancel();

}

private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetDelayQueue<Area>(topic);

queue.Add(new Area { Code = 110000, Name = "北京市" }, 2);
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" }, 2);
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "广州市" }, 2);
Thread.Sleep(1000);

}

private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
var queue = redis.GetDelayQueue<String>(topic);

while (!token.IsCancellationRequested)
{
    var mqMsg = await queue.TakeOneAsync(10);
    if (mqMsg != null)
    {
        var msg = mqMsg.ToJsonEntity<Area\>();
        XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);

        queue.Acknowledge(mqMsg);
    }
}

}

Image 36

上の図から、毎秒 1 つのメッセージを生成し、2 秒後に北京市を消費し、さらに 1 秒後に上海市を消費することがわかります(上海市の発行からちょうど 2 秒後)。ここでは広州市が欠けています。なぜなら、テストプログラムが広州市を生成した後、1 秒待ってから終了したからです。

Redis から広州市のメッセージを確認すると、ZSET 構造に保存されています。

Image 37

複数消費グループ RedisStream#

またある日、データプラットフォームの仲間が注文キューを消費したいと言いましたが、できません。LIST 構造で作られたキューでは、各メッセージは一度しか消費できないため、データプラットフォームのシステムが消費してしまうと、他のビジネスシステムはメッセージを失ってしまいます。

そこで、Redis5.0 から新たに追加された STREAM 構造を思いつき、RedisStream を再度ラップしました。

注意:消費端としてインスタンス化された Redis は、ブロッキング時間よりも大きいタイムアウト時間を指定する必要があります。例: new FullRedis {Timeout = 11_000} 。

消費端は実際には大ループで消費され、各トピックに対して 8 つのスレッドまたはタスクを開いて大ループ消費を実行することをお勧めします(8 スレッドはビジネスニーズに応じて他の数字に調整できます。例えば 1/2/5/16)。各スレッドは GetStream を行った後、自分専用のキューインスタンスを取得し、すべてのインスタンスが共同でそのトピックのメッセージを消費処理します。したがって、メッセージトピックの総消費者数は、デプロイインスタンス数にスレッド数 8 を掛けたものであり、必要に応じて増減でき、パーティションの偏りの問題がなく、非常に柔軟です。

public static void Start(FullRedis redis)
{
var topic = "FullQueue";

// 2つの消費グループがそれぞれ独立して消費
var source = new CancellationTokenSource();
{
    var queue = redis.GetStream<Area\>(topic);
    queue.Group = "Group1";

    \_ = queue.ConsumeAsync(OnConsume, source.Token);
}
{
    var queue = redis.GetStream<Area\>(topic);
    queue.Group = "Group2";

    \_ = queue.ConsumeAsync(OnConsume2, source.Token);
}

// メッセージを発行。実際には生産と消費は異なるサーバーにあります。
Publish(redis, topic);

Thread.Sleep(1000);
source.Cancel();

}

private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetStream<Area>(topic);

queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "广州市" });
Thread.Sleep(1000);

}

private static void OnConsume(Area area)
{
XTrace.WriteLine("Group1.Consume {0} {1}", area.Code, area.Name);
}

private static Task OnConsume2(Area area, Message message, CancellationToken token)
{
XTrace.WriteLine("Group2.Consume {0} {1} Id={2}", area.Code, area.Name, message.Id);

return Task.CompletedTask;

}

生産プロセスは変わらず、消費の大ループは少し特別で、主に STREAM から消費されたメッセージには独自の Id があり、その Id を確認するだけで済みます。

Image 38

上の図で、赤い枠が生産、紫の枠が消費です。

Redis の中を見てみると、STREAM メッセージがまだ残っていることがわかります。データプラットフォームグループは、異なる消費グループ Group を使用するだけで独立して消費でき、他のシステムのメッセージを奪う心配はありません。

Image 39

ベストプラクティス#

RedisQueue は中通の大データ分析で、Oracle/MySql に書き込むデータをバッファリングするために使用され、多スレッド計算の後にキューに書き込み、専用スレッドが定期的に一括(500 行)でプルし、バッチ Insert/Update 操作を実行します。このシステムキューは、毎日 10 億件のメッセージを処理し、Redis のメモリ割り当ては 8G で、実際の使用量は 100M 未満です。消費端の故障が原因で蓄積が発生しない限り、ほとんどメモリを占有しません。

注意:消費端としてインスタンス化された Redis は、ブロッキング時間よりも大きいタイムアウト時間を指定する必要があります。例: new FullRedis {Timeout = 11_000} 。

递易智能科技はすべて信頼できるキュー RedisReliableQueue を使用しており、約 300 以上のキューがあり、システムごとにそれぞれの Redis インスタンスに分散しています。パブリッククラウドの 2G メモリのマスタースレーブ版です。蓄積されたメッセージが 10 万件未満の場合、キュー専用の Redis インスタンスのメモリ使用量は 100M 未満で、ほとんどメモリを占有しません。

会社の業務は毎日 100 万件以上の注文をもたらし、それに伴うメッセージ数は約 1000 万件で、メッセージが失われたことは一度もありません!

例のコード#

コード:https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo

📎.NET 最愛 Redis メッセージキュー.mp4

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。