Filter Messages from the AWS SQS Queue using Python and asyncio

In message-oriented architectures, it is cumbersome for QA to gain visibility into where their test messages are flowing. Visibility usually involves looking at some browser-based monitor (AWS SQS Queue) and/or monitoring logs that might not even have the data QA wishes to check. Amazon Simple Queue Service(SQS), is a message queuing services, that allows user to send,receive, store and delete the messages among AWS Microservices components. When I started learning SQS functionalities using python, I had to search through a lot of material available on the official Amazon SQS website as well as other Internet resources.

I recently had to test a pipeline (AWS stack) that consisted of several lambdas, SQS, and SNS. The messages could flow to different parts of the pipeline based on the logic within the lambdas. I was new to the product and didn’t really know all the different paths that a message could take through the pipeline. Tracing my initial input message during the exploratory phase required me to check several different SQS and cloud watch logs. This was very inefficient for me.

While I couldn’t solve this problem during my stay with the client, I have worked on this problem since then. If you are a QA having a similar problem at work, I hope this blog helps you. The below diagram will help you to relate subsequent sections in the blog.

As shown in the above diagram, the lambda function will trigger certain messages based on the logic written. We will not delve into the logic, but for the blog purpose, we will assume that based on the certain filter criteria message will go either 1 or 2 or 3 queues. As a tester, I should be able to find which queue has received the message. To mimic the real-world scenario, I have created a setup using a local AWS instance with prepopulated SQS and messages in it. Then I tried to demo filtering message using Python and asyncio. The steps mentioned below includes Pre-requisites as well as code snippets.


Pre-requisite:

1. AWS login is required and details AWS_ACCOUNT_ID, AWS_DEFAULT_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY in the aws_configuration_conf.py file. These are user specific details.

2. I have created 3 SQS queues to demonstrate the code. These are stored in the sqs_utilities_conf.py

3. I have stored Filter key in the key_conf.py file.

4. I have pre-populated messages in the SQS queue manually. In case you want to know how to populate a message in the SQS queue manually, then you can find help here

5. I have used sample_message.json as a template. The source code is available here

6. I have used Python 3.8.2 to run this source code. You can download this version from here

7. I have installed asyncio module to run this source code. This module provides infrastructure for writing single threaded-concurrent code using co-routines. More details about asyncio module can be found here


Summary:

In the following sections, I will discuss the following:

1. Filter message from one queue.
2. Filter messages from multiple queues.
3. Filter messages based on filter criteria.


Steps to filter the message:

1. Suppose using below code we are filtering message polled from one queue.(Note:only required code snippets are mentioned in every step):

import boto3
import json
import logging
import os,sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import conf.aws_configuration_conf as aws_conf
import conf.sqs_utilities_conf as conf
import conf.key_conf as key_conf
from pythonjsonlogger import jsonlogger
 
# logging
log_handler = logging.StreamHandler()
log_handler.setFormatter(jsonlogger.JsonFormatter())
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)
 
#setting environment variable
os.environ["AWS_ACCOUNT_ID"]= aws_conf.AWS_ACCOUNT_ID
os.environ['AWS_DEFAULT_REGION'] = aws_conf.AWS_DEFAULT_REGION
os.environ['AWS_ACCESS_KEY_ID'] = aws_conf.AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = aws_conf.AWS_SECRET_ACCESS_KEY
 
class Sqsmessage():
    # class for all sqs utility methods
    logger = logging.getLogger(__name__)
 
   def get_messages_from_queue(self,queue_url):
        """
        Generates messages from an SQS queue.
        :param queue_url: URL of the SQS queue to drain.
        """
        sqs_client = self.get_sqs_client()
        queue = self.get_sqs_queue(queue_url)
        messages = sqs_client.receive_message(QueueUrl=queue.url)
        if 'Messages' in messages:
            for message in messages['Messages']:
                self.logger.info(message['Body'])
        else:
            self.logger.info("No messages polled from the queue at this moment")
 
 
    def get_sqs_client(self):
        """
        Return sqs_client object
        :param none
        :return sqs_client
        """
        sqs_client = boto3.client('sqs')
        self.logger.info(sqs_client)
 
        return sqs_client
 
    def get_sqs_queue(self,queue_url):
        """
        Return queue object from queue_url
        :param queue_url
        :return queue
        """
        queue = boto3.resource('sqs').get_queue_by_name(QueueName=queue_url)
        self.logger.info(queue)
 
        return queue
 
    if __name__=='__main__':
       _logger = logging.getLogger(__name__)
       _logger.setLevel(logging.DEBUG)
       #creating instance of object and calling necessary method
       sqsmessage_obj = Sqsmessage()
       sqsmessage_obj.get_messages_from_queue('admin-filter')
 
    else:
        print('ERROR: Received incorrect comand line input arguments')

