Airflow Dynamic Generation for Tasks

Newt Tan
5 min readJun 21, 2020

--

airflow usage

During the project at the company, I met a problem about how to dynamically generate the tasks in a dag and how to build a connection with different dags. In fact, if we split the two problems:

  • 1. Solve the dependencies within one dag
  • 2. Solve the dependencies between several dags

Another main problem is about the usage of ExternalTaskSensor:

  • 3. Why does one ExternalTask get stuck and not work sometimes?

The fourth problem is about execution time. If you are careful enough, you will find the UTC timezone is default and you can not change it in airflow.cfg:

  • 4. How do we find a substitute method for the default timezone?

I think these questions are the problems that airflow developers often meet in industrial activities. So here is the aim of this article to help airflow developers handle those tricky questions.

In the first place, I had many choices. For the operator, I could choose the PythonOperator, BaseOperator or just BashOperator. For the number of tasks, I can use Variables to specify or use other kinds of ways. For the dependencies, I can choose TriggerDagRunOperator, Xcom or SubDag.

I almost tried all of them and found there is always the simplest way to handle these problems. Maybe not the best solution, but it must be one of the best solutions.

Here my choices are:

BaseOperator + DummyOperator + Plugins + Xcom + For loop + ExternalTaskSensor

1. DummyOperator Usage

DummyOpeator can be used to group tasks in a DAG. In order to structure different tasks into one nice workflow, I used the DummyOperator to connect them. They won’t be executed by the executor. After introducing those two tasks, you will see there is a common start task and a common end task to connect all middle parallel tasks.

start_task = DummyOperator(task_id=’start_task’,dag=dag)end_task = DummyOperator(task_id = ‘end_task’,dag = dag)

For the dynamic tasks, the basic structure would be like:

'''
def Dynamic_Function(variable):
task_1 = Function1( task_id = 'task_{}'.format(variable), dag = dag,
...
) return task_1'''for variable in variables:
task_1 = Dynamic_Function(varible)

For the variables, you can read it from the environment variables or just set it as a list:

# the python way to read environment values from .env file:

os.getenv(‘variables’).split(‘’)

This method is not that complex, but it is quite useful when there are multiple tasks sharing the same processing logic and there is only one difference of variable in them. It can help to scale the project easily.

2. Plugin Operator and BaseOperator

For the Function1, it is defined in a customized way in plugins/operators, you can find the detailed information on this link, the important parts would be:

from airflow.plugins_manager import AirflowPlugin 
from airflow.utils.decorators import apply_defaults
class MyFirstOperator(BaseOperator):
@apply_defaults
def __init__(self, my_operator_param, *args, **kwargs):
self.operator_param = my_operator_param
super(MyFirstOperator, self).__init__(*args, **kwargs)
def execute(self, context):
...
class MyFirstPlugin(AirflowPlugin): name = "my_first_plugin"
operators = [MyFirstOperator]

I use it for the reason that I do not need to put all my code in the dag. Otherwise, the dag code would be extremely redundant and hard to manage.

I use BaseOperator instead of PythonOperator because of the simplicity. The PythonOperator is more complex to control and needs to set more unnecessary parameters.

With the above two solutions, the dynamic tasks can be easily built in one dag now. The following solutions are more for the connection and concurrency problems I met during a project.

3. Xcom & ExternalTaskSensor

How to save the result for the next task? How to get the result from the last task and how to make sure the result is within the right time interval? Airflow provides powerful solutions for those problems with Xcom and ExternalTaskSensor.

To save the result from the current task, Xcom is used for this requirement. It is a bit similar to git. To use it, xcom_push and xcom_pull are the main functions needed. But there is a limitation for the size, which is 48KB. Normally, you do not need to worry about the size, but trying to save the middle variable value in xcom while not big files.

If you want to extract the result obtained from the previous dag with a specified task, more importantly, the extraction process is independent, you should use the ExternalTaskSensor with the following setting:

for variable in variables:
...
# create the task to depend on the up_stream dag external_sensor = ExternalTaskSensor( task_id='ext_sensor_task', external_dag_id='xxx', external_task_id='xxx_{}'.format(variable), timeout = 300, dag=dag, )...

I have to stress here, you should not use end_task in the previous dag if you do not want all tasks are finished in the previous day then go through the next dag.

For Xcom usage, please find the official document for instructions.

4. Execution Time

Execution time is kind of drakback in airflow in version 1.x. I have not tested the 2.x. In verison 1.x, it does not help to change the timezone in airflow.cfg. But you can use the specified way to solve the problem. The pendulum library is a really great option.

import pendulum# get the format date stringcurrent_date = pendulum.datetime.now().strftime("%Y, %m, %d, %H")dag = DAG(   dag_id = dag_id,   # get the datetime type value   start_date = pendulum.strptime(current_date, "%Y, %m, %d, %H").astimezone('Europe/London').subtract(hours=1),   default_args = default_args,   schedule_interval = timedelta(hours=1),)

With this setting, you can introduce a trial task before the current time and you can make sure the time is the same as your local timezone.

5. ExternalTaskSensor Stuck Problem

Make sure the two interactive dags will have the same execution time or same schedule_interval. Never manually trigger the dag in WebUI if the result will be sent to the next dag. It won’t work in this way.

This is very brief description of my solutions for all tricky problems. If you have any other problems, let me know. I will do you a favour.

Thanks for reading.

References:

--

--

Newt Tan
Newt Tan

Written by Newt Tan

In the end, the inventor is still the hero and always will be. Don’t give up on your dreams. We started with DVDs.

No responses yet