Recently, I have attempted to test microservices based architecture where the AWS Lambda function consumes messages from a microservices endpoint and posts them to the skype channel. I have made an attempt to add most of the validations for the flow. I have included this end-to-end test in the below blog.
Background:
The above architecture explains, microservices based architecture which posts messages on the Skype channel. This comprises of following components:
1. A microservice for messages
2. CloudWatch event triggers
3. qxf2-employee-messages Lambda
4. qxf2-bot-sender Lambda to call an endpoint to send Skype messages
5. A skype-sender SQS
6. A microservice to send skype messages
So the end-to-end flow goes this way, CloudWatch event triggers qxf2-employee-messages lambda which consumes messages microservice and then sends the message to the Skype-sender SQS queue. Then another lambda qxf2-bot-sender used to call microservice to send skype message on the channel.
Test Approach:
As there are not any testing frameworks available to test messaging architecture pipeline. We derived the following test strategy to cover end to end test. I know this approach can be modified to make it more robust, by adding more verifications such as logging into the skype channel and verify the message, tracing failures across the pipelines, etc. However, using the below approach, we have attempted to test the entire messaging pipeline.
We have divided the entire end to end flow into 3 steps:
1. Trigger, qxf2-employee-messages lambda: The lambda consumes messages microservice and then sends the message to the Skype-sender SQS queue.
2. Get the message from CloudWatch logs: Log record pointer used to fetch this message. As soon as the message is triggered, the CloudWatch log captures the message. Hence, I am fetching the same message from CloudWatch logs.
3. Validate the message retrieved from CloudWatch logs with the message from skype-listener-queue and source text file: From skype-listener SQS queue qxf2-bot-sender lambda picks up the message so this is the endpoint of the pipeline. Source text file, i.e. comments.txt file, is the starting point of the pipeline from which randomly messages picked up by daily-messages-microservices. The lambda, i.e. qxf2-daily-messages lambada consumes the same message when triggered. So eventually, we are validating the start and endpoint of the flow. (For more information, please refer above diagram)
The test case is organized into helper functions, configuration files, and main test file. Pytest framework is used for running test case. Helper functions are organized based on the AWS Services such as Lambda or module names.
1. Trigger, qxf2-employee-messages lambda:
Method trigger_cron_lambda
will trigger qxf2-employee-message lambda. Lambada helper file contents have been shown as below.
""" Lambda helper """ import os import sys import logging import boto3 import tests.conf.lambda_configuration_conf as lambda_conf sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def trigger_cron_lambda(lambda_name: str): """ :return: The AWS response. Except a response """ _logger = logging.getLogger(__name__) _logger.setLevel(logging.DEBUG) client = boto3.client('lambda') Event = {} response = client.invoke(FunctionName=lambda_name,InvocationType='Event',\ LogType='None',Payload=b'{"endpoint": "/message","channel": "test"}') return response |
2.Get the message from Cloudwatch logs:
We have fetched log record pointer value, using method get_log_ptr
. We have stored, log group and query details are stored in the configuration file.
After fetching the log record pointer, we have used the get_message
method to get the message from the CloudWatch log record.
Both these methods and other methods used in these methods are located in the CloudWatch helper.
""" Helper module for CloudWatch log """ import os import sys import ast import collections from datetime import datetime, timedelta import time import boto3 import conf.cloudwatch_configuration_conf as cloudwatch_conf sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def get_data_structure(data): """ Method used for converting nested dictionary/list to data similar to tabular form """ obj = collections.OrderedDict() def recurse(dataobject,parent_key=""): """ Method will recurse through object """ if isinstance(dataobject,list): # loop through list and call recurse() for i in range(len(dataobject)): recurse(dataobject[i],parent_key + "_" + str(i) if parent_key else str(i)) elif isinstance(dataobject,dict): # loop through dictionary and call recurse() for key,value in dataobject.items(): recurse(value,parent_key + "_" + key if parent_key else key) else: # use the parent_key and store the value to obj obj[parent_key] = dataobject recurse(data) return obj def get_response_log_daily_messages(request_id,log_group,query): """ getting response from daily message lambda """ client = boto3.client('logs') start_query_response = client.start_query(logGroupName=log_group,\ startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\ endTime=int(datetime.now().timestamp()),queryString=query) query_id = start_query_response['queryId'] response = None while response is None: time.sleep(1) response = client.get_query_results(queryId=query_id) return response.get('results') def get_response_log_skype_sender(log_group,query): """ getting log from skype_sender """ client = boto3.client('logs') start_query_response = client.start_query(logGroupName=log_group,\ startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\ endTime=int(datetime.now().timestamp()),queryString=query) query_id = start_query_response['queryId'] response = None while response is None: time.sleep(1) response = client.get_query_results(queryId=query_id) return response def get_ptr_value(log_group,query): """ getting ptr_value from response """ client = boto3.client('logs') start_query_response = client.start_query(logGroupName=log_group,\ startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\ endTime=int(datetime.now().timestamp()),queryString=query) query_id = start_query_response['queryId'] response = None ptr_value = None while response is None: time.sleep(1) response = client.get_query_results(queryId=query_id) response_dict = get_data_structure(response) if cloudwatch_conf.ptr_value in response_dict.keys(): ptr_value = response_dict[cloudwatch_conf.ptr_value] else: print(f'log pointer key could not be fetched from response dictionary.') return ptr_value def get_message_id(ptr_value): """ To get message id """ client = boto3.client('logs') response = client.get_log_record(logRecordPointer=ptr_value) response_dict = get_data_structure(response) request_id = response_dict[cloudwatch_conf.record_messageid] return request_id def get_full_message(ptr_value): """ To get full message """ client = boto3.client('logs') response = client.get_log_record(logRecordPointer=ptr_value) response_dict = get_data_structure(response) return response_dict def get_message(ptr_value): """ To get message """ client = boto3.client('logs') response = client.get_log_record(logRecordPointer=ptr_value) response_dict = get_data_structure(response) message = response_dict[cloudwatch_conf.record_body] message_dict = ast.literal_eval(message) return message_dict['msg'] |
3. Validate the message retrieved from CloudWatch logs with the message from skype-listener-queue and source text file:
Polling the SQS queue is an asynchronous process, we have created asyncio helper to poll the SQS queue. Method poll_message
used for polling the SQS queue. This method takes messages from CloudWatch logs as input. While polling we are achieving two validations(3a.1 and 3a.2 sections mentioned below) and exiting when the test passes/fails
""" Helper module for asyncio methods """ import sys import asyncio import logging import tests.helpers.filter_message_helper import tests.helpers.sqs_helper import tests.conf.sqs_utilities_conf as queue_url_conf # Declaring class Style class Style(): """ Declaring Style class """ BLACK = '\033[30m' RED = '\033[31m' GREEN = '\033[32m' YELLOW = '\033[33m' BLUE = '\033[34m' MAGENTA = '\033[35m' CYAN = '\033[36m' WHITE = '\033[37m' UNDERLINE = '\033[4m' RESET = '\033[0m' async def validate_message_with_sqs(queue_url, message_cloudwatch): """ Validates message from sqs queue with cloudwatch logs :param queue_url: URL of the SQS queue :message_cloudwatch: Message received from cloudwatch logs """ _logger = logging.getLogger(__name__) _logger.setLevel(logging.DEBUG) message = tests.helpers.sqs_helper.get_message_from_queue(queue_url) result_flag = tests.helpers.filter_message_helper.publish_compare_result\ (message,message_cloudwatch) if result_flag is True: sys.exit() return result_flag async def poll_message(message_cloudwatch): """ Schedule calls concurrently """ while True: tasks = [] for every_queue_url in queue_url_conf.QUEUE_URL_LIST: tasks.append(validate_message_with_sqs(every_queue_url,message_cloudwatch)) result = await asyncio.gather(*tasks) |
""" Helper module for sqs messages """ import os import sys import boto3 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def get_sqs_client(): """ Return sqs_client object :param none :return sqs_client """ sqs_client = boto3.client('sqs') return sqs_client def get_sqs_queue(queue_url): """ Return queue object from queue_url :param queue_url :return queue """ queue = boto3.resource('sqs').get_queue_by_name(QueueName=queue_url) return queue def get_message_from_queue(queue_url): """ get messsage from queue_url """ sqs_client = get_sqs_client() queue = get_sqs_queue(queue_url) message = sqs_client.receive_message(QueueUrl=queue.url) return message def purge_sqs_queue(queue_url): """ Reteun status """ queue = get_sqs_queue(queue_url) client = get_sqs_client() client.purge_queue(QueueUrl=queue.url) |
While polling message, we are validating following:
Method validate_message_with_cloudwatch_logs
from filter message helper used for this validation.
def compare_message_cloudwatch_log(message_on_channel, message_cloudwatch): """ compare message with cloudwatch log message """ result_flag = False if message_on_channel == message_cloudwatch: result_flag = True else: result_flag = False return result_flag def validate_message_with_cloudwatch_logs(message_on_channel,message_cloudwatch): """ Asserting method on channels with cloudwatch logs """ result_flag = False if message_on_channel is not None: result_flag = compare_message_cloudwatch_log(message_on_channel,message_cloudwatch) if result_flag is True: print(Style.CYAN + '---------------\ ------------------------------------------------------------') print(Style.CYAN + 'Step 3a. Validating \ message with Skype listener SQS Queue-------------------') print(Style.GREEN + 'Message on channel \ does match with the message from cloudwatch logs') print(Style.CYAN + '----------------------\ -----------------------------------------------------') result_flag = validate_message_with_culture_file(message_on_channel) else: print(Style.CYAN + '-------------\ --------------------------------------------------------------') print(Style.RED + 'Message on channel does not match with \ the message from cloudwatch logs') print(Style.CYAN + '---------------------\ ------------------------------------------------------') else: print(Style.CYAN + '---------------------------\ ------------------------------------------------') print("No message on channel") print(Style.CYAN + '------------------------------\ ---------------------------------------------') return result_flag |
Note that, for filtering message from the message queue, we have used the sender user id and channel chat id as filter criteria’s in the filter_message
from the message queue helper.
def get_dict(body_string): """ Generates dict from message body :param string :return dict object """ body_string = json.dumps(body_string) body_string = body_string.replace("'", "\"") body_string = json.loads(body_string) message_body_obj = json.loads(body_string) return message_body_obj def get_message_body(message): """ This method will return message body """ msg_body = "" if 'Messages' in message: for message in message['Messages']: if 'Body' in message.keys(): message_body_obj = get_dict(message['Body']) if 'Message' in message_body_obj.keys(): msg_body = get_dict(message_body_obj['Message']) else: print("Message key is not present in the Message Body") sys.exit() else: print("Message does not contain Body") sys.exit() else: print("No messages are retrieved") with pytest.raises(SystemExit): sys.exit() return msg_body def filter_message(message,chat_id,user_id): """ Filter method based on chat_id and user_id return: Boolean value """ message_body = get_message_body(message) if message_body is not None: if "chat_id" in message_body and "user_id" in message_body: if message_body['chat_id']==chat_id and message_body['user_id']==user_id: print(f'message is from test channel and sender is skype sender lambda') else: print(f'Neither message is from test channel nor sender is skype sender lambda') else: print(f'Message does not contain required keys') else: print(f'Message body is not none') return True |
Method validate_message_with_culture_file
gives comparison results between source text file and CloudWatch log message.
def validate_message_with_culture_file(message_on_channel): """ Asserting message on channel with culture file """ result_flag = False if message_on_channel is not None: result_flag = compare_message_with_file(message_on_channel,CULTURE_FILE) if result_flag is True: print(Style.CYAN + '---------------------------\ ------------------------------------------------') print(Style.CYAN + 'Step 3b. Validating \ message with culture file------------------------------') print(Style.GREEN + 'Message \ on channel does match with culture file') print(Style.CYAN + '-----------\ ----------------------------------------------------------------') else: print(Style.CYAN + '------------\ ---------------------------------------------------------------') print(Style.GREEN + 'Message \ on channel does match with culture file') print(Style.CYAN + '---------\ ------------------------------------------------------------------') else: print(Style.CYAN + 'There is no message on channel') return result_flag |
The entire filter message helper code is available here:
4. Putting it all together in test:
""" This End to end test employee skype message covers the following: Setup- Purging SQS queue Step 1: Trigger employee message lambda Step 2: Print message from cloudwatch logs Step 3: Verify message with skype-listener sqs queue and culture file """ import os import sys import time import asyncio import logging import unittest import pytest from pythonjsonlogger import jsonlogger import tests.helpers.asyncio_helper import tests.helpers.cloudwatch_helper import tests.helpers.filter_message_helper import tests.helpers.lambda_helper import tests.conf.cloudwatch_configuration_conf as cloudwatch_conf import tests.conf.sqs_utilities_conf as queue_url_conf import tests.conf.lambda_configuration_conf as lambda_conf sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # logging log_handler = logging.StreamHandler() log_handler.setFormatter(jsonlogger.JsonFormatter()) logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(log_handler) # Declaring class for test object class Skypemessagetest(): """ Class for test object """ logger = logging.getLogger(__name__) def __init__(self): """ Initilalise class """ def get_request_id(self): """ get the response from lambda """ request_id = tests.helpers.lambda_helper.get_request_id_from_lambda_response() return request_id def get_message_from_cloudwatch_log_ptr(self): """ Method to get message from cloudwatch log pointer """ message = None for i in range(1, 6): ptr_value = tests.helpers.cloudwatch_helper.get_ptr_value\ (cloudwatch_conf.log_group_bot_sender,cloudwatch_conf.query_skype_sender) if ptr_value: message = tests.helpers.cloudwatch_helper.get_message(ptr_value) break time.sleep(60) return message def clear_queues(self): """ Method to clear queues """ for every_queue_url in queue_url_conf.QUEUE_URL_LIST: tests.helpers.sqs_helper.purge_sqs_queue(every_queue_url) time.sleep(1) class TestEndtoEndSkypeMessage(unittest.TestCase): """ Test class """ def test_end_to_end_skype_message(self): """ Test case """ Skypemessagetest_obj = Skypemessagetest() logger.info("Setup- Purge SQS queue") logger.info("---------------------------------------------------------------------------") Skypemessagetest_obj.clear_queues() logger.info("Step 1: Trigger employee message lambda--------------------------------") tests.helpers.lambda_helper.trigger_cron_lambda(lambda_conf.daily_message_lambda) logger.info("---------------------------------------------------------------------------") logger.info("Step 2: Print message from cloudwatch logs------------------------------") message = Skypemessagetest_obj.get_message_from_cloudwatch_log_ptr() logger.info("---------------------------------------------------------------------------") logger.info(message) logger.info("-------------------------------------------------------------------------- ") logger.info("Step 3: Verify message with skype-listener sqs queue and culture file----") with pytest.raises(SystemExit) as system_exception: asyncio.run(tests.helpers.asyncio_helper.poll_message(message)) assert system_exception.type == SystemExit logger.info("-------------------------------------------------------------------------- ") |
This is how output looks on bash console:
Note that the same message can be seen on the skype channel. This validation, I have done manually. I will enhance the test to include this validation in the test flow.
I hope this blog would help any engineer, who would want to build an end-to-end test based on messaging architecture. The entire code is available at here.
I have around 15 years of experience in Software Testing. I like tackling Software Testing problems and explore various tools and solutions that can solve those problems. On a personal front, I enjoy reading books during my leisure time.