After running the above code following output will be shown.


2. My next step is to access multiple queues and get all the messages from those queues. To demo this, I used a dirty hack of using asyncio. So to achieve this, I have added import asyncio in the import statements. Then I changed,method definition for get_messages_from_queue to async def get_messages_from_queue. Also one new method async def main is defined which will be used to access queues concurrently. The changed code snippets are shown below. Along with the message outputs:

import boto3
import json
import logging
import os,sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import conf.aws_configuration_conf as aws_conf
import conf.sqs_utilities_conf as conf
import conf.key_conf as key_conf
from pythonjsonlogger import jsonlogger
 
# logging
log_handler = logging.StreamHandler()
log_handler.setFormatter(jsonlogger.JsonFormatter())
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)
 
#setting environment variable
os.environ["AWS_ACCOUNT_ID"]= aws_conf.AWS_ACCOUNT_ID
os.environ['AWS_DEFAULT_REGION'] = aws_conf.AWS_DEFAULT_REGION
os.environ['AWS_ACCESS_KEY_ID'] = aws_conf.AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = aws_conf.AWS_SECRET_ACCESS_KEY
 
class Sqsmessage():
    # class for all sqs utility methods
    logger = logging.getLogger(__name__)
 
   async def get_messages_from_queue(self,queue_url):
       """
        Generates messages from an SQS queue.
        :param queue_url: URL of the SQS queue to drain.
        """
        sqs_client = self.get_sqs_client()
        queue = self.get_sqs_queue(queue_url)
        messages = sqs_client.receive_message(QueueUrl=queue.url)
        if 'Messages' in messages:
            for message in messages['Messages']:
                self.logger.info(message['Body']))
        else:
            self.logger.info("No messages polled from the queue at this moment")
 
   def get_sqs_client(self):
        """
        Return sqs_client object
        :param none
        :return sqs_client
        """
        sqs_client = boto3.client('sqs')
        self.logger.info(sqs_client)
 
        return sqs_client
 
    def get_sqs_queue(self,queue_url):
        """
        Return queue object from queue_url
        :param queue_url
        :return queue
        """
        queue = boto3.resource('sqs').get_queue_by_name(QueueName=queue_url)
        self.logger.info(queue)
 
        return queue
 
   async def main():
       """
       Schedule calls concurrently
       """
       sqsmessage_obj = Sqsmessage()
       while True:
         tasks = []
         for every_queue_url in conf.QUEUE_URL_LIST:
            tasks.append(sqsmessage_obj.get_messages_from_queue(every_queue_url))
         result = await asyncio.gather(*tasks)
 
   if __name__=='__main__':
     #Running asyncio main
     _logger = logging.getLogger(__name__)
     _logger.setLevel(logging.DEBUG)
     sqsmessage_obj = Sqsmessage()
     asyncio.run(main())
   else:
    print('ERROR: Received incorrect comand line input arguments')

After running the above code, we will get all the messages from the queues.


