Welcome to sqspy’s documentation!¶
Introduction¶
A more pythonic approach to SQS producer/consumer utilities. Heavily inspired from the the pySqsListener package.
Install¶
pip install sqspy
Usage¶
from sqspy import Consumer
class MyWorker(Consumer):
def handle_message(self, body, attributes, message_attributes):
print(body)
listener = MyWorker('Q1', error_queue='EQ1')
listener.listen()
More documentation coming soon.
Why¶
The mentioned project had a few issues which I faced while trying to
implement at my organisation. The local environment testing setup was
very flaky. The signatures for sqs_listener
and sqs_producer
were very different from each other.
This rewrite supports python 3.6+ versions only, and makes use of a lot of newer python features. It also makes use of service resources (for lazy calls) from the boto3 library instead of making calls via the low level client.
Changelog¶
1.0.0 (2021-03-27)¶
Add self-documentation using sphinx.
Setup pipeline for readthedocs.
Update contact email address.
Update related links for pypi release page.
Integrate with PyUp.
0.1.0 (2020-10-28)¶
Setup tests using Travis CI.
Add test coverage reports.
0.0.1 (2020-07-22)¶
Initial release.
Barebone working model of
Consumer
andProducer
.
sqspy package¶
Submodules¶
sqspy._base module¶
-
class
sqspy._base.
Base
(**kwargs)[source]¶ Bases:
object
Base class initialisation to setup aws credentials.
To make use of instance roles when deploying to AWS infrastructure, leave the aws_* keys blank (
None
).- Parameters
-
QUEUE_VISIBILITY_TIMEOUT
: str = '600'¶ Message’s visibility timeout in seconds. See Visibility Timeout in Amazon Simple Queue Service Developer Guide for more information.
-
create_queue
(name: str, attributes: Dict[str, Any])[source]¶ Create a Queue resource.
For more information, check
SQS.ServiceResource.create_queue()
-
get_or_create_queue
(queue_data: Dict[str, str], create_queue: bool = True)[source]¶ Fetch or create the sqs Queue resource from boto3.
Also tries to create the queue resource with the configured credentials as dictated by the create_queue parameter if the resource was not located.
- Parameters
Dictionary referencing parameters for the queue to be retrieved or created.
The keys for the data are: name, url and visibility_timeout. The visibility_timeout defaults to
QUEUE_VISIBILITY_TIMEOUT
.create_queue (bool) – Force creation of queue resource on AWS. Default is True
- Returns
An Queue resource on success, None otherwise.
- Return type
sqspy.consumer module¶
-
class
sqspy.consumer.
Consumer
(queue_name: Optional[str] = None, queue_url: Optional[str] = None, **kwargs)[source]¶ Bases:
sqspy._base.Base
Message consumer/worker.
- Parameters
queue_name (str) – Optional queue name.
queue_url (str) – Optional queue url, according to AWS guidelines.
queue (SQS.Queue) – Optional queue resource.
visibility_timeout (str) – Message visibility timeout in seconds, but as a string value. Defaults to
QUEUE_VISIBILITY_TIMEOUT
error_queue (str) – Name for error queue, when messages were not consumed successfully.
error_queue_url (str) – Queue url as per AWS guidelines, for the error queue.
error_visibility_timeout (str) – Same as visibility_timeout but for error queue.
create_queue (bool) – Set to False if the queue should not be created in case it does not exist. The default is True.
create_error_queue (bool) – Same as create_queue but for error queue. The default is True.
poll_interval (int) – Polling interval between messages. Defaults to
poll_interval
.message_attribute_names (list[str]) – List of attributes for message to fetch. See
SQS.Message.message_attributes
.wait_time (int) – Time to wait (in seconds) when fetching messages. Defaults to
wait_time
.force_delete (bool) – Whether to delete the message from queue before handling or not. Defaults to False.
max_messages_count (int) – Maximum message count when fetching from the queue. Defaults to
max_messages_count
.attribute_names (list[str]) – Attributes to be retrieved along with message when fetching. See more at:
SQS.Queue.receive_messages()
- Raises
ValueError – At least one of queue, queue_url or queue_name has to be provided.
-
property
error_queue
¶ The Queue resource for when messages were not processed correctly.
- Return type
-
abstract
handle_message
(body, attributes, messages_attributes)[source]¶ Method representing the handling of messages retrieved from queue.
- Parameters
body (Any) – The body retrieved from the queue after passing through a json deserialiser.
attributes (dict) –
A map of the attributes requested from queue when fetching messages.
See more at:
SQS.Message.attributes
messages_attributes (dict) –
Strucutred metadata as retrieved from the queue.
See more at:
SQS.Message.message_attributes
- Return type
- Raises
NotImplementedError – If not overridden in a subclass.
-
listen
()[source]¶ Method that triggers listening for messages, and forwards to
handle_message()
.This is a blocking call.
sqspy.producer module¶
-
class
sqspy.producer.
Producer
(queue_name: Optional[str] = None, queue_url: Optional[str] = None, **kwargs)[source]¶ Bases:
sqspy._base.Base
Message producer.
- Parameters
queue_name (str) – Optional queue name.
queue_url (str) – Optional queue url, according to AWS guidelines.
queue (SQS.Queue) – Optional queue resource.
visibility_timeout (str) – Message visibility timeout in seconds, but as a string value. Defaults to
QUEUE_VISIBILITY_TIMEOUT
create_queue (bool) – Set to False if the queue should not be created in case it does not exist. Default value is True.
- Raises
ValueError – At least one of queue, queue_url or queue_name has to be provided.
-
publish
(message: Any, **kwargs)[source]¶ Method to publish message to queue.
The message should be json serializable. The other arguments can be sent as named parameters. More information is available at
SQS.Queue.send_message()
.- Parameters
message (Any) – The message body.
- Returns
Dictionary of attributes as per AWS guidelines. Check:
send_message()
.- Return type
-
property
queue_name
¶ See
queue_name