Extract message from AWS CloudWatch log record using log record pointer

This post shows you a short example of how to use the CloudWatch log record pointer to extract message from the AWS CloudWatch log record.


Background

Recently I have written end to end test for skype sender lambda. In the end to end test, I have compared message from AWS SQS queue with AWS CloudWatch logs. I have used log record pointer to retrieve message from the CloudWatch logs. I thought documenting this step in the blog would benefit anyone who wants to extract data from the CloudWatch logs.


Extracting log record using log record pointer

Below steps will help us to extract record from the AWS CloudWatch logs:

Steps:
1. Get record pointer value from response.
2. Using the log record pointer fetch the message.


1. Get record pointer value from response

This step is divided into the following steps:
1a. Get response from cloudwatch log query.
1b. Get record pointer value from response dict.


1a. Get response from cloudwatch log query.

To get response from the cloudwatch logs, I have used get_query_results method of boto3 client. I have used another method start_query of boto3 client to get query_id. I have passed, query_id as an argument in the get_query_results method. Record pointer value retrieved from response dict.

response = None
ptr_value = None
client = boto3.client('logs')
start_query_response = client.start_query(logGroupName=log_group,\
     startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\
         endTime=int(datetime.now().timestamp()),queryString=query)
query_id = start_query_response['queryId']
time.sleep(1)
response = client.get_query_results(queryId=query_id)

1b. Get record pointer value from response dict.

I have passed log_group and query as an argument to the get_ptr_value method. I have used get_data_structure method to recurse through the nested dictionary structure. The method code is mentioned in the putting it all together of step 1. To maintain continuity, some snippet from step 1a is repeated here as well.

response = None
ptr_value = None
client = boto3.client('logs')
start_query_response = client.start_query(logGroupName=log_group,\
     startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\
         endTime=int(datetime.now().timestamp()),queryString=query)
query_id = start_query_response['queryId']
time.sleep(1)
response = client.get_query_results(queryId=query_id)
response_dict = get_data_structure(response)
if cloudwatch_conf.ptr_value in response_dict.keys():
      ptr_value = response_dict[cloudwatch_conf.ptr_value]
else:
      print(f'log pointer key could not be fetched from response dictionary.')
 
return ptr_value

Putting it all together, the code snippet for step 1 will look as below. This snippet also contains other reference methods and configuration variables used to demo this snippet.

def get_data_structure(data):
    """
    Method used for converting nested dictionary/list to data similar to tabular form
    """
    # https://medium.com/better-programming/how-to-flatten-a-dictionary-with-nested-lists-and-dictionaries-in-python-524fd236365
    obj = collections.OrderedDict()
    def recurse(dataobject,parent_key=""):
        """
        Method will recurse through object
        """
        if isinstance(dataobject,list):
            # loop through list and call recurse()
            for i in range(len(dataobject)):
                recurse(dataobject[i],parent_key + "_" + str(i) if parent_key else str(i))
        elif isinstance(dataobject,dict):
            # loop through dictionary and call recurse()
            for key,value in dataobject.items():
                recurse(value,parent_key + "_" + key if parent_key else key)
        else:
            # use the parent_key and store the value to obj
            obj[parent_key] = dataobject
 
    recurse(data)
 
    return obj
 
def get_ptr_value(log_group,query):
    """
    getting ptr_value from response
    """
    response = None
    ptr_value = None
    client = boto3.client('logs')
    start_query_response = client.start_query(logGroupName=log_group,\
         startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\
             endTime=int(datetime.now().timestamp()),queryString=query)
    query_id = start_query_response['queryId']
    time.sleep(1)
    response = client.get_query_results(queryId=query_id)
    response_dict = get_data_structure(response)
    if cloudwatch_conf.ptr_value in response_dict.keys():
       ptr_value = response_dict[cloudwatch_conf.ptr_value]
    else:
       print(f'log pointer key could not be fetched from response dictionary.')
 
    return ptr_value

2. Using the log record pointer fetch the message

I have used get_log_record method of boto3 client to fetch response using log record pointer. Method get_log_record used log record pointer as an argument.

def get_message(ptr_value):
    """
    To get message
    """
    client = boto3.client('logs')
    response = client.get_log_record(logRecordPointer=ptr_value)
    response_dict = get_data_structure(response)
    message = response_dict[cloudwatch_conf.record_body]
    message_dict = ast.literal_eval(message)
 
    return message_dict['msg']

3. Putting it all together

Combining all the code together and running this code snippet in the test looks like below and screen output is attached as well. Note that config file contains cloudwatch queries and log groups.

