轻量级消息队列 RedisQueue#
Title: 轻量级消息队列 RedisQueue
URL Source: https://newlifex.com/core/redisqueue
Markdown Content:
消息队列(Message Queue)是分布式系统必不可少的中间件,大部分消息队列产品(如 RocketMQ/RabbitMQ/Kafka 等)要求团队有比较强的技术实力,不适用于中小团队,并且对.NET 技术的支持力度不够。而 Redis 实现的轻量级消息队列很简单,仅有 Redis 常规操作,几乎不需要开发团队掌握额外的知识!
随着强大的.NET5 发布,.NET 技术栈里面怎可没有最佳的消息队列搭档?
本文从高性能 Redis 组件 NewLife.Redis 出发,借用快递业务场景,讲解.NET 中如何使用 Redis 作为消息队列,搭建企业级分布式系统架构!
本文例程代码需要新建控制台项目后,从 nuget 引用 NewLife.Redis 方能使用。
实践场景:
中通快递,RedisQueue 用于大数据计算,统计数据凑批上传降低数据库落盘压力,消息量每日 10 亿。
递易智能,RedisQueue 用于分布式系统解耦,消息 Topic 共 300 多个,涉及应用系统 50 多个,消息量每日 1000 万。
什么是消息队列#
消息队列就是消息在传输过程中保存消息的容器,其核心功用是削峰和解耦!
早高峰,快递公司的货车前来各驿站卸货,多名站点工作人员使用 PDA 扫描到站,大量信息进入系统(1000tps),而通知快递公司的接口只有 400tps 的处理能力。
通过增加 MQ 来保存消息,让超过系统处理能力的消息滞留下来,等早高峰过后,系统即可完成处理。此为削峰!
在快递柜业务流程中,快递员投柜后需要经历扣减系统费、短信通知用户和推送通知快递公司三个业务动作。传统做法需要依次执行这些业务东西,如果其中某一步异常(例如用户手机未开机或者快递公司接口故障),将会延迟甚至中断整个投柜流程,严重影响用户体验。
如果接口层收到投柜数据后,写入消息到 MQ,后续三个子系统各自消费处理,将可以完美解决该问题,并且子系统故障不影响上游系统!此为解耦!
内存消息队列#
最简单的消息队列,可以由阻塞集合 BlockingCollection 实现
public static void Start()
{
var queue = new BlockingCollection<Area>();
// 独立线程消费
var thread = new Thread(s =\> Consume(queue));
thread.Start();
// 发布消息
Publish(queue);
}
private static void Publish(BlockingCollection<Area> queue)
{
var area = new Area {Code = 110000, Name = "北京市"};
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
area = new Area { Code = 310000, Name = "上海市" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
area = new Area { Code = 440100, Name = "广州市" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
}
private static void Consume(BlockingCollection<Area> queue)
{
while (true)
{
var msg = queue.Take();
if (msg != null)
{
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
}
}
}
每秒钟生产一个消息(1 号线程),都被独立线程(9 好线程)消费到。
Redis 做消息队列#
Redis 的 LIST 结构,具备左进右出的功能,再使用 BRPOP 的阻塞弹出,即可完成一个最基本的消息队列 RedisQueue<T>。BRPOP 确保每个消息都被消费,且仅消费一次。
GetQueue 取得队列后,Add 方法发布消息。
TakeOne 拉取消费一条消息,指定 10 秒阻塞,10 秒内有消息立马返回,否则等到 10 秒超时后返回空。
注意:作为消费端实例化的 Redis,需要指定大于阻塞时间的超时时间,如 new FullRedis {Timeout = 11_000} 。
消费端实际上是大循环消费,建议每个 Topic 开 8 个线程或 Task 执行大循环消费,(8 线程可根据业务需要调整为其它数字,如 1/2/5/16),每个线程 GetQueue 后得到自己独占的队列实例,所有实例共同消费处理该 Topic 下的消息。那么消息 Topic 的总消费者数量就是部署实例数乘以线程数 8,并且可以按需增减,没有分区倾斜问题,非常灵活。
public static void Start(FullRedis redis)
{
var topic = "EasyQueue";
// 独立线程消费
var thread = new Thread(s =\> Consume(redis, topic));
thread.Start();
// 发布消息。实际上生产和消费位于不同服务器
Publish(redis, topic);
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "广州市" });
Thread.Sleep(1000);
}
private static void Consume(FullRedis redis, String topic)
{
var queue = redis.GetQueue<Area>(topic);
while (true)
{
var msg = queue.TakeOne(10);
if (msg != null)
{
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
}
}
}
LPUSH 生产消息(1 号线程插入列表),BRPOP 消费消息(11 号线程弹出列表),因此,消息被消费后就消失了!
注:NewLife.Redis 消息队列默认对消息进行 Json 序列化,消费时以 String 消费得到消息,再反序列化为消息对象。
从日志时间可以看到,生产与消费的时间差在 1~3ms 之间,延迟极低!
注释消费代码后重跑,可以在 Redis 中看到发布的消息
注意:RedisQueue 的缺点是消息被消费后即从 Redis 中删除,如果消费端处理消息失败,将有丢失消息的风险!重要业务请使用需要确认的 RedisReliableQueue 队列。
需要确认的队列#
如果通知快递公司的物流推送子系统处理消息时出错,消息丢失怎么办?显然不可能让上游再发一次!
这里我们需要支持消费确认的可信队列 RedisReliableQueue<T>。消费之后,除非程序主动确认消费,否则 Redis 不许删除消息。
RedisReliableQueue 采用 Redis 的 LIST 结构,LPUSH 发布消息,再使用 BRPOPLPUSH 的阻塞弹出,同时备份消息到挂起列表。消费成功后确认时,再从挂起列表中删除。如果消费处理失败,消息将滞留在挂起列表,一定时间后自动转移回去主队列,重新分配消费。BRPOPLPUSH 确保每个消息都被消费,且仅消费一次。
GetReliableQueue 获取队列实例后,Add 发布消息,TakeOneAsync 异步消费一条消息,并指定 10 秒阻塞超时,处理完成后再通过 Acknowledge 确认。
注意:作为消费端实例化的 Redis,需要指定大于阻塞时间的超时时间,如 new FullRedis {Timeout = 11_000} 。
消费端实际上是大循环消费,建议每个 Topic 开 8 个线程或 Task 执行大循环消费,(8 线程可根据业务需要调整为其它数字,如 1/2/5/16),每个线程 GetReliableQueue 后得到自己独占的队列实例,所有实例共同消费处理该 Topic 下的消息。那么消息 Topic 的总消费者数量就是部署实例数乘以线程数 8,并且可以按需增减,没有分区倾斜问题,非常灵活。
public static void Start(FullRedis redis)
{
var topic = "AckQueue";
// 独立线程消费
var source = new CancellationTokenSource();
Task.Run(() =\> ConsumeAsync(redis, topic, source.Token));
// 发布消息。实际上生产和消费位于不同服务器
Publish(redis, topic);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetReliableQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "广州市" });
Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
var queue = redis.GetReliableQueue<String>(topic);
while (!token.IsCancellationRequested)
{
var mqMsg = await queue.TakeOneAsync(10);
if (mqMsg != null)
{
var msg = mqMsg.ToJsonEntity<Area\>();
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
queue.Acknowledge(mqMsg);
}
}
}
LPUSH 生产消息(1 号线程插入列表),BRPOPLPUSH 消费消息(6 号 5 号线程弹出列表并插入另一个 Ack 挂起列表),这是确保不丢消息的关键。 LREM 从 Ack 挂起列表删除,用于消费完成后确认。
如果消费异常,就不会执行该确认操作,滞留在 Ack 挂起列表的消息,60 秒后重新回来主列表,再次分配消费。
脑筋急转弯: 如果应用进程异常退出,未确认的消息该怎么处理?
注释消费代码后重跑,可以在 Redis 中看到发布的消息,跟普通队列一样,使用了 LIST 结构
处理 “北京市” 消息时,如果没有 Acknowledge 确认,Redis 里面将会看到一个名为 AckQueue:Ack:* 的 LIST 结构,里面保存这这一条消息。
所以,可信队列本质上就是在消费时,同步把消息备份到另一个 LIST 里面,确认操作就是从待确认 LIST 里面删除。
自从有了这个可信队列,基本上足够满足 90% 以上业务需求。
延迟队列#
某一天,小马哥说,快递员投柜一定时间时候,如果用户没有来取件,那么系统需要收取超期取件费,需要一个延迟队列。
于是想到了 Redis 的 ZSET,我们再来一个 RedisDelayQueue<T>,Add 生产消息时多了一个参数,指定若干秒后可以消费到该消息,消费用法跟可信队列一样。
消费端实际上是大循环消费,建议每个 Topic 开 8 个线程或 Task 执行大循环消费,(8 线程可根据业务需要调整为其它数字,如 1/2/5/16),每个线程 GetDelayQueue 后得到自己独占的队列实例,所有实例共同消费处理该 Topic 下的消息。那么消息 Topic 的总消费者数量就是部署实例数乘以线程数 8,并且可以按需增减,没有分区倾斜问题,非常灵活。
public static void Start(FullRedis redis)
{
var topic = "DelayQueue";
// 独立线程消费
var source = new CancellationTokenSource();
Task.Run(() =\> ConsumeAsync(redis, topic, source.Token));
// 发布消息。实际上生产和消费位于不同服务器
Publish(redis, topic);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetDelayQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" }, 2);
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" }, 2);
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "广州市" }, 2);
Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
var queue = redis.GetDelayQueue<String>(topic);
while (!token.IsCancellationRequested)
{
var mqMsg = await queue.TakeOneAsync(10);
if (mqMsg != null)
{
var msg = mqMsg.ToJsonEntity<Area\>();
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
queue.Acknowledge(mqMsg);
}
}
}
上图可以看到,每秒生产一个消息,2 秒后消费到北京市,再过 1 秒消费到上海市(距离上海市的发布刚好 2 秒)。这里少了广州市,因为测试程序在生产广州市后,只等了 1 秒就退出。
我们从 Redis 中可以看到广州市这一条消息,存放在 ZSET 结构中。
多消费组 RedisStream#
又一天,数据中台的小伙伴想要消费订单队列,但是不能够啊,LIST 结构做的队列,每个消息只能被消费一次,如果数据中台的系统消费掉了,其它业务系统就会失去消息。
我们想到了 Redis5.0 开始新增的 STREAM 结构,再次封装 RedisStream。
注意:作为消费端实例化的 Redis,需要指定大于阻塞时间的超时时间,如 new FullRedis {Timeout = 11_000} 。
消费端实际上是大循环消费,建议每个 Topic 开 8 个线程或 Task 执行大循环消费,(8 线程可根据业务需要调整为其它数字,如 1/2/5/16),每个线程 GetStream 后得到自己独占的队列实例,所有实例共同消费处理该 Topic 下的消息。那么消息 Topic 的总消费者数量就是部署实例数乘以线程数 8,并且可以按需增减,没有分区倾斜问题,非常灵活。
public static void Start(FullRedis redis)
{
var topic = "FullQueue";
// 两个消费组各自独立消费
var source = new CancellationTokenSource();
{
var queue = redis.GetStream<Area\>(topic);
queue.Group = "Group1";
\_ = queue.ConsumeAsync(OnConsume, source.Token);
}
{
var queue = redis.GetStream<Area\>(topic);
queue.Group = "Group2";
\_ = queue.ConsumeAsync(OnConsume2, source.Token);
}
// 发布消息。实际上生产和消费位于不同服务器
Publish(redis, topic);
Thread.Sleep(1000);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetStream<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "广州市" });
Thread.Sleep(1000);
}
private static void OnConsume(Area area)
{
XTrace.WriteLine("Group1.Consume {0} {1}", area.Code, area.Name);
}
private static Task OnConsume2(Area area, Message message, CancellationToken token)
{
XTrace.WriteLine("Group2.Consume {0} {1} Id={2}", area.Code, area.Name, message.Id);
return Task.CompletedTask;
}
生产过程不变,消费大循环有点特别,主要是 STREAM 消费回来的消息,有它自己的 Id,只需要对这个 Id 确认就可以了。
上图中,红色框是生产,紫色框是消费。
再来看看 Redis 中,可以看到 STREAM 消息还在里面。数据中台组只需要使用不同的消费组 Group,即可独立消费,不用担心抢其它系统消息啦。
最佳实践#
RedisQueue 在中通大数据分析中,用于缓冲等待写入 Oracle/MySql 的数据,多线程计算后写入队列,然后由专门线程定时拉取一批(500 行),执行批量 Insert/Update 操作。该系统队列,每天 10 亿条消息,Redis 内存分配 8G,实际使用小于 100M,除非消费端故障导致产生积压。
注意:作为消费端实例化的 Redis,需要指定大于阻塞时间的超时时间,如 new FullRedis {Timeout = 11_000} 。
递易智能科技全部使用可信队列 RedisReliableQueue,约 300 多个队列,按系统分布在各自的 Redis 实例,公有云 2G 内存主从版。积压消息小于 10 万时,队列专用的 Redis 实例内存占用小于 100M,几乎不占内存空间。
公司业务每天带来 100 万多订单,由此衍生的消息数约 1000 万条,从未丢失消息!
例程代码#
代码:https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo