{"id":14710,"date":"2021-01-20T01:25:47","date_gmt":"2021-01-20T06:25:47","guid":{"rendered":"https:\/\/qxf2.com\/blog\/?p=14710"},"modified":"2023-04-02T09:37:22","modified_gmt":"2023-04-02T13:37:22","slug":"extract-message-from-aws-cloudwatch-log-record-using-log-record-pointer","status":"publish","type":"post","link":"https:\/\/qxf2.com\/blog\/extract-message-from-aws-cloudwatch-log-record-using-log-record-pointer\/","title":{"rendered":"Extract message from AWS CloudWatch log record using log record pointer"},"content":{"rendered":"<p><span style=\"font-weight: 400;\">This post shows you a short example of how to use the CloudWatch log record pointer to extract message from the AWS CloudWatch log record.<\/p>\n<hr>\n<h3>Background<\/h3>\n<p>Recently I have written end to end test for skype sender lambda. In the end to end test, I have compared message from AWS SQS queue with AWS CloudWatch logs. I have used log record pointer to retrieve message from the CloudWatch logs. I thought documenting this step in the blog would benefit anyone who wants to extract data from the CloudWatch logs.<\/p>\n<hr>\n<h3>Extracting log record using log record pointer<\/h3>\n<p>Below steps will help us to extract record from the AWS CloudWatch logs:<\/p>\n<p>Steps:<br \/>\n1. Get record pointer value from response.<br \/>\n2. Using the log record pointer fetch the message.<\/p>\n<hr>\n<h5>1. Get record pointer value from response<\/h5>\n<p>This step is divided into the following steps:<br \/>\n1a. Get response from cloudwatch log query.<br \/>\n1b. Get record pointer value from response dict.<\/p>\n<hr>\n<h5>1a. Get response from cloudwatch log query.<\/h5>\n<p>To get response from the cloudwatch logs, I have used get_query_results method of boto3 client. I have used another method start_query of boto3 client to get query_id. I have passed, query_id as an argument in the get_query_results method. Record pointer value retrieved from response dict.<\/p>\n<pre lang=\"python\">\r\nresponse = None\r\nptr_value = None\r\nclient = boto3.client('logs')\r\nstart_query_response = client.start_query(logGroupName=log_group,\\\r\n     startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\\\r\n         endTime=int(datetime.now().timestamp()),queryString=query)\r\nquery_id = start_query_response['queryId']\r\ntime.sleep(1)\r\nresponse = client.get_query_results(queryId=query_id)\r\n<\/pre>\n<hr>\n<h5>1b. Get record pointer value from response dict.<\/h5>\n<p>I have passed log_group and query as an argument to the get_ptr_value method. I have used get_data_structure method to recurse through the nested dictionary structure. The method code is mentioned in the putting it all together of step 1. To maintain continuity, some snippet from step 1a is repeated here as well. <\/p>\n<pre lang=\"python\">\r\nresponse = None\r\nptr_value = None\r\nclient = boto3.client('logs')\r\nstart_query_response = client.start_query(logGroupName=log_group,\\\r\n     startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\\\r\n         endTime=int(datetime.now().timestamp()),queryString=query)\r\nquery_id = start_query_response['queryId']\r\ntime.sleep(1)\r\nresponse = client.get_query_results(queryId=query_id)\r\nresponse_dict = get_data_structure(response)\r\nif cloudwatch_conf.ptr_value in response_dict.keys():\r\n      ptr_value = response_dict[cloudwatch_conf.ptr_value]\r\nelse:\r\n      print(f'log pointer key could not be fetched from response dictionary.')\r\n\r\nreturn ptr_value\r\n<\/pre>\n<hr>\n<p>Putting it all together, the code snippet for step 1 will look as below. This snippet also contains other reference methods and configuration variables used to demo this snippet.<\/p>\n<pre lang=\"python\">\r\ndef get_data_structure(data):\r\n    \"\"\"\r\n    Method used for converting nested dictionary\/list to data similar to tabular form\r\n    \"\"\"\r\n    # https:\/\/medium.com\/better-programming\/how-to-flatten-a-dictionary-with-nested-lists-and-dictionaries-in-python-524fd236365\r\n    obj = collections.OrderedDict()\r\n    def recurse(dataobject,parent_key=\"\"):\r\n        \"\"\"\r\n        Method will recurse through object\r\n        \"\"\"\r\n        if isinstance(dataobject,list):\r\n            # loop through list and call recurse()\r\n            for i in range(len(dataobject)):\r\n                recurse(dataobject[i],parent_key + \"_\" + str(i) if parent_key else str(i))\r\n        elif isinstance(dataobject,dict):\r\n            # loop through dictionary and call recurse()\r\n            for key,value in dataobject.items():\r\n                recurse(value,parent_key + \"_\" + key if parent_key else key)\r\n        else:\r\n            # use the parent_key and store the value to obj\r\n            obj[parent_key] = dataobject\r\n\r\n    recurse(data)\r\n\r\n    return obj\r\n\r\ndef get_ptr_value(log_group,query):\r\n    \"\"\"\r\n    getting ptr_value from response\r\n    \"\"\"\r\n    response = None\r\n    ptr_value = None\r\n    client = boto3.client('logs')\r\n    start_query_response = client.start_query(logGroupName=log_group,\\\r\n         startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\\\r\n             endTime=int(datetime.now().timestamp()),queryString=query)\r\n    query_id = start_query_response['queryId']\r\n    time.sleep(1)\r\n    response = client.get_query_results(queryId=query_id)\r\n    response_dict = get_data_structure(response)\r\n    if cloudwatch_conf.ptr_value in response_dict.keys():\r\n       ptr_value = response_dict[cloudwatch_conf.ptr_value]\r\n    else:\r\n       print(f'log pointer key could not be fetched from response dictionary.')\r\n\r\n    return ptr_value\r\n<\/pre>\n<hr>\n<h5>2. Using the log record pointer fetch the message<\/h5>\n<p>I have used get_log_record method of boto3 client to fetch response using log record pointer. Method get_log_record used log record pointer as an argument.<\/p>\n<pre lang=\"python\">\r\ndef get_message(ptr_value):\r\n    \"\"\"\r\n    To get message\r\n    \"\"\"\r\n    client = boto3.client('logs')\r\n    response = client.get_log_record(logRecordPointer=ptr_value)\r\n    response_dict = get_data_structure(response)\r\n    message = response_dict[cloudwatch_conf.record_body]\r\n    message_dict = ast.literal_eval(message)\r\n\r\n    return message_dict['msg']\r\n<\/pre>\n<hr>\n<h5>3. Putting it all together<\/h5>\n<p>Combining all the code together and running this code snippet in the test looks like below and screen output is attached as well. Note that config file contains <a href=\"https:\/\/docs.aws.amazon.com\/AmazonCloudWatch\/latest\/logs\/CWL_QuerySyntax-examples.html\">cloudwatch queries<\/a> and <a href=\"https:\/\/docs.aws.amazon.com\/AmazonCloudWatch\/latest\/logs\/Working-with-log-groups-and-streams.html\">log groups<\/a>.<\/p>\n<pre lang=\"python\">\r\n\"\"\"\r\nHelper module for cloudwatch log\r\n\"\"\"\r\nimport os\r\nimport sys\r\nimport ast\r\nimport collections\r\nfrom datetime import datetime, timedelta\r\nimport time\r\nimport boto3\r\nimport conf.cloudwatch_configuration_conf as cloudwatch_conf\r\nsys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))\r\n\r\ndef get_data_structure(data):\r\n    \"\"\"\r\n    Method used for converting nested dictionary\/list to data similar to tabular form\r\n    \"\"\"\r\n    # https:\/\/medium.com\/better-programming\/how-to-flatten-a-dictionary-with-nested-lists-and-dictionaries-in-python-524fd236365\r\n    obj = collections.OrderedDict()\r\n    def recurse(dataobject,parent_key=\"\"):\r\n        \"\"\"\r\n        Method will recurse through object\r\n        \"\"\"\r\n        if isinstance(dataobject,list):\r\n            # loop through list and call recurse()\r\n            for i in range(len(dataobject)):\r\n                recurse(dataobject[i],parent_key + \"_\" + str(i) if parent_key else str(i))\r\n        elif isinstance(dataobject,dict):\r\n            # loop through dictionary and call recurse()\r\n            for key,value in dataobject.items():\r\n                recurse(value,parent_key + \"_\" + key if parent_key else key)\r\n        else:\r\n            # use the parent_key and store the value to obj\r\n            obj[parent_key] = dataobject\r\n\r\n    recurse(data)\r\n\r\n    return obj\r\n\r\ndef get_ptr_value(log_group,query):\r\n    \"\"\"\r\n    getting ptr_value from response\r\n    \"\"\"\r\n    response = None\r\n    ptr_value = None\r\n    client = boto3.client('logs')\r\n    start_query_response = client.start_query(logGroupName=log_group,\\\r\n         startTime=int((datetime.today() - timedelta(minutes=5)).timestamp()),\\\r\n             endTime=int(datetime.now().timestamp()),queryString=query)\r\n    query_id = start_query_response['queryId']\r\n    time.sleep(1)\r\n    response = client.get_query_results(queryId=query_id)\r\n    response_dict = get_data_structure(response)\r\n    if cloudwatch_conf.ptr_value in response_dict.keys():\r\n       ptr_value = response_dict[cloudwatch_conf.ptr_value]\r\n    else:\r\n       print(f'log pointer key could not be fetched from response dictionary.')\r\n\r\n    return ptr_value\r\n\r\n\r\ndef get_message(ptr_value):\r\n    \"\"\"\r\n    To get message\r\n    \"\"\"\r\n    client = boto3.client('logs')\r\n    response = client.get_log_record(logRecordPointer=ptr_value)\r\n    response_dict = get_data_structure(response)\r\n    message = response_dict[cloudwatch_conf.record_body]\r\n    message_dict = ast.literal_eval(message)\r\n\r\n    return message_dict['msg']\r\n<\/pre>\n<p>My test file looks like as below:<\/p>\n<pre lang=\"python\">\r\n\"\"\"\r\nThis End to end test employee skype message covers following:\r\nSetup- Purging SQS queue\r\nStep 1: Trigger employee message lambda\r\nStep 2: Print message from cloudwatch logs\r\nStep 3: Verify message with skype-listner sqs queue and culture file\r\n\"\"\"\r\n\r\nimport os\r\nimport sys\r\nimport time\r\nimport asyncio\r\nimport logging\r\nimport helpers.cloudwatch_helper\r\nimport helpers.lambda_helper\r\nimport helpers.sqs_helper\r\nimport helpers.asyncio_helper\r\nfrom pythonjsonlogger import jsonlogger\r\nimport conf.cloudwatch_configuration_conf as cloudwatch_conf\r\nimport conf.sqs_utilities_conf as queue_url_conf\r\nsys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))\r\n\r\n# logging\r\nlog_handler = logging.StreamHandler()\r\nlog_handler.setFormatter(jsonlogger.JsonFormatter())\r\nlogger = logging.getLogger()\r\nlogger.setLevel(logging.INFO)\r\nlogger.addHandler(log_handler)\r\n\r\n# Declaring class for test object\r\nclass Skypemessagetest():\r\n    \"\"\"\r\n    Class for test object\r\n    \"\"\"\r\n    logger = logging.getLogger(__name__)\r\n    def __init__(self):\r\n        \"\"\"\r\n        Initilalise class\r\n        \"\"\"\r\n    def get_request_id(self):\r\n        \"\"\"\r\n        get the response from lambda\r\n        \"\"\"\r\n        request_id = helpers.lambda_helper.get_request_id_from_lambda_response()\r\n\r\n        return request_id\r\n\r\n    def get_message_from_cloudwatch_log_ptr(self):\r\n        \"\"\"\r\n        Method to get message from cloudwatch log pointer\r\n        \"\"\"\r\n        message = None\r\n        for i in range(1, 6):\r\n            ptr_value = helpers.cloudwatch_helper.get_ptr_value\\\r\n                (cloudwatch_conf.log_group_bot_sender,cloudwatch_conf.query_skype_sender)\r\n            print(\"---------Printing Record Pointer----------\")\r\n            print(ptr_value)\r\n            if ptr_value:\r\n                message = helpers.cloudwatch_helper.get_message(ptr_value)\r\n                print(\"---------Printing Message----------\")\r\n                print(message)\r\n                break\r\n            time.sleep(1)\r\n\r\n        return message\r\n\r\n    def clear_queues(self):\r\n        \"\"\"\r\n        Method to clear queues\r\n        \"\"\"\r\n        for every_queue_url in queue_url_conf.QUEUE_URL_LIST:\r\n            helpers.sqs_helper.purge_sqs_queue(every_queue_url)\r\n            time.sleep(1)\r\n\r\nif __name__ == '__main__':\r\n    Skypemessagetest_obj = Skypemessagetest()\r\n    logger.info(\"Setup- Purge SQS queue\")\r\n    logger.info(\"---------------------------------------------------------------------------\")\r\n    Skypemessagetest_obj.clear_queues()\r\n    logger.info(\"Step 1: Trigger employee message lambda--------------------------------\")\r\n    request_id = Skypemessagetest_obj.get_request_id()\r\n    logger.info(\"---------------------------------------------------------------------------\")\r\n    logger.info(\"Step 2: Print message from cloudwatch logs------------------------------\")\r\n    time.sleep(240)\r\n    message = Skypemessagetest_obj.get_message_from_cloudwatch_log_ptr()\r\n    logger.info(\"---------------------------------------------------------------------------\")\r\n    logger.info(message)\r\n    logger.info(\"-------------------------------------------------------------------------- \")\r\n    logger.info(\"Step 3: Verify message with skype-listner sqs queue and culture file----\")\r\n    asyncio.run(helpers.asyncio_helper.poll_message(message))\r\n    logger.info(\"-------------------------------------------------------------------------- \")\r\n\r\n<\/pre>\n<p>My test output screen looks like as below:<\/p>\n<p><a href=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2020\/12\/putting_it_all_together.png\" data-rel=\"lightbox-image-0\" data-rl_title=\"\" data-rl_caption=\"\" title=\"\"><img decoding=\"async\" class=\"aligncenter\" src=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2020\/12\/putting_it_all_together.png\" alt=\"\"><\/a><\/p>\n<p>In case you are interested to know more about tests where above snippet is used then you can find the test <a href=\"https:\/\/github.com\/qxf2\/qxf2-lambdas\/tree\/master\/tests\">here<\/a>.<\/p>\n<hr>\n<p>If you are a tester working on testing microservices or working AWS messaging architecture, then above example may help you to find more details around record, message from cloudwatch logs.<\/p>\n<hr>\n","protected":false},"excerpt":{"rendered":"<p>This post shows you a short example of how to use the CloudWatch log record pointer to extract message from the AWS CloudWatch log record. Background Recently I have written end to end test for skype sender lambda. In the end to end test, I have compared message from AWS SQS queue with AWS CloudWatch logs. I have used log [&hellip;]<\/p>\n","protected":false},"author":28,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[345,18],"tags":[272],"class_list":["post-14710","post","type-post","status-publish","format-standard","hentry","category-cloudwatch-aws","category-python","tag-aws-cloudwatch-logs"],"_links":{"self":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/14710","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/users\/28"}],"replies":[{"embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/comments?post=14710"}],"version-history":[{"count":26,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/14710\/revisions"}],"predecessor-version":[{"id":14808,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/14710\/revisions\/14808"}],"wp:attachment":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/media?parent=14710"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/categories?post=14710"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/tags?post=14710"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}