This post shows you a short example of how to use the CloudWatch log record pointer to extract message from the AWS CloudWatch log record.
Background
Recently I have written end to end test for skype sender lambda. In the end to end test, I have compared message from AWS SQS queue with AWS CloudWatch logs. I have used log record pointer to retrieve message from the CloudWatch logs. I thought documenting this step in the blog would benefit anyone who wants to extract data from the CloudWatch logs.
Extracting log record using log record pointer
Below steps will help us to extract record from the AWS CloudWatch logs:
Steps:
1. Get record pointer value from response.
2. Using the log record pointer fetch the message.
1. Get record pointer value from response
This step is divided into the following steps:
1a. Get response from cloudwatch log query.
1b. Get record pointer value from response dict.
1a. Get response from cloudwatch log query.
To get response from the cloudwatch logs, I have used get_query_results method of boto3 client. I have used another method start_query of boto3 client to get query_id. I have passed, query_id as an argument in the get_query_results method. Record pointer value retrieved from response dict.
response = None ptr_value = None client = boto3.client('logs') start_query_response = client.start_query(logGroupName=log_group,\ startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\ endTime=int(datetime.now().timestamp()),queryString=query) query_id = start_query_response['queryId'] time.sleep(1) response = client.get_query_results(queryId=query_id) |
1b. Get record pointer value from response dict.
I have passed log_group and query as an argument to the get_ptr_value method. I have used get_data_structure method to recurse through the nested dictionary structure. The method code is mentioned in the putting it all together of step 1. To maintain continuity, some snippet from step 1a is repeated here as well.
response = None ptr_value = None client = boto3.client('logs') start_query_response = client.start_query(logGroupName=log_group,\ startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\ endTime=int(datetime.now().timestamp()),queryString=query) query_id = start_query_response['queryId'] time.sleep(1) response = client.get_query_results(queryId=query_id) response_dict = get_data_structure(response) if cloudwatch_conf.ptr_value in response_dict.keys(): ptr_value = response_dict[cloudwatch_conf.ptr_value] else: print(f'log pointer key could not be fetched from response dictionary.') return ptr_value |
Putting it all together, the code snippet for step 1 will look as below. This snippet also contains other reference methods and configuration variables used to demo this snippet.
def get_data_structure(data): """ Method used for converting nested dictionary/list to data similar to tabular form """ # https://medium.com/better-programming/how-to-flatten-a-dictionary-with-nested-lists-and-dictionaries-in-python-524fd236365 obj = collections.OrderedDict() def recurse(dataobject,parent_key=""): """ Method will recurse through object """ if isinstance(dataobject,list): # loop through list and call recurse() for i in range(len(dataobject)): recurse(dataobject[i],parent_key + "_" + str(i) if parent_key else str(i)) elif isinstance(dataobject,dict): # loop through dictionary and call recurse() for key,value in dataobject.items(): recurse(value,parent_key + "_" + key if parent_key else key) else: # use the parent_key and store the value to obj obj[parent_key] = dataobject recurse(data) return obj def get_ptr_value(log_group,query): """ getting ptr_value from response """ response = None ptr_value = None client = boto3.client('logs') start_query_response = client.start_query(logGroupName=log_group,\ startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\ endTime=int(datetime.now().timestamp()),queryString=query) query_id = start_query_response['queryId'] time.sleep(1) response = client.get_query_results(queryId=query_id) response_dict = get_data_structure(response) if cloudwatch_conf.ptr_value in response_dict.keys(): ptr_value = response_dict[cloudwatch_conf.ptr_value] else: print(f'log pointer key could not be fetched from response dictionary.') return ptr_value |
2. Using the log record pointer fetch the message
I have used get_log_record method of boto3 client to fetch response using log record pointer. Method get_log_record used log record pointer as an argument.
def get_message(ptr_value): """ To get message """ client = boto3.client('logs') response = client.get_log_record(logRecordPointer=ptr_value) response_dict = get_data_structure(response) message = response_dict[cloudwatch_conf.record_body] message_dict = ast.literal_eval(message) return message_dict['msg'] |
3. Putting it all together
Combining all the code together and running this code snippet in the test looks like below and screen output is attached as well. Note that config file contains cloudwatch queries and log groups.
""" Helper module for cloudwatch log """ import os import sys import ast import collections from datetime import datetime, timedelta import time import boto3 import conf.cloudwatch_configuration_conf as cloudwatch_conf sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def get_data_structure(data): """ Method used for converting nested dictionary/list to data similar to tabular form """ # https://medium.com/better-programming/how-to-flatten-a-dictionary-with-nested-lists-and-dictionaries-in-python-524fd236365 obj = collections.OrderedDict() def recurse(dataobject,parent_key=""): """ Method will recurse through object """ if isinstance(dataobject,list): # loop through list and call recurse() for i in range(len(dataobject)): recurse(dataobject[i],parent_key + "_" + str(i) if parent_key else str(i)) elif isinstance(dataobject,dict): # loop through dictionary and call recurse() for key,value in dataobject.items(): recurse(value,parent_key + "_" + key if parent_key else key) else: # use the parent_key and store the value to obj obj[parent_key] = dataobject recurse(data) return obj def get_ptr_value(log_group,query): """ getting ptr_value from response """ response = None ptr_value = None client = boto3.client('logs') start_query_response = client.start_query(logGroupName=log_group,\ startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\ endTime=int(datetime.now().timestamp()),queryString=query) query_id = start_query_response['queryId'] time.sleep(1) response = client.get_query_results(queryId=query_id) response_dict = get_data_structure(response) if cloudwatch_conf.ptr_value in response_dict.keys(): ptr_value = response_dict[cloudwatch_conf.ptr_value] else: print(f'log pointer key could not be fetched from response dictionary.') return ptr_value def get_message(ptr_value): """ To get message """ client = boto3.client('logs') response = client.get_log_record(logRecordPointer=ptr_value) response_dict = get_data_structure(response) message = response_dict[cloudwatch_conf.record_body] message_dict = ast.literal_eval(message) return message_dict['msg'] |
My test file looks like as below:
""" This End to end test employee skype message covers following: Setup- Purging SQS queue Step 1: Trigger employee message lambda Step 2: Print message from cloudwatch logs Step 3: Verify message with skype-listner sqs queue and culture file """ import os import sys import time import asyncio import logging import helpers.cloudwatch_helper import helpers.lambda_helper import helpers.sqs_helper import helpers.asyncio_helper from pythonjsonlogger import jsonlogger import conf.cloudwatch_configuration_conf as cloudwatch_conf import conf.sqs_utilities_conf as queue_url_conf sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # logging log_handler = logging.StreamHandler() log_handler.setFormatter(jsonlogger.JsonFormatter()) logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(log_handler) # Declaring class for test object class Skypemessagetest(): """ Class for test object """ logger = logging.getLogger(__name__) def __init__(self): """ Initilalise class """ def get_request_id(self): """ get the response from lambda """ request_id = helpers.lambda_helper.get_request_id_from_lambda_response() return request_id def get_message_from_cloudwatch_log_ptr(self): """ Method to get message from cloudwatch log pointer """ message = None for i in range(1, 6): ptr_value = helpers.cloudwatch_helper.get_ptr_value\ (cloudwatch_conf.log_group_bot_sender,cloudwatch_conf.query_skype_sender) print("---------Printing Record Pointer----------") print(ptr_value) if ptr_value: message = helpers.cloudwatch_helper.get_message(ptr_value) print("---------Printing Message----------") print(message) break time.sleep(1) return message def clear_queues(self): """ Method to clear queues """ for every_queue_url in queue_url_conf.QUEUE_URL_LIST: helpers.sqs_helper.purge_sqs_queue(every_queue_url) time.sleep(1) if __name__ == '__main__': Skypemessagetest_obj = Skypemessagetest() logger.info("Setup- Purge SQS queue") logger.info("---------------------------------------------------------------------------") Skypemessagetest_obj.clear_queues() logger.info("Step 1: Trigger employee message lambda--------------------------------") request_id = Skypemessagetest_obj.get_request_id() logger.info("---------------------------------------------------------------------------") logger.info("Step 2: Print message from cloudwatch logs------------------------------") time.sleep(240) message = Skypemessagetest_obj.get_message_from_cloudwatch_log_ptr() logger.info("---------------------------------------------------------------------------") logger.info(message) logger.info("-------------------------------------------------------------------------- ") logger.info("Step 3: Verify message with skype-listner sqs queue and culture file----") asyncio.run(helpers.asyncio_helper.poll_message(message)) logger.info("-------------------------------------------------------------------------- ") |
My test output screen looks like as below:
In case you are interested to know more about tests where above snippet is used then you can find the test here.
If you are a tester working on testing microservices or working AWS messaging architecture, then above example may help you to find more details around record, message from cloudwatch logs.
I have around 15 years of experience in Software Testing. I like tackling Software Testing problems and explore various tools and solutions that can solve those problems. On a personal front, I enjoy reading books during my leisure time.
nice thank you .!