-->

Sunday, September 15, 2024

Understanding Amazon SQS: Normal vs FIFO Queues - with Practical Examples

 This guide will walk you through the differences between Amazon SQS Standard and FIFO queues, demonstrating various functionalities with practical Python examples. Each example will be in its own file, with clear instructions on how to run and what to expect.

Repo for Code Reference
aws/sqs/normal-fifo-queue-examples

https://github.com/ankit630/unixcloudfusion-devops-solutions.git
https://gitlab.com/ankit630/unixcloudfusion-devops-solutions.git

Prerequisites

  1. An AWS account with appropriate permissions to create and manage SQS queues.
  2. Python 3.6 or later installed on your system.
  3. AWS CLI configured with your credentials.
  4. Boto3 library installed (pip install boto3).

Table of Contents

  1. Setting Up Your Environment
  2. Creating Standard and FIFO Queues
  3. Sending and Receiving Messages
  4. Visibility Timeout
  5. Message Deduplication and Grouping
  6. Dead-Letter Queues
  7. Long Polling
  8. Cleanup

Setting Up Your Environment

Create a new directory for this project and navigate into it:

mkdir sqs_examples cd sqs_examples

Create a virtual environment and activate it:

python -m venv venv source venv/bin/activate # On Windows, use `venv\Scripts\activate`

Install the required library:

pip install boto3

Creating Standard and FIFO Queues

File: 01_create_queues.py

import boto3 sqs = boto3.client('sqs') # Create a Standard queue standard_queue = sqs.create_queue( QueueName='MyStandardQueue' ) print(f"Standard Queue URL: {standard_queue['QueueUrl']}") # Create a FIFO queue fifo_queue = sqs.create_queue( QueueName='MyFifoQueue.fifo', Attributes={ 'FifoQueue': 'true', 'ContentBasedDeduplication': 'true' } ) print(f"FIFO Queue URL: {fifo_queue['QueueUrl']}") # Save queue URLs to files for later use with open('standard_queue_url.txt', 'w') as f: f.write(standard_queue['QueueUrl']) with open('fifo_queue_url.txt', 'w') as f: f.write(fifo_queue['QueueUrl'])

To run:

python 01_create_queues.py

Expected output:

Standard Queue URL: https://sqs.{region}.amazonaws.com/{account-id}/MyStandardQueue FIFO Queue URL: https://sqs.{region}.amazonaws.com/{account-id}/MyFifoQueue.fifo

This script creates both a Standard and a FIFO queue and saves their URLs to text files for use in subsequent examples.

Sending and Receiving Messages

File: 02_send_receive_messages.py

import boto3 sqs = boto3.client('sqs') # Read queue URLs with open('standard_queue_url.txt', 'r') as f: standard_queue_url = f.read().strip() with open('fifo_queue_url.txt', 'r') as f: fifo_queue_url = f.read().strip() # Send messages standard_response = sqs.send_message( QueueUrl=standard_queue_url, MessageBody='Hello from Standard Queue!' ) print(f"Message sent to Standard Queue. MessageId: {standard_response['MessageId']}") fifo_response = sqs.send_message( QueueUrl=fifo_queue_url, MessageBody='Hello from FIFO Queue!', MessageGroupId='MyMessageGroup', MessageDeduplicationId='UniqueDeduplicationId' ) print(f"Message sent to FIFO Queue. MessageId: {fifo_response['MessageId']}") # Receive and process messages for queue_type, queue_url in [("Standard", standard_queue_url), ("FIFO", fifo_queue_url)]: response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=1 ) if 'Messages' in response: for message in response['Messages']: print(f"Received message from {queue_type} queue: {message['Body']}") # Delete the message sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'] ) print(f"Deleted message from {queue_type} queue") else: print(f"No messages received from {queue_type} queue")

To run:

python 02_send_receive_messages.py

Expected output:

Message sent to Standard Queue. MessageId: {some-message-id} Message sent to FIFO Queue. MessageId: {some-message-id} Received message from Standard queue: Hello from Standard Queue! Deleted message from Standard queue Received message from FIFO queue: Hello from FIFO Queue! Deleted message from FIFO queue


This script sends a message to both queue types, then receives and deletes the messages.

Visibility Timeout

Visibility timeout is the period during which Amazon SQS prevents other consumers from receiving and processing a message that has already been retrieved by another consumer but not yet deleted from the queue.

How it works:

  1. When a consumer receives a message from a queue, the message remains in the queue.
  2. To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, making the message temporarily invisible to other consumers.
  3. If the consumer processes and deletes the message within the visibility timeout period, the message is removed from the queue.
  4. If the visibility timeout expires before the message is deleted, the message becomes visible again, allowing other consumers to process it.

