๐ 1. Understand the Common Causes of DAG Failures
Failures in Cloud Composer DAGs typically come from:
Code errors: Python exceptions, syntax issues, or missing dependencies.
Task failures: Downstream operators failing (e.g., BigQuery job errors, API timeouts).
Resource limits: Out-of-memory errors, worker timeouts, or Airflow scheduler lag.
External dependencies: Network issues, unresponsive APIs, or invalid credentials.
Before implementing graceful handling, ensure logs are configured and accessible in Cloud Logging and Airflow UI → Logs.
๐งฉ 2. Implement Task-Level Retry and Error Handling
✅ Configure Retries
Use Airflow’s built-in retry mechanism:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['alerts@yourdomain.com'],
}
def my_task():
# your logic here
raise Exception("Simulated failure")
with DAG(
'example_dag',
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
) as dag:
task = PythonOperator(
task_id='resilient_task',
python_callable=my_task,
)
Tips:
Use retries and retry_exponential_backoff=True to add exponential delay between retries.
Keep retry count reasonable to prevent DAG backlog.
⚙️ 3. Use On-Failure Callbacks
Define custom callbacks to take action (e.g., send Slack alerts, trigger cleanup, or log metrics):
from airflow.utils.email import send_email
def failure_callback(context):
dag_run = context.get('dag_run')
task_instance = context.get('task_instance')
msg = f"DAG {dag_run.dag_id}, Task {task_instance.task_id} failed."
send_email(to=['alerts@yourdomain.com'], subject='Airflow Task Failed', html_content=msg)
Attach the callback:
task = PythonOperator(
task_id='resilient_task',
python_callable=my_task,
on_failure_callback=failure_callback,
)
๐ช 4. Use Branching and Conditional Logic
You can use the BranchPythonOperator to route execution paths based on task outcomes:
from airflow.operators.python import BranchPythonOperator, PythonOperator
def check_previous_result(**context):
if context['ti'].xcom_pull(task_ids='task_a') == 'success':
return 'task_b'
else:
return 'handle_failure'
BranchPythonOperator(
task_id='branch_logic',
python_callable=check_previous_result,
)
This allows you to gracefully skip downstream tasks or execute a compensating task when something fails.
๐ 5. Use the “Trigger Rule” to Manage Failures in Dependencies
Control downstream behavior with trigger_rule:
from airflow.operators.dummy import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
cleanup = DummyOperator(
task_id='cleanup_task',
trigger_rule=TriggerRule.ONE_FAILED # Run if any upstream fails
)
Common trigger rules:
all_success (default) – only runs if all upstream succeed.
one_failed – runs if any upstream task fails.
all_failed – runs if all upstream fail.
none_failed_min_one_success – flexible success criteria.
This ensures you can always perform recovery or notification even if a task fails.
๐ง 6. Use Sensor Timeouts and Soft Failures
For sensors or long-running dependencies:
from airflow.sensors.base import PokeReturnValue
from airflow.sensors.python import PythonSensor
def check_condition():
return False # condition not met
PythonSensor(
task_id='wait_for_condition',
python_callable=check_condition,
timeout=3600,
soft_fail=True, # Marks task as SKIPPED instead of FAILED
)
This prevents sensors from blocking the DAG indefinitely.
๐งฐ 7. Implement DAG-Level Failure Notifications
You can also define a DAG-level on_failure_callback:
with DAG(
'example_dag',
default_args=default_args,
on_failure_callback=failure_callback,
) as dag:
This ensures even unexpected failures trigger alerts.
☁️ 8. Use Cloud Composer-Specific Features
a. Cloud Monitoring Alerts
Composer automatically exports Airflow metrics (DAG/task failures, queue size) to Cloud Monitoring.
Create alerting policies on metrics like:
airflow_task_failures
airflow_dag_failures
airflow_dagrun_failures
b. Logging Integration
Logs go to Cloud Logging under:
/composer-environment-name/airflow-worker
/composer-environment-name/airflow-scheduler
You can create log-based alerts directly in Cloud Logging.
๐ 9. Add Idempotency and Checkpointing in Tasks
For tasks that modify external systems (e.g., load to BigQuery, GCS, or APIs):
Write tasks to be idempotent (safe to retry).
Use checkpoints or metadata tables to track processed data.
Validate state before performing writes.
✅ 10. Test and Simulate Failures
Use:
airflow test <dag_id> <task_id> <execution_date>
to simulate runs and ensure your retry logic, callbacks, and alerts behave correctly.
๐งพ Summary
Technique Purpose
retries / retry_delay Automatic retry of transient failures
on_failure_callback Custom alerting or cleanup logic
trigger_rule Control downstream behavior
soft_fail=True Graceful skip for sensors
Cloud Monitoring alerts Detect and respond to DAG failures globally
Idempotent task design Prevent duplicate work on retries
Learn GCP Training in Hyderabad
Read More
CI/CD for Data Pipelines Using Cloud Composer and GitHub Actions
Interfacing Composer with AWS S3 and Redshift
Using Cloud Composer to Schedule ML Pipeline Retraining
Automating Cloud Function Deployments with Composer
Visit Our Quality Thought Training Institute in Hyderabad
Subscribe by Email
Follow Updates Articles from This Blog via Email
No Comments