Building Custom Operator in Airflow for external API calls

In this blog, I will show how to build a Custom Operator in Airflow to make calls to external APIs. As we do this, we will see how to use secrets in Airflow, make tasks communicate with each other and interpret the output of an SSH Operator. Although this blog refers to posting messages to Skype, all it does is hit an HTTP endpoint, i.e makes an external API call. We can use similar approach for any other HTTP calls as well. This blog assumes basic knowledge of using Airflow DAGs.

Context

I helped set up Airflow at Qxf2. Along with my colleagues, I have written a few scripts that implemented Delta Lake architecture. These scripts perform some ETL and write refined data to Delta Lake tables. Airflow fits in well here to schedule and monitor these workflows. As I worked on building upon these DAGs, one of the functionalities I wanted to add was to send the output of tasks as notifications. At Qxf2, the main mode of communication is Skype. To make it easy to programmatically post messages to Skype, we had put a ‘Skype Sender’ web service. So I set out to write a small Python snippet that will make a post call to the HTTPS endpoint exposed by the web service. By creating a Custom Operator in Airflow (out of my Skype sender code), I can make this available to be used by my colleagues in their DAGs.


Overview

I will first show how to put up the Custom Operator. And then we will look at how to use it in one of the DAGs I had set up. We will do the following:

1. Create Custom Operator
2. Setup Airflow Variables
3. A quick look at DAG
4. Add task to use Custom Operator in DAG
5. Use XComs to communicate between tasks
6. Update tasks to use XComs
7. Upgrade SkypeOperator to render templated value
8. Decode XCom value returned from SSHOperator

This image shows an overview of the tasks of the DAG


1. Create Custom Operator

Writing a Custom Operator is fairly straightforward. All we have to do is to extend the BaseOperator and override its Constructor and Executor methods in the derived class.

In the constructor, we will define the parameters needed for our SkypeOperator. In our case, we require two:
* message – that we want to send to Skype
* channel – the Skype channel or individual ID that we want to send the message to

from airflow.models.baseoperator import BaseOperator
 
class SkypeOperator(BaseOperator):
    def __init__(self, message: str, channel: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.message = message
        self.channel = channel

Next, in the execute method, we will define the functionality we want to perform. Here, I will create a method called post_message_on_skype where we can make a post call to the Skype Sender API using the requests module. The Skype Sender requires an API_KEY. Along with message and channel, they constitute the payload.

Notice that for configuration information like API_KEY and skype_sender_url, I am storing them as Airflow’s Variables. In the next section, I will explain briefly how to create and use them.

def execute(self, context):
    try:
        self.post_message_on_skype(self.message, self.channel)
    except Exception as err:
        raise Exception(f"Unable to post message to Skype due to: {err}")
 
def post_message_on_skype(self, message, channel):
"""
Posts a message on the set Skype channel
"""
try:
    headers = {"Content-Type": "application/json"}
    payload = {
        "msg": message,
        "channel": channel,
        "API_KEY": Variable.get("skype_api_key_secret"),
    }
    response = requests.post(
        url=Variable.get("skype_sender_url"),
        json=payload,
        headers=headers,
    )
    response.raise_for_status()
    print("Successfully sent the Skype message - {message}")
except Exception as err:
    raise Exception(f"Unable to post message to Skype channel, due to {err}")

2. Setup Airflow Variables

In Airflow, Variables are simple key/value pairs stored in its metastore DB. They are Airflow’s runtime configuration concept. They can be created using UI, code or CLI. Here, I used the UI. To create a Variable, navigate to Admin > Variables. Use the plus icon to add a new Variable.

This image shows the way to create a new variable in Airflow using UI

Airflow will mask the value of a Variable if the name contains any words in (‘access_token’,’api_key’,’apikey’,’authorization’,’passphrase’, ‘passwd’, ‘password’, ‘private_key’, ‘secret’, ‘token’). By default, Variables are stored as plain text within the metadata database unless encryption is enabled. Airflow uses Fernet to encrypt passwords for connection and variable configurations. This involves creating a Fenert key and placing it in airflow.cfg. After this, Airflow will then start encrypting all the variables stored.

    i) Install crypto package if not already available

    pip install cryptography

    ii) Generate Fernet key

    from cryptography.fernet import Fernet
     
    fernet_key = Fernet.generate_key()
    print(fernet_key.decode())

    iii) In airflow.cfg, place the key generated in the option fernet_key of section [core]

    iv) Restart webserver for changes to take effect

    v) For existing variables (defined before creating Fernet key), open each variable in the connection admin UI and re-type the password and save the change.

    vi) To access the variables in our code, we need to use them as:

    Variable.get("<variable_name>")

