import json
import logging
import sys
from abc import ABCMeta, abstractmethod
from time import sleep
from typing import Dict, List
from ._base import Base
from .producer import Producer
sqspy_logger = logging.getLogger("sqspy")
[docs]class Consumer(Base):
"""
Message consumer/worker.
:param str queue_name: Optional queue name.
:param str queue_url: Optional queue url, according to AWS
guidelines.
:param SQS.Queue queue: Optional queue resource.
:param str visibility_timeout: Message visibility timeout in
seconds, but as a string value. Defaults to
:const:`~sqspy._base.Base.QUEUE_VISIBILITY_TIMEOUT`
:param str error_queue: Name for error queue, when messages were
not consumed successfully.
:param str error_queue_url: Queue url as per AWS guidelines, for
the error queue.
:param str error_visibility_timeout: Same as `visibility_timeout`
but for error queue.
:param bool create_queue: Set to `False` if the queue should not
be created in case it does not exist. The default is `True`.
:param bool create_error_queue: Same as `create_queue` but for
error queue. The default is `True`.
:param int poll_interval: Polling interval between messages.
Defaults to :attr:`poll_interval`.
:param list[str] message_attribute_names: List of attributes for
message to fetch. See :attr:`SQS.Message.message_attributes`.
:param int wait_time: Time to wait (in seconds) when fetching
messages. Defaults to :attr:`wait_time`.
:param bool force_delete: Whether to delete the message from queue
before handling or not. Defaults to False.
:param int max_messages_count: Maximum message count when fetching
from the queue. Defaults to :attr:`max_messages_count`.
:param list[str] attribute_names: Attributes to be retrieved along
with message when fetching. See more at:
:meth:`SQS.Queue.receive_messages`
:raises ValueError: At least one of `queue`, `queue_url` or
`queue_name` has to be provided.
"""
__metaclass__ = ABCMeta
#: Wait time when fetching message from queue (in seconds).
wait_time: int = 0
#: Time between continuous fetch from queue (in seconds).
poll_interval: int = 60
#: Upper limit of message count when fetching from queue.
max_messages_count: int = 1
def __init__(self, queue_name: str = None, queue_url: str = None, **kwargs):
queue = kwargs.get("queue")
if not any([queue, queue_name, queue_url]):
raise ValueError(
"One of `queue`, `queue_name` or `queue_url` should be provided"
)
super().__init__(**kwargs)
queue_data: Dict[str, str] = {
"name": queue_name,
"url": queue_url,
"visibility_timeout": kwargs.get("visibility_timeout"),
}
error_queue_data: Dict[str, str] = {
"name": kwargs.get("error_queue"),
"url": kwargs.get("error_queue_url"),
"visibility_timeout": kwargs.get("error_visibility_timeout"),
}
create_queue: bool = kwargs.get("create_queue", True)
create_error_queue: bool = kwargs.get("create_error_queue", True)
self.poll_interval: int = int(kwargs.get("interval", self.poll_interval))
self._message_attribute_names: List = kwargs.get("message_attribute_names", [])
self._attribute_names: List = kwargs.get("attribute_names", [])
self.wait_time: int = int(kwargs.get("wait_time", self.wait_time))
self.max_messages_count: int = int(
kwargs.get("max_messages_count", self.max_messages_count)
)
self._force_delete: bool = kwargs.get("force_delete", False)
self._queue = queue or self.get_or_create_queue(
queue_data,
create_queue=create_queue,
)
if self.queue is None:
raise ValueError(
"No queue found with name or URL provided, or "
"application did not have permission to create one."
)
self._queue_name = self._queue.url.split("/")[-1]
self._error_queue = None
if error_queue_data.get("name") or error_queue_data.get("url"):
self._error_queue = Producer(
queue_name=error_queue_data.get("name"),
queue_url=error_queue_data.get("url"),
queue=self.get_or_create_queue(
error_queue_data,
create_queue=create_error_queue,
),
)
self._error_queue_name = self.error_queue.queue.url.split("/")[-1]
@property
def queue(self):
"""
The connected Queue resource.
:rtype: SQS.Queue
"""
return self._queue
@property
def queue_name(self) -> str:
"""
Base name of the connected Queue resource.
:rtype: str
"""
return self._queue_name
@property
def error_queue(self):
"""
The Queue resource for when messages were not processed
correctly.
:rtype: SQS.Queue
"""
return self._error_queue
[docs] def poll_messages(self):
"""
Poll the queue for new messages.
The polling happens as per the `poll_interval` specified, and
the message fetch timeout is set as per the value in
`wait_time`.
:returns: A list of message resources.
:rtype: list[SQS.Message]
"""
while True:
messages = self._queue.receive_messages(
AttributeNames=self._attribute_names,
MessageAttributeNames=self._message_attribute_names,
WaitTimeSeconds=self.wait_time,
MaxNumberOfMessages=self.max_messages_count,
)
if not messages:
sqspy_logger.debug(
f"No messages were fetched for {self.queue_name}. "
f"Sleeping for {self.poll_interval} seconds."
)
sleep(self.poll_interval)
continue
sqspy_logger.info(
f"{len(messages)} messages received for {self.queue_name}"
)
break
return messages
def _start_listening(self):
while True:
messages = self.poll_messages()
for message in messages:
m_body = message.body
message_attribs = message.message_attributes
attribs: Dict = message.attributes
# catch problems with malformed JSON, usually a result
# of someone writing poor JSON directly in the AWS
# console
try:
body = json.loads(m_body)
except:
sqspy_logger.warning(
f"Unable to parse message - JSON is not formatted properly. "
f"Received message: {m_body}"
)
continue
try:
if self._force_delete:
message.delete()
self.handle_message(body, message_attribs, attribs)
else:
self.handle_message(body, message_attribs, attribs)
message.delete()
except Exception as ex:
# need exception logtype to log stack trace
sqspy_logger.exception(ex)
if self._error_queue:
exc_type, exc_obj, exc_tb = sys.exc_info()
sqspy_logger.info("Pushing exception to error queue")
self._error_queue.publish(
{
"exception_type": str(exc_type),
"error_message": str(ex.args),
}
)
[docs] def listen(self):
"""
Method that triggers listening for messages, and forwards to
:meth:`handle_message`.
This is a blocking call.
"""
sqspy_logger.info(f"Listening to queue {self.queue_name}")
if self.error_queue:
sqspy_logger.info(f"Using error queue {self._error_queue_name}")
self._start_listening()
[docs] @abstractmethod
def handle_message(self, body, attributes, messages_attributes):
"""
Method representing the handling of messages retrieved from
queue.
:param Any body: The body retrieved from the queue after
passing through a json deserialiser.
:param dict attributes: A map of the attributes requested from
queue when fetching messages.
See more at: :attr:`SQS.Message.attributes`
:param dict messages_attributes: Strucutred metadata as
retrieved from the queue.
See more at: :attr:`SQS.Message.message_attributes`
:rtype: None
:raises NotImplementedError: If not overridden in a subclass.
"""
raise NotImplementedError("Implement this function in subclass.")