Use cases:

  1. Preventing duplicate processing: In distributed systems where multiple consumers process messages concurrently, visibility timeout ensures that each message is processed only once.
  2. Handling long-running tasks: For tasks that take a significant amount of time to process, you can set a longer visibility timeout to prevent other consumers from picking up the message while it's being processed.
  3. Implementing retry logic: If a consumer fails to process a message within the visibility timeout, the message becomes visible again, allowing for automatic retry by other consumers.

Example scenario: Imagine an e-commerce system processing orders. When an order is placed, a message is sent to an SQS queue. A worker picks up the message and starts processing the order (e.g., checking inventory, charging the customer, preparing for shipment). The visibility timeout ensures that no other worker tries to process the same order simultaneously, preventing duplicate charges or shipments.

File: 03_visibility_timeout.py

import boto3 import time sqs = boto3.client('sqs') # Create a new queue with custom visibility timeout queue_with_timeout = sqs.create_queue( QueueName='QueueWithCustomTimeout', Attributes={ 'VisibilityTimeout': '10' # 10 seconds } ) queue_url = queue_with_timeout['QueueUrl'] # Send a message sqs.send_message( QueueUrl=queue_url, MessageBody='Test visibility timeout' ) # Receive the message response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=1 ) if 'Messages' in response: message = response['Messages'][0] receipt_handle = message['ReceiptHandle'] print(f"Received message: {message['Body']}") # Wait for 5 seconds print("Waiting for 5 seconds...") time.sleep(5) # Try to receive the message again (should not receive) response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=1 ) if 'Messages' in response: print("Message received again (unexpected)") else: print("Message not received (as expected due to visibility timeout)") # Wait for another 6 seconds (total 11 seconds) print("Waiting for another 6 seconds...") time.sleep(6) # Try to receive the message again (should receive) response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=1 ) if 'Messages' in response: print("Message received again (as expected after visibility timeout)") else: print("Message not received (unexpected)") # Clean up sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) sqs.delete_queue(QueueUrl=queue_url) else: print("No message received")

To run:

python 03_visibility_timeout.py

Expected output:

Received message: Test visibility timeout Waiting for 5 seconds... Message not received (as expected due to visibility timeout) Waiting for another 6 seconds... Message received again (as expected after visibility timeout)

This script demonstrates how visibility timeout works by creating a queue with a 10-second visibility timeout and showing how the message becomes visible again after the timeout period.

Message Deduplication and Grouping

Visibility timeout is the period during which Amazon SQS prevents other consumers from receiving and processing a message that has already been retrieved by another consumer but not yet deleted from the queue.

How it works:

  1. When a consumer receives a message from a queue, the message remains in the queue.
  2. To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, making the message temporarily invisible to other consumers.
  3. If the consumer processes and deletes the message within the visibility timeout period, the message is removed from the queue.
  4. If the visibility timeout expires before the message is deleted, the message becomes visible again, allowing other consumers to process it.

Use cases:

  1. Preventing duplicate processing: In distributed systems where multiple consumers process messages concurrently, visibility timeout ensures that each message is processed only once.
  2. Handling long-running tasks: For tasks that take a significant amount of time to process, you can set a longer visibility timeout to prevent other consumers from picking up the message while it's being processed.
  3. Implementing retry logic: If a consumer fails to process a message within the visibility timeout, the message becomes visible again, allowing for automatic retry by other consumers.

Example scenario: Imagine an e-commerce system processing orders. When an order is placed, a message is sent to an SQS queue. A worker picks up the message and starts processing the order (e.g., checking inventory, charging the customer, preparing for shipment). The visibility timeout ensures that no other worker tries to process the same order simultaneously, preventing duplicate charges or shipments.

File: 04_deduplication_grouping.py

import boto3 sqs = boto3.client('sqs') # Read FIFO queue URL with open('fifo_queue_url.txt', 'r') as f: fifo_queue_url = f.read().strip() # Send messages with deduplication for i in range(2): response = sqs.send_message( QueueUrl=fifo_queue_url, MessageBody=f'Deduplication test {i+1}', MessageGroupId='DeduplicationGroup', MessageDeduplicationId='UniqueDeduplicationId' ) print(f"Attempt {i+1} - MessageId: {response.get('MessageId', 'Not sent due to deduplication')}") # Send messages to different message groups for i in range(2): for group in ['GroupA', 'GroupB']: sqs.send_message( QueueUrl=fifo_queue_url, MessageBody=f'Message {i+1} for {group}', MessageGroupId=group, MessageDeduplicationId=f'{group}-{i}' ) # Receive and process messages received_messages = [] while True: response = sqs.receive_message( QueueUrl=fifo_queue_url, MaxNumberOfMessages=10 ) if 'Messages' not in response: break for message in response['Messages']: print(f"Received: {message['Body']}") received_messages.append(message['Body']) # Delete the message sqs.delete_message( QueueUrl=fifo_queue_url, ReceiptHandle=message['ReceiptHandle'] ) # Verify deduplication and ordering expected_messages = [ 'Deduplication test 1', 'Message 1 for GroupA', 'Message 2 for GroupA', 'Message 1 for GroupB', 'Message 2 for GroupB' ] if received_messages == expected_messages: print("Deduplication and ordering worked as expected") else: print("Unexpected message order or content") print(f"Expected: {expected_messages}") print(f"Received: {received_messages}")

