🧩 1. What Is a “Dynamic DAG”?
A dynamic DAG (Directed Acyclic Graph) is one that is programmatically generated at runtime, rather than statically defined in code.
In tools like Airflow, this means your dags/ folder contains Python files that create multiple DAGs based on:
Metadata (like configurations from a database or YAML file),
Environment variables (for deployment-specific customization),
Or even API calls.
✅ Example use cases:
A separate DAG per data source in a multi-tenant pipeline.
A separate DAG per environment (dev, staging, prod).
DAGs automatically updating as configurations change.
⚙️ 2. Common Data Sources for Dynamic DAG Generation
Source Type Example Purpose
Environment variables os.getenv("DATA_SOURCE") Control what DAGs load in a specific environment
Metadata DB / Config table SELECT * FROM pipelines Store definitions for each workflow
YAML / JSON files pipelines_config.yaml Keep DAG definitions version-controlled
API endpoints requests.get(".../configs") Dynamically fetch workflow metadata
🧠 3. Airflow Example: Building DAGs from Metadata + Env Vars
Here’s a concrete example using Apache Airflow.
🔹 Suppose you have a metadata table or config:
# metadata.json
[
{"dag_id": "process_sales_data", "schedule": "@daily", "dataset": "sales"},
{"dag_id": "process_inventory_data", "schedule": "0 6 * * *", "dataset": "inventory"}
]
🔹 Airflow DAG factory pattern:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import json, os
# Load metadata (could also come from a DB)
with open("/opt/airflow/dags/metadata.json") as f:
pipeline_configs = json.load(f)
# Optional: environment-based filtering
ENV = os.getenv("DEPLOYMENT_ENV", "dev")
def process_dataset(dataset_name):
print(f"Processing dataset: {dataset_name}")
# Generate a DAG for each metadata entry
for config in pipeline_configs:
dag_id = f"{config['dag_id']}_{ENV}"
default_args = {"start_date": datetime(2025, 1, 1)}
with DAG(
dag_id=dag_id,
schedule=config["schedule"],
default_args=default_args,
catchup=False,
) as dag:
task = PythonOperator(
task_id=f"process_{config['dataset']}",
python_callable=process_dataset,
op_args=[config["dataset"]],
)
globals()[dag_id] = dag # Register DAG dynamically
✅ What happens:
When Airflow parses the file, it creates multiple DAGs (one per config entry).
The DAG IDs include the environment (like _dev or _prod).
You can control which DAGs appear by setting environment variables.
🧮 4. Using Environment Variables for Configuration
Environment variables can control:
Which DAGs to load
Connections or secrets
Branching behavior within DAGs
Example:
if os.getenv("LOAD_INVENTORY") == "false":
pipeline_configs = [c for c in pipeline_configs if "inventory" not in c["dag_id"]]
This allows you to have one codebase but different deployments (e.g., dev/test/prod) behave differently.
🧰 5. Dynamic DAGs in Other Frameworks
Prefect 2+
Prefect uses Flows instead of DAGs, but similar concepts apply.
You can dynamically create and register flows at runtime based on configuration:
from prefect import flow
import os, json
@flow
def process_flow(name):
print(f"Running flow for {name}")
configs = json.loads(os.getenv("PIPELINE_CONFIGS", '["sales","inventory"]'))
for name in configs:
process_flow.with_options(name=f"{name}_flow")()
Dagster
Dagster supports dynamic job generation using Definitions and load_assets_from_modules, which can read metadata dynamically.
🧩 6. Best Practices
✅ 1. Keep generation logic simple.
Only generate DAG structure — don’t run complex logic at parse time.
✅ 2. Cache or memoize metadata reads.
Avoid hitting APIs or databases on every DAG parse — Airflow may parse files every few seconds.
✅ 3. Include DAG naming conventions.
Dynamic naming can make it hard to track DAGs; include source/environment tags.
✅ 4. Version-control your metadata.
Use config files or versioned tables to ensure reproducibility.
✅ 5. Document the generation logic.
So other engineers understand how and why DAGs are created.
🧭 Summary
Concept Purpose
Metadata-driven DAGs Auto-create DAGs from config data
Environment variables Control deployment behavior dynamically
Factory pattern Generate and register multiple DAGs in a single Python file
Outcome Scalable, maintainable, environment-specific workflow orchestration
Learn GCP Training in Hyderabad
Read More
Airflow Plugins for Custom GCP Operators in Composer
Scheduling Notebooks and Reports with Cloud Composer
How to Handle DAG Failures Gracefully in Cloud Composer
CI/CD for Data Pipelines Using Cloud Composer and GitHub Actions
Visit Our Quality Thought Training Institute in Hyderabad
Subscribe by Email
Follow Updates Articles from This Blog via Email
No Comments