{"id":15049,"date":"2021-05-05T11:52:21","date_gmt":"2021-05-05T15:52:21","guid":{"rendered":"https:\/\/qxf2.com\/blog\/?p=15049"},"modified":"2021-05-05T11:52:21","modified_gmt":"2021-05-05T15:52:21","slug":"end-to-end-test-for-microservices-based-architecture-pytest","status":"publish","type":"post","link":"https:\/\/qxf2.com\/blog\/end-to-end-test-for-microservices-based-architecture-pytest\/","title":{"rendered":"Writing end to end test for microservices based architecture used for posting skype messages"},"content":{"rendered":"<p><span style=\"font-weight: 400;\">Recently, I have attempted to test microservices based architecture where the AWS Lambda function consumes messages from a microservices endpoint and posts them to the skype channel. I have made an attempt to add most of the validations for the flow. I have included this end-to-end test in the below blog.<\/p>\n<hr>\n<h3>Background:<\/h3>\n<p><a href=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2021\/03\/messaging_architecture.png\" data-rel=\"lightbox-image-0\" data-rl_title=\"\" data-rl_caption=\"\" title=\"\"><img decoding=\"async\" class=\"aligncenter\" src=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2021\/03\/messaging_architecture.png\" alt=\"\"><br \/>\n<\/a><\/p>\n<p>The above architecture explains, microservices based architecture which posts messages on the Skype channel. This comprises of following components:<\/p>\n<p>1. A microservice for messages<br \/>\n2. <a href=\"https:\/\/docs.aws.amazon.com\/AmazonCloudWatch\/latest\/monitoring\/WhatIsCloudWatch.html\">CloudWatch<\/a> event triggers<br \/>\n3. qxf2-employee-messages <a href=\"https:\/\/aws.amazon.com\/lambda\/\">Lambda<\/a><br \/>\n4. qxf2-bot-sender <a href=\"https:\/\/aws.amazon.com\/lambda\/\">Lambda<\/a> to call an endpoint to send Skype messages<br \/>\n5. A skype-sender <a href=\"https:\/\/aws.amazon.com\/sqs\/\">SQS<\/a><br \/>\n6. A microservice to send skype messages<\/p>\n<p>So the end-to-end flow goes this way, CloudWatch event triggers qxf2-employee-messages lambda which consumes messages microservice and then sends the message to the Skype-sender SQS queue. Then another lambda qxf2-bot-sender used to call microservice to send skype message on the channel.<\/p>\n<hr>\n<h3>Test Approach:<\/h3>\n<p>As there are not any testing frameworks available to test messaging architecture pipeline. We derived the following test strategy to cover end to end test. I know this approach can be modified to make it more robust, by adding more verifications such as logging into the skype channel and verify the message, tracing failures across the pipelines, etc. However, using the below approach, we have attempted to test the entire messaging pipeline.<\/p>\n<p>We have divided the entire end to end flow into 3 steps:<\/p>\n<p>1. Trigger, qxf2-employee-messages lambda: The lambda consumes messages microservice and then sends the message to the Skype-sender SQS queue.<\/p>\n<p>2. Get the message from CloudWatch logs: Log record pointer used to fetch this message. As soon as the message is triggered, the CloudWatch log captures the message. Hence, I am fetching the same message from CloudWatch logs.<\/p>\n<p>3. Validate the message retrieved from CloudWatch logs with the message from skype-listener-queue and source text file: From skype-listener SQS queue qxf2-bot-sender lambda picks up the message so this is the endpoint of the pipeline. Source text file, i.e. comments.txt file, is the starting point of the pipeline from which randomly messages picked up by daily-messages-microservices. The lambda, i.e. qxf2-daily-messages lambada consumes the same message when triggered. So eventually, we are validating the start and endpoint of the flow. (For more information, please refer above diagram)<\/p>\n<p>The test case is organized into helper functions, configuration files, and main test file. <a href=\"https:\/\/docs.pytest.org\/en\/stable\/index.html\">Pytest<\/a> framework is used for running test case. Helper functions are organized based on the AWS Services such as Lambda or module names.<\/p>\n<hr>\n<h5>1. Trigger, qxf2-employee-messages lambda: <\/h5>\n<p>Method <code>trigger_cron_lambda<\/code> will trigger qxf2-employee-message lambda. Lambada helper file contents have been shown as below.<\/p>\n<pre lang=\"python\">\r\n\"\"\"\r\nLambda helper\r\n\"\"\"\r\nimport os\r\nimport sys\r\nimport logging\r\nimport boto3\r\nimport tests.conf.lambda_configuration_conf as lambda_conf\r\nsys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))\r\n\r\ndef trigger_cron_lambda(lambda_name: str):\r\n    \"\"\"\r\n    :return: The AWS response.\r\n    Except a response\r\n    \"\"\"\r\n    _logger = logging.getLogger(__name__)\r\n    _logger.setLevel(logging.DEBUG)\r\n    client = boto3.client('lambda')\r\n    Event = {}\r\n    response = client.invoke(FunctionName=lambda_name,InvocationType='Event',\\\r\n        LogType='None',Payload=b'{\"endpoint\": \"\/message\",\"channel\": \"test\"}')\r\n\r\n    return response\r\n<\/pre>\n<h5>2.Get the message from Cloudwatch logs:<\/h5>\n<p><strong><h7>2.a. Get log record pointer value:<\/h7><\/strong><\/p>\n<p>We have fetched log record pointer value, using method <code>get_log_ptr<\/code>. We have stored, log group and query details are stored in the <a href=\"https:\/\/github.com\/qxf2\/qxf2-lambdas\/blob\/master\/tests\/conf\/cloudwatch_configuration_conf.py\">configuration<\/a> file.<\/p>\n<p><strong><h7>2.b. Retrieve message using log record pointer:<\/h7><\/strong><\/p>\n<p>After fetching the log record pointer, we have used the <code>get_message<\/code> method to get the message from the CloudWatch log record.<\/p>\n<p>Both these methods and other methods used in these methods are located in the CloudWatch helper.<\/p>\n<pre lang=\"python\">\r\n\"\"\"\r\nHelper module for CloudWatch log\r\n\"\"\"\r\n\r\nimport os\r\nimport sys\r\nimport ast\r\nimport collections\r\nfrom datetime import datetime, timedelta\r\nimport time\r\nimport boto3\r\nimport conf.cloudwatch_configuration_conf as cloudwatch_conf\r\nsys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))\r\n\r\ndef get_data_structure(data):\r\n    \"\"\"\r\n    Method used for converting nested dictionary\/list to data similar to tabular form\r\n    \"\"\"\r\n    obj = collections.OrderedDict()\r\n    def recurse(dataobject,parent_key=\"\"):\r\n        \"\"\"\r\n        Method will recurse through object\r\n        \"\"\"\r\n        if isinstance(dataobject,list):\r\n            # loop through list and call recurse()\r\n            for i in range(len(dataobject)):\r\n                recurse(dataobject[i],parent_key + \"_\" + str(i) if parent_key else str(i))\r\n        elif isinstance(dataobject,dict):\r\n            # loop through dictionary and call recurse()\r\n            for key,value in dataobject.items():\r\n                recurse(value,parent_key + \"_\" + key if parent_key else key)\r\n        else:\r\n            # use the parent_key and store the value to obj\r\n            obj[parent_key] = dataobject\r\n\r\n    recurse(data)\r\n\r\n    return obj\r\n\r\ndef get_response_log_daily_messages(request_id,log_group,query):\r\n    \"\"\"\r\n    getting response from daily message lambda\r\n    \"\"\"\r\n    client = boto3.client('logs')\r\n    start_query_response = client.start_query(logGroupName=log_group,\\\r\n        startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\\\r\n            endTime=int(datetime.now().timestamp()),queryString=query)\r\n    query_id = start_query_response['queryId']\r\n    response = None\r\n    while response is None:\r\n        time.sleep(1)\r\n        response = client.get_query_results(queryId=query_id)\r\n\r\n    return response.get('results')\r\n\r\ndef get_response_log_skype_sender(log_group,query):\r\n    \"\"\"\r\n    getting log from skype_sender\r\n    \"\"\"\r\n    client = boto3.client('logs')\r\n    start_query_response = client.start_query(logGroupName=log_group,\\\r\n        startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\\\r\n            endTime=int(datetime.now().timestamp()),queryString=query)\r\n    query_id = start_query_response['queryId']\r\n    response = None\r\n    while response is None:\r\n        time.sleep(1)\r\n        response = client.get_query_results(queryId=query_id)\r\n\r\n    return response\r\n\r\ndef get_ptr_value(log_group,query):\r\n    \"\"\"\r\n    getting ptr_value from response\r\n    \"\"\"\r\n    client = boto3.client('logs')\r\n    start_query_response = client.start_query(logGroupName=log_group,\\\r\n        startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\\\r\n            endTime=int(datetime.now().timestamp()),queryString=query)\r\n    query_id = start_query_response['queryId']\r\n    response = None\r\n    ptr_value = None\r\n    while response is None:\r\n        time.sleep(1)\r\n        response = client.get_query_results(queryId=query_id)\r\n        response_dict = get_data_structure(response)\r\n        if cloudwatch_conf.ptr_value in response_dict.keys():\r\n            ptr_value = response_dict[cloudwatch_conf.ptr_value]\r\n        else:\r\n            print(f'log pointer key could not be fetched from response dictionary.')\r\n\r\n    return ptr_value\r\n\r\ndef get_message_id(ptr_value):\r\n    \"\"\"\r\n    To get message id\r\n    \"\"\"\r\n    client = boto3.client('logs')\r\n    response = client.get_log_record(logRecordPointer=ptr_value)\r\n    response_dict = get_data_structure(response)\r\n    request_id = response_dict[cloudwatch_conf.record_messageid]\r\n\r\n    return request_id\r\n\r\ndef get_full_message(ptr_value):\r\n    \"\"\"\r\n    To get full message\r\n    \"\"\"\r\n    client = boto3.client('logs')\r\n    response = client.get_log_record(logRecordPointer=ptr_value)\r\n    response_dict = get_data_structure(response)\r\n\r\n    return response_dict\r\n\r\ndef get_message(ptr_value):\r\n    \"\"\"\r\n    To get message\r\n    \"\"\"\r\n    client = boto3.client('logs')\r\n    response = client.get_log_record(logRecordPointer=ptr_value)\r\n    response_dict = get_data_structure(response)\r\n    message = response_dict[cloudwatch_conf.record_body]\r\n    message_dict = ast.literal_eval(message)\r\n\r\n    return message_dict['msg']\r\n\r\n<\/pre>\n<hr>\n<h5>3. Validate the message retrieved from CloudWatch logs with the message from skype-listener-queue and source text file:<\/h5>\n<p><strong><h7>3a. Poll the skype-listener queue to fetch the message:<h7><\/strong><\/p>\n<p>Polling the SQS queue is an asynchronous process, we have created asyncio helper to poll the SQS queue. Method <code>poll_message<\/code> used for polling the SQS queue. This method takes messages from CloudWatch logs as input. While polling we are achieving two validations(3a.1 and 3a.2 sections mentioned below) and exiting when the test passes\/fails<\/p>\n<pre lang=\"python\">\r\n\"\"\"\r\nHelper module for asyncio methods\r\n\"\"\"\r\nimport sys\r\nimport asyncio\r\nimport logging\r\nimport tests.helpers.filter_message_helper\r\nimport tests.helpers.sqs_helper\r\nimport tests.conf.sqs_utilities_conf as queue_url_conf\r\n\r\n# Declaring class Style\r\nclass Style():\r\n    \"\"\"\r\n    Declaring Style class\r\n    \"\"\"\r\n    BLACK = '\\033[30m'\r\n    RED = '\\033[31m'\r\n    GREEN = '\\033[32m'\r\n    YELLOW = '\\033[33m'\r\n    BLUE = '\\033[34m'\r\n    MAGENTA = '\\033[35m'\r\n    CYAN = '\\033[36m'\r\n    WHITE = '\\033[37m'\r\n    UNDERLINE = '\\033[4m'\r\n    RESET = '\\033[0m'\r\n\r\nasync def validate_message_with_sqs(queue_url, message_cloudwatch):\r\n    \"\"\"\r\n    Validates message from sqs queue with cloudwatch logs\r\n    :param queue_url: URL of the SQS queue\r\n    :message_cloudwatch: Message received from cloudwatch logs\r\n    \"\"\"\r\n    _logger = logging.getLogger(__name__)\r\n    _logger.setLevel(logging.DEBUG)\r\n    message = tests.helpers.sqs_helper.get_message_from_queue(queue_url)\r\n    result_flag = tests.helpers.filter_message_helper.publish_compare_result\\\r\n        (message,message_cloudwatch)\r\n    if result_flag is True:\r\n        sys.exit()\r\n\r\n    return result_flag\r\n\r\nasync def poll_message(message_cloudwatch):\r\n    \"\"\"\r\n    Schedule calls concurrently\r\n    \"\"\"\r\n    while True:\r\n        tasks = []\r\n        for every_queue_url in queue_url_conf.QUEUE_URL_LIST:\r\n            tasks.append(validate_message_with_sqs(every_queue_url,message_cloudwatch))\r\n        result = await asyncio.gather(*tasks)\r\n<\/pre>\n<pre lang=\"python\">\r\n\"\"\"\r\nHelper module for sqs messages\r\n\"\"\"\r\nimport os\r\nimport sys\r\nimport boto3\r\nsys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))\r\n\r\ndef get_sqs_client():\r\n    \"\"\"\r\n    Return sqs_client object\r\n    :param none\r\n    :return sqs_client\r\n    \"\"\"\r\n    sqs_client = boto3.client('sqs')\r\n\r\n    return sqs_client\r\n\r\ndef get_sqs_queue(queue_url):\r\n    \"\"\"\r\n    Return queue object from queue_url\r\n    :param queue_url\r\n    :return queue\r\n    \"\"\"\r\n    queue = boto3.resource('sqs').get_queue_by_name(QueueName=queue_url)\r\n\r\n    return queue\r\n\r\ndef get_message_from_queue(queue_url):\r\n    \"\"\"\r\n    get messsage from queue_url\r\n    \"\"\"\r\n    sqs_client = get_sqs_client()\r\n    queue = get_sqs_queue(queue_url)\r\n    message = sqs_client.receive_message(QueueUrl=queue.url)\r\n\r\n    return message\r\n\r\ndef purge_sqs_queue(queue_url):\r\n    \"\"\"\r\n    Reteun status\r\n    \"\"\"\r\n    queue = get_sqs_queue(queue_url)\r\n    client = get_sqs_client()\r\n    client.purge_queue(QueueUrl=queue.url)\r\n\r\n<\/pre>\n<p>While polling message, we are validating following:<\/p>\n<p><strong><h11>3a.1. Message from SQS queue matches with message from CloudWatch logs:<h11><\/strong><\/p>\n<p>Method <code>validate_message_with_cloudwatch_logs<\/code> from filter message helper used for this validation.<\/p>\n<pre lang=\"python\">\r\ndef compare_message_cloudwatch_log(message_on_channel, message_cloudwatch):\r\n    \"\"\"\r\n    compare message with cloudwatch log message\r\n    \"\"\"\r\n    result_flag = False\r\n    if message_on_channel == message_cloudwatch:\r\n        result_flag = True\r\n    else:\r\n        result_flag = False\r\n\r\n    return result_flag\r\n\r\ndef validate_message_with_cloudwatch_logs(message_on_channel,message_cloudwatch):\r\n    \"\"\"\r\n    Asserting method on channels with cloudwatch logs\r\n    \"\"\"\r\n    result_flag = False\r\n    if message_on_channel is not None:\r\n        result_flag = compare_message_cloudwatch_log(message_on_channel,message_cloudwatch)\r\n        if result_flag is True:\r\n            print(Style.CYAN + '---------------\\\r\n                ------------------------------------------------------------')\r\n            print(Style.CYAN + 'Step 3a. Validating \\\r\n                message with Skype listener SQS Queue-------------------')\r\n            print(Style.GREEN + 'Message on channel \\\r\n                does match with the message from cloudwatch logs')\r\n            print(Style.CYAN + '----------------------\\\r\n                -----------------------------------------------------')\r\n            result_flag = validate_message_with_culture_file(message_on_channel)\r\n        else:\r\n            print(Style.CYAN + '-------------\\\r\n                --------------------------------------------------------------')\r\n            print(Style.RED + 'Message on channel does not match with \\\r\n                the message from cloudwatch logs')\r\n            print(Style.CYAN + '---------------------\\\r\n                ------------------------------------------------------')\r\n    else:\r\n            print(Style.CYAN + '---------------------------\\\r\n                ------------------------------------------------')\r\n            print(\"No message on channel\")\r\n            print(Style.CYAN + '------------------------------\\\r\n                ---------------------------------------------')\r\n\r\n    return result_flag\r\n\r\n<\/pre>\n<p>Note that, for filtering message from the message queue, we have used the sender user id and channel chat id as filter criteria&#8217;s in the <code>filter_message<\/code> from the message queue helper. <\/p>\n<pre lang=\"python\">\r\n\r\ndef get_dict(body_string):\r\n    \"\"\"\r\n    Generates dict from message body\r\n    :param string\r\n    :return dict object\r\n    \"\"\"\r\n    body_string = json.dumps(body_string)\r\n    body_string = body_string.replace(\"'\", \"\\\"\")\r\n    body_string = json.loads(body_string)\r\n    message_body_obj = json.loads(body_string)\r\n\r\n    return message_body_obj\r\n\r\ndef get_message_body(message):\r\n    \"\"\"\r\n    This method will return message body\r\n    \"\"\"\r\n    msg_body = \"\"\r\n    if 'Messages' in message:\r\n        for message in message['Messages']:\r\n            if 'Body' in message.keys():\r\n                message_body_obj = get_dict(message['Body'])\r\n                if 'Message' in message_body_obj.keys():\r\n                    msg_body = get_dict(message_body_obj['Message'])\r\n                else:\r\n                    print(\"Message key is not present in the Message Body\")\r\n                    sys.exit()\r\n            else:\r\n                print(\"Message does not contain Body\")\r\n                sys.exit()\r\n\r\n    else:\r\n        print(\"No messages are retrieved\")\r\n        with pytest.raises(SystemExit):\r\n            sys.exit()\r\n\r\n    return msg_body\r\n\r\ndef filter_message(message,chat_id,user_id):\r\n    \"\"\"\r\n    Filter method based on chat_id and user_id\r\n    return: Boolean value\r\n    \"\"\"\r\n    message_body = get_message_body(message)\r\n    if message_body is not None:\r\n        if \"chat_id\" in message_body and \"user_id\" in message_body:\r\n            if message_body['chat_id']==chat_id and message_body['user_id']==user_id:\r\n                print(f'message is from test channel and  sender is skype sender lambda')\r\n            else:\r\n                print(f'Neither message is from test channel nor sender is skype sender lambda')\r\n        else:\r\n            print(f'Message does not contain required keys')\r\n    else:\r\n        print(f'Message body is not none')\r\n\r\n    return True\r\n\r\n<\/pre>\n<p><strong><h11>3a.2. After message from CloudWatch log and SQS queue matches, compare that with source text file:<h11><\/strong><\/p>\n<p>Method <code>validate_message_with_culture_file<\/code> gives comparison results between source text file and CloudWatch log message.<\/p>\n<pre lang=\"python\">\r\n\r\ndef validate_message_with_culture_file(message_on_channel):\r\n    \"\"\"\r\n    Asserting message on channel with culture file\r\n    \"\"\"\r\n    result_flag = False\r\n    if message_on_channel is not None:\r\n        result_flag = compare_message_with_file(message_on_channel,CULTURE_FILE)\r\n        if result_flag is True:\r\n            print(Style.CYAN + '---------------------------\\\r\n                ------------------------------------------------')\r\n            print(Style.CYAN + 'Step 3b. Validating \\\r\n                message with culture file------------------------------')\r\n            print(Style.GREEN + 'Message \\\r\n                on channel does match with culture file')\r\n            print(Style.CYAN + '-----------\\\r\n                ----------------------------------------------------------------')\r\n        else:\r\n            print(Style.CYAN + '------------\\\r\n                ---------------------------------------------------------------')\r\n            print(Style.GREEN + 'Message \\\r\n                on channel does match with culture file')\r\n            print(Style.CYAN + '---------\\\r\n                ------------------------------------------------------------------')\r\n    else:\r\n        print(Style.CYAN + 'There is no message on channel')\r\n\r\n    return result_flag\r\n\r\n<\/pre>\n<p>The entire filter message helper code is available <a href=\"https:\/\/github.com\/qxf2\/qxf2-lambdas\/blob\/master\/tests\/helpers\/filter_message_helper.py\">here:<\/a><\/p>\n<hr>\n<h5>4. Putting it all together in test:<\/h5>\n<pre lang=\"python\">\r\n\"\"\"\r\nThis End to end test employee skype message covers the following:\r\nSetup- Purging SQS queue\r\nStep 1: Trigger employee message lambda\r\nStep 2: Print message from cloudwatch logs\r\nStep 3: Verify message with skype-listener sqs queue and culture file\r\n\"\"\"\r\nimport os\r\nimport sys\r\nimport time\r\nimport asyncio\r\nimport logging\r\nimport unittest\r\nimport pytest\r\nfrom pythonjsonlogger import jsonlogger\r\nimport tests.helpers.asyncio_helper\r\nimport tests.helpers.cloudwatch_helper\r\nimport tests.helpers.filter_message_helper\r\nimport tests.helpers.lambda_helper\r\nimport tests.conf.cloudwatch_configuration_conf as cloudwatch_conf\r\nimport tests.conf.sqs_utilities_conf as queue_url_conf\r\nimport tests.conf.lambda_configuration_conf as lambda_conf\r\nsys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))\r\n\r\n# logging\r\nlog_handler = logging.StreamHandler()\r\nlog_handler.setFormatter(jsonlogger.JsonFormatter())\r\nlogger = logging.getLogger()\r\nlogger.setLevel(logging.INFO)\r\nlogger.addHandler(log_handler)\r\n\r\n# Declaring class for test object\r\nclass Skypemessagetest():\r\n    \"\"\"\r\n    Class for test object\r\n    \"\"\"\r\n    logger = logging.getLogger(__name__)\r\n    def __init__(self):\r\n        \"\"\"\r\n        Initilalise class\r\n        \"\"\"\r\n    def get_request_id(self):\r\n        \"\"\"\r\n        get the response from lambda\r\n        \"\"\"\r\n        request_id = tests.helpers.lambda_helper.get_request_id_from_lambda_response()\r\n\r\n        return request_id\r\n\r\n    def get_message_from_cloudwatch_log_ptr(self):\r\n        \"\"\"\r\n        Method to get message from cloudwatch log pointer\r\n        \"\"\"\r\n        message = None\r\n        for i in range(1, 6):\r\n            ptr_value = tests.helpers.cloudwatch_helper.get_ptr_value\\\r\n                (cloudwatch_conf.log_group_bot_sender,cloudwatch_conf.query_skype_sender)\r\n            if ptr_value:\r\n                message = tests.helpers.cloudwatch_helper.get_message(ptr_value)\r\n                break\r\n            time.sleep(60)\r\n\r\n        return message\r\n\r\n    def clear_queues(self):\r\n        \"\"\"\r\n        Method to clear queues\r\n        \"\"\"\r\n        for every_queue_url in queue_url_conf.QUEUE_URL_LIST:\r\n            tests.helpers.sqs_helper.purge_sqs_queue(every_queue_url)\r\n            time.sleep(1)\r\n\r\nclass TestEndtoEndSkypeMessage(unittest.TestCase):\r\n    \"\"\"\r\n    Test class\r\n    \"\"\"\r\n    def test_end_to_end_skype_message(self):\r\n        \"\"\"\r\n        Test case\r\n        \"\"\"\r\n        Skypemessagetest_obj = Skypemessagetest()\r\n        logger.info(\"Setup- Purge SQS queue\")\r\n        logger.info(\"---------------------------------------------------------------------------\")\r\n        Skypemessagetest_obj.clear_queues()\r\n        logger.info(\"Step 1: Trigger employee message lambda--------------------------------\")\r\n        tests.helpers.lambda_helper.trigger_cron_lambda(lambda_conf.daily_message_lambda)\r\n        logger.info(\"---------------------------------------------------------------------------\")\r\n        logger.info(\"Step 2: Print message from cloudwatch logs------------------------------\")\r\n        message = Skypemessagetest_obj.get_message_from_cloudwatch_log_ptr()\r\n        logger.info(\"---------------------------------------------------------------------------\")\r\n        logger.info(message)\r\n        logger.info(\"-------------------------------------------------------------------------- \")\r\n        logger.info(\"Step 3: Verify message with skype-listener sqs queue and culture file----\")\r\n        with pytest.raises(SystemExit) as system_exception:\r\n            asyncio.run(tests.helpers.asyncio_helper.poll_message(message))\r\n        assert system_exception.type == SystemExit\r\n        logger.info(\"-------------------------------------------------------------------------- \")\r\n\r\n<\/pre>\n<p>This is how output looks on bash console:<\/p>\n<p><a href=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2021\/03\/messaging_architecture_test_screen-1.png\" data-rel=\"lightbox-image-1\" data-rl_title=\"\" data-rl_caption=\"\" title=\"\"><img decoding=\"async\" class=\"aligncenter\" src=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2021\/03\/messaging_architecture_test_screen-1.png\" alt=\"\"><\/a><\/p>\n<p>Note that the same message can be seen on the skype channel. This validation, I have done manually. I will enhance the test to include this validation in the test flow. <\/p>\n<p><a href=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2021\/03\/messaging_architecture_skype_channel_screen.png\" data-rel=\"lightbox-image-2\" data-rl_title=\"\" data-rl_caption=\"\" title=\"\"><img decoding=\"async\" class=\"aligncenter\" src=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2021\/03\/messaging_architecture_skype_channel_screen.png\" alt=\"\"><\/a><\/p>\n<p>I hope this blog would help any engineer, who would want to build an end-to-end test based on messaging architecture. The entire code is available at <a href=\"https:\/\/github.com\/qxf2\/qxf2-lambdas\/tree\/master\/tests\">here<\/a>.<\/p>\n<hr>\n","protected":false},"excerpt":{"rendered":"<p>Recently, I have attempted to test microservices based architecture where the AWS Lambda function consumes messages from a microservices endpoint and posts them to the skype channel. I have made an attempt to add most of the validations for the flow. I have included this end-to-end test in the below blog. Background: The above architecture explains, microservices based architecture which [&hellip;]<\/p>\n","protected":false},"author":28,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[271,282,240,283,107],"tags":[],"class_list":["post-15049","post","type-post","status-publish","format-standard","hentry","category-aws-cloudwatch-logs","category-aws-lambda","category-aws-sqs","category-messaging-architecture-testing","category-pytest"],"_links":{"self":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/15049","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/users\/28"}],"replies":[{"embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/comments?post=15049"}],"version-history":[{"count":78,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/15049\/revisions"}],"predecessor-version":[{"id":15258,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/15049\/revisions\/15258"}],"wp:attachment":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/media?parent=15049"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/categories?post=15049"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/tags?post=15049"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}