Writing end to end test for microservices based architecture used for posting skype messages

Recently, I have attempted to test microservices based architecture where the AWS Lambda function consumes messages from a microservices endpoint and posts them to the skype channel. I have made an attempt to add most of the validations for the flow. I have included this end-to-end test in the below blog.


Background:


The above architecture explains, microservices based architecture which posts messages on the Skype channel. This comprises of following components:

1. A microservice for messages
2. CloudWatch event triggers
3. qxf2-employee-messages Lambda
4. qxf2-bot-sender Lambda to call an endpoint to send Skype messages
5. A skype-sender SQS
6. A microservice to send skype messages

So the end-to-end flow goes this way, CloudWatch event triggers qxf2-employee-messages lambda which consumes messages microservice and then sends the message to the Skype-sender SQS queue. Then another lambda qxf2-bot-sender used to call microservice to send skype message on the channel.


Test Approach:

As there are not any testing frameworks available to test messaging architecture pipeline. We derived the following test strategy to cover end to end test. I know this approach can be modified to make it more robust, by adding more verifications such as logging into the skype channel and verify the message, tracing failures across the pipelines, etc. However, using the below approach, we have attempted to test the entire messaging pipeline.

We have divided the entire end to end flow into 3 steps:

1. Trigger, qxf2-employee-messages lambda: The lambda consumes messages microservice and then sends the message to the Skype-sender SQS queue.

2. Get the message from CloudWatch logs: Log record pointer used to fetch this message. As soon as the message is triggered, the CloudWatch log captures the message. Hence, I am fetching the same message from CloudWatch logs.

3. Validate the message retrieved from CloudWatch logs with the message from skype-listener-queue and source text file: From skype-listener SQS queue qxf2-bot-sender lambda picks up the message so this is the endpoint of the pipeline. Source text file, i.e. comments.txt file, is the starting point of the pipeline from which randomly messages picked up by daily-messages-microservices. The lambda, i.e. qxf2-daily-messages lambada consumes the same message when triggered. So eventually, we are validating the start and endpoint of the flow. (For more information, please refer above diagram)

The test case is organized into helper functions, configuration files, and main test file. Pytest framework is used for running test case. Helper functions are organized based on the AWS Services such as Lambda or module names.


1. Trigger, qxf2-employee-messages lambda:

Method trigger_cron_lambda will trigger qxf2-employee-message lambda. Lambada helper file contents have been shown as below.

"""
Lambda helper
"""
import os
import sys
import logging
import boto3
import tests.conf.lambda_configuration_conf as lambda_conf
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
def trigger_cron_lambda(lambda_name: str):
    """
    :return: The AWS response.
    Except a response
    """
    _logger = logging.getLogger(__name__)
    _logger.setLevel(logging.DEBUG)
    client = boto3.client('lambda')
    Event = {}
    response = client.invoke(FunctionName=lambda_name,InvocationType='Event',\
        LogType='None',Payload=b'{"endpoint": "/message","channel": "test"}')
 
    return response
2.Get the message from Cloudwatch logs:

2.a. Get log record pointer value:

We have fetched log record pointer value, using method get_log_ptr. We have stored, log group and query details are stored in the configuration file.

2.b. Retrieve message using log record pointer:

After fetching the log record pointer, we have used the get_message method to get the message from the CloudWatch log record.

Both these methods and other methods used in these methods are located in the CloudWatch helper.

"""
Helper module for CloudWatch log
"""
 
import os
import sys
import ast
import collections
from datetime import datetime, timedelta
import time
import boto3
import conf.cloudwatch_configuration_conf as cloudwatch_conf
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
def get_data_structure(data):
    """
    Method used for converting nested dictionary/list to data similar to tabular form
    """
    obj = collections.OrderedDict()
    def recurse(dataobject,parent_key=""):
        """
        Method will recurse through object
        """
        if isinstance(dataobject,list):
            # loop through list and call recurse()
            for i in range(len(dataobject)):
                recurse(dataobject[i],parent_key + "_" + str(i) if parent_key else str(i))
        elif isinstance(dataobject,dict):
            # loop through dictionary and call recurse()
            for key,value in dataobject.items():
                recurse(value,parent_key + "_" + key if parent_key else key)
        else:
            # use the parent_key and store the value to obj
            obj[parent_key] = dataobject
 
    recurse(data)
 
    return obj
 
