This document describes the current stable version of Kombu (5.0). For development docs, go here.
Amazon SQS Transport - kombu.transport.SQS
¶
Amazon SQS Transport.
Amazon SQS transport module for Kombu. This package implements an AMQP-like interface on top of Amazons SQS service, with the goal of being optimized for high performance and reliability.
The default settings for this module are focused now on high performance in task queue situations where tasks are small, idempotent and run very fast.
- SQS Features supported by this transport:
- Long Polling:
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html
Long polling is enabled by setting the wait_time_seconds transport option to a number > 1. Amazon supports up to 20 seconds. This is enabled with 10 seconds by default.
- Batch API Actions:
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-batch-api.html
The default behavior of the SQS Channel.drain_events() method is to request up to the ‘prefetch_count’ messages on every request to SQS. These messages are stored locally in a deque object and passed back to the Transport until the deque is empty, before triggering a new API call to Amazon.
This behavior dramatically speeds up the rate that you can pull tasks from SQS when you have short-running tasks (or a large number of workers).
When a Celery worker has multiple queues to monitor, it will pull down up to ‘prefetch_count’ messages from queueA and work on them all before moving on to queueB. If queueB is empty, it will wait up until ‘polling_interval’ expires before moving back and checking on queueA.
- Other Features supported by this transport:
- Predefined Queues:
The default behavior of this transport is to use a single AWS credential pair in order to manage all SQS queues (e.g. listing queues, creating queues, polling queues, deleting messages).
If it is preferable for your environment to use a single AWS credential, you can use the ‘predefined_queues’ setting inside the ‘transport_options’ map. This setting allows you to specify the SQS queue URL and AWS credentials for each of your queues. For example, if you have two queues which both already exist in AWS) you can tell this transport about them as follows:
- transport_options = {
- ‘predefined_queues’: {
- ‘queue-1’: {
‘url’: ‘https://sqs.us-east-1.amazonaws.com/xxx/aaa’, ‘access_key_id’: ‘a’, ‘secret_access_key’: ‘b’,
}, ‘queue-2’: {
‘url’: ‘https://sqs.us-east-1.amazonaws.com/xxx/bbb’, ‘access_key_id’: ‘c’, ‘secret_access_key’: ‘d’,
},
}
}
- Client config:
In some cases you may need to override the botocore config. You can do it as follows:
- transport_option = {
- ‘client-config’: {
‘connect_timeout’: 5,
},
}
For a complete list of settings you can adjust using this option see https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
Transport¶
-
class
kombu.transport.SQS.
Transport
(client, **kwargs)[source]¶ SQS Transport.
Additional queue attributes can be supplied to SQS during queue creation by passing an
sqs-creation-attributes
key in transport_options.sqs-creation-attributes
must be a dict whose key-value pairs correspond with Attributes in the CreateQueue SQS API.For example, to have SQS queues created with server-side encryption enabled using the default Amazon Managed Customer Master Key, you can set
KmsMasterKeyId
Attribute. When the queue is initially created by Kombu, encryption will be enabled.from kombu.transport.SQS import Transport transport = Transport( ..., transport_options={ 'sqs-creation-attributes': { 'KmsMasterKeyId': 'alias/aws/sqs', }, } )
-
class
Channel
(*args, **kwargs)¶ SQS Channel.
-
asynsqs
(queue=None)¶
-
basic_ack
(delivery_tag, multiple=False)¶ Acknowledge message.
-
basic_cancel
(consumer_tag)¶ Cancel consumer by consumer tag.
-
basic_consume
(queue, no_ack, *args, **kwargs)¶ Consume from queue.
-
canonical_queue_name
(queue_name)¶
-
close
()¶ Close channel.
Cancel all consumers, and requeue unacked messages.
-
property
conninfo
¶
-
default_region
= 'us-east-1'¶
-
default_visibility_timeout
= 1800¶
-
default_wait_time_seconds
= 10¶
-
domain_format
= 'kombu%(vhost)s'¶
-
drain_events
(timeout=None, callback=None, **kwargs)¶ Return a single payload message from one of our queues.
- Raises
Queue.Empty – if no messages available.
-
endpoint_url
¶
-
entity_name
(name, table={33: 95, 34: 95, 35: 95, 36: 95, 37: 95, 38: 95, 39: 95, 40: 95, 41: 95, 42: 95, 43: 95, 44: 95, 46: 45, 47: 95, 58: 95, 59: 95, 60: 95, 61: 95, 62: 95, 63: 95, 64: 95, 91: 95, 92: 95, 93: 95, 94: 95, 96: 95, 123: 95, 124: 95, 125: 95, 126: 95})¶ Format AMQP queue name into a legal SQS queue name.
-
is_secure
¶
-
new_sqs_client
(region, access_key_id, secret_access_key)¶
-
port
¶
-
predefined_queues
¶ Map of queue_name to predefined queue settings.
-
queue_name_prefix
¶
-
region
¶
-
regioninfo
¶
-
sqs
(queue=None)¶
-
supports_fanout
¶
-
property
transport_options
¶
-
visibility_timeout
¶
-
wait_time_seconds
¶
-
-
channel_errors
= (<class 'amqp.exceptions.ChannelError'>, <class 'kombu.asynchronous.aws.ext.BotoCoreError'>)¶
-
connection_errors
= (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.asynchronous.aws.ext.BotoCoreError'>, <class 'OSError'>)¶
-
property
default_connection_params
¶
-
default_port
= None¶
-
driver_name
= 'sqs'¶
-
driver_type
= 'sqs'¶
-
implements
= {'asynchronous': True, 'exchange_type': frozenset({'direct'}), 'heartbeats': False}¶
-
polling_interval
= 1¶
-
wait_time_seconds
= 0¶
-
class
Channel¶
-
class
kombu.transport.SQS.
Channel
(*args, **kwargs)[source]¶ SQS Channel.
-
property
conninfo
¶
-
default_region
= 'us-east-1'¶
-
default_visibility_timeout
= 1800¶
-
default_wait_time_seconds
= 10¶
-
domain_format
= 'kombu%(vhost)s'¶
-
drain_events
(timeout=None, callback=None, **kwargs)[source]¶ Return a single payload message from one of our queues.
- Raises
Queue.Empty – if no messages available.
-
entity_name
(name, table={33: 95, 34: 95, 35: 95, 36: 95, 37: 95, 38: 95, 39: 95, 40: 95, 41: 95, 42: 95, 43: 95, 44: 95, 46: 45, 47: 95, 58: 95, 59: 95, 60: 95, 61: 95, 62: 95, 63: 95, 64: 95, 91: 95, 92: 95, 93: 95, 94: 95, 96: 95, 123: 95, 124: 95, 125: 95, 126: 95})[source]¶ Format AMQP queue name into a legal SQS queue name.
-
property
transport_options
¶
-
property