3. In the next steps, we will filter only those messages which have Value > 70 . I have defined new method filter_message. So the message where Value<= 70 will not be fetched (the messages which are highlighted by a red bracket in the above image)after applying the filter.I have hardcoaded filter_key,filter_value,filter_criteria in the get_messages_from_queue when it is called in the main method. I will come up with another blog where user can define data structure,filter for the message in future. Code snippets will look like below now:

    def filter_message(self,message,filter_key,filter_value,filter_criteria):
        """
        Fetches filtered message from sqs queue
        :param message: message
        :filter_key: dict key
        :filter_value: dict value
        :filter_criteria: filter criteria greater than,equal to,less than
        :return: print filtered message
        """
        if 'Body' in message.keys():
            message_body_obj = self.get_dict(message['Body'])
            message_body_obj_key_list, message_body_obj_value_list = \
            self.get_value_key_list(message_body_obj)
            if filter_key in message_body_obj_key_list and filter_criteria \
            == 'greater than':
                    if any(operator.gt(int(ele), int(filter_value)) \
                           for ele in message_body_obj_value_list):
                        self.logger.info(message_body_obj)
                    else:
                        self.logger.info("Filter value not found in the message value list")
            else:
                self.logger.info \
                ("Filter key not found in message key list or Filter criteria not defined")
        else:
            self.logger.info("Message does not have body attribute")
 
   def get_recursive_items(self, dictionary):
        """
        This method will be used to get keys and values
        param: dict
        return : key,value
        """
        for key, value in dictionary.items():
            if type(value) is dict:
                yield (key, value)
                yield from self.get_recursive_items(value)
            else:
                yield(key,value)
 
        return key, value
 
   def get_value_key_list(self, dictionary):
        """
        Method to get key and value list for any dict
        param: dict object
        return: key_list, value_list
        """
        key_list=[]
        value_list=[]
        for key,value in self.get_recursive_items(dictionary):
            key_list = key_list + [key]
            value_list = value_list = [value]
 
        return(key_list, value_list)
   async def get_messages_from_queue(self,queue_url,filter_key,filter_value,filter_criteria):
      """
      Generates messages from an SQS queue.
      :param queue_url: URL of the SQS queue to drain.
      :param filter_key: dict key(This will be hard coaded in main method for this blog)
      :param filter_value: dict value(This will be hard coaded in main method for this blog)
      :param filter_criteria: filter criteria greater than,equal to,less than
      :(filter_criteria will be hard coaded in main method for this blog)
      """
      sqs_client = self.get_sqs_client()
      queue = self.get_sqs_queue(queue_url)
      messages = sqs_client.receive_message(QueueUrl=queue.url)
      if 'Messages' in messages:
         for message in messages['Messages']:
             self.filter_message(message,filter_key,filter_value,filter_criteria)
      else:
             self.logger.info("No messages polled from the queue at this moment")
   async def main():
      """
      Schedule calls concurrently
      # https://www.educative.io/blog/python-concurrency-making-sense-of-asyncio
      # https://www.integralist.co.uk/posts/python-asyncio/
      """
      sqsmessage_obj = Sqsmessage()
      while True:
        tasks = []
        for every_queue_url in conf.QUEUE_URL_LIST:
            tasks.append(sqsmessage_obj.get_messages_from_queue(every_queue_url, \
            filter_key='quantity',filter_value='70', filter_criteria='greater than'))
        result = await asyncio.gather(*tasks)
 
    if __name__=='__main__':
       #Running asyncio main
       _logger = logging.getLogger(__name__)
       _logger.setLevel(logging.DEBUG)
       asyncio.run(main())
    else:
       print('ERROR: Received incorrect comand line input arguments')

When you run the code, it will filter only those messages where Value > 70.


I hope you have liked the blog. The source code is available here. I have attempted to capture the most commonly used functionalities you can automate using Python and boto3 resources. You can find some useful documentation about boto3 here.


2 thoughts on “Filter Messages from the AWS SQS Queue using Python and asyncio

  1. Hi,
    I saw you put ‘sqs_client.receive_message’ on ‘async def get_messages_from_queue’
    I am not sure if putting a blocking IO method in coroutine then it could be executed parallelly?
    If the answer is yes, do we need aioboto3?

    1. Hi Yehudi,
      Based on your query I understand that you want to know if aioboto3 can be used instead of asyncio and boto3. If you can refer to the aioboto3 documentation https://pypi.org/project/aioboto3/ under the tested services SQS is not mentioned under tested services table. So we cannot confirm whether replacing asyncio + boto3 with aioboto3 will work with respect to the example mentioned in the blog. You may have to try and check if it works. While going over aioboto3 I came across the below sample from the aiobotocore library(https://github.com/aio-libs/aiobotocore/blob/master/tests/test_sqs.py). Hope this helps.
      Regards,

Leave a Reply

Your email address will not be published. Required fields are marked *