To trigger Cloud Run jobs from Apache Airflow (Composer) DAGs, you can use Google Cloud's CloudRunJobRunOperator (available via the apache-airflow-providers-google provider) or invoke them via the REST API using an HttpOperator or a Python function. Below are some common approaches:
✅ Option 1: Using CloudRunJobRunOperator (Recommended if using Airflow 2+)
Requirements
Airflow version >= 2.3
apache-airflow-providers-google >= 8.1.0
Composer environment with the correct permissions (roles/run.invoker, roles/run.viewer, roles/iam.serviceAccountUser)
Example DAG Snippet
python
Copy
Edit
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_run import CloudRunJobRunOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='trigger_cloud_run_job',
schedule_interval=None,
start_date=days_ago(1),
catchup=False,
) as dag:
trigger_job = CloudRunJobRunOperator(
task_id='run_cloud_run_job',
project_id='your-gcp-project-id',
region='us-central1',
job_name='your-cloud-run-job-name',
)
✅ Option 2: Using Python + Cloud Run REST API (for older versions or more control)
Example using PythonOperator:
python
Copy
Edit
from airflow import DAG
from airflow.operators.python import PythonOperator
from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests
from datetime import datetime
def trigger_cloud_run_job(**kwargs):
service_url = "https://us-central1-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/your-project/jobs/your-job-name:run"
audience = "https://us-central1-run.googleapis.com"
# Get ID token with proper audience
auth_req = Request()
target_audience = audience
token = id_token.fetch_id_token(auth_req, target_audience)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
response = requests.post(service_url, headers=headers)
response.raise_for_status() # Raise error if the request failed
with DAG(
dag_id="trigger_cloud_run_rest",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
trigger = PythonOperator(
task_id="trigger_cloud_run_job",
python_callable=trigger_cloud_run_job,
)
✅ Option 3: Using HttpOperator (if using Cloud Run HTTP endpoints)
If you deployed a Cloud Run service (not a Cloud Run Job), you can invoke it like this:
python
Copy
Edit
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
with DAG(
'trigger_cloud_run_service',
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
) as dag:
call_service = SimpleHttpOperator(
task_id='call_run_service',
http_conn_id='cloud_run_conn', # Set this up in Airflow Connections
endpoint='/',
method='POST',
headers={"Content-Type": "application/json"},
data='{"key": "value"}',
)
๐ IAM Permissions Needed
Make sure the Composer service account has:
roles/run.invoker on the Cloud Run Job
roles/iam.serviceAccountUser if the job is using a custom service account
roles/composer.worker (usually already set)
Learn Google Cloud Data Engineering Course
Read More
Cloud Composer - Cross-Service Integration
Creating Version-Controlled File Systems in Cloud Storage
Cloud Storage as a Staging Area for Enterprise ETL Pipelines
Monitoring File Access Logs with Cloud Logging and Cloud Storage
Visit Our Quality Thought Training in Hyderabad
Subscribe by Email
Follow Updates Articles from This Blog via Email
No Comments