To run:

python 04_deduplication_grouping.py

Expected output:

Attempt 1 - MessageId: {some-message-id} Attempt 2 - MessageId: Not sent due to deduplication Received: Deduplication test 1 Received: Message 1 for GroupA Received: Message 2 for GroupA Received: Message 1 for GroupB Received: Message 2 for GroupB Deduplication and ordering worked as expected

This script demonstrates message deduplication and grouping in FIFO queues.

Dead-Letter Queues

A dead-letter queue (DLQ) is a queue that other (source) queues can target for messages that can't be processed successfully.

How it works:

  1. You create a dead-letter queue and associate it with a source queue.
  2. You set a maximum number of receives for messages in the source queue.
  3. If a message exceeds the maximum receives count without being successfully processed and deleted, it's moved to the dead-letter queue.

Use cases:

  1. Debugging and troubleshooting: DLQs help isolate problematic messages for analysis without blocking the processing of other messages.
  2. Handling poison messages: Messages that consistently fail processing (e.g., due to malformed data) are moved to the DLQ, preventing them from blocking the main queue.
  3. Implementing retry mechanisms: You can set up a process to periodically check the DLQ, attempt to reprocess messages, or notify developers of persistent issues.

Example scenario: In a data processing pipeline, messages representing data batches are sent to an SQS queue for processing. If a particular batch consistently fails processing (e.g., due to corrupt data), it will eventually be moved to a dead-letter queue. This allows the system to continue processing other batches while developers investigate and handle the problematic batch separately.

File: 05_dead_letter_queue.py

import boto3 import time sqs = boto3.client('sqs') # Create a dead-letter queue dlq = sqs.create_queue( QueueName='MyDeadLetterQueue' ) dlq_url = dlq['QueueUrl'] # Get the ARN of the dead-letter queue dlq_attributes = sqs.get_queue_attributes( QueueUrl=dlq_url, AttributeNames=['QueueArn'] ) dlq_arn = dlq_attributes['Attributes']['QueueArn'] # Create a main queue with the dead-letter queue main_queue = sqs.create_queue( QueueName='MainQueueWithDLQ', Attributes={ 'RedrivePolicy': f'{{"deadLetterTargetArn":"{dlq_arn}","maxReceiveCount":"2"}}' } ) main_queue_url = main_queue['QueueUrl'] # Send a message to the main queue sqs.send_message( QueueUrl=main_queue_url, MessageBody='Test message for DLQ' ) # Simulate processing failure for attempt in range(3): response = sqs.receive_message( QueueUrl=main_queue_url, MaxNumberOfMessages=1, VisibilityTimeout=5 ) if 'Messages' in response: message = response['Messages'][0] print(f"Attempt {attempt + 1}: Received message from main queue: {message['Body']}") print("Simulating processing failure (not deleting the message)") time.sleep(6) # Wait for visibility timeout to expire else: print(f"Attempt {attempt + 1}: No message received from main queue") # Check the dead-letter queue time.sleep(10) # Wait for the message to be moved to DLQ response = sqs.receive_message( QueueUrl=dlq_url, MaxNumberOfMessages=1 ) if 'Messages' in response: message = response['Messages'][0] print(f"Message received in dead-letter queue: {message['Body']}") # Clean up sqs.delete_message( QueueUrl=dlq_url, ReceiptHandle=message['ReceiptHandle'] ) else: print("No message received in dead-letter queue") # Delete queues sqs.delete_queue(QueueUrl=main_queue_url) sqs.delete_queue(QueueUrl=dlq_url)

To run:

python 05_dead_letter_queue.py

Expected output:

Attempt 1: Received message from main queue: Test message for DLQ Simulating processing failure (not deleting the message) Attempt 2: Received message from main queue: Test message for DLQ Simulating processing failure (not deleting the message) Attempt 3: No message received from main queue Message received in dead-letter queue: Test message for DLQ

This script demonstrates how a message is moved to a dead-letter queue after multiple failed processing attempts.

Long Polling

Long polling is a method to retrieve messages from SQS queues that reduces both latency and cost by eliminating empty responses when no messages are available.

