This guide will walk you through the differences between Amazon SQS Standard and FIFO queues, demonstrating various functionalities with practical Python examples. Each example will be in its own file, with clear instructions on how to run and what to expect.
Repo for Code Reference
aws/sqs/normal-fifo-queue-examples
https://github.com/ankit630/unixcloudfusion-devops-solutions.git
https://gitlab.com/ankit630/unixcloudfusion-devops-solutions.git
Prerequisites
- An AWS account with appropriate permissions to create and manage SQS queues.
- Python 3.6 or later installed on your system.
- AWS CLI configured with your credentials.
- Boto3 library installed (
pip install boto3
).
Table of Contents
- Setting Up Your Environment
- Creating Standard and FIFO Queues
- Sending and Receiving Messages
- Visibility Timeout
- Message Deduplication and Grouping
- Dead-Letter Queues
- Long Polling
- Cleanup
Setting Up Your Environment
Create a new directory for this project and navigate into it:
mkdir sqs_examples
cd sqs_examples
Create a virtual environment and activate it:
python -m venv venv
source venv/bin/activate # On Windows, use `venv\Scripts\activate`
Install the required library:
Creating Standard and FIFO Queues
File: 01_create_queues.py
import boto3
sqs = boto3.client('sqs')
# Create a Standard queue
standard_queue = sqs.create_queue(
QueueName='MyStandardQueue'
)
print(f"Standard Queue URL: {standard_queue['QueueUrl']}")
# Create a FIFO queue
fifo_queue = sqs.create_queue(
QueueName='MyFifoQueue.fifo',
Attributes={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true'
}
)
print(f"FIFO Queue URL: {fifo_queue['QueueUrl']}")
# Save queue URLs to files for later use
with open('standard_queue_url.txt', 'w') as f:
f.write(standard_queue['QueueUrl'])
with open('fifo_queue_url.txt', 'w') as f:
f.write(fifo_queue['QueueUrl'])
To run:
python 01_create_queues.py
Expected output:
Standard Queue URL: https://sqs.{region}.amazonaws.com/{account-id}/MyStandardQueue
FIFO Queue URL: https://sqs.{region}.amazonaws.com/{account-id}/MyFifoQueue.fifo
This script creates both a Standard and a FIFO queue and saves their URLs to text files for use in subsequent examples.
Sending and Receiving Messages
File: 02_send_receive_messages.py
import boto3
sqs = boto3.client('sqs')
# Read queue URLs
with open('standard_queue_url.txt', 'r') as f:
standard_queue_url = f.read().strip()
with open('fifo_queue_url.txt', 'r') as f:
fifo_queue_url = f.read().strip()
# Send messages
standard_response = sqs.send_message(
QueueUrl=standard_queue_url,
MessageBody='Hello from Standard Queue!'
)
print(f"Message sent to Standard Queue. MessageId: {standard_response['MessageId']}")
fifo_response = sqs.send_message(
QueueUrl=fifo_queue_url,
MessageBody='Hello from FIFO Queue!',
MessageGroupId='MyMessageGroup',
MessageDeduplicationId='UniqueDeduplicationId'
)
print(f"Message sent to FIFO Queue. MessageId: {fifo_response['MessageId']}")
# Receive and process messages
for queue_type, queue_url in [("Standard", standard_queue_url), ("FIFO", fifo_queue_url)]:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1
)
if 'Messages' in response:
for message in response['Messages']:
print(f"Received message from {queue_type} queue: {message['Body']}")
# Delete the message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
print(f"Deleted message from {queue_type} queue")
else:
print(f"No messages received from {queue_type} queue")
To run:
python 02_send_receive_messages.py
Expected output:
Message sent to Standard Queue. MessageId: {some-message-id}
Message sent to FIFO Queue. MessageId: {some-message-id}
Received message from Standard queue: Hello from Standard Queue!
Deleted message from Standard queue
Received message from FIFO queue: Hello from FIFO Queue!
Deleted message from FIFO queue
This script sends a message to both queue types, then receives and deletes the messages.
Visibility Timeout
Visibility timeout is the period during which Amazon SQS prevents other consumers from receiving and processing a message that has already been retrieved by another consumer but not yet deleted from the queue.
How it works:
- When a consumer receives a message from a queue, the message remains in the queue.
- To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, making the message temporarily invisible to other consumers.
- If the consumer processes and deletes the message within the visibility timeout period, the message is removed from the queue.
- If the visibility timeout expires before the message is deleted, the message becomes visible again, allowing other consumers to process it.
Use cases:
- Preventing duplicate processing: In distributed systems where multiple consumers process messages concurrently, visibility timeout ensures that each message is processed only once.
- Handling long-running tasks: For tasks that take a significant amount of time to process, you can set a longer visibility timeout to prevent other consumers from picking up the message while it's being processed.
- Implementing retry logic: If a consumer fails to process a message within the visibility timeout, the message becomes visible again, allowing for automatic retry by other consumers.
Example scenario:
Imagine an e-commerce system processing orders. When an order is placed, a message is sent to an SQS queue. A worker picks up the message and starts processing the order (e.g., checking inventory, charging the customer, preparing for shipment). The visibility timeout ensures that no other worker tries to process the same order simultaneously, preventing duplicate charges or shipments.
File: 03_visibility_timeout.py
import boto3
import time
sqs = boto3.client('sqs')
# Create a new queue with custom visibility timeout
queue_with_timeout = sqs.create_queue(
QueueName='QueueWithCustomTimeout',
Attributes={
'VisibilityTimeout': '10' # 10 seconds
}
)
queue_url = queue_with_timeout['QueueUrl']
# Send a message
sqs.send_message(
QueueUrl=queue_url,
MessageBody='Test visibility timeout'
)
# Receive the message
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1
)
if 'Messages' in response:
message = response['Messages'][0]
receipt_handle = message['ReceiptHandle']
print(f"Received message: {message['Body']}")
# Wait for 5 seconds
print("Waiting for 5 seconds...")
time.sleep(5)
# Try to receive the message again (should not receive)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1
)
if 'Messages' in response:
print("Message received again (unexpected)")
else:
print("Message not received (as expected due to visibility timeout)")
# Wait for another 6 seconds (total 11 seconds)
print("Waiting for another 6 seconds...")
time.sleep(6)
# Try to receive the message again (should receive)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1
)
if 'Messages' in response:
print("Message received again (as expected after visibility timeout)")
else:
print("Message not received (unexpected)")
# Clean up
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
sqs.delete_queue(QueueUrl=queue_url)
else:
print("No message received")
To run:
python 03_visibility_timeout.py
Expected output:
Received message: Test visibility timeout
Waiting for 5 seconds...
Message not received (as expected due to visibility timeout)
Waiting for another 6 seconds...
Message received again (as expected after visibility timeout)
This script demonstrates how visibility timeout works by creating a queue with a 10-second visibility timeout and showing how the message becomes visible again after the timeout period.
Message Deduplication and Grouping
Visibility timeout is the period during which Amazon SQS prevents other consumers from receiving and processing a message that has already been retrieved by another consumer but not yet deleted from the queue.
How it works:
- When a consumer receives a message from a queue, the message remains in the queue.
- To prevent other consumers from processing the message again, Amazon SQS sets a visibility timeout, making the message temporarily invisible to other consumers.
- If the consumer processes and deletes the message within the visibility timeout period, the message is removed from the queue.
- If the visibility timeout expires before the message is deleted, the message becomes visible again, allowing other consumers to process it.
Use cases:
- Preventing duplicate processing: In distributed systems where multiple consumers process messages concurrently, visibility timeout ensures that each message is processed only once.
- Handling long-running tasks: For tasks that take a significant amount of time to process, you can set a longer visibility timeout to prevent other consumers from picking up the message while it's being processed.
- Implementing retry logic: If a consumer fails to process a message within the visibility timeout, the message becomes visible again, allowing for automatic retry by other consumers.
Example scenario:
Imagine an e-commerce system processing orders. When an order is placed, a message is sent to an SQS queue. A worker picks up the message and starts processing the order (e.g., checking inventory, charging the customer, preparing for shipment). The visibility timeout ensures that no other worker tries to process the same order simultaneously, preventing duplicate charges or shipments.
File: 04_deduplication_grouping.py
import boto3
sqs = boto3.client('sqs')
# Read FIFO queue URL
with open('fifo_queue_url.txt', 'r') as f:
fifo_queue_url = f.read().strip()
# Send messages with deduplication
for i in range(2):
response = sqs.send_message(
QueueUrl=fifo_queue_url,
MessageBody=f'Deduplication test {i+1}',
MessageGroupId='DeduplicationGroup',
MessageDeduplicationId='UniqueDeduplicationId'
)
print(f"Attempt {i+1} - MessageId: {response.get('MessageId', 'Not sent due to deduplication')}")
# Send messages to different message groups
for i in range(2):
for group in ['GroupA', 'GroupB']:
sqs.send_message(
QueueUrl=fifo_queue_url,
MessageBody=f'Message {i+1} for {group}',
MessageGroupId=group,
MessageDeduplicationId=f'{group}-{i}'
)
# Receive and process messages
received_messages = []
while True:
response = sqs.receive_message(
QueueUrl=fifo_queue_url,
MaxNumberOfMessages=10
)
if 'Messages' not in response:
break
for message in response['Messages']:
print(f"Received: {message['Body']}")
received_messages.append(message['Body'])
# Delete the message
sqs.delete_message(
QueueUrl=fifo_queue_url,
ReceiptHandle=message['ReceiptHandle']
)
# Verify deduplication and ordering
expected_messages = [
'Deduplication test 1',
'Message 1 for GroupA',
'Message 2 for GroupA',
'Message 1 for GroupB',
'Message 2 for GroupB'
]
if received_messages == expected_messages:
print("Deduplication and ordering worked as expected")
else:
print("Unexpected message order or content")
print(f"Expected: {expected_messages}")
print(f"Received: {received_messages}")
To run:
python 04_deduplication_grouping.py
Expected output:
Attempt 1 - MessageId: {some-message-id}
Attempt 2 - MessageId: Not sent due to deduplication
Received: Deduplication test 1
Received: Message 1 for GroupA
Received: Message 2 for GroupA
Received: Message 1 for GroupB
Received: Message 2 for GroupB
Deduplication and ordering worked as expected
This script demonstrates message deduplication and grouping in FIFO queues.
Dead-Letter Queues
A dead-letter queue (DLQ) is a queue that other (source) queues can target for messages that can't be processed successfully.
How it works:
- You create a dead-letter queue and associate it with a source queue.
- You set a maximum number of receives for messages in the source queue.
- If a message exceeds the maximum receives count without being successfully processed and deleted, it's moved to the dead-letter queue.
Use cases:
- Debugging and troubleshooting: DLQs help isolate problematic messages for analysis without blocking the processing of other messages.
- Handling poison messages: Messages that consistently fail processing (e.g., due to malformed data) are moved to the DLQ, preventing them from blocking the main queue.
- Implementing retry mechanisms: You can set up a process to periodically check the DLQ, attempt to reprocess messages, or notify developers of persistent issues.
Example scenario:
In a data processing pipeline, messages representing data batches are sent to an SQS queue for processing. If a particular batch consistently fails processing (e.g., due to corrupt data), it will eventually be moved to a dead-letter queue. This allows the system to continue processing other batches while developers investigate and handle the problematic batch separately.
File: 05_dead_letter_queue.py
import boto3
import time
sqs = boto3.client('sqs')
# Create a dead-letter queue
dlq = sqs.create_queue(
QueueName='MyDeadLetterQueue'
)
dlq_url = dlq['QueueUrl']
# Get the ARN of the dead-letter queue
dlq_attributes = sqs.get_queue_attributes(
QueueUrl=dlq_url,
AttributeNames=['QueueArn']
)
dlq_arn = dlq_attributes['Attributes']['QueueArn']
# Create a main queue with the dead-letter queue
main_queue = sqs.create_queue(
QueueName='MainQueueWithDLQ',
Attributes={
'RedrivePolicy': f'{{"deadLetterTargetArn":"{dlq_arn}","maxReceiveCount":"2"}}'
}
)
main_queue_url = main_queue['QueueUrl']
# Send a message to the main queue
sqs.send_message(
QueueUrl=main_queue_url,
MessageBody='Test message for DLQ'
)
# Simulate processing failure
for attempt in range(3):
response = sqs.receive_message(
QueueUrl=main_queue_url,
MaxNumberOfMessages=1,
VisibilityTimeout=5
)
if 'Messages' in response:
message = response['Messages'][0]
print(f"Attempt {attempt + 1}: Received message from main queue: {message['Body']}")
print("Simulating processing failure (not deleting the message)")
time.sleep(6) # Wait for visibility timeout to expire
else:
print(f"Attempt {attempt + 1}: No message received from main queue")
# Check the dead-letter queue
time.sleep(10) # Wait for the message to be moved to DLQ
response = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=1
)
if 'Messages' in response:
message = response['Messages'][0]
print(f"Message received in dead-letter queue: {message['Body']}")
# Clean up
sqs.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=message['ReceiptHandle']
)
else:
print("No message received in dead-letter queue")
# Delete queues
sqs.delete_queue(QueueUrl=main_queue_url)
sqs.delete_queue(QueueUrl=dlq_url)
To run:
python 05_dead_letter_queue.py
Expected output:
Attempt 1: Received message from main queue: Test message for DLQ
Simulating processing failure (not deleting the message)
Attempt 2: Received message from main queue: Test message for DLQ
Simulating processing failure (not deleting the message)
Attempt 3: No message received from main queue
Message received in dead-letter queue: Test message for DLQ
This script demonstrates how a message is moved to a dead-letter queue after multiple failed processing attempts.
Long Polling
Long polling is a method to retrieve messages from SQS queues that reduces both latency and cost by eliminating empty responses when no messages are available.
How it works:
- Instead of returning immediately if no messages are available, the SQS server keeps the connection open for up to 20 seconds.
- If messages arrive during this time, they are returned to the consumer immediately.
- If no messages arrive before the timeout, an empty response is returned.
Use cases:
- Reducing API calls and costs: Long polling significantly reduces the number of empty receive requests, lowering costs and API usage.
- Decreasing message latency: Messages are typically received faster with long polling compared to short polling, as there's no need to wait for the next polling interval.
- Efficient message processing in low-throughput queues: For queues that receive messages infrequently, long polling ensures that messages are processed promptly without constant polling.
Example scenario:
Consider a system that processes user-generated content (e.g., comments, reviews). The rate of incoming content may vary greatly depending on the time of day or user activity. By using long polling, the content processing workers can efficiently wait for new content without constantly making API calls. This reduces costs during low-activity periods while ensuring that content is processed quickly when it arrives.
These advanced features of Amazon SQS provide powerful tools for building robust, scalable, and efficient distributed systems. By understanding and leveraging these features, you can design systems that handle message processing reliably, maintain order when necessary, deal with problematic messages gracefully, and optimize for both performance and cost.
File: 06_long_polling.py
import boto3
import threading
import time
sqs = boto3.client('sqs')
# Create a queue with long polling enabled
long_poll_queue = sqs.create_queue(
QueueName='LongPollQueue',
Attributes={
'ReceiveMessageWaitTimeSeconds': '20' # Wait up to 20 seconds for messages
}
)
queue_url = long_poll_queue['QueueUrl']
def delayed_send():
time.sleep(10) # Wait for 10 seconds before sending the message
sqs.send_message(
QueueUrl=queue_url,
MessageBody='Delayed message for long polling demo'
)
print("Message sent after 10 seconds delay")
# Start a thread to send a delayed message
threading.Thread(target=delayed_send).start()
print("Starting long polling (waiting up to 20 seconds for a message)...")
start_time = time.time()
# Use long polling when receiving messages
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=20
)
end_time = time.time()
elapsed_time = end_time - start_time
if 'Messages' in response:
message = response['Messages'][0]
print(f"Received message after {elapsed_time:.2f} seconds: {message['Body']}")
# Delete the message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
else:
print(f"No messages received after {elapsed_time:.2f} seconds")
# Clean up
sqs.delete_queue(QueueUrl=queue_url)
To run:
python 06_long_polling.py
Expected output:
Starting long polling (waiting up to 20 seconds for a message)...
Message sent after 10 seconds delay
Received message after 10.XX seconds: Delayed message for long polling demo
This script demonstrates long polling by creating a queue with a 20-second receive wait time. It then starts a separate thread to send a message after a 10-second delay. The main thread uses long polling to wait for the message, showing how it can reduce the number of empty responses and latency in receiving messages.
Cleanup
To ensure you don't incur unnecessary AWS charges, it's important to clean up the resources created during these examples. Most of the scripts clean up after themselves, but here's a script to delete any remaining queues:
File: 07_cleanup.py
import boto3
sqs = boto3.client('sqs')
# List all queues
response = sqs.list_queues()
if 'QueueUrls' in response:
for queue_url in response['QueueUrls']:
if any(name in queue_url for name in ['MyStandardQueue', 'MyFifoQueue', 'QueueWithCustomTimeout', 'MyDeadLetterQueue', 'MainQueueWithDLQ', 'LongPollQueue']):
sqs.delete_queue(QueueUrl=queue_url)
print(f"Deleted queue: {queue_url}")
else:
print("No queues found")
# Remove queue URL files
import os
for filename in ['standard_queue_url.txt', 'fifo_queue_url.txt']:
if os.path.exists(filename):
os.remove(filename)
print(f"Removed file: {filename}")
To run:
This script will delete all the queues created during the examples and remove the queue URL files.
Summary
These examples demonstrate the key features and differences between Amazon SQS Standard and FIFO queues:
- Creating Queues: We created both Standard and FIFO queues, showing the different attributes required for each.
- Sending and Receiving Messages: We sent messages to both queue types and demonstrated how to receive and delete messages.
- Visibility Timeout: We illustrated how visibility timeout works by creating a queue with a custom timeout and showing message visibility behavior.
- Message Deduplication and Grouping: We demonstrated FIFO queue features like message deduplication and message grouping, ensuring exactly-once processing and ordered message delivery within groups.
- Dead-Letter Queues: We set up a dead-letter queue and showed how messages are moved to it after multiple failed processing attempts.
- Long Polling: We demonstrated how long polling can be used to reduce empty responses and latency in receiving messages.