✅ Why you might want to do this
Combining Cloud Composer (Airflow) with Cloud Run Jobs gives you:
Orchestration power: you can schedule complex workflows in Airflow, and have tasks that launch containerized jobs in Cloud Run.
Scalability & separation of concerns: long‑running, containerized tasks decoupled from the scheduler.
Cloud‑native workflow: leverage GCP managed services (Composer + Run) with less infra to manage.
So the goal: from your DAG in Composer, trigger a Cloud Run Job (or Cloud Run service) as one of your tasks.
๐ How to set it up
There are two main approaches. I’ll describe both and then give a concrete DAG snippet.
Approach 1: Use the dedicated CloudRun operators in Airflow
The provider package apache‑airflow‑providers‑google includes Cloud Run operators.
Apache Airflow
+1
Key operator for launching a job:
CloudRunExecuteJobOperator — executes an existing Cloud Run Job and waits (by default) for it to finish.
Apache Airflow
+1
Also there are operators to create/update/delete/list jobs.
DAG snippet example:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator
with DAG(
dag_id='trigger_cloud_run_job',
start_date=days_ago(1),
catchup=False,
schedule_interval=None, # manual or trigger‑based
) as dag:
run_my_job = CloudRunExecuteJobOperator(
task_id='run_cloud_run_job',
project_id='my‑gcp‑project',
region='us‑central1',
job_name='my_cloud_run_job_name',
overrides={
"container_overrides": [
{
"name": "job",
"args": ["--some‑param", "value"],
"env": [{"name": "MY_VAR", "value": "some_value"}]
}
],
"timeout": "300s",
# you can specify other overrides
}
)
This will launch the Cloud Run Job named my_cloud_run_job_name and wait for completion unless configured otherwise.
Approach 2: Call the Cloud Run Job (or Service) via HTTP/REST from Airflow
If you already have a Cloud Run Service or Job and you prefer to use a more generic operator (e.g., HttpOperator or PythonOperator with the SDK), you can do that.
Steps for this approach:
Ensure the Composer environment’s service account (or the connection you use) has invoker permissions on the Cloud Run Job or Service (e.g., roles/run.invoker).
If using a Cloud Run Job, use the runJobs API or SDK to start it. If a Cloud Run Service, you may send a request to it.
Use Airflow in your DAG, e.g.,:
from airflow import DAG
from airflow.operators.http import SimpleHttpOperator
with DAG(...):
trigger = SimpleHttpOperator(
task_id='trigger_cr_service',
method='POST',
http_conn_id='my_cloud_run_conn',
endpoint='/my‑endpoint',
data={"param": "value"},
headers={"Content-Type": "application/json"},
)
Or use PythonOperator with the GCP SDK.
๐ Which to choose?
If you’re working with a Cloud Run Job resource (rather than a persistent service) and you want to monitor execution, use the dedicated operator (Approach 1).
If you have a Cloud Run Service (HTTP endpoint) or prefer custom behaviour, Approach 2 is fine.
๐ End‑to‑end example (Operator approach)
Here’s a somewhat complete DAG example using Approach 1:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator
default_args = {
'owner': 'airflow',
'retries': 1,
}
with DAG(
dag_id='cloud_run_job_from_composer',
default_args=default_args,
schedule_interval='@daily',
start_date=days_ago(2),
catchup=False,
) as dag:
launch_job = CloudRunExecuteJobOperator(
task_id='launch_cloud_run_job',
project_id='my‑project',
region='us‑central1',
job_name='my_batch_processing_job',
overrides={
"container_overrides": [
{
"name": "job",
"args": ["--date", "{{ ds }}"],
"env": [
{"name": "ENV", "value": "prod"},
{"name": "RUN_DATE", "value": "{{ ds }}"},
],
}
],
"timeout": "600s",
},
gcp_conn_id='google_cloud_default'
)
When this DAG runs, it triggers the Cloud Run job, passing the Airflow date ({{ ds }}) into the job arguments, waits until completion, and then the DAG will mark this task as succeeded/failed depending on job outcome.
⚠️ Key considerations & pitfalls
Here are some important things to remember:
Permissions & IAM
The service account used by the Airflow task (Composer worker) must have permissions to invoke the Cloud Run Job (e.g., roles/run.invoker, maybe roles/run.viewer).
If the Cloud Run Job uses a service account, ensure that service account has rights to any GCP resources used.
If you’re running in a VPC/Private IP environment, make sure network connectivity from Composer to Cloud Run (or the job endpoint) is allowed. The docs note that “some networking configurations (Private IP & VPC‑SC) may not permit connectivity” in certain patterns.
Google Cloud
+1
Version compatibility
Ensure your Composer environment uses an Airflow version that supports the operator you want. The provider package apache‑airflow‑providers‑google must include the Cloud Run operators. (Check versions)
The operator doc says: project_id, region, job_name, overrides.
Apache Airflow
+1
Monitoring & Idempotency
Cloud Run Jobs are one‑time (or batch) runs; launching them repeatedly needs you to think about unique job names, or if you reuse the same job with different parameters.
Use DAG logic or Airflow XComs if you need to pass results/IDs from the Cloud Run job back into subsequent DAG tasks.
Consider setting timeout_seconds in the override to avoid infinite waiting.
Logging, error handling
Make sure your job emits logs that you can access (e.g., via Cloud Logging) so you can diagnose failures.
In Airflow, if the job fails or times out, the operator will fail the task (unless you customize).
If parallelism or resource quotas are tight in Cloud Run, you might hit job queueing delays.
Cost & quotas
Cloud Run Jobs incur cost for execution time, CPU, memory.
Ensure you’re aware of quotas (region, number of concurrent instances).
If triggering very frequently, you may want to batch or ensure your job is efficient.
Environment & dependencies
Your DAG may need the provider package installed in Composer environment. Make sure apache‑airflow‑providers‑google is up‑to‑date.
If your Cloud Run Job depends on certain environment variables or secrets, integrate them via overrides or Secret Manager.
๐ Summary
Use Cloud Composer + Airflow DAGs to orchestrate tasks; one of those tasks can trigger a Cloud Run Job.
The recommended way: use the CloudRunExecuteJobOperator from the Google provider.
Ensure correct IAM, networking, versioning, monitoring.
Write your DAG with overrides if you need to parameterize the job launch (e.g., passing the date, environment).
Monitor execution, logs, and handle failures gracefully.
Learn GCP Training in Hyderabad
Read More
Visit Our Quality Thought Training Institute in Hyderabad
Subscribe by Email
Follow Updates Articles from This Blog via Email
No Comments