In this blog, I will show how to build a Custom Operator in Airflow to make calls to external APIs. As we do this, we will see how to use secrets in Airflow, make tasks communicate with each other and interpret the output of an SSH Operator. Although this blog refers to posting messages to Skype, all it does is hit an HTTP endpoint, i.e makes an external API call. We can use similar approach for any other HTTP calls as well. This blog assumes basic knowledge of using Airflow DAGs.
Context
I helped set up Airflow at Qxf2. Along with my colleagues, I have written a few scripts that implemented Delta Lake architecture. These scripts perform some ETL and write refined data to Delta Lake tables. Airflow fits in well here to schedule and monitor these workflows. As I worked on building upon these DAGs, one of the functionalities I wanted to add was to send the output of tasks as notifications. At Qxf2, the main mode of communication is Skype. To make it easy to programmatically post messages to Skype, we had put a ‘Skype Sender’ web service. So I set out to write a small Python snippet that will make a post call to the HTTPS endpoint exposed by the web service. By creating a Custom Operator in Airflow (out of my Skype sender code), I can make this available to be used by my colleagues in their DAGs.
Overview
I will first show how to put up the Custom Operator. And then we will look at how to use it in one of the DAGs I had set up. We will do the following:
1. Create Custom Operator
2. Setup Airflow Variables
3. A quick look at DAG
4. Add task to use Custom Operator in DAG
5. Use XComs to communicate between tasks
6. Update tasks to use XComs
7. Upgrade SkypeOperator to render templated value
8. Decode XCom value returned from SSHOperator
1. Create Custom Operator
Writing a Custom Operator is fairly straightforward. All we have to do is to extend the BaseOperator and override its Constructor and Executor methods in the derived class.
In the constructor, we will define the parameters needed for our SkypeOperator. In our case, we require two:
* message – that we want to send to Skype
* channel – the Skype channel or individual ID that we want to send the message to
from airflow.models.baseoperator import BaseOperator class SkypeOperator(BaseOperator): def __init__(self, message: str, channel: str, **kwargs) -> None: super().__init__(**kwargs) self.message = message self.channel = channel |
Next, in the execute method, we will define the functionality we want to perform. Here, I will create a method called post_message_on_skype where we can make a post call to the Skype Sender API using the requests module. The Skype Sender requires an API_KEY. Along with message and channel, they constitute the payload.
Notice that for configuration information like API_KEY and skype_sender_url, I am storing them as Airflow’s Variables. In the next section, I will explain briefly how to create and use them.
def execute(self, context): try: self.post_message_on_skype(self.message, self.channel) except Exception as err: raise Exception(f"Unable to post message to Skype due to: {err}") def post_message_on_skype(self, message, channel): """ Posts a message on the set Skype channel """ try: headers = {"Content-Type": "application/json"} payload = { "msg": message, "channel": channel, "API_KEY": Variable.get("skype_api_key_secret"), } response = requests.post( url=Variable.get("skype_sender_url"), json=payload, headers=headers, ) response.raise_for_status() print("Successfully sent the Skype message - {message}") except Exception as err: raise Exception(f"Unable to post message to Skype channel, due to {err}") |
2. Setup Airflow Variables
In Airflow, Variables are simple key/value pairs stored in its metastore DB. They are Airflow’s runtime configuration concept. They can be created using UI, code or CLI. Here, I used the UI. To create a Variable, navigate to Admin > Variables. Use the plus icon to add a new Variable.
Airflow will mask the value of a Variable if the name contains any words in (‘access_token’,’api_key’,’apikey’,’authorization’,’passphrase’, ‘passwd’, ‘password’, ‘private_key’, ‘secret’, ‘token’). By default, Variables are stored as plain text within the metadata database unless encryption is enabled. Airflow uses Fernet to encrypt passwords for connection and variable configurations. This involves creating a Fenert key and placing it in airflow.cfg. After this, Airflow will then start encrypting all the variables stored.
-
i) Install crypto package if not already available
pip install cryptography |
ii) Generate Fernet key
from cryptography.fernet import Fernet fernet_key = Fernet.generate_key() print(fernet_key.decode()) |
iii) In airflow.cfg, place the key generated in the option fernet_key of section [core]
iv) Restart webserver for changes to take effect
v) For existing variables (defined before creating Fernet key), open each variable in the connection admin UI and re-type the password and save the change.
vi) To access the variables in our code, we need to use them as:
Variable.get("<variable_name>") |
Note that although Fernet encryption ensures the content of variable are encrypted and cannot be read without the key, it does not stop the variable from being changed through the Airflow UI. To prevent such unauthorized changes, it is important to restrict access to the UI by using Airflow’s Role-Based Access Control system. Having said that, there might be other ways to do this too, I am providing details of what I tried and worked for me.
So, coming back to our Custom Operator, let us see how to use what we built in a DAG.
3. A quick look at DAG
I have a DAG where one of the tasks reads data of a CSV file. Now, whenever I run this DAG, I would like the output to be sent as a Skype message. The CSV is present on an AWS EC2 instance(that houses the delta lake architecture). It helps view the output of a delta lake table (after it has been loaded). So, to read the CSV file, I will use Airflow’s SSH Operator to connect to the machine and run a cat command. Most of the time, the data of the CSV is not big and hence this design works for me. The task will look like below:
fetch_gold_table_data = SSHOperator( task_id='fetch_gold_table_data', ssh_conn_id=None, command="cat /<path_of_table>/gold_table.csv", ssh_hook=sshHook, dag=dag ) |
Note that I have just shared the particular task of the DAG here. At the end of the blog, I will provide the entire code of the DAG.
4. Add task to use Custom Operator in DAG
Next, I will add another task to my DAG where I will use the Custom Operator I created earlier. It is similar to how we use any other operator. First, we will make the necessary import. And then while using the Operator in a task, pass the arguments that are required. In this case, it will be message and channel. To begin with, I am going to try with a simple message. For the channel, I will use a test channel that we had setup at Qxf2 for testing purposes.
from skype_operator import SkypeOperator send_skype_message = SkypeOperator( task_id='send_skype_message', message='Airflow is very useful', channel=Variable.get("test_channel_secret"), dag=dag ) |
After running the DAG, I was able to get the message to the Skype Channel. The send_skype_message task log indicates that it successfully sent the message:
Now that we know our Custom operator is able to send messages, we will next try to send output of the csv file we spoke about earlier. However, that is in a different task. For that, we will need XComs.
5. Use XComs to communicate between tasks
To pass output of fetch_csv_data task to send_skype_message task we will use Airflow’s XComs (cross-communications). These are how tasks of a DAG can communicate with each other. An XCom is identified by a key. Basically, XComs are explicitly pushed and pulled to/from their storage using the xcom_push and xcom_pull methods on Task Instances (which represent the state of a task, each task run is instantiated into Task instance). However, there is a default behaviour that does not require an explicit push. Some operators auto-push their results into an XCom key called return_value. This happens when ‘do_xcom_push’ argument is set to True. And this is set by default for most of the operators. In our case, we defined the send_skype_message task. This can be done by using xcom_pull. Similar to xcom_push, if we do not provide a key, it will default to using the return_value. Hence, we can just provide the id of the task we want the output of, like so:
task_instance.xcom_pull(task_ids="fetch_gold_table_data")
Now that we know how to use XComs, let’s use them in our task in the DAG.
6. Update tasks to use XComs
In Airflow, workflow parameterization is built in leveraging the Jinja templating engine. This lets us pass dynamic data to our DAGs at runtime. To use task instance’s xcom_pull, we will use placeholer {{ }} to indicate where the template engine should render the value.
{{ task_instance.xcom_pull(task_ids="fetch_gold_table_data") }}
We will use this in our task in the message argument of our SkypeOperator.
fetch_gold_table_data = SSHOperator( task_id='fetch_gold_table_data', ssh_conn_id=None, command="cat /<path_of_table>/gold_table.csv", ssh_hook=sshHook, dag=dag ) send_skype_message = SkypeOperator( task_id='send_skype_message', message='{{ task_instance.xcom_pull(task_ids="fetch_gold_table_data") }}', channel=Variable.get("test_channel_secret"), dag=dag ) fetch_gold_table_data >> send_skype_message |
When I run the DAG after making the change, I see that message has come to the provided Skype Channel, but is a templated value. Here is the log of the send_skype_message task:
We would need to render the template to get the actual value. Let’s do that next.
7. Update SkypeOperator to render the templated value
We will update our Custom Operator to render the templated values that get sent via XComs. One way is to check explicitly if the value is a XCom and then render it. To do that, I will add a flag which can be set in the task stating it is sending a XCom value.
First, I will update the __init__ method to take an argument called ‘is_xcom_push’ (our flag) and set it to False by default.
def __init__(self, message: str, channel: str, is_xcom_push:bool = False, **kwargs) -> None: super().__init__(**kwargs) self.message = message self.channel = channel self.is_xcom_push = is_xcom_push |
Next, I will add a check to see if the flag is set to True, in which case will need to render the templated value.
if self.is_xcom_push: self.message = self.render_template(self.message, context=context) |
The final code will be:
class SkypeOperator(BaseOperator): def __init__(self, message: str, channel: str, is_xcom_push:bool = False, **kwargs) -> None: super().__init__(**kwargs) self.message = message self.channel = channel self.is_xcom_push = is_xcom_push def execute(self, context): try: if self.is_xcom_push: self.message = self.render_template(self.message, context=context) self.post_message_on_skype(self.message, self.channel) except Exception as err: raise Exception(f"Unable to post message to Skype due to: {err}") def post_message_on_skype(self, message, channel): """ Posts a message on the set Skype channel """ try: headers = {"Content-Type": "application/json"} payload = { "msg": message, "channel": channel, "API_KEY": Variable.get("skype_api_key_secret"), } response = requests.post( url=Variable.get("skype_sender_url"), json=payload, headers=headers, ) response.raise_for_status() print("Successfully sent the Skype message - {message}") except Exception as err: raise Exception(f"Unable to post message to Skype channel, due to {err}") |
Now that our Custom Operator handles templated values, let’s test it again. After running the DAG, we can see the following output in the log of the send_skype_message task.
Although the templated value gets rendered, we see that the actual value is still not what we expected! The value is base64 encoded. The return value of SSHOperator is encoded with base64 and then pushed to XCom. So, we would need to decode the value to be able to use it.
8. Decode XCom value returned from SSH Operator
Using Python’s base64, we will decode the value we get from SSHOperator. I will add a Python snippet to the DAG which will perform the decoding.
import base64 def decode_csv_data(**context): "Decode the output of the SSHOperator" # Pull the XCom value csv_data_base64 = context['ti'].xcom_pull(task_ids='fetch_gold_table_data') # Decode the base64 encoded value csv_data = base64.b64decode(csv_data_base64).decode('utf-8') return csv_data decode_csv_data = PythonOperator( task_id='decode_csv_data', python_callable=decode_csv_data, provide_context=True, dag=dag ) |
Next, we will update the send_skype_message task to use the output of the decode_csv_data task that we have put up above. Note that for both PythonOperator and SSHOperator I did not have to explicitly add any argument to push the XCom value. The do_xcom_push is set to True by default for these operators.
The final code of the DAG will look like below:
import base64 from airflow import DAG from airflow.providers.ssh.operators.ssh import SSHOperator from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.operators.python import PythonOperator from skype_operator import SkypeOperator from airflow.models import Variable dag = DAG( dag_id='sravanti_test_dag', start_date= datetime(2023,3,21), schedule=None, description='Connect to EC2 instance and run script to fetch cards of gold table' ) sshHook = SSHHook( remote_host='', username='', key_file='', cmd_timeout=300 ) fetch_gold_table_data = SSHOperator( task_id='fetch_gold_table_data', ssh_conn_id=None, command="cat /<path_of_table>/gold_table.csv", ssh_hook=sshHook, dag=dag ) def decode_csv_data(**context): "Decode the output of the SSHOperator" # Pull the XCom value csv_data_base64 = context['ti'].xcom_pull(task_ids='fetch_gold_table_data') # Decode the base64 encoded value csv_data = base64.b64decode(csv_data_base64).decode('utf-8') return csv_data decode_csv_data = PythonOperator( task_id='decode_csv_data', python_callable=decode_csv_data, provide_context=True, dag=dag ) send_skype_message = SkypeOperator( task_id='send_skype_message', message='{{ task_instance.xcom_pull(task_ids="decode_csv_data") }}', channel=Variable.get("test_channel_secret"), is_xcom_push=True, dag=dag ) fetch_gold_table_data >> decode_csv_data >> send_skype_message |
Time to run the DAG once more. Here is the log of the send_skype_message task of the updated DAG:
We are finally able to get the contents of the csv file as a Skype message!
By following the above steps, I was able to put up a Custom Operator which can send Skype notifications. Hopefully, the blog is useful to you as well. Thanks for reading!
Hire Qxf2 for testing your data pipelines
Is your data engineering team constantly firefighting? You can hire testers from Qxf2 to help. Our testing experts do more than just traditional test automation and quality checks. We can help your teams deliver better quality data and think better about testing. You can get in touch with us over here.
I have been in the IT industry from 9 years. I worked as a curriculum validation engineer at Oracle for the past 5 years validating various courses on products developed by them. Before Oracle, I worked at TCS as a Manual tester. I like testing – its diverse, challenging, and satisfying in the sense that we can help improve the quality of software and provide better user experience. I also wanted to try my hand at writing and got an opportunity at Qxf2 as a Content Writer before transitioning to a full time QA Engineer role. I love doing DIY crafts, reading books and spending time with my daughter.