"""
Helper module for cloudwatch log
"""
import os
import sys
import ast
import collections
from datetime import datetime, timedelta
import time
import boto3
import conf.cloudwatch_configuration_conf as cloudwatch_conf
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
def get_data_structure(data):
    """
    Method used for converting nested dictionary/list to data similar to tabular form
    """
    # https://medium.com/better-programming/how-to-flatten-a-dictionary-with-nested-lists-and-dictionaries-in-python-524fd236365
    obj = collections.OrderedDict()
    def recurse(dataobject,parent_key=""):
        """
        Method will recurse through object
        """
        if isinstance(dataobject,list):
            # loop through list and call recurse()
            for i in range(len(dataobject)):
                recurse(dataobject[i],parent_key + "_" + str(i) if parent_key else str(i))
        elif isinstance(dataobject,dict):
            # loop through dictionary and call recurse()
            for key,value in dataobject.items():
                recurse(value,parent_key + "_" + key if parent_key else key)
        else:
            # use the parent_key and store the value to obj
            obj[parent_key] = dataobject
 
    recurse(data)
 
    return obj
 
def get_ptr_value(log_group,query):
    """
    getting ptr_value from response
    """
    response = None
    ptr_value = None
    client = boto3.client('logs')
    start_query_response = client.start_query(logGroupName=log_group,\
         startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\
             endTime=int(datetime.now().timestamp()),queryString=query)
    query_id = start_query_response['queryId']
    time.sleep(1)
    response = client.get_query_results(queryId=query_id)
    response_dict = get_data_structure(response)
    if cloudwatch_conf.ptr_value in response_dict.keys():
       ptr_value = response_dict[cloudwatch_conf.ptr_value]
    else:
       print(f'log pointer key could not be fetched from response dictionary.')
 
    return ptr_value
 
 
def get_message(ptr_value):
    """
    To get message
    """
    client = boto3.client('logs')
    response = client.get_log_record(logRecordPointer=ptr_value)
    response_dict = get_data_structure(response)
    message = response_dict[cloudwatch_conf.record_body]
    message_dict = ast.literal_eval(message)
 
    return message_dict['msg']

My test file looks like as below:

"""
This End to end test employee skype message covers following:
Setup- Purging SQS queue
Step 1: Trigger employee message lambda
Step 2: Print message from cloudwatch logs
Step 3: Verify message with skype-listner sqs queue and culture file
"""
 
import os
import sys
import time
import asyncio
import logging
import helpers.cloudwatch_helper
import helpers.lambda_helper
import helpers.sqs_helper
import helpers.asyncio_helper
from pythonjsonlogger import jsonlogger
import conf.cloudwatch_configuration_conf as cloudwatch_conf
import conf.sqs_utilities_conf as queue_url_conf
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
# logging
log_handler = logging.StreamHandler()
log_handler.setFormatter(jsonlogger.JsonFormatter())
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)
 
# Declaring class for test object
class Skypemessagetest():
    """
    Class for test object
    """
    logger = logging.getLogger(__name__)
    def __init__(self):
        """
        Initilalise class
        """
    def get_request_id(self):
        """
        get the response from lambda
        """
        request_id = helpers.lambda_helper.get_request_id_from_lambda_response()
 
        return request_id
 
    def get_message_from_cloudwatch_log_ptr(self):
        """
        Method to get message from cloudwatch log pointer
        """
        message = None
        for i in range(1, 6):
            ptr_value = helpers.cloudwatch_helper.get_ptr_value\
                (cloudwatch_conf.log_group_bot_sender,cloudwatch_conf.query_skype_sender)
            print("---------Printing Record Pointer----------")
            print(ptr_value)
            if ptr_value:
                message = helpers.cloudwatch_helper.get_message(ptr_value)
                print("---------Printing Message----------")
                print(message)
                break
            time.sleep(1)
 
        return message
 
    def clear_queues(self):
        """
        Method to clear queues
        """
        for every_queue_url in queue_url_conf.QUEUE_URL_LIST:
            helpers.sqs_helper.purge_sqs_queue(every_queue_url)
            time.sleep(1)
 
if __name__ == '__main__':
    Skypemessagetest_obj = Skypemessagetest()
    logger.info("Setup- Purge SQS queue")
    logger.info("---------------------------------------------------------------------------")
    Skypemessagetest_obj.clear_queues()
    logger.info("Step 1: Trigger employee message lambda--------------------------------")
    request_id = Skypemessagetest_obj.get_request_id()
    logger.info("---------------------------------------------------------------------------")
    logger.info("Step 2: Print message from cloudwatch logs------------------------------")
    time.sleep(240)
    message = Skypemessagetest_obj.get_message_from_cloudwatch_log_ptr()
    logger.info("---------------------------------------------------------------------------")
    logger.info(message)
    logger.info("-------------------------------------------------------------------------- ")
    logger.info("Step 3: Verify message with skype-listner sqs queue and culture file----")
    asyncio.run(helpers.asyncio_helper.poll_message(message))
    logger.info("-------------------------------------------------------------------------- ")

My test output screen looks like as below:

In case you are interested to know more about tests where above snippet is used then you can find the test here.


If you are a tester working on testing microservices or working AWS messaging architecture, then above example may help you to find more details around record, message from cloudwatch logs.


Tags:

Leave a Reply

Your email address will not be published.