Tuesday, July 15, 2025

thumbnail

Triggering Cloud Run Jobs from Composer DAGs

 To trigger Cloud Run jobs from Apache Airflow (Composer) DAGs, you can use Google Cloud's CloudRunJobRunOperator (available via the apache-airflow-providers-google provider) or invoke them via the REST API using an HttpOperator or a Python function. Below are some common approaches:


✅ Option 1: Using CloudRunJobRunOperator (Recommended if using Airflow 2+)

Requirements

Airflow version >= 2.3


apache-airflow-providers-google >= 8.1.0


Composer environment with the correct permissions (roles/run.invoker, roles/run.viewer, roles/iam.serviceAccountUser)


Example DAG Snippet

python

Copy

Edit

from airflow import DAG

from airflow.providers.google.cloud.operators.cloud_run import CloudRunJobRunOperator

from airflow.utils.dates import days_ago


with DAG(

    dag_id='trigger_cloud_run_job',

    schedule_interval=None,

    start_date=days_ago(1),

    catchup=False,

) as dag:


    trigger_job = CloudRunJobRunOperator(

        task_id='run_cloud_run_job',

        project_id='your-gcp-project-id',

        region='us-central1',

        job_name='your-cloud-run-job-name',

    )

✅ Option 2: Using Python + Cloud Run REST API (for older versions or more control)

Example using PythonOperator:

python

Copy

Edit

from airflow import DAG

from airflow.operators.python import PythonOperator

from google.auth.transport.requests import Request

from google.oauth2 import id_token

import requests

from datetime import datetime


def trigger_cloud_run_job(**kwargs):

    service_url = "https://us-central1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/your-project/jobs/your-job-name:run"

    audience = "https://us-central1-run.googleapis.com"


    # Get ID token with proper audience

    auth_req = Request()

    target_audience = audience

    token = id_token.fetch_id_token(auth_req, target_audience)


    headers = {

        "Authorization": f"Bearer {token}",

        "Content-Type": "application/json"

    }


    response = requests.post(service_url, headers=headers)

    response.raise_for_status()  # Raise error if the request failed


with DAG(

    dag_id="trigger_cloud_run_rest",

    start_date=datetime(2023, 1, 1),

    schedule_interval=None,

    catchup=False,

) as dag:


    trigger = PythonOperator(

        task_id="trigger_cloud_run_job",

        python_callable=trigger_cloud_run_job,

    )

✅ Option 3: Using HttpOperator (if using Cloud Run HTTP endpoints)

If you deployed a Cloud Run service (not a Cloud Run Job), you can invoke it like this:


python

Copy

Edit

from airflow import DAG

from airflow.providers.http.operators.http import SimpleHttpOperator

from airflow.utils.dates import days_ago


with DAG(

    'trigger_cloud_run_service',

    start_date=days_ago(1),

    schedule_interval=None,

    catchup=False,

) as dag:


    call_service = SimpleHttpOperator(

        task_id='call_run_service',

        http_conn_id='cloud_run_conn',  # Set this up in Airflow Connections

        endpoint='/',

        method='POST',

        headers={"Content-Type": "application/json"},

        data='{"key": "value"}',

    )

๐Ÿ” IAM Permissions Needed

Make sure the Composer service account has:


roles/run.invoker on the Cloud Run Job


roles/iam.serviceAccountUser if the job is using a custom service account


roles/composer.worker (usually already set)

Learn Google Cloud Data Engineering Course

Read More

Cloud Composer - Cross-Service Integration

Creating Version-Controlled File Systems in Cloud Storage

Cloud Storage as a Staging Area for Enterprise ETL Pipelines

Monitoring File Access Logs with Cloud Logging and Cloud Storage

Visit Our Quality Thought Training in Hyderabad

Get Directions 


Subscribe by Email

Follow Updates Articles from This Blog via Email

No Comments

About

Search This Blog

Powered by Blogger.

Blog Archive