Monday, November 3, 2025

thumbnail

Triggering Cloud Run Jobs from Composer DAGs

 ✅ Why you might want to do this


Combining Cloud Composer (Airflow) with Cloud Run Jobs gives you:


Orchestration power: you can schedule complex workflows in Airflow, and have tasks that launch containerized jobs in Cloud Run.


Scalability & separation of concerns: long‑running, containerized tasks decoupled from the scheduler.


Cloud‑native workflow: leverage GCP managed services (Composer + Run) with less infra to manage.


So the goal: from your DAG in Composer, trigger a Cloud Run Job (or Cloud Run service) as one of your tasks.


๐Ÿ›  How to set it up


There are two main approaches. I’ll describe both and then give a concrete DAG snippet.


Approach 1: Use the dedicated CloudRun operators in Airflow


The provider package apache‑airflow‑providers‑google includes Cloud Run operators. 

Apache Airflow

+1


Key operator for launching a job:


CloudRunExecuteJobOperator — executes an existing Cloud Run Job and waits (by default) for it to finish. 

Apache Airflow

+1


Also there are operators to create/update/delete/list jobs.


DAG snippet example:


from airflow import DAG

from airflow.utils.dates import days_ago

from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator


with DAG(

    dag_id='trigger_cloud_run_job',

    start_date=days_ago(1),

    catchup=False,

    schedule_interval=None,  # manual or trigger‑based

) as dag:


    run_my_job = CloudRunExecuteJobOperator(

        task_id='run_cloud_run_job',

        project_id='my‑gcp‑project',

        region='us‑central1',

        job_name='my_cloud_run_job_name',

        overrides={

            "container_overrides": [

                {

                  "name": "job",

                  "args": ["--some‑param", "value"],

                  "env": [{"name": "MY_VAR", "value": "some_value"}]

                }

            ],

            "timeout": "300s",

            # you can specify other overrides

        }

    )



This will launch the Cloud Run Job named my_cloud_run_job_name and wait for completion unless configured otherwise.


Approach 2: Call the Cloud Run Job (or Service) via HTTP/REST from Airflow


If you already have a Cloud Run Service or Job and you prefer to use a more generic operator (e.g., HttpOperator or PythonOperator with the SDK), you can do that.


Steps for this approach:


Ensure the Composer environment’s service account (or the connection you use) has invoker permissions on the Cloud Run Job or Service (e.g., roles/run.invoker).


If using a Cloud Run Job, use the runJobs API or SDK to start it. If a Cloud Run Service, you may send a request to it.


Use Airflow in your DAG, e.g.,:


from airflow import DAG

from airflow.operators.http import SimpleHttpOperator


with DAG(...):

    trigger = SimpleHttpOperator(

        task_id='trigger_cr_service',

        method='POST',

        http_conn_id='my_cloud_run_conn',

        endpoint='/my‑endpoint',

        data={"param": "value"},

        headers={"Content-Type": "application/json"},

    )



Or use PythonOperator with the GCP SDK.


๐Ÿ” Which to choose?


If you’re working with a Cloud Run Job resource (rather than a persistent service) and you want to monitor execution, use the dedicated operator (Approach 1).


If you have a Cloud Run Service (HTTP endpoint) or prefer custom behaviour, Approach 2 is fine.


๐Ÿ“‹ End‑to‑end example (Operator approach)


Here’s a somewhat complete DAG example using Approach 1:


from airflow import DAG

from airflow.utils.dates import days_ago

from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator


default_args = {

    'owner': 'airflow',

    'retries': 1,

}


with DAG(

    dag_id='cloud_run_job_from_composer',

    default_args=default_args,

    schedule_interval='@daily',

    start_date=days_ago(2),

    catchup=False,

) as dag:


    launch_job = CloudRunExecuteJobOperator(

        task_id='launch_cloud_run_job',

        project_id='my‑project',

        region='us‑central1',

        job_name='my_batch_processing_job',

        overrides={

            "container_overrides": [

                {

                  "name": "job",

                  "args": ["--date", "{{ ds }}"],

                  "env": [

                    {"name": "ENV", "value": "prod"},

                    {"name": "RUN_DATE", "value": "{{ ds }}"},

                  ],

                }

            ],

            "timeout": "600s",

        },

        gcp_conn_id='google_cloud_default'

    )



When this DAG runs, it triggers the Cloud Run job, passing the Airflow date ({{ ds }}) into the job arguments, waits until completion, and then the DAG will mark this task as succeeded/failed depending on job outcome.


⚠️ Key considerations & pitfalls


Here are some important things to remember:


Permissions & IAM


The service account used by the Airflow task (Composer worker) must have permissions to invoke the Cloud Run Job (e.g., roles/run.invoker, maybe roles/run.viewer).


If the Cloud Run Job uses a service account, ensure that service account has rights to any GCP resources used.


If you’re running in a VPC/Private IP environment, make sure network connectivity from Composer to Cloud Run (or the job endpoint) is allowed. The docs note that “some networking configurations (Private IP & VPC‑SC) may not permit connectivity” in certain patterns. 

Google Cloud

+1


Version compatibility


Ensure your Composer environment uses an Airflow version that supports the operator you want. The provider package apache‑airflow‑providers‑google must include the Cloud Run operators. (Check versions)


The operator doc says: project_id, region, job_name, overrides. 

Apache Airflow

+1


Monitoring & Idempotency


Cloud Run Jobs are one‑time (or batch) runs; launching them repeatedly needs you to think about unique job names, or if you reuse the same job with different parameters.


Use DAG logic or Airflow XComs if you need to pass results/IDs from the Cloud Run job back into subsequent DAG tasks.


Consider setting timeout_seconds in the override to avoid infinite waiting.


Logging, error handling


Make sure your job emits logs that you can access (e.g., via Cloud Logging) so you can diagnose failures.


In Airflow, if the job fails or times out, the operator will fail the task (unless you customize).


If parallelism or resource quotas are tight in Cloud Run, you might hit job queueing delays.


Cost & quotas


Cloud Run Jobs incur cost for execution time, CPU, memory.


Ensure you’re aware of quotas (region, number of concurrent instances).


If triggering very frequently, you may want to batch or ensure your job is efficient.


Environment & dependencies


Your DAG may need the provider package installed in Composer environment. Make sure apache‑airflow‑providers‑google is up‑to‑date.


If your Cloud Run Job depends on certain environment variables or secrets, integrate them via overrides or Secret Manager.


๐Ÿ“Œ Summary


Use Cloud Composer + Airflow DAGs to orchestrate tasks; one of those tasks can trigger a Cloud Run Job.


The recommended way: use the CloudRunExecuteJobOperator from the Google provider.


Ensure correct IAM, networking, versioning, monitoring.


Write your DAG with overrides if you need to parameterize the job launch (e.g., passing the date, environment).


Monitor execution, logs, and handle failures gracefully.

Learn GCP Training in Hyderabad

Read More

Visit Our Quality Thought Training Institute in Hyderabad

Get Directions 

Subscribe by Email

Follow Updates Articles from This Blog via Email

No Comments

About

Search This Blog

Powered by Blogger.

Blog Archive