Triggering Chef InSpec tests with Airflow DAG

In this post, we will show you how to write some Python code to create an Airflow “job” (aka Directed Acyclic Graph or DAG) to trigger some infrastructure tests. By creating this Airflow DAG, everyone in the team that has access to Airflow can run those tests. This post is intended for testers who want to practice writing DAGs.

1. Background:

Qxf2 engineers are frequently encountering Apache Airflow at our client engagements. Airflow is used to orchestrate pipelines. As testers, we need to learn how to integrate our tests with existing pipelines and allow other job roles to trigger the tests easily. If you know a bit about Airflow already, then this post is for you.

Many times, testers get to use tools and learn on the job. But writing good code takes practice. This post will help you practice writing a simple DAG on your local Airflow Server.

Note: The test setup at our clients is much more involved and evolved three separate entities – an Airflow server, a server (or environment) dedicated to running infrastructure tests and the actual application infrastructure under test. But this setup is not easy for testers to replicate just so they can practice. So for this post, let us assume that you have Airflow setup locally and the environment for running the infrastructure tests are also local to your machine. Once you get a hang of writing simple scripts like these, adjusting them to a “real” environment will be easy.

2. Prerequisites:

This is an intermediate level blog post. So there are a few prerequisites. You should know a bit of Python and Apache Airflow. You also need Apache Airflow setup on your local system.

The DAG we are going to write will show you how to execute some bash commands using Airflow’s BashOperator. It is completely ok if you do not know anything about Chef InSpec tests. You can substitute that section with whatever you are trying to do at your workplace.

3. Code Snippet:

In your Airflow server, create a Python file called “inspec_tests_execution_using_airflow_dag.py” and place it “~/airflow/dags/”. This represents the DAG we want to create. We will develop the DAG step by step.

3.1 Importing Modules and Libraries:

This section begins by importing necessary Python modules and libraries.

import os
import sys
from datetime import datetime, timedelta
 
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow import DAG
 
sys.path.append(os.path.dirname((os.path.abspath(__file__))))
 
import config
3.2 Class ChefInspecDAG Initialization:

In this section, the ChefInspecDAG class is initialized with the dag_id and profile_name. Additionally, environment variables for the “instanceID” and “IPaddress” of the interview scheduler are set(Qxf2’s In-house application). These values are retrieved from the config.py file.

def __init__(self, dag_id, profile_name):
        self.dag_id = dag_id
        self.profile_name = profile_name
        os.environ['INTERVIEW_SCHEDULER_INSTANCE_ID'] = config.INTERVIEW_SCHEDULER_INSTANCE_ID
        os.environ['INTERVIEW_SCHEDULER_IP'] = config.INTERVIEW_SCHEDULER_IP
        self.interview_scheduler_instance_id = os.getenv('INTERVIEW_SCHEDULER_INSTANCE_ID')
        self.ip_address = os.getenv('INTERVIEW_SCHEDULER_IP')
3.3 create_dag_object() Method:

This method creates the DAG object. It defines default arguments for the DAG, including the owner, start date, and retry settings. The dag_id, which was provided during class initialization, is used to uniquely identify the DAG. The description is set, and the schedule is set to None (since this DAG will not be scheduled to run periodically).

def create_dag_object(self):
        """
        Creates the DAG object with specified configurations.
        Returns:
            DAG: The DAG object.
        """
        default_args = {
            'owner': 'Chef',
            'depends_on_past': False,
            'start_date': datetime(2023, 10, 17),
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
        }
 
        dag = DAG(
            self.dag_id,
            default_args=default_args,
            description='A DAG to run Chef InSpec tests',
            schedule=None,
        )
3.4 Defining Private Key and Task:

Sometimes, our tests need SSH access to the application servers. It might be tempting to store the private keys in plain text in your repo. But that is not a good practice. Instead, store them in an Airflow variable and then temporarily write them into a file that you can delete later in the test. Here, the private key for SSH connection is retrieved from Airflow’s Variables using Variable.get(). The private key is then written to a temporary file specified by PRIVATE_KEY_FILE_PATH. The file permissions are set to be limited to the current user. A PythonOperator task is created, named based on the profile_name. It will call the run_chef_inspec_test method and pass private_key_file_path as an argument. This task is added to the DAG.