def get_response_log_daily_messages(request_id,log_group,query):
    """
    getting response from daily message lambda
    """
    client = boto3.client('logs')
    start_query_response = client.start_query(logGroupName=log_group,\
        startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\
            endTime=int(datetime.now().timestamp()),queryString=query)
    query_id = start_query_response['queryId']
    response = None
    while response is None:
        time.sleep(1)
        response = client.get_query_results(queryId=query_id)
 
    return response.get('results')
 
def get_response_log_skype_sender(log_group,query):
    """
    getting log from skype_sender
    """
    client = boto3.client('logs')
    start_query_response = client.start_query(logGroupName=log_group,\
        startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\
            endTime=int(datetime.now().timestamp()),queryString=query)
    query_id = start_query_response['queryId']
    response = None
    while response is None:
        time.sleep(1)
        response = client.get_query_results(queryId=query_id)
 
    return response
 
def get_ptr_value(log_group,query):
    """
    getting ptr_value from response
    """
    client = boto3.client('logs')
    start_query_response = client.start_query(logGroupName=log_group,\
        startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\
            endTime=int(datetime.now().timestamp()),queryString=query)
    query_id = start_query_response['queryId']
    response = None
    ptr_value = None
    while response is None:
        time.sleep(1)
        response = client.get_query_results(queryId=query_id)
        response_dict = get_data_structure(response)
        if cloudwatch_conf.ptr_value in response_dict.keys():
            ptr_value = response_dict[cloudwatch_conf.ptr_value]
        else:
            print(f'log pointer key could not be fetched from response dictionary.')
 
    return ptr_value
 
def get_message_id(ptr_value):
    """
    To get message id
    """
    client = boto3.client('logs')
    response = client.get_log_record(logRecordPointer=ptr_value)
    response_dict = get_data_structure(response)
    request_id = response_dict[cloudwatch_conf.record_messageid]
 
    return request_id
 
def get_full_message(ptr_value):
    """
    To get full message
    """
    client = boto3.client('logs')
    response = client.get_log_record(logRecordPointer=ptr_value)
    response_dict = get_data_structure(response)
 
    return response_dict
 
def get_message(ptr_value):
    """
    To get message
    """
    client = boto3.client('logs')
    response = client.get_log_record(logRecordPointer=ptr_value)
    response_dict = get_data_structure(response)
    message = response_dict[cloudwatch_conf.record_body]
    message_dict = ast.literal_eval(message)
 
    return message_dict['msg']

3. Validate the message retrieved from CloudWatch logs with the message from skype-listener-queue and source text file:

3a. Poll the skype-listener queue to fetch the message:

Polling the SQS queue is an asynchronous process, we have created asyncio helper to poll the SQS queue. Method poll_message used for polling the SQS queue. This method takes messages from CloudWatch logs as input. While polling we are achieving two validations(3a.1 and 3a.2 sections mentioned below) and exiting when the test passes/fails

"""
Helper module for asyncio methods
"""
import sys
import asyncio
import logging
import tests.helpers.filter_message_helper
import tests.helpers.sqs_helper
import tests.conf.sqs_utilities_conf as queue_url_conf
 
# Declaring class Style
class Style():
    """
    Declaring Style class
    """
    BLACK = '\033[30m'
    RED = '\033[31m'
    GREEN = '\033[32m'
    YELLOW = '\033[33m'
    BLUE = '\033[34m'
    MAGENTA = '\033[35m'
    CYAN = '\033[36m'
    WHITE = '\033[37m'
    UNDERLINE = '\033[4m'
    RESET = '\033[0m'
 
async def validate_message_with_sqs(queue_url, message_cloudwatch):
    """
    Validates message from sqs queue with cloudwatch logs
    :param queue_url: URL of the SQS queue
    :message_cloudwatch: Message received from cloudwatch logs
    """
    _logger = logging.getLogger(__name__)
    _logger.setLevel(logging.DEBUG)
    message = tests.helpers.sqs_helper.get_message_from_queue(queue_url)
    result_flag = tests.helpers.filter_message_helper.publish_compare_result\
        (message,message_cloudwatch)
    if result_flag is True:
        sys.exit()
 
    return result_flag
 
async def poll_message(message_cloudwatch):
    """
    Schedule calls concurrently
    """
    while True:
        tasks = []
        for every_queue_url in queue_url_conf.QUEUE_URL_LIST:
            tasks.append(validate_message_with_sqs(every_queue_url,message_cloudwatch))
        result = await asyncio.gather(*tasks)