How it works:

  1. Instead of returning immediately if no messages are available, the SQS server keeps the connection open for up to 20 seconds.
  2. If messages arrive during this time, they are returned to the consumer immediately.
  3. If no messages arrive before the timeout, an empty response is returned.

Use cases:

  1. Reducing API calls and costs: Long polling significantly reduces the number of empty receive requests, lowering costs and API usage.
  2. Decreasing message latency: Messages are typically received faster with long polling compared to short polling, as there's no need to wait for the next polling interval.
  3. Efficient message processing in low-throughput queues: For queues that receive messages infrequently, long polling ensures that messages are processed promptly without constant polling.

Example scenario: Consider a system that processes user-generated content (e.g., comments, reviews). The rate of incoming content may vary greatly depending on the time of day or user activity. By using long polling, the content processing workers can efficiently wait for new content without constantly making API calls. This reduces costs during low-activity periods while ensuring that content is processed quickly when it arrives.

These advanced features of Amazon SQS provide powerful tools for building robust, scalable, and efficient distributed systems. By understanding and leveraging these features, you can design systems that handle message processing reliably, maintain order when necessary, deal with problematic messages gracefully, and optimize for both performance and cost.

File: 06_long_polling.py

import boto3 import threading import time sqs = boto3.client('sqs') # Create a queue with long polling enabled long_poll_queue = sqs.create_queue( QueueName='LongPollQueue', Attributes={ 'ReceiveMessageWaitTimeSeconds': '20' # Wait up to 20 seconds for messages } ) queue_url = long_poll_queue['QueueUrl'] def delayed_send(): time.sleep(10) # Wait for 10 seconds before sending the message sqs.send_message( QueueUrl=queue_url, MessageBody='Delayed message for long polling demo' ) print("Message sent after 10 seconds delay") # Start a thread to send a delayed message threading.Thread(target=delayed_send).start() print("Starting long polling (waiting up to 20 seconds for a message)...") start_time = time.time() # Use long polling when receiving messages response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=1, WaitTimeSeconds=20 ) end_time = time.time() elapsed_time = end_time - start_time if 'Messages' in response: message = response['Messages'][0] print(f"Received message after {elapsed_time:.2f} seconds: {message['Body']}") # Delete the message sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'] ) else: print(f"No messages received after {elapsed_time:.2f} seconds") # Clean up sqs.delete_queue(QueueUrl=queue_url)

To run:

python 06_long_polling.py

Expected output:

Starting long polling (waiting up to 20 seconds for a message)... Message sent after 10 seconds delay Received message after 10.XX seconds: Delayed message for long polling demo

This script demonstrates long polling by creating a queue with a 20-second receive wait time. It then starts a separate thread to send a message after a 10-second delay. The main thread uses long polling to wait for the message, showing how it can reduce the number of empty responses and latency in receiving messages.

Cleanup

To ensure you don't incur unnecessary AWS charges, it's important to clean up the resources created during these examples. Most of the scripts clean up after themselves, but here's a script to delete any remaining queues:

File: 07_cleanup.py

import boto3 sqs = boto3.client('sqs') # List all queues response = sqs.list_queues() if 'QueueUrls' in response: for queue_url in response['QueueUrls']: if any(name in queue_url for name in ['MyStandardQueue', 'MyFifoQueue', 'QueueWithCustomTimeout', 'MyDeadLetterQueue', 'MainQueueWithDLQ', 'LongPollQueue']): sqs.delete_queue(QueueUrl=queue_url) print(f"Deleted queue: {queue_url}") else: print("No queues found") # Remove queue URL files import os for filename in ['standard_queue_url.txt', 'fifo_queue_url.txt']: if os.path.exists(filename): os.remove(filename) print(f"Removed file: {filename}")

To run:

python 07_cleanup.py

This script will delete all the queues created during the examples and remove the queue URL files.

Summary

These examples demonstrate the key features and differences between Amazon SQS Standard and FIFO queues:

  1. Creating Queues: We created both Standard and FIFO queues, showing the different attributes required for each.
  2. Sending and Receiving Messages: We sent messages to both queue types and demonstrated how to receive and delete messages.
  3. Visibility Timeout: We illustrated how visibility timeout works by creating a queue with a custom timeout and showing message visibility behavior.
  4. Message Deduplication and Grouping: We demonstrated FIFO queue features like message deduplication and message grouping, ensuring exactly-once processing and ordered message delivery within groups.
  5. Dead-Letter Queues: We set up a dead-letter queue and showed how messages are moved to it after multiple failed processing attempts.
  6. Long Polling: We demonstrated how long polling can be used to reduce empty responses and latency in receiving messages.





0 comments:

Post a Comment