wsf

wsf

Lightweight Message Queue RedisQueue

Lightweight Message Queue RedisQueue#

Title: Lightweight Message Queue RedisQueue

URL Source: https://newlifex.com/core/redisqueue

Markdown Content:
A message queue is an essential middleware for distributed systems. Most message queue products (such as RocketMQ/RabbitMQ/Kafka, etc.) require teams to have relatively strong technical capabilities, making them unsuitable for small and medium-sized teams, and they do not provide sufficient support for .NET technology. The lightweight message queue implemented by Redis is very simple, relying only on regular Redis operations, and almost does not require the development team to master additional knowledge!

With the powerful release of .NET5, how can the .NET technology stack be without the best message queue partner?

This article starts from the high-performance Redis component NewLife.Redis and uses the express delivery business scenario to explain how to use Redis as a message queue in .NET to build an enterprise-level distributed system architecture!

The example code in this article requires creating a console project and referencing NewLife.Redis from NuGet to use.

Practical Scenarios:

ZTO Express, RedisQueue is used for big data computing, batching statistics to reduce database write pressure, with a daily message volume of 1 billion.

Delivery Intelligent, RedisQueue is used for decoupling distributed systems, with over 300 message topics involving more than 50 application systems, and a daily message volume of 10 million.

Image 27

What is a Message Queue#

A message queue is a container that holds messages during transmission, with its core functions being peak shaving and decoupling!

Image 28

During the morning rush hour, delivery trucks arrive at various stations to unload goods, and multiple station staff use PDAs to scan arrivals, resulting in a large amount of information entering the system (1000tps), while the interface notifying the delivery company has a processing capacity of only 400tps.

By adding an MQ to hold messages, messages that exceed the system's processing capacity can be retained until after the morning rush hour, allowing the system to complete processing. This is peak shaving!

Image 29

In the express cabinet business process, after the courier deposits the package, three business actions need to be executed: deducting system fees, notifying users via SMS, and pushing notifications to the delivery company. The traditional approach requires executing these business actions sequentially, and if any step encounters an exception (for example, the user's phone is off or the delivery company's interface fails), it will delay or even interrupt the entire deposit process, severely affecting user experience.

If the interface layer writes the deposit data to the MQ after receiving it, the subsequent three subsystems can each consume and process independently, perfectly solving the problem, and subsystem failures do not affect upstream systems! This is decoupling!

In-Memory Message Queue#

The simplest message queue can be implemented using a BlockingCollection.

public static void Start()
{
    var queue = new BlockingCollection<Area>();

    // Independent thread consumption
    var thread = new Thread(s => Consume(queue));
    thread.Start();

    // Publish messages
    Publish(queue);
}

private static void Publish(BlockingCollection<Area> queue)
{
    var area = new Area { Code = 110000, Name = "Beijing" };
    XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
    queue.Add(area);
    Thread.Sleep(1000);

    area = new Area { Code = 310000, Name = "Shanghai" };
    XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
    queue.Add(area);
    Thread.Sleep(1000);

    area = new Area { Code = 440100, Name = "Guangzhou" };
    XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
    queue.Add(area);
    Thread.Sleep(1000);
}

private static void Consume(BlockingCollection<Area> queue)
{
    while (true)
    {
        var msg = queue.Take();
        if (msg != null)
        {
            XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
        }
    }
}

A message is produced every second (Thread 1) and consumed by an independent thread (Thread 9).

Using Redis as a Message Queue#

The LIST structure of Redis has the functionality of left-in and right-out. By using BRPOP for blocking pop, a basic message queue RedisQueue can be completed. BRPOP ensures that each message is consumed and only consumed once.

GetQueue retrieves the queue, and the Add method publishes messages.

TakeOne pulls and consumes a message, specifying a 10-second blocking timeout. If there is a message within 10 seconds, it returns immediately; otherwise, it returns null after 10 seconds.

Note: When instantiating Redis as a consumer, a timeout greater than the blocking time must be specified, such as new FullRedis { Timeout = 11_000 }.