"""
Helper module for sqs messages
"""
import os
import sys
import boto3
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
def get_sqs_client():
    """
    Return sqs_client object
    :param none
    :return sqs_client
    """
    sqs_client = boto3.client('sqs')
 
    return sqs_client
 
def get_sqs_queue(queue_url):
    """
    Return queue object from queue_url
    :param queue_url
    :return queue
    """
    queue = boto3.resource('sqs').get_queue_by_name(QueueName=queue_url)
 
    return queue
 
def get_message_from_queue(queue_url):
    """
    get messsage from queue_url
    """
    sqs_client = get_sqs_client()
    queue = get_sqs_queue(queue_url)
    message = sqs_client.receive_message(QueueUrl=queue.url)
 
    return message
 
def purge_sqs_queue(queue_url):
    """
    Reteun status
    """
    queue = get_sqs_queue(queue_url)
    client = get_sqs_client()
    client.purge_queue(QueueUrl=queue.url)

While polling message, we are validating following:

3a.1. Message from SQS queue matches with message from CloudWatch logs:

Method validate_message_with_cloudwatch_logs from filter message helper used for this validation.

def compare_message_cloudwatch_log(message_on_channel, message_cloudwatch):
    """
    compare message with cloudwatch log message
    """
    result_flag = False
    if message_on_channel == message_cloudwatch:
        result_flag = True
    else:
        result_flag = False
 
    return result_flag
 
def validate_message_with_cloudwatch_logs(message_on_channel,message_cloudwatch):
    """
    Asserting method on channels with cloudwatch logs
    """
    result_flag = False
    if message_on_channel is not None:
        result_flag = compare_message_cloudwatch_log(message_on_channel,message_cloudwatch)
        if result_flag is True:
            print(Style.CYAN + '---------------\
                ------------------------------------------------------------')
            print(Style.CYAN + 'Step 3a. Validating \
                message with Skype listener SQS Queue-------------------')
            print(Style.GREEN + 'Message on channel \
                does match with the message from cloudwatch logs')
            print(Style.CYAN + '----------------------\
                -----------------------------------------------------')
            result_flag = validate_message_with_culture_file(message_on_channel)
        else:
            print(Style.CYAN + '-------------\
                --------------------------------------------------------------')
            print(Style.RED + 'Message on channel does not match with \
                the message from cloudwatch logs')
            print(Style.CYAN + '---------------------\
                ------------------------------------------------------')
    else:
            print(Style.CYAN + '---------------------------\
                ------------------------------------------------')
            print("No message on channel")
            print(Style.CYAN + '------------------------------\
                ---------------------------------------------')
 
    return result_flag

Note that, for filtering message from the message queue, we have used the sender user id and channel chat id as filter criteria’s in the filter_message from the message queue helper.

 
def get_dict(body_string):
    """
    Generates dict from message body
    :param string
    :return dict object
    """
    body_string = json.dumps(body_string)
    body_string = body_string.replace("'", "\"")
    body_string = json.loads(body_string)
    message_body_obj = json.loads(body_string)
 
    return message_body_obj
 
def get_message_body(message):
    """
    This method will return message body
    """
    msg_body = ""
    if 'Messages' in message:
        for message in message['Messages']:
            if 'Body' in message.keys():
                message_body_obj = get_dict(message['Body'])
                if 'Message' in message_body_obj.keys():
                    msg_body = get_dict(message_body_obj['Message'])
                else:
                    print("Message key is not present in the Message Body")
                    sys.exit()
            else:
                print("Message does not contain Body")
                sys.exit()
 
    else:
        print("No messages are retrieved")
        with pytest.raises(SystemExit):
            sys.exit()
 
    return msg_body
 
def filter_message(message,chat_id,user_id):
    """
    Filter method based on chat_id and user_id
    return: Boolean value
    """
    message_body = get_message_body(message)
    if message_body is not None:
        if "chat_id" in message_body and "user_id" in message_body:
            if message_body['chat_id']==chat_id and message_body['user_id']==user_id:
                print(f'message is from test channel and  sender is skype sender lambda')
            else:
                print(f'Neither message is from test channel nor sender is skype sender lambda')
        else:
            print(f'Message does not contain required keys')
    else:
        print(f'Message body is not none')
 
    return True

3a.2. After message from CloudWatch log and SQS queue matches, compare that with source text file:

Method validate_message_with_culture_file gives comparison results between source text file and CloudWatch log message.

 
def validate_message_with_culture_file(message_on_channel):
    """
    Asserting message on channel with culture file
    """
    result_flag = False
    if message_on_channel is not None:
        result_flag = compare_message_with_file(message_on_channel,CULTURE_FILE)
        if result_flag is True:
            print(Style.CYAN + '---------------------------\
                ------------------------------------------------')
            print(Style.CYAN + 'Step 3b. Validating \
                message with culture file------------------------------')
            print(Style.GREEN + 'Message \
                on channel does match with culture file')
            print(Style.CYAN + '-----------\
                ----------------------------------------------------------------')
        else:
            print(Style.CYAN + '------------\
                ---------------------------------------------------------------')
            print(Style.GREEN + 'Message \
                on channel does match with culture file')
            print(Style.CYAN + '---------\
                ------------------------------------------------------------------')
    else:
        print(Style.CYAN + 'There is no message on channel')
 
    return result_flag

The entire filter message helper code is available here:


4. Putting it all together in test:
"""
This End to end test employee skype message covers the following:
Setup- Purging SQS queue
Step 1: Trigger employee message lambda
Step 2: Print message from cloudwatch logs
Step 3: Verify message with skype-listener sqs queue and culture file
"""
import os
import sys
import time
import asyncio
import logging
import unittest
import pytest
from pythonjsonlogger import jsonlogger
import tests.helpers.asyncio_helper
import tests.helpers.cloudwatch_helper
import tests.helpers.filter_message_helper
import tests.helpers.lambda_helper
import tests.conf.cloudwatch_configuration_conf as cloudwatch_conf
import tests.conf.sqs_utilities_conf as queue_url_conf
import tests.conf.lambda_configuration_conf as lambda_conf
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
# logging
log_handler = logging.StreamHandler()
log_handler.setFormatter(jsonlogger.JsonFormatter())
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)
 
# Declaring class for test object
class Skypemessagetest():
    """
    Class for test object
    """
    logger = logging.getLogger(__name__)
    def __init__(self):
        """
        Initilalise class
        """
    def get_request_id(self):
        """
        get the response from lambda
        """
        request_id = tests.helpers.lambda_helper.get_request_id_from_lambda_response()
 
        return request_id
 
    def get_message_from_cloudwatch_log_ptr(self):
        """
        Method to get message from cloudwatch log pointer
        """
        message = None
        for i in range(1, 6):
            ptr_value = tests.helpers.cloudwatch_helper.get_ptr_value\
                (cloudwatch_conf.log_group_bot_sender,cloudwatch_conf.query_skype_sender)
            if ptr_value:
                message = tests.helpers.cloudwatch_helper.get_message(ptr_value)
                break
            time.sleep(60)
 
        return message
 
    def clear_queues(self):
        """
        Method to clear queues
        """
        for every_queue_url in queue_url_conf.QUEUE_URL_LIST:
            tests.helpers.sqs_helper.purge_sqs_queue(every_queue_url)
            time.sleep(1)
 
class TestEndtoEndSkypeMessage(unittest.TestCase):
    """
    Test class
    """
    def test_end_to_end_skype_message(self):
        """
        Test case
        """
        Skypemessagetest_obj = Skypemessagetest()
        logger.info("Setup- Purge SQS queue")
        logger.info("---------------------------------------------------------------------------")
        Skypemessagetest_obj.clear_queues()
        logger.info("Step 1: Trigger employee message lambda--------------------------------")
        tests.helpers.lambda_helper.trigger_cron_lambda(lambda_conf.daily_message_lambda)
        logger.info("---------------------------------------------------------------------------")
        logger.info("Step 2: Print message from cloudwatch logs------------------------------")
        message = Skypemessagetest_obj.get_message_from_cloudwatch_log_ptr()
        logger.info("---------------------------------------------------------------------------")
        logger.info(message)
        logger.info("-------------------------------------------------------------------------- ")
        logger.info("Step 3: Verify message with skype-listener sqs queue and culture file----")
        with pytest.raises(SystemExit) as system_exception:
            asyncio.run(tests.helpers.asyncio_helper.poll_message(message))
        assert system_exception.type == SystemExit
        logger.info("-------------------------------------------------------------------------- ")

This is how output looks on bash console:

Note that the same message can be seen on the skype channel. This validation, I have done manually. I will enhance the test to include this validation in the test flow.

I hope this blog would help any engineer, who would want to build an end-to-end test based on messaging architecture. The entire code is available at here.


Leave a Reply

Your email address will not be published. Required fields are marked *