{"id":12893,"date":"2020-08-13T02:09:55","date_gmt":"2020-08-13T06:09:55","guid":{"rendered":"https:\/\/qxf2.com\/blog\/?p=12893"},"modified":"2020-08-17T03:34:41","modified_gmt":"2020-08-17T07:34:41","slug":"filter-messages-from-the-aws-sqs-queue-using-python-and-asyncio","status":"publish","type":"post","link":"https:\/\/qxf2.com\/blog\/filter-messages-from-the-aws-sqs-queue-using-python-and-asyncio\/","title":{"rendered":"Filter Messages from the AWS SQS Queue using Python and asyncio"},"content":{"rendered":"<p><span style=\"font-weight: 400;\">In message-oriented architectures, it is cumbersome for QA to gain visibility into where their test messages are flowing. Visibility usually involves looking at some browser-based monitor (AWS SQS Queue) and\/or monitoring logs that might not even have the data QA wishes to check. <span style=\"font-weight: 400;\">Amazon Simple Queue Service(SQS), is a message queuing services, that allows user to send,receive, store and delete the messages among AWS Microservices components. When I started learning SQS functionalities using python, I had to search through a lot of material available on the official<\/span><a href=\"https:\/\/aws.amazon.com\/sqs\/\"> <span style=\"font-weight: 400;\">Amazon SQS website<\/span><\/a><span style=\"font-weight: 400;\"> as well as other Internet resources.<\/span><\/p>\n<p>I recently had to test a pipeline (AWS stack) that consisted of several lambdas, SQS, and SNS. The messages could flow to different parts of the pipeline based on the logic within the lambdas. I was new to the product and didn&#8217;t really know all the different paths that a message could take through the pipeline. Tracing my initial input message during the exploratory phase required me to check several different SQS and cloud watch logs. This was very inefficient for me.<\/p>\n<p>While I couldn&#8217;t solve this problem during my stay with the client, I have worked on this problem since then. If you are a QA having a similar problem at work, I hope this blog helps you. The below diagram will help you to relate subsequent sections in the blog.<\/p>\n<p><a href=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2020\/08\/aws-sqs-test-system.png\" data-rel=\"lightbox-image-0\" data-rl_title=\"\" data-rl_caption=\"\" title=\"\"><img decoding=\"async\" class=\"aligncenter\" src=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2020\/08\/aws-sqs-test-system.png\" alt=\"\" \/><\/a><\/p>\n<p>As shown in the above diagram, the lambda function will trigger certain messages based on the logic written. We will not delve into the logic, but for the blog purpose, we will assume that based on the certain filter criteria message will go either 1 or 2 or 3 queues. As a tester, I should be able to find which queue has received the message. To mimic the real-world scenario, I have created a setup using a local AWS instance with prepopulated SQS and messages in it. Then I tried to demo filtering message using <code>Python<\/code> and <code>asyncio<\/code>. The steps mentioned below includes Pre-requisites as well as code snippets.<\/span><\/p>\n<hr \/>\n<h3><b>Pre-requisite:<\/b><\/h3>\n<p>1. AWS login is required and details <code>AWS_ACCOUNT_ID<\/code>, <code>AWS_DEFAULT_REGION<\/code>, <code>AWS_ACCESS_KEY_ID<\/code>, <code>AWS_SECRET_ACCESS_KEY<\/code> in the <code>aws_configuration_conf.py<\/code> file. These are user specific details.<\/p>\n<p>2. I have created 3 SQS queues to demonstrate the code. These are stored in the <code>sqs_utilities_conf.py<\/code><\/p>\n<p>3. I have stored Filter key in the <code>key_conf.py<\/code> file.<\/p>\n<p>4. I have pre-populated messages in the SQS queue manually. In case you want to know how to populate a message in the SQS queue manually, then you can find help <a href=\"https:\/\/docs.aws.amazon.com\/AWSSimpleQueueService\/latest\/SQSDeveloperGuide\/sqs-receive-delete-message.html\">here<\/a><\/p>\n<p>5. I have used <code>sample_message.json<\/code>\u00a0as a template. The source code is available <a href=\"https:\/\/github.com\/rahul-bhave\/sqs-utilities\">here<\/a><\/p>\n<p>6. I have used <code>Python 3.8.2<\/code> to run this source code. You can download this version from <a href=\"https:\/\/www.python.org\/downloads\/release\/python-382\/\">here<\/a><\/p>\n<p>7. I have installed <code>asyncio<\/code> module to run this source code. This module provides infrastructure for writing single threaded-concurrent code using co-routines. More details about <code>asyncio<\/code> module can be found <a href=\"https:\/\/pypi.org\/project\/asyncio\/\">here<\/a><\/p>\n<hr \/>\n<h3><b>Summary:<\/b><\/h3>\n<p>In the following sections, I will discuss the following:<\/p>\n<p>1. Filter message from one queue.<br \/>\n2. Filter messages from multiple queues.<br \/>\n3. Filter messages based on filter criteria.<\/p>\n<hr \/>\n<h3><b>Steps to filter the message:<\/b><\/h3>\n<p><strong>1<\/strong>. Suppose using below code we are filtering message polled from one queue.(Note:only required code snippets are mentioned in every step):<\/p>\n<pre lang=\"Python\">\r\nimport boto3\r\nimport json\r\nimport logging\r\nimport os,sys\r\nsys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))\r\nimport conf.aws_configuration_conf as aws_conf\r\nimport conf.sqs_utilities_conf as conf\r\nimport conf.key_conf as key_conf\r\nfrom pythonjsonlogger import jsonlogger\r\n\r\n# logging\r\nlog_handler = logging.StreamHandler()\r\nlog_handler.setFormatter(jsonlogger.JsonFormatter())\r\nlogger = logging.getLogger()\r\nlogger.setLevel(logging.INFO)\r\nlogger.addHandler(log_handler)\r\n\r\n#setting environment variable\r\nos.environ[\"AWS_ACCOUNT_ID\"]= aws_conf.AWS_ACCOUNT_ID\r\nos.environ['AWS_DEFAULT_REGION'] = aws_conf.AWS_DEFAULT_REGION\r\nos.environ['AWS_ACCESS_KEY_ID'] = aws_conf.AWS_ACCESS_KEY_ID\r\nos.environ['AWS_SECRET_ACCESS_KEY'] = aws_conf.AWS_SECRET_ACCESS_KEY\r\n\r\nclass Sqsmessage():\r\n    # class for all sqs utility methods\r\n    logger = logging.getLogger(__name__)\r\n\r\n   def get_messages_from_queue(self,queue_url):\r\n        \"\"\"\r\n        Generates messages from an SQS queue.\r\n        :param queue_url: URL of the SQS queue to drain.\r\n        \"\"\"\r\n        sqs_client = self.get_sqs_client()\r\n        queue = self.get_sqs_queue(queue_url)\r\n        messages = sqs_client.receive_message(QueueUrl=queue.url)\r\n        if 'Messages' in messages:\r\n            for message in messages['Messages']:\r\n                self.logger.info(message['Body'])\r\n        else:\r\n            self.logger.info(\"No messages polled from the queue at this moment\")\r\n       \r\n\r\n    def get_sqs_client(self):\r\n        \"\"\"\r\n        Return sqs_client object\r\n        :param none\r\n        :return sqs_client\r\n        \"\"\"\r\n        sqs_client = boto3.client('sqs')\r\n        self.logger.info(sqs_client)\r\n\r\n        return sqs_client\r\n\r\n    def get_sqs_queue(self,queue_url):\r\n        \"\"\"\r\n        Return queue object from queue_url\r\n        :param queue_url\r\n        :return queue\r\n        \"\"\"\r\n        queue = boto3.resource('sqs').get_queue_by_name(QueueName=queue_url)\r\n        self.logger.info(queue)\r\n\r\n        return queue\r\n     \r\n    if __name__=='__main__':\r\n       _logger = logging.getLogger(__name__)\r\n       _logger.setLevel(logging.DEBUG)\r\n       #creating instance of object and calling necessary method\r\n       sqsmessage_obj = Sqsmessage()\r\n       sqsmessage_obj.get_messages_from_queue('admin-filter')\r\n\r\n    else:\r\n        print('ERROR: Received incorrect comand line input arguments')\r\n<\/pre>\n<p>After running the above code following output will be shown.<\/p>\n<p><a href=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2020\/08\/raw_message.png\" data-rel=\"lightbox-image-1\" data-rl_title=\"\" data-rl_caption=\"\" title=\"\"><img decoding=\"async\" class=\"aligncenter\" src=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2020\/08\/raw_message.png\" alt=\"\" \/><\/a><\/p>\n<hr \/>\n<p><strong>2<\/strong>. My next step is to access multiple queues and get all the messages from those queues. To demo this, I used a dirty hack of using <code>asyncio<\/code>. So to achieve this, I have added <code>import asyncio<\/code> in the import statements. Then I changed,method definition for <code>get_messages_from_queue<\/code> to <code>async def get_messages_from_queue<\/code>. Also one new method <code>async def main<\/code> is defined which will be used to access queues concurrently. The changed code snippets are shown below. Along with the message outputs:<\/p>\n<pre lang=\"Python\">\r\nimport boto3\r\nimport json\r\nimport logging\r\nimport os,sys\r\nsys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))\r\nimport conf.aws_configuration_conf as aws_conf\r\nimport conf.sqs_utilities_conf as conf\r\nimport conf.key_conf as key_conf\r\nfrom pythonjsonlogger import jsonlogger\r\n \r\n# logging\r\nlog_handler = logging.StreamHandler()\r\nlog_handler.setFormatter(jsonlogger.JsonFormatter())\r\nlogger = logging.getLogger()\r\nlogger.setLevel(logging.INFO)\r\nlogger.addHandler(log_handler)\r\n \r\n#setting environment variable\r\nos.environ[\"AWS_ACCOUNT_ID\"]= aws_conf.AWS_ACCOUNT_ID\r\nos.environ['AWS_DEFAULT_REGION'] = aws_conf.AWS_DEFAULT_REGION\r\nos.environ['AWS_ACCESS_KEY_ID'] = aws_conf.AWS_ACCESS_KEY_ID\r\nos.environ['AWS_SECRET_ACCESS_KEY'] = aws_conf.AWS_SECRET_ACCESS_KEY\r\n \r\nclass Sqsmessage():\r\n    # class for all sqs utility methods\r\n    logger = logging.getLogger(__name__)\r\n \r\n   async def get_messages_from_queue(self,queue_url):\r\n       \"\"\"\r\n        Generates messages from an SQS queue.\r\n        :param queue_url: URL of the SQS queue to drain.\r\n        \"\"\"\r\n        sqs_client = self.get_sqs_client()\r\n        queue = self.get_sqs_queue(queue_url)\r\n        messages = sqs_client.receive_message(QueueUrl=queue.url)\r\n        if 'Messages' in messages:\r\n            for message in messages['Messages']:\r\n                self.logger.info(message['Body']))\r\n        else:\r\n            self.logger.info(\"No messages polled from the queue at this moment\")\r\n\r\n   def get_sqs_client(self):\r\n        \"\"\"\r\n        Return sqs_client object\r\n        :param none\r\n        :return sqs_client\r\n        \"\"\"\r\n        sqs_client = boto3.client('sqs')\r\n        self.logger.info(sqs_client)\r\n \r\n        return sqs_client\r\n \r\n    def get_sqs_queue(self,queue_url):\r\n        \"\"\"\r\n        Return queue object from queue_url\r\n        :param queue_url\r\n        :return queue\r\n        \"\"\"\r\n        queue = boto3.resource('sqs').get_queue_by_name(QueueName=queue_url)\r\n        self.logger.info(queue)\r\n \r\n        return queue\r\n      \r\n   async def main():\r\n       \"\"\"\r\n       Schedule calls concurrently\r\n       \"\"\"\r\n       sqsmessage_obj = Sqsmessage()\r\n       while True:\r\n         tasks = []\r\n         for every_queue_url in conf.QUEUE_URL_LIST:\r\n            tasks.append(sqsmessage_obj.get_messages_from_queue(every_queue_url))\r\n         result = await asyncio.gather(*tasks)\r\n\r\n   if __name__=='__main__':\r\n     #Running asyncio main\r\n     _logger = logging.getLogger(__name__)\r\n     _logger.setLevel(logging.DEBUG)\r\n     sqsmessage_obj = Sqsmessage()\r\n     asyncio.run(main())\r\n   else:\r\n    print('ERROR: Received incorrect comand line input arguments')\r\n\r\n<\/pre>\n<p>After running the above code, we will get all the messages from the queues.<\/p>\n<p><a href=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2020\/07\/sqs-step2-3.png\" data-rel=\"lightbox-image-2\" data-rl_title=\"\" data-rl_caption=\"\" title=\"\"><img decoding=\"async\" class=\"aligncenter\" src=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2020\/07\/sqs-step2-3.png\" alt=\"\" \/><\/a><\/p>\n<hr\/>\n<p><strong>3<\/strong>. In the next steps, we will filter only those messages which have <code> Value > 70 <\/code>. I have defined new method <code>filter_message<\/code>. So the message where <code>Value<= 70<\/code> will not be fetched (the messages which are highlighted by a red bracket in the above image)after applying the filter.I have hardcoaded <code>filter_key<\/code>,<code>filter_value<\/code>,<code>filter_criteria<\/code> in the <code>get_messages_from_queue<\/code> when it is called in the <code>main<\/code> method. I will come up with another blog where user can define data structure,filter for the message in future. Code snippets will look like below now:<\/p>\n<pre lang=\"Python\">\r\n    def filter_message(self,message,filter_key,filter_value,filter_criteria):\r\n        \"\"\"\r\n        Fetches filtered message from sqs queue\r\n        :param message: message\r\n        :filter_key: dict key\r\n        :filter_value: dict value\r\n        :filter_criteria: filter criteria greater than,equal to,less than\r\n        :return: print filtered message\r\n        \"\"\"\r\n        if 'Body' in message.keys():\r\n            message_body_obj = self.get_dict(message['Body'])\r\n            message_body_obj_key_list, message_body_obj_value_list = \\\r\n            self.get_value_key_list(message_body_obj)\r\n            if filter_key in message_body_obj_key_list and filter_criteria \\\r\n            == 'greater than':\r\n                    if any(operator.gt(int(ele), int(filter_value)) \\\r\n                           for ele in message_body_obj_value_list):\r\n                        self.logger.info(message_body_obj)\r\n                    else:\r\n                        self.logger.info(\"Filter value not found in the message value list\")\r\n            else:\r\n                self.logger.info \\\r\n                (\"Filter key not found in message key list or Filter criteria not defined\")\r\n        else:\r\n            self.logger.info(\"Message does not have body attribute\")\r\n     \r\n   def get_recursive_items(self, dictionary):\r\n        \"\"\"\r\n        This method will be used to get keys and values\r\n        param: dict\r\n        return : key,value\r\n        \"\"\"\r\n        for key, value in dictionary.items():\r\n            if type(value) is dict:\r\n                yield (key, value)\r\n                yield from self.get_recursive_items(value)\r\n            else:\r\n                yield(key,value)\r\n\r\n        return key, value\r\n\r\n   def get_value_key_list(self, dictionary):\r\n        \"\"\"\r\n        Method to get key and value list for any dict\r\n        param: dict object\r\n        return: key_list, value_list\r\n        \"\"\"\r\n        key_list=[]\r\n        value_list=[]\r\n        for key,value in self.get_recursive_items(dictionary):\r\n            key_list = key_list + [key]\r\n            value_list = value_list = [value]\r\n\r\n        return(key_list, value_list)\r\n\r\n<\/pre>\n<pre lang=\"Python\">\r\n   async def get_messages_from_queue(self,queue_url,filter_key,filter_value,filter_criteria):\r\n      \"\"\"\r\n      Generates messages from an SQS queue.\r\n      :param queue_url: URL of the SQS queue to drain.\r\n      :param filter_key: dict key(This will be hard coaded in main method for this blog)\r\n      :param filter_value: dict value(This will be hard coaded in main method for this blog)\r\n      :param filter_criteria: filter criteria greater than,equal to,less than\r\n      :(filter_criteria will be hard coaded in main method for this blog)\r\n      \"\"\"\r\n      sqs_client = self.get_sqs_client()\r\n      queue = self.get_sqs_queue(queue_url)\r\n      messages = sqs_client.receive_message(QueueUrl=queue.url)\r\n      if 'Messages' in messages:\r\n         for message in messages['Messages']:\r\n             self.filter_message(message,filter_key,filter_value,filter_criteria)\r\n      else:\r\n             self.logger.info(\"No messages polled from the queue at this moment\")\r\n<\/pre>\n<pre lang=\"Python\">\r\n   async def main():\r\n      \"\"\"\r\n      Schedule calls concurrently\r\n      # https:\/\/www.educative.io\/blog\/python-concurrency-making-sense-of-asyncio\r\n      # https:\/\/www.integralist.co.uk\/posts\/python-asyncio\/\r\n      \"\"\"\r\n      sqsmessage_obj = Sqsmessage()\r\n      while True:\r\n        tasks = []\r\n        for every_queue_url in conf.QUEUE_URL_LIST:\r\n            tasks.append(sqsmessage_obj.get_messages_from_queue(every_queue_url, \\\r\n            filter_key='quantity',filter_value='70', filter_criteria='greater than'))\r\n        result = await asyncio.gather(*tasks)\r\n\r\n    if __name__=='__main__':\r\n       #Running asyncio main\r\n       _logger = logging.getLogger(__name__)\r\n       _logger.setLevel(logging.DEBUG)\r\n       asyncio.run(main())\r\n    else:\r\n       print('ERROR: Received incorrect comand line input arguments')\r\n<\/pre>\n<p>When you run the code, it will filter only those messages where <code> Value > 70<\/code>. <\/p>\n<p><a href=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2020\/07\/sqs-step3-1.png\" data-rel=\"lightbox-image-3\" data-rl_title=\"\" data-rl_caption=\"\" title=\"\"><img decoding=\"async\" class=\"aligncenter\" src=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2020\/07\/sqs-step3-1.png\" alt=\"\" \/><\/a><\/p>\n<hr\/>\n<p>I hope you have liked the blog.\u00a0The source code is available <a href=\"https:\/\/github.com\/rahul-bhave\/sqs-utilities\">here<\/a>. I have attempted to capture the most commonly used functionalities you can automate using <code>Python<\/code> and <code>boto3<\/code> resources. You can find some useful documentation about <code>boto3<\/code> <a href=\"https:\/\/boto3.amazonaws.com\/v1\/documentation\/api\/latest\/guide\/sqs.html#sqs\">here.<\/a><\/p>\n<hr\/>\n","protected":false},"excerpt":{"rendered":"<p>In message-oriented architectures, it is cumbersome for QA to gain visibility into where their test messages are flowing. Visibility usually involves looking at some browser-based monitor (AWS SQS Queue) and\/or monitoring logs that might not even have the data QA wishes to check. Amazon Simple Queue Service(SQS), is a message queuing services, that allows user to send,receive, store and delete [&hellip;]<\/p>\n","protected":false},"author":28,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[240],"tags":[237,241],"class_list":["post-12893","post","type-post","status-publish","format-standard","hentry","category-aws-sqs","tag-amazon-simple-queue-servicesqs","tag-python-asyncio"],"_links":{"self":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/12893","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/users\/28"}],"replies":[{"embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/comments?post=12893"}],"version-history":[{"count":148,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/12893\/revisions"}],"predecessor-version":[{"id":13489,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/12893\/revisions\/13489"}],"wp:attachment":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/media?parent=12893"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/categories?post=12893"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/tags?post=12893"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}