Friday, November 7, 2025

thumbnail

Airflow Plugins for Custom GCP Operators in Composer

 ๐Ÿงฉ 1. What Airflow Plugins Are


In Airflow, a plugin is a Python module that adds new functionality — such as:


Custom Operators / Sensors / Hooks


UI Views / Macros / Flask Blueprints


In Cloud Composer 2, plugins live under:


/dags/plugins/



and are automatically loaded into your environment.


⚙️ 2. Plugin File Structure


You can structure your Composer bucket like this:


gs://your-composer-bucket/

├── dags/

│   ├── my_custom_dag.py

└── plugins/

    ├── __init__.py

    ├── my_gcp_plugin.py

    └── operators/

        └── my_custom_gcp_operator.py



When you upload files to the plugins/ folder, Composer automatically syncs them into the Airflow workers.


๐Ÿ—️ 3. Writing a Custom GCP Operator


Let’s build a simple example:

A custom operator that starts a Dataflow job and waits until it’s done, but with extra logging and dynamic parameters.


๐Ÿงฐ File: plugins/operators/my_custom_gcp_operator.py

from airflow.models import BaseOperator

from airflow.utils.decorators import apply_defaults

from airflow.providers.google.cloud.hooks.dataflow import DataflowHook

import time


class CustomDataflowStartOperator(BaseOperator):

    """

    Custom operator to start a Dataflow job and log job status periodically.

    """


    @apply_defaults

    def __init__(self, job_name, template_path, parameters=None, gcp_conn_id='google_cloud_default', *args, **kwargs):

        super().__init__(*args, **kwargs)

        self.job_name = job_name

        self.template_path = template_path

        self.parameters = parameters or {}

        self.gcp_conn_id = gcp_conn_id


    def execute(self, context):

        hook = DataflowHook(gcp_conn_id=self.gcp_conn_id)

        self.log.info(f"Launching Dataflow job: {self.job_name}")

        job = hook.start_template_dataflow(

            job_name=self.job_name,

            variables=self.parameters,

            parameters=self.parameters,

            dataflow_template=self.template_path

        )


        self.log.info(f"Started job: {job.get('id')}")

        job_id = job.get('id')


        # Example: custom wait loop

        while True:

            job_state = hook.get_job(job_id=job_id).get('currentState')

            self.log.info(f"Job {job_id} state: {job_state}")

            if job_state in ['JOB_STATE_DONE', 'JOB_STATE_FAILED', 'JOB_STATE_CANCELLED']:

                break

            time.sleep(60)


        if job_state != 'JOB_STATE_DONE':

            raise AirflowException(f"Dataflow job failed with state {job_state}")

        self.log.info("Dataflow job completed successfully.")


๐Ÿงฉ 4. Declaring the Plugin


Now register your operator in a plugin file so Airflow can discover it.


๐Ÿ“„ File: plugins/my_gcp_plugin.py

from airflow.plugins_manager import AirflowPlugin

from operators.my_custom_gcp_operator import CustomDataflowStartOperator


class MyGCPPlugin(AirflowPlugin):

    name = "my_gcp_plugin"

    operators = [CustomDataflowStartOperator]


๐Ÿš€ 5. Using the Custom Operator in a DAG

๐Ÿ“„ File: dags/dataflow_pipeline_dag.py

from airflow import DAG

from datetime import datetime

from my_gcp_plugin import CustomDataflowStartOperator


with DAG(

    dag_id='custom_dataflow_dag',

    schedule_interval='@daily',

    start_date=datetime(2024, 1, 1),

    catchup=False,

) as dag:


    run_dataflow = CustomDataflowStartOperator(

        task_id='start_dataflow_job',

        job_name='daily-job-{{ ds_nodash }}',

        template_path='gs://my-bucket/templates/my_dataflow_template',

        parameters={'input': 'gs://data/input.csv', 'output': 'gs://data/output/'},

    )


    run_dataflow



After uploading:


The plugin auto-loads in Composer.


The operator becomes available in the Airflow UI and DAG code.


๐Ÿง  6. Common Custom GCP Operator Use Cases

Use Case Description

Enhanced BigQuery Operators Add data validation after query completion.

Cloud Run / Cloud Functions Deploy or trigger Cloud Run jobs directly from Airflow.

GCS Operations Batch file movements, apply custom naming or metadata logic.

Pub/Sub Sensors Custom message filters or acknowledgment logic.

Vertex AI Integrations Launch training jobs or batch predictions with custom waiting logic.

๐Ÿงฐ 7. Best Practices for Composer Plugins


✅ Use Airflow Providers

Always import from airflow.providers.google.cloud — never reimplement core hooks.

Your plugin should extend, not replace, built-in behavior.


✅ Separate Logic

Keep custom logic in /plugins/operators or /plugins/hooks, not directly in the DAG.


✅ Version Control Plugins

Store plugins in Git with your DAGs for reproducibility.


✅ Avoid Large Dependencies

If your plugin requires external libraries, add them to Composer’s PyPI dependencies via Composer Environment Configuration.


✅ Test Locally First

Use a local Airflow (same version as Composer) for plugin testing:


docker run -p 8080:8080 apache/airflow:2.10.1-python3.9



✅ Use Cloud Logging

Composer sends plugin logs automatically to Cloud Logging, so you can monitor operator execution.


๐Ÿงพ 8. Example: Plugin Loading Verification


Once uploaded, you can verify it in the Airflow UI:


Navigate to Admin → Plugins


Look for my_gcp_plugin


Your operator (CustomDataflowStartOperator) should appear in the list.


You can also check worker logs for:


INFO - Loading plugin: my_gcp_plugin


๐Ÿงฑ 9. Plugin Folder Summary

File Purpose

plugins/__init__.py Marks folder as a package

plugins/my_gcp_plugin.py Registers plugin metadata

plugins/operators/my_custom_gcp_operator.py Defines your custom logic

dags/my_dag.py Uses the operator

✅ Summary

Step Action

1️⃣ Create custom operator logic (BaseOperator subclass)

2️⃣ Register operator in an AirflowPlugin class

3️⃣ Upload under /plugins in Cloud Composer

4️⃣ Use in your DAGs

5️⃣ Verify plugin loaded via Airflow UI / Logs

Learn GCP Training in Hyderabad

Read More

Scheduling Notebooks and Reports with Cloud Composer

How to Handle DAG Failures Gracefully in Cloud Composer

CI/CD for Data Pipelines Using Cloud Composer and GitHub Actions

Interfacing Composer with AWS S3 and Redshift

Visit Our Quality Thought Training Institute in Hyderabad

Get Directions 

Subscribe by Email

Follow Updates Articles from This Blog via Email

No Comments

About

Search This Blog

Powered by Blogger.

Blog Archive