Processing high volume of unique messages exactly once while preserving order

RC
7 min readApr 17, 2020

Message grouping and deduplication using SQS and Ruby

In this article, let’s try to design a specific event-based queuing architecture. We will use the term event or message interchangeably through the post.

Understanding Requirements

We are trying to build a user analytics system where every user action must be tracked and stored for later reporting and behavioral analysis. Let’s think of something similar to Google analytics or Segment that we are building in-house.

  1. Every user is given a unique user_id and that will help us identify the user across the board
  2. The sequence in which the user performs actions must be preserved. For example:

a. the user clicks on the Contracts tab

b. checks the latest Agreement as selected

c. authorizes the Agreement by digitally signing it

3. The analytics is time-sensitive and we need to process the events in the order listed above. We can assume each user event has a unique event_id that we can track.

4. To avoid noise, we are interested in only unique events within a span of 5 minutes. If the user performs the same event more than once within that time period, we prefer to ignore the redundant events and capture them only once.

So, let’s start talking messaging jargon going forward to come up with a solution to the above requirements.

  1. First to ensure the order of events is maintained, we must use a FIFO queue, which is short for First In First Out.
  2. We would like to group events in a specific way so that they are processed one at a time in the given order. Events across different groups can be processed in parallel to improve performance and reduce wait time to process the events. Queues support this using Message Group ID which you can use to bucket events together. The messages will be received in the order they were sent to their group.
  3. Within a group, we want events to be unique for a span of 5 minutes. This is called message deduplication. For a queue, this can be defined using Message Deduplication ID For exactly-once processing, FIFO Queues use the Message Deduplication ID for identifying and rejecting duplicates sent within a 5 minutes interval.

Now, for our use-case, what do you think should be the group id and deduplication id. You would have guessed that the user_id can be the Group Id and event_id can the deduplication id.

In the following diagram, the message group id is of the form usrX and deduplication id is of the form evtY . You can see that the messages are grouped by the user_id (which is the group id). And duplicates have been removed within the group based on the event_id (which is the deduplication id)

Message Grouping & Deduplication

Proposed Solution

We are going to look at the solution using SQS FIFO as the preferred queue and Ruby-on-Rails as the message processor. But, the ideas discussed can be extended to any queuing system and consumer.

Before we start exploring the application code, let’s review some of the definition in the context of AWS SQS straight from the reference docs

Message deduplication ID

The token used for deduplication of sent messages. If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren’t delivered during the 5-minute deduplication interval.

Message group ID

The tag that specifies that a message belongs to a specific message group. Messages that belong to the same message group are always processed one by one, in a strict order relative to the message group (however, messages that belong to different message groups might be processed out of order).

For our purpose, we will use the Shoryuken gem which is a wrapper on top of the AWS SDK for SQS. Why not use the SDK directly? Shoryuken empowers the SDK by adding support for continuously polling and thread based processing, besides that, it also adds abstractions simplifying the integration with SQS, including Active Job support.

Once, you have added the gem to your codebase, it comes with a CLI that you can use. Open the terminal and type the following command

$ shoryuken sqs create User_Events.fifo

SQS mandates that the FIFO queues end with the .fifo suffix as seen above.

The next step would be create a worker that will consume the events from the queue

user_events_processor.rb

You see that any Ruby class can be made into a worker by including the Shoryuken::Worker module. That lets you call shoryuken_options helper with the queue name and other options. We will discuss auto_delete and batch later in the post.

The class will implement one method called perform which will receive the event (that is useful for debugging reasons) and the actual payload.

Now, for the purpose of understanding, let’s look at the code that publishes the events. Remember, in that in reality, the system that emanates these events might be in a different universe altogether at the other end of the queue. But, the following snippet will help us understand how the events are created in the first place.

Publishing events

The perform_async method of the worker will add these events to the SQS queue instead of processing them immediately. In addition to the actual event which is the message payload, every entry has a message_group_id and message_deduplication_id as were discussed as standard FIFO queue event identifiers earlier.

Let’s go ahead and create the SQS queue using the AWS console. When creating the queue, make sure you select FIFO and pls refer the config below:

Make sure Content-Based Deduplication is checked. We will talk about other parameters in detail later.

Now, we are all set to start the worker to process the messages. Run the following command:

bundle exec shoryuken start --concurrency=4 --rails -r user_events_processor.rb

concurrency is number of threads available for processing messages at a time. By default it is set to 25 and you can increase or decrease it.

Our config will let the worker process 4 messages at a time in parallel. But, remember that for a specific user, we want to process events in a sequence. We can configure that using the max_number_of_messages configuration at a queue level. This will force Shoryuken to receive and process messages one by one for a Message Group (user in our case):

Shoryuken.sqs_client_receive_message_opts['User_Events.fifo'] = {
max_number_of_messages: 1
}

To summarize, what we have done is remove duplicate events within a span of 5 minutes for a specific user and within a user group we will process one event at at time. But, for multiple user events in the queue, we will process 4 events simultaneously.

Message Visibility Timeout

When we configured the queue, you see that we have set Default Visibility Timeout to 3 minutes. This is an important configuration parameter that has lot of implications:

  1. Immediately after a message is received by the processor, it remains in the SQS queue. To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, a period of time during which Amazon SQS prevents other consumers from receiving and processing the message.
  2. This way even if we run multiple processes on different machines of the same worker (horizontally scale), you can be sure the message is not processed more than once.
  3. That is the reason we have set auto_delete: true in the work config to make sure Shoryuken deletes the message from SQS after successfully processing it.
  4. The visibility timeout has more weightage when the message fails to be processed and we have to retry again!

But to do that, we have to setup a dead-letter queue for User_Events queue. A dead-letter queue can be the destination for messages that can’t be processed (consumed) successfully. Dead-letter queues are useful for debugging your application or messaging system because they let you isolate problematic messages to determine why their processing doesn’t succeed. Shoryuken will take care of signaling SQS to indicate the message processing as failed provided you raise the right error in your application code.

As can be seen in the config screenshot, we have created another queue called User_Events_DL.fifo which will be the dead letter queue for the User_Events.fifo queue. Now, what is more interesting is the Maximum Receives param set to 3. This implies if a message fails to be processed, it will retried again thrice. But, how soon will the message be processed again. That is where visibility timeout: 3 minutes helps. SQS will wait for 3 minutes before making that message visible again for processing. The cool thing about all this in the context of FIFO SQS is (straight from the AWS docs)

When you receive a message with a message group ID, no more messages for the same message group ID are returned unless you delete the message or it becomes visible.

This makes sure all other messages in that group wait before this message is retried and processed. What if the message fails to be processed after third try? It will be moved to the dead letter queue and the rest of the messages will continue to be delivered

It is a good practice to normally to setup CloudWatch alarms when a new event is added to a dead letter queue so that you can be alerted about it

--

--

RC

Full Stack Engineer @ Pro.com , Ex-ThoughtWorker • Ruby,JavaScript, iOS, Frontend, Devops https://rcdexta.com