Processing high volume of unique messages exactly once while preserving order
Message grouping and deduplication using SQS and Ruby
In this article, let’s try to design a specific event-based queuing architecture. We will use the term event or message interchangeably through the post.
Understanding Requirements
We are trying to build a user analytics system where every user action must be tracked and stored for later reporting and behavioral analysis. Let’s think of something similar to Google analytics or Segment that we are building in-house.
- Every user is given a unique
user_id
and that will help us identify the user across the board - The sequence in which the user performs actions must be preserved. For example:
a. the user clicks on the Contracts tab
b. checks the latest Agreement as selected
c. authorizes the Agreement by digitally signing it
3. The analytics is time-sensitive and we need to process the events in the order listed above. We can assume each user event has a unique event_id
that we can track.
4. To avoid noise, we are interested in only unique events within a span of 5 minutes. If the user performs the same event more than once within that time period, we prefer to ignore the redundant events and capture them only once.
So, let’s start talking messaging jargon going forward to come up with a solution to the above requirements.
- First to ensure the order of events is maintained, we must use a
FIFO
queue, which is short for First In First Out. - We would like to group events in a specific way so that they are processed one at a time in the given order. Events across different groups can be processed in parallel to improve performance and reduce wait time to process the events. Queues support this using
Message Group ID
which you can use to bucket events together. The messages will be received in the order they were sent to their group. - Within a group, we want events to be unique for a span of 5 minutes. This is called message deduplication. For a queue, this can be defined using
Message Deduplication ID
For exactly-once processing, FIFO Queues use the Message Deduplication ID for identifying and rejecting duplicates sent within a 5 minutes interval.
Now, for our use-case, what do you think should be the group id and deduplication id. You would have guessed that the user_id
can be the Group Id and event_id
can the deduplication id.
In the following diagram, the message group id is of the form usrX
and deduplication id is of the form evtY
. You can see that the messages are grouped by the user_id (which is the group id). And duplicates have been removed within the group based on the event_id (which is the deduplication id)
Proposed Solution
We are going to look at the solution using SQS FIFO as the preferred queue and Ruby-on-Rails as the message processor. But, the ideas discussed can be extended to any queuing system and consumer.
Before we start exploring the application code, let’s review some of the definition in the context of AWS SQS straight from the reference docs
Message deduplication ID
The token used for deduplication of sent messages. If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren’t delivered during the 5-minute deduplication interval.
Message group ID
The tag that specifies that a message belongs to a specific message group. Messages that belong to the same message group are always processed one by one, in a strict order relative to the message group (however, messages that belong to different message groups might be processed out of order).
For our purpose, we will use the Shoryuken gem which is a wrapper on top of the AWS SDK for SQS. Why not use the SDK directly? Shoryuken empowers the SDK by adding support for continuously polling and thread based processing, besides that, it also adds abstractions simplifying the integration with SQS, including Active Job support.
Once, you have added the gem to your codebase, it comes with a CLI that you can use. Open the terminal and type the following command
$ shoryuken sqs create User_Events.fifo
SQS mandates that the FIFO queues end with the .fifo
suffix as seen above.
The next step would be create a worker that will consume the events from the queue
You see that any Ruby class can be made into a worker by including the Shoryuken::Worker
module. That lets you call shoryuken_options
helper with the queue name and other options. We will discuss auto_delete
and batch
later in the post.
The class will implement one method called perform
which will receive the event (that is useful for debugging reasons) and the actual payload.
Now, for the purpose of understanding, let’s look at the code that publishes the events. Remember, in that in reality, the system that emanates these events might be in a different universe altogether at the other end of the queue. But, the following snippet will help us understand how the events are created in the first place.
The perform_async
method of the worker will add these events to the SQS queue instead of processing them immediately. In addition to the actual event which is the message payload, every entry has a message_group_id
and message_deduplication_id
as were discussed as standard FIFO queue event identifiers earlier.
Let’s go ahead and create the SQS queue using the AWS console. When creating the queue, make sure you select FIFO and pls refer the config below:
Make sure Content-Based Deduplication
is checked. We will talk about other parameters in detail later.
Now, we are all set to start the worker to process the messages. Run the following command:
bundle exec shoryuken start --concurrency=4 --rails -r user_events_processor.rb
concurrency
is number of threads available for processing messages at a time. By default it is set to 25
and you can increase or decrease it.
Our config will let the worker process 4 messages at a time in parallel. But, remember that for a specific user, we want to process events in a sequence. We can configure that using the max_number_of_messages
configuration at a queue level. This will force Shoryuken to receive and process messages one by one for a Message Group (user in our case):
Shoryuken.sqs_client_receive_message_opts['User_Events.fifo'] = {
max_number_of_messages: 1
}
To summarize, what we have done is remove duplicate events within a span of 5 minutes for a specific user and within a user group we will process one event at at time. But, for multiple user events in the queue, we will process 4 events simultaneously.
Message Visibility Timeout
When we configured the queue, you see that we have set Default Visibility Timeout
to 3 minutes. This is an important configuration parameter that has lot of implications:
- Immediately after a message is received by the processor, it remains in the SQS queue. To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, a period of time during which Amazon SQS prevents other consumers from receiving and processing the message.
- This way even if we run multiple processes on different machines of the same worker (horizontally scale), you can be sure the message is not processed more than once.
- That is the reason we have set
auto_delete: true
in the work config to make sure Shoryuken deletes the message from SQS after successfully processing it. - The visibility timeout has more weightage when the message fails to be processed and we have to retry again!
But to do that, we have to setup a dead-letter queue for User_Events queue. A dead-letter queue can be the destination for messages that can’t be processed (consumed) successfully. Dead-letter queues are useful for debugging your application or messaging system because they let you isolate problematic messages to determine why their processing doesn’t succeed. Shoryuken will take care of signaling SQS to indicate the message processing as failed provided you raise the right error in your application code.
As can be seen in the config screenshot, we have created another queue called User_Events_DL.fifo
which will be the dead letter queue for the User_Events.fifo
queue. Now, what is more interesting is the Maximum Receives
param set to 3. This implies if a message fails to be processed, it will retried again thrice. But, how soon will the message be processed again. That is where visibility timeout: 3 minutes
helps. SQS will wait for 3 minutes before making that message visible again for processing. The cool thing about all this in the context of FIFO SQS is (straight from the AWS docs)
When you receive a message with a message group ID, no more messages for the same message group ID are returned unless you delete the message or it becomes visible.
This makes sure all other messages in that group wait before this message is retried and processed. What if the message fails to be processed after third try? It will be moved to the dead letter queue and the rest of the messages will continue to be delivered
It is a good practice to normally to setup CloudWatch alarms when a new event is added to a dead letter queue so that you can be alerted about it