๐งฉ 1. What Airflow Plugins Are
In Airflow, a plugin is a Python module that adds new functionality — such as:
Custom Operators / Sensors / Hooks
UI Views / Macros / Flask Blueprints
In Cloud Composer 2, plugins live under:
/dags/plugins/
and are automatically loaded into your environment.
⚙️ 2. Plugin File Structure
You can structure your Composer bucket like this:
gs://your-composer-bucket/
│
├── dags/
│ ├── my_custom_dag.py
│
└── plugins/
├── __init__.py
├── my_gcp_plugin.py
└── operators/
└── my_custom_gcp_operator.py
When you upload files to the plugins/ folder, Composer automatically syncs them into the Airflow workers.
๐️ 3. Writing a Custom GCP Operator
Let’s build a simple example:
A custom operator that starts a Dataflow job and waits until it’s done, but with extra logging and dynamic parameters.
๐งฐ File: plugins/operators/my_custom_gcp_operator.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.cloud.hooks.dataflow import DataflowHook
import time
class CustomDataflowStartOperator(BaseOperator):
"""
Custom operator to start a Dataflow job and log job status periodically.
"""
@apply_defaults
def __init__(self, job_name, template_path, parameters=None, gcp_conn_id='google_cloud_default', *args, **kwargs):
super().__init__(*args, **kwargs)
self.job_name = job_name
self.template_path = template_path
self.parameters = parameters or {}
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
hook = DataflowHook(gcp_conn_id=self.gcp_conn_id)
self.log.info(f"Launching Dataflow job: {self.job_name}")
job = hook.start_template_dataflow(
job_name=self.job_name,
variables=self.parameters,
parameters=self.parameters,
dataflow_template=self.template_path
)
self.log.info(f"Started job: {job.get('id')}")
job_id = job.get('id')
# Example: custom wait loop
while True:
job_state = hook.get_job(job_id=job_id).get('currentState')
self.log.info(f"Job {job_id} state: {job_state}")
if job_state in ['JOB_STATE_DONE', 'JOB_STATE_FAILED', 'JOB_STATE_CANCELLED']:
break
time.sleep(60)
if job_state != 'JOB_STATE_DONE':
raise AirflowException(f"Dataflow job failed with state {job_state}")
self.log.info("Dataflow job completed successfully.")
๐งฉ 4. Declaring the Plugin
Now register your operator in a plugin file so Airflow can discover it.
๐ File: plugins/my_gcp_plugin.py
from airflow.plugins_manager import AirflowPlugin
from operators.my_custom_gcp_operator import CustomDataflowStartOperator
class MyGCPPlugin(AirflowPlugin):
name = "my_gcp_plugin"
operators = [CustomDataflowStartOperator]
๐ 5. Using the Custom Operator in a DAG
๐ File: dags/dataflow_pipeline_dag.py
from airflow import DAG
from datetime import datetime
from my_gcp_plugin import CustomDataflowStartOperator
with DAG(
dag_id='custom_dataflow_dag',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
run_dataflow = CustomDataflowStartOperator(
task_id='start_dataflow_job',
job_name='daily-job-{{ ds_nodash }}',
template_path='gs://my-bucket/templates/my_dataflow_template',
parameters={'input': 'gs://data/input.csv', 'output': 'gs://data/output/'},
)
run_dataflow
After uploading:
The plugin auto-loads in Composer.
The operator becomes available in the Airflow UI and DAG code.
๐ง 6. Common Custom GCP Operator Use Cases
Use Case Description
Enhanced BigQuery Operators Add data validation after query completion.
Cloud Run / Cloud Functions Deploy or trigger Cloud Run jobs directly from Airflow.
GCS Operations Batch file movements, apply custom naming or metadata logic.
Pub/Sub Sensors Custom message filters or acknowledgment logic.
Vertex AI Integrations Launch training jobs or batch predictions with custom waiting logic.
๐งฐ 7. Best Practices for Composer Plugins
✅ Use Airflow Providers
Always import from airflow.providers.google.cloud — never reimplement core hooks.
Your plugin should extend, not replace, built-in behavior.
✅ Separate Logic
Keep custom logic in /plugins/operators or /plugins/hooks, not directly in the DAG.
✅ Version Control Plugins
Store plugins in Git with your DAGs for reproducibility.
✅ Avoid Large Dependencies
If your plugin requires external libraries, add them to Composer’s PyPI dependencies via Composer Environment Configuration.
✅ Test Locally First
Use a local Airflow (same version as Composer) for plugin testing:
docker run -p 8080:8080 apache/airflow:2.10.1-python3.9
✅ Use Cloud Logging
Composer sends plugin logs automatically to Cloud Logging, so you can monitor operator execution.
๐งพ 8. Example: Plugin Loading Verification
Once uploaded, you can verify it in the Airflow UI:
Navigate to Admin → Plugins
Look for my_gcp_plugin
Your operator (CustomDataflowStartOperator) should appear in the list.
You can also check worker logs for:
INFO - Loading plugin: my_gcp_plugin
๐งฑ 9. Plugin Folder Summary
File Purpose
plugins/__init__.py Marks folder as a package
plugins/my_gcp_plugin.py Registers plugin metadata
plugins/operators/my_custom_gcp_operator.py Defines your custom logic
dags/my_dag.py Uses the operator
✅ Summary
Step Action
1️⃣ Create custom operator logic (BaseOperator subclass)
2️⃣ Register operator in an AirflowPlugin class
3️⃣ Upload under /plugins in Cloud Composer
4️⃣ Use in your DAGs
5️⃣ Verify plugin loaded via Airflow UI / Logs
Learn GCP Training in Hyderabad
Read More
Scheduling Notebooks and Reports with Cloud Composer
How to Handle DAG Failures Gracefully in Cloud Composer
CI/CD for Data Pipelines Using Cloud Composer and GitHub Actions
Interfacing Composer with AWS S3 and Redshift
Visit Our Quality Thought Training Institute in Hyderabad
Subscribe by Email
Follow Updates Articles from This Blog via Email
No Comments