The consumer is essentially a large loop, and it is recommended to open 8 threads or tasks for each topic to execute the large loop consumption (the number of threads can be adjusted to other numbers as needed, such as 1/2/5/16). Each thread gets its exclusive queue instance after calling GetQueue, and all instances jointly consume and process messages under that topic. Therefore, the total number of consumers for the message topic is the number of deployed instances multiplied by the number of threads (8), and it can be increased or decreased as needed, with no partition skew issues, making it very flexible.

public static void Start(FullRedis redis)
{
    var topic = "EasyQueue";

    // Independent thread consumption
    var thread = new Thread(s => Consume(redis, topic));
    thread.Start();

    // Publish messages. In reality, production and consumption are on different servers
    Publish(redis, topic);
}

private static void Publish(FullRedis redis, String topic)
{
    var queue = redis.GetQueue<Area>(topic);

    queue.Add(new Area { Code = 110000, Name = "Beijing" });
    Thread.Sleep(1000);
    queue.Add(new Area { Code = 310000, Name = "Shanghai" });
    Thread.Sleep(1000);
    queue.Add(new Area { Code = 440100, Name = "Guangzhou" });
    Thread.Sleep(1000);
}

private static void Consume(FullRedis redis, String topic)
{
    var queue = redis.GetQueue<Area>(topic);

    while (true)
    {
        var msg = queue.TakeOne(10);
        if (msg != null)
        {
            XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
        }
    }
}

LPUSH produces messages (Thread 1 inserts into the list), and BRPOP consumes messages (Thread 11 pops from the list), so the message disappears after being consumed!

Note: NewLife.Redis by default serializes messages to JSON, and during consumption, the message is consumed as a string and then deserialized back into a message object.

Image 31

From the log timestamps, it can be seen that the time difference between production and consumption is between 1 to 3 ms, with very low latency!

After commenting out the consumption code and rerunning, the published messages can be seen in Redis.

Image 32

Note: The disadvantage of RedisQueue is that messages are deleted from Redis after being consumed. If the consumer fails to process the message, there is a risk of message loss! For important business, please use the confirmed RedisReliableQueue.

Confirmed Queue#

What if there is an error processing messages in the logistics push subsystem notifying the delivery company, and the message is lost? Clearly, it is not possible to let the upstream send it again!

Here we need a trusted queue RedisReliableQueue that supports consumption confirmation. After consumption, unless the program actively confirms consumption, Redis will not allow message deletion.

RedisReliableQueue uses Redis's LIST structure, LPUSH to publish messages, and then uses BRPOPLPUSH for blocking pop, while backing up messages to a pending list. After confirming successful consumption, the message is deleted from the pending list. If the consumption fails, the message will remain in the pending list and will automatically return to the main queue after a certain time, ready for reallocation for consumption. BRPOPLPUSH ensures that each message is consumed and only consumed once.

GetReliableQueue retrieves the queue instance, Add publishes messages, and TakeOneAsync asynchronously consumes a message, specifying a 10-second blocking timeout, and confirms after processing is complete.

Note: When instantiating Redis as a consumer, a timeout greater than the blocking time must be specified, such as new FullRedis { Timeout = 11_000 }.

The consumer is essentially a large loop, and it is recommended to open 8 threads or tasks for each topic to execute the large loop consumption (the number of threads can be adjusted to other numbers as needed, such as 1/2/5/16). Each thread gets its exclusive queue instance after calling GetReliableQueue, and all instances jointly consume and process messages under that topic. Therefore, the total number of consumers for the message topic is the number of deployed instances multiplied by the number of threads (8), and it can be increased or decreased as needed, with no partition skew issues, making it very flexible.

public static void Start(FullRedis redis)
{
    var topic = "AckQueue";

    // Independent thread consumption
    var source = new CancellationTokenSource();
    Task.Run(() => ConsumeAsync(redis, topic, source.Token));

    // Publish messages. In reality, production and consumption are on different servers
    Publish(redis, topic);

    source.Cancel();
}

private static void Publish(FullRedis redis, String topic)
{
    var queue = redis.GetReliableQueue<Area>(topic);

    queue.Add(new Area { Code = 110000, Name = "Beijing" });
    Thread.Sleep(1000);
    queue.Add(new Area { Code = 310000, Name = "Shanghai" });
    Thread.Sleep(1000);
    queue.Add(new Area { Code = 440100, Name = "Guangzhou" });
    Thread.Sleep(1000);
}

private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
    var queue = redis.GetReliableQueue<String>(topic);

    while (!token.IsCancellationRequested)
    {
        var mqMsg = await queue.TakeOneAsync(10);
        if (mqMsg != null)
        {
            var msg = mqMsg.ToJsonEntity<Area>();
            XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
            queue.Acknowledge(mqMsg);
        }
    }
}

LPUSH produces messages (Thread 1 inserts into the list), and BRPOPLPUSH consumes messages (Threads 6 and 5 pop from the list and insert into another Ack pending list), which is key to ensuring no message loss. LREM deletes from the Ack pending list for confirmation after consumption.

If there is an exception during consumption, the confirmation operation will not be executed, and the messages left in the Ack pending list will return to the main list after 60 seconds for reallocation for consumption.

Riddle: How to handle unconfirmed messages if the application process exits abnormally?

Image 33

After commenting out the consumption code and rerunning, the published messages can be seen in Redis, just like a normal queue, using the LIST structure.

Image 34

When processing the "Beijing" message, if there is no Acknowledge confirmation, Redis will show a LIST structure named AckQueue:Ack:* containing this message.

Thus, the trusted queue essentially synchronously backs up the message to another LIST during consumption, and the confirmation operation is to delete from the pending confirmation LIST.

Image 35

Since the introduction of this trusted queue, it has basically been sufficient to meet over 90% of business needs.

Delayed Queue#

One day, Brother Ma said that if the courier deposits the package for a certain period and the user does not come to pick it up, the system needs to charge a late pickup fee, requiring a delayed queue.

Thus, we thought of Redis's ZSET, and we created a RedisDelayQueue, which adds a parameter when producing messages to specify the number of seconds after which the message can be consumed, with consumption usage similar to the trusted queue.

The consumer is essentially a large loop, and it is recommended to open 8 threads or tasks for each topic to execute the large loop consumption (the number of threads can be adjusted to other numbers as needed, such as 1/2/5/16). Each thread gets its exclusive queue instance after calling GetDelayQueue, and all instances jointly consume and process messages under that topic. Therefore, the total number of consumers for the message topic is the number of deployed instances multiplied by the number of threads (8), and it can be increased or decreased as needed, with no partition skew issues, making it very flexible.

public static void Start(FullRedis redis)
{
    var topic = "DelayQueue";

    // Independent thread consumption
    var source = new CancellationTokenSource();
    Task.Run(() => ConsumeAsync(redis, topic, source.Token));

    // Publish messages. In reality, production and consumption are on different servers
    Publish(redis, topic);

    source.Cancel();
}

private static void Publish(FullRedis redis, String topic)
{
    var queue = redis.GetDelayQueue<Area>(topic);

    queue.Add(new Area { Code = 110000, Name = "Beijing" }, 2);
    Thread.Sleep(1000);
    queue.Add(new Area { Code = 310000, Name = "Shanghai" }, 2);
    Thread.Sleep(1000);
    queue.Add(new Area { Code = 440100, Name = "Guangzhou" }, 2);
    Thread.Sleep(1000);
}

private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
    var queue = redis.GetDelayQueue<String>(topic);

    while (!token.IsCancellationRequested)
    {
        var mqMsg = await queue.TakeOneAsync(10);
        if (mqMsg != null)
        {
            var msg = mqMsg.ToJsonEntity<Area>();
            XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);

            queue.Acknowledge(mqMsg);
        }
    }
}

Image 36

As shown in the figure, a message is produced every second, and after 2 seconds, it is consumed in Beijing, and after another second, it is consumed in Shanghai (exactly 2 seconds after the release of Shanghai). The Guangzhou message is missing because the test program exited just 1 second after producing it.

