輕量級消息隊列 RedisQueue#
Title: 輕量級消息隊列 RedisQueue
URL Source: https://newlifex.com/core/redisqueue
Markdown Content:
消息隊列(Message Queue)是分佈式系統必不可少的中間件,大部分消息隊列產品(如 RocketMQ/RabbitMQ/Kafka 等)要求團隊有比較強的技術實力,不適用於中小團隊,並且對.NET 技術的支持力度不夠。而 Redis 實現的輕量級消息隊列很簡單,僅有 Redis 常規操作,幾乎不需要開發團隊掌握額外的知識!
隨著強大的.NET5 發布,.NET 技術棧裡面怎可沒有最佳的消息隊列搭檔?
本文從高性能 Redis 組件 NewLife.Redis 出發,借用快遞業務場景,講解.NET 中如何使用 Redis 作為消息隊列,搭建企業級分佈式系統架構!
本文例程代碼需要新建控制台項目後,從 nuget 引用 NewLife.Redis 方能使用。
實踐場景:
中通快遞,RedisQueue 用於大數據計算,統計數據湊批上傳降低數據庫落盤壓力,消息量每日 10 億。
遞易智能,RedisQueue 用於分佈式系統解耦,消息 Topic 共 300 多個,涉及應用系統 50 多個,消息量每日 1000 萬。
什麼是消息隊列#
消息隊列就是消息在傳輸過程中保存消息的容器,其核心功用是削峰和解耦!
早高峰,快遞公司的貨車前來各驛站卸貨,多名站點工作人員使用 PDA 掃描到站,大量信息進入系統(1000tps),而通知快遞公司的接口只有 400tps 的處理能力。
通過增加 MQ 來保存消息,讓超過系統處理能力的消息滯留下來,等早高峰過後,系統即可完成處理。此為削峰!
在快遞櫃業務流程中,快遞員投櫃後需要經歷扣減系統費、短信通知用戶和推送通知快遞公司三個業務動作。傳統做法需要依次執行這些業務東西,如果其中某一步異常(例如用戶手機未開機或者快遞公司接口故障),將會延遲甚至中斷整個投櫃流程,嚴重影響用戶體驗。
如果接口層收到投櫃數據後,寫入消息到 MQ,後續三個子系統各自消費處理,將可以完美解決該問題,並且子系統故障不影響上游系統!此為解耦!
內存消息隊列#
最簡單的消息隊列,可以由阻塞集合 BlockingCollection 實現
public static void Start()
{
var queue = new BlockingCollection<Area>();
// 獨立線程消費
var thread = new Thread(s =\> Consume(queue));
thread.Start();
// 發布消息
Publish(queue);
}
private static void Publish(BlockingCollection<Area> queue)
{
var area = new Area {Code = 110000, Name = "北京市"};
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
area = new Area { Code = 310000, Name = "上海市" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
area = new Area { Code = 440100, Name = "廣州市" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
}
private static void Consume(BlockingCollection<Area> queue)
{
while (true)
{
var msg = queue.Take();
if (msg != null)
{
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
}
}
}
每秒鐘生產一個消息(1 號線程),都被獨立線程(9 好線程)消費到。
Redis 做消息隊列#
Redis 的 LIST 結構,具備左進右出的功能,再使用 BRPOP 的阻塞彈出,即可完成一個最基本的消息隊列 RedisQueue<T>。BRPOP 確保每個消息都被消費,且僅消費一次。
GetQueue 取得隊列後,Add 方法發布消息。
TakeOne 拉取消費一條消息,指定 10 秒阻塞,10 秒內有消息立馬返回,否則等到 10 秒超時後返回空。
注意:作為消費端實例化的 Redis,需要指定大於阻塞時間的超時時間,如 new FullRedis {Timeout = 11_000} 。
消費端實際上是大循環消費,建議每個 Topic 開 8 個線程或 Task 執行大循環消費,(8 線程可根據業務需要調整為其它數字,如 1/2/5/16),每個線程 GetQueue 後得到自己獨占的隊列實例,所有實例共同消費處理該 Topic 下的消息。那么消息 Topic 的總消費者數量就是部署實例數乘以線程數 8,並且可以按需增減,沒有分區傾斜問題,非常靈活。
public static void Start(FullRedis redis)
{
var topic = "EasyQueue";
// 獨立線程消費
var thread = new Thread(s =\> Consume(redis, topic));
thread.Start();
// 發布消息。實際上生產和消費位於不同伺服器
Publish(redis, topic);
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "廣州市" });
Thread.Sleep(1000);
}
private static void Consume(FullRedis redis, String topic)
{
var queue = redis.GetQueue<Area>(topic);
while (true)
{
var msg = queue.TakeOne(10);
if (msg != null)
{
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
}
}
}
LPUSH 生產消息(1 號線程插入列表),BRPOP 消費消息(11 號線程彈出列表),因此,消息被消費後就消失了!
注:NewLife.Redis 消息隊列默認對消息進行 Json 序列化,消費時以 String 消費得到消息,再反序列化為消息對象。
從日誌時間可以看到,生產與消費的時間差在 1~3ms 之間,延遲極低!
註釋消費代碼後重跑,可以在 Redis 中看到發布的消息
注意:RedisQueue 的缺點是消息被消費後即從 Redis 中刪除,如果消費端處理消息失敗,將有丟失消息的風險!重要業務請使用需要確認的 RedisReliableQueue 隊列。
需要確認的隊列#
如果通知快遞公司的物流推送子系統處理消息時出錯,消息丟失怎麼辦?顯然不可能讓上游再發一次!
這裡我們需要支持消費確認的可信隊列 RedisReliableQueue<T>。消費之後,除非程序主動確認消費,否則 Redis 不許刪除消息。
RedisReliableQueue 採用 Redis 的 LIST 結構,LPUSH 發布消息,再使用 BRPOPLPUSH 的阻塞彈出,同時備份消息到掛起列表。消費成功後確認時,再從掛起列表中刪除。如果消費處理失敗,消息將滯留在掛起列表,一定時間後自動轉移回去主隊列,重新分配消費。BRPOPLPUSH 確保每個消息都被消費,且僅消費一次。
GetReliableQueue 獲取隊列實例後,Add 發布消息,TakeOneAsync 異步消費一條消息,並指定 10 秒阻塞超時,處理完成後再通過 Acknowledge 確認。
注意:作為消費端實例化的 Redis,需要指定大於阻塞時間的超時時間,如 new FullRedis {Timeout = 11_000} 。
消費端實際上是大循環消費,建議每個 Topic 開 8 個線程或 Task 執行大循環消費,(8 線程可根據業務需要調整為其它數字,如 1/2/5/16),每個線程 GetReliableQueue 後得到自己獨占的隊列實例,所有實例共同消費處理該 Topic 下的消息。那么消息 Topic 的總消費者數量就是部署實例數乘以線程數 8,並且可以按需增減,沒有分區傾斜問題,非常靈活。
public static void Start(FullRedis redis)
{
var topic = "AckQueue";
// 獨立線程消費
var source = new CancellationTokenSource();
Task.Run(() =\> ConsumeAsync(redis, topic, source.Token));
// 發布消息。實際上生產和消費位於不同伺服器
Publish(redis, topic);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetReliableQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "廣州市" });
Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
var queue = redis.GetReliableQueue<String>(topic);
while (!token.IsCancellationRequested)
{
var mqMsg = await queue.TakeOneAsync(10);
if (mqMsg != null)
{
var msg = mqMsg.ToJsonEntity<Area\>();
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
queue.Acknowledge(mqMsg);
}
}
}
LPUSH 生產消息(1 號線程插入列表),BRPOPLPUSH 消費消息(6 號 5 號線程彈出列表並插入另一個 Ack 掛起列表),這是確保不丟消息的關鍵。 LREM 從 Ack 掛起列表刪除,用於消費完成後確認。
如果消費異常,就不會執行該確認操作,滯留在 Ack 掛起列表的消息,60 秒後重新回到主列表,再次分配消費。
腦筋急轉彎: 如果應用進程異常退出,未確認的消息該怎麼處理?
註釋消費代碼後重跑,可以在 Redis 中看到發布的消息,跟普通隊列一樣,使用了 LIST 結構
處理 “北京市” 消息時,如果沒有 Acknowledge 確認,Redis 裡面將會看到一個名為 AckQueue:Ack:* 的 LIST 結構,裡面保存這一條消息。
所以,可信隊列本質上就是在消費時,同步把消息備份到另一個 LIST 裡面,確認操作就是從待確認 LIST 裡面刪除。
自從有了這個可信隊列,基本上足夠滿足 90% 以上業務需求。
延遲隊列#
某一天,小馬哥說,快遞員投櫃一定時間時候,如果用戶沒有來取件,那麼系統需要收取超期取件費,需要一個延遲隊列。
於是想到了 Redis 的 ZSET,我們再來一個 RedisDelayQueue<T>,Add 生產消息時多了一個參數,指定若干秒後可以消費到該消息,消費用法跟可信隊列一樣。
消費端實際上是大循環消費,建議每個 Topic 開 8 個線程或 Task 執行大循環消費,(8 線程可根據業務需要調整為其它數字,如 1/2/5/16),每個線程 GetDelayQueue 後得到自己獨占的隊列實例,所有實例共同消費處理該 Topic 下的消息。那么消息 Topic 的總消費者數量就是部署實例數乘以線程數 8,並且可以按需增減,沒有分區傾斜問題,非常靈活。
public static void Start(FullRedis redis)
{
var topic = "DelayQueue";
// 獨立線程消費
var source = new CancellationTokenSource();
Task.Run(() =\> ConsumeAsync(redis, topic, source.Token));
// 發布消息。實際上生產和消費位於不同伺服器
Publish(redis, topic);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetDelayQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" }, 2);
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" }, 2);
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "廣州市" }, 2);
Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
var queue = redis.GetDelayQueue<String>(topic);
while (!token.IsCancellationRequested)
{
var mqMsg = await queue.TakeOneAsync(10);
if (mqMsg != null)
{
var msg = mqMsg.ToJsonEntity<Area\>();
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
queue.Acknowledge(mqMsg);
}
}
}
上圖可以看到,每秒生產一個消息,2 秒後消費到北京市,再過 1 秒消費到上海市(距離上海市的發布剛好 2 秒)。這裡少了廣州市,因為測試程序在生產廣州市後,只等了 1 秒就退出。
我們從 Redis 中可以看到廣州市這一條消息,存放在 ZSET 結構中。
多消費組 RedisStream#
又一天,數據中台的小夥伴想要消費訂單隊列,但是不能夠啊,LIST 結構做的隊列,每個消息只能被消費一次,如果數據中台的系統消費掉了,其它業務系統就會失去消息。
我們想到了 Redis5.0 開始新增的 STREAM 結構,再次封裝 RedisStream。
注意:作為消費端實例化的 Redis,需要指定大於阻塞時間的超時時間,如 new FullRedis {Timeout = 11_000} 。
消費端實際上是大循環消費,建議每個 Topic 開 8 個線程或 Task 執行大循環消費,(8 線程可根據業務需要調整為其它數字,如 1/2/5/16),每個線程 GetStream 後得到自己獨占的隊列實例,所有實例共同消費處理該 Topic 下的消息。那么消息 Topic 的總消費者數量就是部署實例數乘以線程數 8,並且可以按需增減,沒有分區傾斜問題,非常靈活。
public static void Start(FullRedis redis)
{
var topic = "FullQueue";
// 兩個消費組各自獨立消費
var source = new CancellationTokenSource();
{
var queue = redis.GetStream<Area\>(topic);
queue.Group = "Group1";
\_ = queue.ConsumeAsync(OnConsume, source.Token);
}
{
var queue = redis.GetStream<Area\>(topic);
queue.Group = "Group2";
\_ = queue.ConsumeAsync(OnConsume2, source.Token);
}
// 發布消息。實際上生產和消費位於不同伺服器
Publish(redis, topic);
Thread.Sleep(1000);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetStream<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "廣州市" });
Thread.Sleep(1000);
}
private static void OnConsume(Area area)
{
XTrace.WriteLine("Group1.Consume {0} {1}", area.Code, area.Name);
}
private static Task OnConsume2(Area area, Message message, CancellationToken token)
{
XTrace.WriteLine("Group2.Consume {0} {1} Id={2}", area.Code, area.Name, message.Id);
return Task.CompletedTask;
}
生產過程不變,消費大循環有點特別,主要是 STREAM 消費回來的消息,有它自己的 Id,只需要對這個 Id 確認就可以了。
上圖中,紅色框是生產,紫色框是消費。
再來看看 Redis 中,可以看到 STREAM 消息還在裡面。數據中台組只需要使用不同的消費組 Group,即可獨立消費,不用擔心搶其它系統消息啦。
最佳實踐#
RedisQueue 在中通大數據分析中,用於緩衝等待寫入 Oracle/MySql 的數據,多線程計算後寫入隊列,然後由專門線程定時拉取一批(500 行),執行批量 Insert/Update 操作。該系統隊列,每天 10 億條消息,Redis 內存分配 8G,實際使用小於 100M,除非消費端故障導致產生積壓。
注意:作為消費端實例化的 Redis,需要指定大於阻塞時間的超時時間,如 new FullRedis {Timeout = 11_000} 。
遞易智能科技全部使用可信隊列 RedisReliableQueue,約 300 多個隊列,按系統分布在各自的 Redis 實例,公有雲 2G 內存主從版。積壓消息小於 10 萬時,隊列專用的 Redis 實例內存占用小於 100M,幾乎不占內存空間。
公司業務每天帶來 100 萬多訂單,由此衍生的消息數約 1000 萬條,從未丟失消息!
例程代碼#
代碼:https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo