TLDR: Es ist möglich, DAGs mit nur einem Skript dynamisch zu erstellen. Bei der Ausführung der Tasks wird jedoch das ursprüngliche DAG-Skript noch einmal geparset. Dies führt zu unnötigen Parsing-Iterationen von DAGs, die nicht zum aktuellen Task zugehörig sind.
Beobachtung:
Wir haben ein Skript, welches dynamisch DAGs und enthaltene Tasks erstellt:
dyn_dags = ['dag_load_hungsblog', 'dag_backup_hungsblog']
for dag_id in dyn_dags:
# retrieve specifc task from a database
task_list = get_tasks_for_specific_dag(dag_id)
# create DAG
dag = DAG(dag_id)
with dag:
for taks in task_list:
do_the_task(task)
# create task dependencies
# pushing the dag to the globals variable, which Airflow retrieves to get information on existing dags
globals()[dag_id] = dag
Um DAGs dynamisch zu generieren, liegen die zugehörigen Task-Beschreibungen in einer Datenbank bereit. In einem Skript wird dynamisch die Datenbank für jedes zu erstellende DAG abgefragt und die zugehörigen Tasks abgerufen. Wir haben festgestellt, dass mit dieser Methode der “Fill DagBag”-Prozess bei Ausführung jedes Tasks mehrere Sekunden dauert.
Erforderliche Airflow Hintergrundinformationen:
Anscheinend wird ein Airflow Skript immer zwei mal geparset:
- Der Scheduler analysiert das Skript in einer Endlosschleife (airflow doc). Und erstellt dadurch den entsprechenden Graphen in der Benutzeroberfläche.
- Zur Ausführungszeit wird das Python-Skript, welches die DAG-Definition enthält, noch einmal als Ganzes geparst (airflow doc)
“Dynamically generating dags can cause performance issues” (astronomer)
Auflösung:
Die erhöhte Verarbeitungszeit ist darauf zurückzuführen, dass jeder Task des Python-Skripts zur Taskausführungszeit noch einmal geparst wird. Bei genauer Betrachtung des oben aufgeführten Skriptes wird Folgendes klar. Angenommen, wir führen einen Task innerhalb des DAGs dag_load_hungsblog aus, dann würde dieser Task das gesamte Python-Skript noch einmal parsen. Dadurch, würde die Liste der auszuführenden Tasks für jeden DAG noch einmal von der Datenbank abgerufen werden, daher sowohl für dag_load_hungsblog als auch für dag_backup_hungsblog. Der letztere Abruf is allerdings unnötiger Overhead.
Um zu verhindern, dass Tasks Code ausführen, der ursprünglich zu einem “fremden übergeordneten” DAG gehört, wurde in Airflow 2.4 ein neues experimentelles Feature veröffentlicht. get_parsing_context()
gibt die aktuelle dagid und taskID zurück, die ausgeführt werden sollen.
However, task execution requires only a single DAG object to execute a task. Knowing this, we can skip the generation of unnecessary DAG objects when a task is executed, shortening the parsing time.
Airflow
dyn_dags = ['dag_load_hungsblog', 'dag_backup_hungsblog']
current_dag_id = get_parsing_context().dag_id
for dag_id in dyn_dags:
if current_dag_id is not None and current_dag_id != dag_id:
continue
# do the tasks as above
[..]
Zunächst wird die aktuelle dagID abgefragt und current_dag_id zugewiesen. Wenn wir uns in einem DAG-Lauf befinden, wird ein entsprechender Wert zurückgegeben, aber wenn nur der Scheduler dieses Skript parst, dann wird None
zurückgegeben (airflow doc). Dann wird für jedes dynamisch erstellte DAG geprüft, ob wir uns tatsächlich in einem DAG-Lauf befinden und ob der aktuelle DAG-Lauf derjenige ist in dem sich der auszuführende Task befindet. Falls,wir uns in einem fremden DAG befinden, können wir daher alle weiterführenden Codezeilen überspringen und führen somit keine unnötigen Datenbankaufrufe ab.
Mit diesem experimentellen Feature konnten wir das Parsen des Skripts auf die notwendigen Komponenten beschränken und so die Ausführungszeit der Tasks erheblich reduzieren.