We can see the Guangzhou message stored in the ZSET structure in Redis.

Image 37

Multiple Consumer Groups RedisStream#

Another day, colleagues from the data middle platform wanted to consume the order queue, but they couldn't because the queue made with the LIST structure allows each message to be consumed only once. If the data middle platform system consumes it, other business systems will lose the message.

We thought of the STREAM structure newly added in Redis 5.0 and encapsulated RedisStream again.

Note: When instantiating Redis as a consumer, a timeout greater than the blocking time must be specified, such as new FullRedis { Timeout = 11_000 }.

The consumer is essentially a large loop, and it is recommended to open 8 threads or tasks for each topic to execute the large loop consumption (the number of threads can be adjusted to other numbers as needed, such as 1/2/5/16). Each thread gets its exclusive queue instance after calling GetStream, and all instances jointly consume and process messages under that topic. Therefore, the total number of consumers for the message topic is the number of deployed instances multiplied by the number of threads (8), and it can be increased or decreased as needed, with no partition skew issues, making it very flexible.

public static void Start(FullRedis redis)
{
    var topic = "FullQueue";

    // Two consumer groups consume independently
    var source = new CancellationTokenSource();
    {
        var queue = redis.GetStream<Area>(topic);
        queue.Group = "Group1";

        _ = queue.ConsumeAsync(OnConsume, source.Token);
    }
    {
        var queue = redis.GetStream<Area>(topic);
        queue.Group = "Group2";

        _ = queue.ConsumeAsync(OnConsume2, source.Token);
    }

    // Publish messages. In reality, production and consumption are on different servers
    Publish(redis, topic);

    Thread.Sleep(1000);
    source.Cancel();
}

private static void Publish(FullRedis redis, String topic)
{
    var queue = redis.GetStream<Area>(topic);

    queue.Add(new Area { Code = 110000, Name = "Beijing" });
    Thread.Sleep(1000);
    queue.Add(new Area { Code = 310000, Name = "Shanghai" });
    Thread.Sleep(1000);
    queue.Add(new Area { Code = 440100, Name = "Guangzhou" });
    Thread.Sleep(1000);
}

private static void OnConsume(Area area)
{
    XTrace.WriteLine("Group1.Consume {0} {1}", area.Code, area.Name);
}

private static Task OnConsume2(Area area, Message message, CancellationToken token)
{
    XTrace.WriteLine("Group2.Consume {0} {1} Id={2}", area.Code, area.Name, message.Id);

    return Task.CompletedTask;
}

The production process remains unchanged, but the consumption large loop is a bit special, mainly because the messages consumed from STREAM have their own Id, and only this Id needs to be confirmed.

Image 38

In the figure, the red box represents production, and the purple box represents consumption.

Looking at Redis, we can see that STREAM messages are still there. The data middle platform group can independently consume using different consumer groups, without worrying about other systems losing messages.

Image 39

Best Practices#

RedisQueue is used in ZTO's big data analysis to buffer data waiting to be written to Oracle/MySQL. After multi-threaded computation, data is written to the queue, and a dedicated thread periodically pulls a batch (500 rows) to perform batch Insert/Update operations. This system queue handles 1 billion messages daily, with Redis memory allocation of 8GB, and actual usage is less than 100MB, unless consumer failures cause backlog.

Note: When instantiating Redis as a consumer, a timeout greater than the blocking time must be specified, such as new FullRedis { Timeout = 11_000 }.

Delivery Intelligent Technology uses trusted queues RedisReliableQueue for all operations, with about 300 queues distributed across their respective Redis instances in a public cloud with 2GB memory master-slave version. When the backlog of messages is less than 100,000, the memory usage of the dedicated Redis instance for the queue is less than 100MB, taking up almost no memory space.

The company's business generates over 1 million orders daily, resulting in approximately 10 million messages, with no messages ever lost!

Example Code#

Code: https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo

📎.NET Favorite Redis Message Queue.mp4

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.