Using Airflow to start and stop EC2 instances

In this post we will show you how to use Airflow to start and stop EC2 instances. Airflow is a popular open-source platform that engineering teams use to manage workflows. It uses a concept called Directed Acyclic Graphs (DAGs) which lets you chain multiple steps into a workflow. Airflow’s popularity is also partly due to an extensive library of operators. Operators allow you to use code others have written within your workflow.

1. Background

As the industry sees a rise in applications that are loosely coupled microservices, we notice that the amount of infrastructure that a tester has to interact with has increased too. Testing is evolving too. It is not uncommon for Qxf2 testers to have to start and stop instances automatically as part of some specific testing workflows. But most of our clients do not want to give us direct access to their PaaS provides (AWS, Azure. etc). Instead, what is more common is that one tester is given access and then they setup Airflow jobs for the remaining testers. That way, the testing team only needs access to Airflow.

Qxf2 has fully embraced Airflow as our preferred tool for automating a broad range of routine tasks.


2. Airflow Start & Stop EC2 Instances DAG:

In order to play along with this post, you need access to an Airflow instance and AWS credentials to access an EC2 instance. You will also need minimal knowledge of DAGs in Airflow so you can place the code in the right place. We will learn how to start an instance, stop it and also check the instance state in order to verify if the start/stop operation succeeded.

Note: For this post, we will pretend that we want to start and stop instances according to a schedule. We are doing this because we find auto-scheduled start/stop to be useful in many testing scenarios.

2.1) DAG to start EC2 instance:

To start the EC2 instance, we will use the EC2StartInstanceOperator. In your airflow/dags/ directory, create a Python file (e.g.: start_ec2_instance_dag.py) and perform the following steps.

2.1.1) Import all required packages:
import datetime
from airflow.models import DAG, Variable
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.ec2 import EC2StartInstanceOperator
from airflow.providers.amazon.aws.sensors.ec2 import EC2InstanceStateSensor
2.1.2) Read, set, and retrieve the Instance ID value from the environment:

We save the Instance ID value as an Environment Variable “START_INSTANCE_ID” and then set it in the DAG using variable.set(). To read the INSTANCE_ID value which was set in the above we used variable.get(). The environment variable has to exported in the system where Airflow is running.

instance_id = os.environ.get("START_INSTANCE_ID")
if instance_id is not None:
    Variable.set("START_INSTANCE_ID", instance_id)
else:
    print("INSTANCE_ID environment variable not found.")
# retrieve the Instance_ID
INSTANCE_ID = Variable.get("INSTANCE_ID")
2.1.3) Defining Start EC2 Instance DAG:

The code below defines a DAG for starting an EC2 instance. It sets various parameters such as the unique identifier (dag_id), the schedule for executing the DAG, the start date of the DAG, and tags for organizing the DAGs. The catchup flag ensures that any missed runs will be executed.

In this code, the DAG context is established, and the EC2StartInstanceOperator is used to start the instance. Specific parameters, such as the unique task ID, instance_id, and aws_conn_id (set to the default AWS profile), are configured. The region is also specified, and check_interval is used to verify if the EC2 instance has started successfully. Additionally, the code includes settings for retries in case of failures and the retry_delay period.

# Start the EC2 instance on every week day 'Monday' morning 7:30 AM. Change schedule value as per requirement.
with DAG(
    dag_id='Start_Newsletter_Staging_ec2_instance',
    schedule="30 7 * * 1",
    start_date=datetime.datetime(2023, 7, 31),
    tags=['example'],
    catchup=False,
) as dag:
    # [START of the program- ec2_start_instance operator]
    start_instance = EC2StartInstanceOperator(
        task_id="ec2_start_instance",
        instance_id=INSTANCE_ID,
        aws_conn_id="aws_default",
        region_name="us-east-1",
        check_interval=15,
        retries =  3,  # number of retries for the task
        retry_delay=datetime.timedelta(minutes=5)  # retry delay
    )
    # [END of the program- ec2_start_instance operator]
2.1.4) Monitoring EC2 Instance State:

The below code is to monitor the state of EC2 instance. To check the state of the instance we have used EC2InstanceStateSensor. Defined parameters are the unique identifier task_id, instance_id, target_status(this parameter specifies the sensor is waiting for the EC2 instance to reach), retries and retry_delay

 instance_state = EC2InstanceStateSensor(
        task_id="ec2_instance_state",
        instance_id=INSTANCE_ID,
        target_state="running",
        retries=3,  # number of retries for the task
        retry_delay=datetime.timedelta(minutes=5),
2.1.5) Sequence of Tasks:

Now chain the two tasks to start and then sense the state by adding the following line.

chain(start_instance, instance_state)

And that’s it. You now have a DAG to start an EC2 instance.

2.2) DAG to stop EC2 instance:

In this scenario, we need to utilize the EC2StopInstanceOperator to stop an EC2 instance that is in the “running” state. This step is similar to starting an EC2 instance. So we will not work through it step by step. Instead, the entire code is provided below. You can place the code in airflow/dags/stop_ec2_instance_dag.py

"""
Using Airflow operators to stop an EC2 instance
"""
import os
import datetime
from airflow.models import DAG, Variable
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.ec2 import EC2StopInstanceOperator
from airflow.providers.amazon.aws.sensors.ec2 import EC2InstanceStateSensor
 
#set the Instance ID variable. export the STOP_INSTANCE_ID
instance_id = os.environ.get("STOP_INSTANCE_ID")
if instance_id is not None:
    Variable.set("STOP_INSTANCE_ID", instance_id)
else:
    print("INSTANCE_ID environment variable not found.")
 
# Get INSTANCE_ID
INSTANCE_ID = Variable.get("STOP_INSTANCE_ID")
# Stop the Instance every week day 'Friday' midnight 12 AM. Change Scheduler as per requirement.
with DAG(
    dag_id='Stop_Newsletter_Staging_ec2_instance',
    schedule="00 12 * * 5",
    start_date=datetime.datetime(2023, 7, 31),
    tags=['example'],
    catchup=False,
) as dag:
 
    # [START of the program ec2_stop_instance operator]
    stop_instance = EC2StopInstanceOperator(
        task_id="ec2_stop_instance",
        instance_id=INSTANCE_ID,
        aws_conn_id="my_aws_connection",
        region_name="us-east-1",
        check_interval=15,
        retries =  3,  # number of retries for the task
        retry_delay=datetime.timedelta(minutes=5)
    )
    # [END of the program ec2_stop_instance operator]
 
    instance_state = EC2InstanceStateSensor(
        task_id="ec2_instance_state",
        instance_id=INSTANCE_ID,
        target_state="stopped",
    )
    # [END of program - ec2 instance state sensor]
    chain(stop_instance, instance_state)

3) Non-Default value for AWS connection: :

aws_conn_id is set to default value for AWS connection in the Start & Stop EC2 instance operation. To provide non-default value for AWS connection use the provided code in this gist.

4) Conclusion:

We hope this post helps a few testers. Qxf2 is finding that working with orchestration tools and knowing how to configure jobs and workflows in them helps testers integrate better with teams. These tools have the additional benefit of enabling the tester to independently execute tests that previously required help from developers and DevOps members.


Hire Qxf2 for your technical testing needs

Do you need technical software testers? We provide senior QA engineers who embed themselves into your team for short periods of time. We implement advanced testing tools and techniques to enable your development team to move faster. Our testers will also spend time coaching the team on better testing practices and implement examples. Once the team is self-reliant, we leave. Contact us to discuss your requirements and explore how we can support your testing needs


Leave a Reply

Your email address will not be published. Required fields are marked *