Note that although Fernet encryption ensures the content of variable are encrypted and cannot be read without the key, it does not stop the variable from being changed through the Airflow UI. To prevent such unauthorized changes, it is important to restrict access to the UI by using Airflow’s Role-Based Access Control system. Having said that, there might be other ways to do this too, I am providing details of what I tried and worked for me.

So, coming back to our Custom Operator, let us see how to use what we built in a DAG.


3. A quick look at DAG

I have a DAG where one of the tasks reads data of a CSV file. Now, whenever I run this DAG, I would like the output to be sent as a Skype message. The CSV is present on an AWS EC2 instance(that houses the delta lake architecture). It helps view the output of a delta lake table (after it has been loaded). So, to read the CSV file, I will use Airflow’s SSH Operator to connect to the machine and run a cat command. Most of the time, the data of the CSV is not big and hence this design works for me. The task will look like below:

fetch_gold_table_data = SSHOperator(
    task_id='fetch_gold_table_data',
    ssh_conn_id=None,
    command="cat /<path_of_table>/gold_table.csv",
    ssh_hook=sshHook,
    dag=dag
)

Note that I have just shared the particular task of the DAG here. At the end of the blog, I will provide the entire code of the DAG.


4. Add task to use Custom Operator in DAG

Next, I will add another task to my DAG where I will use the Custom Operator I created earlier. It is similar to how we use any other operator. First, we will make the necessary import. And then while using the Operator in a task, pass the arguments that are required. In this case, it will be message and channel. To begin with, I am going to try with a simple message. For the channel, I will use a test channel that we had setup at Qxf2 for testing purposes.

from skype_operator import SkypeOperator
 
send_skype_message = SkypeOperator(
    task_id='send_skype_message',
    message='Airflow is very useful',
    channel=Variable.get("test_channel_secret"),
    dag=dag
)

After running the DAG, I was able to get the message to the Skype Channel. The send_skype_message task log indicates that it successfully sent the message:

This image shows the log of send_skype_message task which shows simple message sent to Skype.

Now that we know our Custom operator is able to send messages, we will next try to send output of the csv file we spoke about earlier. However, that is in a different task. For that, we will need XComs.


5. Use XComs to communicate between tasks

To pass output of fetch_csv_data task to send_skype_message task we will use Airflow’s XComs (cross-communications). These are how tasks of a DAG can communicate with each other. An XCom is identified by a key. Basically, XComs are explicitly pushed and pulled to/from their storage using the xcom_push and xcom_pull methods on Task Instances (which represent the state of a task, each task run is instantiated into Task instance). However, there is a default behaviour that does not require an explicit push. Some operators auto-push their results into an XCom key called return_value. This happens when ‘do_xcom_push’ argument is set to True. And this is set by default for most of the operators. In our case, we defined the send_skype_message task. This can be done by using xcom_pull. Similar to xcom_push, if we do not provide a key, it will default to using the return_value. Hence, we can just provide the id of the task we want the output of, like so:

task_instance.xcom_pull(task_ids="fetch_gold_table_data")

Now that we know how to use XComs, let’s use them in our task in the DAG.


6. Update tasks to use XComs

In Airflow, workflow parameterization is built in leveraging the Jinja templating engine. This lets us pass dynamic data to our DAGs at runtime. To use task instance’s xcom_pull, we will use placeholer {{ }} to indicate where the template engine should render the value.

{{ task_instance.xcom_pull(task_ids="fetch_gold_table_data") }}

We will use this in our task in the message argument of our SkypeOperator.

fetch_gold_table_data = SSHOperator(
    task_id='fetch_gold_table_data',
    ssh_conn_id=None,
    command="cat /<path_of_table>/gold_table.csv",
    ssh_hook=sshHook,
    dag=dag
)
 
send_skype_message = SkypeOperator(
    task_id='send_skype_message',
    message='{{ task_instance.xcom_pull(task_ids="fetch_gold_table_data") }}',
    channel=Variable.get("test_channel_secret"),
    dag=dag
)
 
fetch_gold_table_data >> send_skype_message

When I run the DAG after making the change, I see that message has come to the provided Skype Channel, but is a templated value. Here is the log of the send_skype_message task:

This image shows the log of send_skype_message task which shows templated XCom value

We would need to render the template to get the actual value. Let’s do that next.


7. Update SkypeOperator to render the templated value

We will update our Custom Operator to render the templated values that get sent via XComs. One way is to check explicitly if the value is a XCom and then render it. To do that, I will add a flag which can be set in the task stating it is sending a XCom value.

First, I will update the __init__ method to take an argument called ‘is_xcom_push’ (our flag) and set it to False by default.

def __init__(self, message: str, channel: str, is_xcom_push:bool = False, **kwargs) -> None:
    super().__init__(**kwargs)
    self.message = message
    self.channel = channel
    self.is_xcom_push = is_xcom_push

Next, I will add a check to see if the flag is set to True, in which case will need to render the templated value.