#getting secrete key for ssh connection
        interview_scheduler_private_key=Variable.get("INTERVIEW_SCHEDULER_PRIVATE_KEY")
        private_key_file_path = config.PRIVATE_KEY_FILE_PATH
        with open(private_key_file_path, 'w', encoding='UTF-8') as file:
            file.write(interview_scheduler_private_key)
        BashOperator(
            task_id=f'Grant_permission_{self.profile_name}',
            bash_command=f'chmod 700 {private_key_file_path}',
            dag=dag,
        ).execute(context={})
 
        PythonOperator(
            task_id=f'run_{self.profile_name}_test',
            python_callable=self.run_chef_inspec_test,
            op_args=[private_key_file_path],
            dag=dag,
        )
3.5 Returning the DAG:

The created DAG object is returned at the end of the create_dag() method.

return dag
3.6 run_chef_inspec_test() Method:

This method is responsible for executing the Chef InSpec tests. It follows these steps:
-> Creates a temporary folder (TEMPORARY_FOLDER) if it doesn’t already exist.
-> Performs a Git clone operation using BashOperator to clone a repository into the specified temporary folder.
-> Executes Chef InSpec tests (inspec exec) using BashOperator. It executes the InSpec tests and generates a report in both CLI and HTML format (as per the provided command).
-> Handles potential exceptions.
-> Finally, it performs cleanup by removing the temporary folder.

def run_chef_inspec_test(self, private_key_file_path, **kwargs):
        """
        Executes Chef InSpec tests within an Airflow DAG context.
 
        Args:
            private_key_file_path (str): Path to the private key file for authentication.
            **kwargs: Additional keyword arguments (used by Airflow).
 
        Raises:
            Exception: If any error occurs during task execution.
 
       The method performs:
        - Creation of a temporary folder for testing purposes.
        - Cloning a Git repository into the temporary folder.
        - Executing Chef InSpec tests using the provided private key.
        - Generates both CLI and HTML reports.
        - Cleans up temporary files after completion.
        """
        dag = kwargs['dag']
        try:
            BashOperator(
                task_id=f'create_temp_folder_{self.profile_name}',
                bash_command=f'mkdir -p {config.TEMPORARY_FOLDER}',
                dag=dag,
            ).execute(context={})
 
            BashOperator(
                task_id=f'git_clone_{self.profile_name}',
                bash_command=f"if [ -d {config.TEMPORARY_FOLDER} ]; then \
                                    rm -rf {config.TEMPORARY_FOLDER}; \
                                fi && \
                                git clone {config.INTERVIEW_GITHUB_REPO} {config.TEMPORARY_FOLDER}",
                dag=dag,
            ).execute(context={})
 
            BashOperator(
                task_id=f'inspec_exec_{self.profile_name}',
                bash_command=f"inspec exec {config.TEMPORARY_FOLDER}/{self.profile_name} \
                                -t ssh://ubuntu@{self.ip_address} \
                                -i {private_key_file_path} \
                                --no-distinct-exit \
                                --reporter=cli html:{config.TEST_REPORT} && \
                                  echo 'Inspec execution completed successfully'",
                dag=dag,
            ).execute(context={})
        except FileNotFoundError as file_err:
            print(f"FileNotFoundError: {file_err}")
        except PermissionError as perm_err:
            print(f"PermissionError: {perm_err}")
        except Exception as err:
            print(f"Unexpected Error: {err}")
        finally:
            BashOperator(
                task_id=f'cleanup_{self.profile_name}',
                bash_command=f"rm -rf {config.TEMPORARY_FOLDER} && \
                               rm -rf {private_key_file_path}",
                dag=dag,
            ).execute(context={})

4. Complete Code:

Available in the provided location.

5. Conclusion:

This detailed code description provides a step-by-step explanation of how to create an Airflow DAG using Python for running Chef InSpec tests. By understanding each section, you can customize and adapt the code to suit your specific use case.

6. Hire technical testers from Qxf2

Qxf2 is staffed with highly technical testers. Our expertise and tool belt extends well beyond the standard “automation” engineers you see in the market. We work well with small engineering teams and early stage products. If you are struggling to find technical testers that gel well with your development team, get in touch!


One thought on “%1$s”

Leave a Reply

Your email address will not be published. Required fields are marked *