Tuesday, November 11, 2025

thumbnail

Building Dynamic DAGs Using Metadata and Environment Variables

 🧩 1. What Is a “Dynamic DAG”?


A dynamic DAG (Directed Acyclic Graph) is one that is programmatically generated at runtime, rather than statically defined in code.


In tools like Airflow, this means your dags/ folder contains Python files that create multiple DAGs based on:


Metadata (like configurations from a database or YAML file),


Environment variables (for deployment-specific customization),


Or even API calls.


✅ Example use cases:


A separate DAG per data source in a multi-tenant pipeline.


A separate DAG per environment (dev, staging, prod).


DAGs automatically updating as configurations change.


⚙️ 2. Common Data Sources for Dynamic DAG Generation

Source Type Example Purpose

Environment variables os.getenv("DATA_SOURCE") Control what DAGs load in a specific environment

Metadata DB / Config table SELECT * FROM pipelines Store definitions for each workflow

YAML / JSON files pipelines_config.yaml Keep DAG definitions version-controlled

API endpoints requests.get(".../configs") Dynamically fetch workflow metadata

🧠 3. Airflow Example: Building DAGs from Metadata + Env Vars


Here’s a concrete example using Apache Airflow.


🔹 Suppose you have a metadata table or config:

# metadata.json

[

  {"dag_id": "process_sales_data", "schedule": "@daily", "dataset": "sales"},

  {"dag_id": "process_inventory_data", "schedule": "0 6 * * *", "dataset": "inventory"}

]


🔹 Airflow DAG factory pattern:

from airflow import DAG

from airflow.operators.python import PythonOperator

from datetime import datetime

import json, os


# Load metadata (could also come from a DB)

with open("/opt/airflow/dags/metadata.json") as f:

    pipeline_configs = json.load(f)


# Optional: environment-based filtering

ENV = os.getenv("DEPLOYMENT_ENV", "dev")


def process_dataset(dataset_name):

    print(f"Processing dataset: {dataset_name}")


# Generate a DAG for each metadata entry

for config in pipeline_configs:

    dag_id = f"{config['dag_id']}_{ENV}"


    default_args = {"start_date": datetime(2025, 1, 1)}


    with DAG(

        dag_id=dag_id,

        schedule=config["schedule"],

        default_args=default_args,

        catchup=False,

    ) as dag:

        task = PythonOperator(

            task_id=f"process_{config['dataset']}",

            python_callable=process_dataset,

            op_args=[config["dataset"]],

        )


    globals()[dag_id] = dag  # Register DAG dynamically



✅ What happens:


When Airflow parses the file, it creates multiple DAGs (one per config entry).


The DAG IDs include the environment (like _dev or _prod).


You can control which DAGs appear by setting environment variables.


🧮 4. Using Environment Variables for Configuration


Environment variables can control:


Which DAGs to load


Connections or secrets


Branching behavior within DAGs


Example:


if os.getenv("LOAD_INVENTORY") == "false":

    pipeline_configs = [c for c in pipeline_configs if "inventory" not in c["dag_id"]]



This allows you to have one codebase but different deployments (e.g., dev/test/prod) behave differently.


🧰 5. Dynamic DAGs in Other Frameworks

Prefect 2+


Prefect uses Flows instead of DAGs, but similar concepts apply.

You can dynamically create and register flows at runtime based on configuration:


from prefect import flow

import os, json


@flow

def process_flow(name):

    print(f"Running flow for {name}")


configs = json.loads(os.getenv("PIPELINE_CONFIGS", '["sales","inventory"]'))


for name in configs:

    process_flow.with_options(name=f"{name}_flow")()


Dagster


Dagster supports dynamic job generation using Definitions and load_assets_from_modules, which can read metadata dynamically.


🧩 6. Best Practices


✅ 1. Keep generation logic simple.

Only generate DAG structure — don’t run complex logic at parse time.


✅ 2. Cache or memoize metadata reads.

Avoid hitting APIs or databases on every DAG parse — Airflow may parse files every few seconds.


✅ 3. Include DAG naming conventions.

Dynamic naming can make it hard to track DAGs; include source/environment tags.


✅ 4. Version-control your metadata.

Use config files or versioned tables to ensure reproducibility.


✅ 5. Document the generation logic.

So other engineers understand how and why DAGs are created.


🧭 Summary

Concept Purpose

Metadata-driven DAGs Auto-create DAGs from config data

Environment variables Control deployment behavior dynamically

Factory pattern Generate and register multiple DAGs in a single Python file

Outcome Scalable, maintainable, environment-specific workflow orchestration

Learn GCP Training in Hyderabad

Read More

Airflow Plugins for Custom GCP Operators in Composer

Scheduling Notebooks and Reports with Cloud Composer

How to Handle DAG Failures Gracefully in Cloud Composer

CI/CD for Data Pipelines Using Cloud Composer and GitHub Actions

Visit Our Quality Thought Training Institute in Hyderabad

Get Directions 

Subscribe by Email

Follow Updates Articles from This Blog via Email

No Comments

About

Search This Blog

Powered by Blogger.

Blog Archive