if self.is_xcom_push:
    self.message = self.render_template(self.message, context=context)

The final code will be:

class SkypeOperator(BaseOperator):
    def __init__(self, message: str, channel: str, is_xcom_push:bool = False, **kwargs) -> None:
        super().__init__(**kwargs)
        self.message = message
        self.channel = channel
        self.is_xcom_push = is_xcom_push
 
    def execute(self, context):
        try:
            if self.is_xcom_push:
                self.message = self.render_template(self.message, context=context)     
            self.post_message_on_skype(self.message, self.channel)
        except Exception as err:
            raise Exception(f"Unable to post message to Skype due to: {err}")
 
    def post_message_on_skype(self, message, channel):
        """
        Posts a message on the set Skype channel
        """
        try:
            headers = {"Content-Type": "application/json"}
            payload = {
                "msg": message,
                "channel": channel,
                "API_KEY": Variable.get("skype_api_key_secret"),
            }
            response = requests.post(
                url=Variable.get("skype_sender_url"),
                json=payload,
                headers=headers,
            )
            response.raise_for_status()
            print("Successfully sent the Skype message - {message}")
        except Exception as err:
            raise Exception(f"Unable to post message to Skype channel, due to {err}")

Now that our Custom Operator handles templated values, let’s test it again. After running the DAG, we can see the following output in the log of the send_skype_message task.

This image shows the log of send_skype_message task which shows the XCom value of SSHOperator as base64 encoded.

Although the templated value gets rendered, we see that the actual value is still not what we expected! The value is base64 encoded. The return value of SSHOperator is encoded with base64 and then pushed to XCom. So, we would need to decode the value to be able to use it.


8. Decode XCom value returned from SSH Operator

Using Python’s base64, we will decode the value we get from SSHOperator. I will add a Python snippet to the DAG which will perform the decoding.

import base64
 
def decode_csv_data(**context):
    "Decode the output of the SSHOperator"
    # Pull the XCom value
    csv_data_base64 = context['ti'].xcom_pull(task_ids='fetch_gold_table_data')
    # Decode the base64 encoded value
    csv_data = base64.b64decode(csv_data_base64).decode('utf-8')
    return csv_data
 
decode_csv_data = PythonOperator(
    task_id='decode_csv_data',
    python_callable=decode_csv_data,
    provide_context=True,
    dag=dag
)

Next, we will update the send_skype_message task to use the output of the decode_csv_data task that we have put up above. Note that for both PythonOperator and SSHOperator I did not have to explicitly add any argument to push the XCom value. The do_xcom_push is set to True by default for these operators.

The final code of the DAG will look like below:

import base64
from airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.operators.python import PythonOperator
from skype_operator import SkypeOperator
from airflow.models import Variable
 
dag = DAG(
    dag_id='sravanti_test_dag',
    start_date= datetime(2023,3,21),
    schedule=None,
    description='Connect to EC2 instance and run script to fetch cards of gold table'
)
 
sshHook = SSHHook(
    remote_host='',
    username='',
    key_file='',
    cmd_timeout=300
)
 
fetch_gold_table_data = SSHOperator(
    task_id='fetch_gold_table_data',
    ssh_conn_id=None,
    command="cat /<path_of_table>/gold_table.csv",
    ssh_hook=sshHook,
    dag=dag
)
 
def decode_csv_data(**context):
    "Decode the output of the SSHOperator"
    # Pull the XCom value
    csv_data_base64 = context['ti'].xcom_pull(task_ids='fetch_gold_table_data')
    # Decode the base64 encoded value
    csv_data = base64.b64decode(csv_data_base64).decode('utf-8')
    return csv_data
 
decode_csv_data = PythonOperator(
    task_id='decode_csv_data',
    python_callable=decode_csv_data,
    provide_context=True,
    dag=dag
)
 
send_skype_message = SkypeOperator(
    task_id='send_skype_message',
    message='{{ task_instance.xcom_pull(task_ids="decode_csv_data") }}',
    channel=Variable.get("test_channel_secret"),
    is_xcom_push=True,
    dag=dag
)
 
fetch_gold_table_data >> decode_csv_data >> send_skype_message

Time to run the DAG once more. Here is the log of the send_skype_message task of the updated DAG:
This image shows the log of send_skype_message task which shows the csv output

We are finally able to get the contents of the csv file as a Skype message!

By following the above steps, I was able to put up a Custom Operator which can send Skype notifications. Hopefully, the blog is useful to you as well. Thanks for reading!


Hire Qxf2 for testing your data pipelines

Is your data engineering team constantly firefighting? You can hire testers from Qxf2 to help. Our testing experts do more than just traditional test automation and quality checks. We can help your teams deliver better quality data and think better about testing. You can get in touch with us over here.


Leave a Reply

Your email address will not be published. Required fields are marked *