Friday, November 7, 2025

thumbnail

Scheduling Notebooks and Reports with Cloud Composer

 ๐Ÿงญ Overview


Cloud Composer (managed Apache Airflow) acts as a scheduler/orchestrator for:


Jupyter / Vertex AI Workbench notebooks


BigQuery reports


Looker Studio dashboards


Custom reports (e.g., PDF, email, or CSV)


A common setup:


[Extract data] → [Transform in BigQuery] → [Run Notebook] → [Generate Report] → [Send via Email or Slack]


☁️ 1. Scheduling Vertex AI or Jupyter Notebooks

✅ Option A: Run Vertex AI Workbench Notebook via Cloud Composer


If you use Vertex AI Workbench managed notebooks, you can execute them using the Vertex AI Workflows Operators or an HTTP call to the Vertex AI Notebooks API.


Example DAG (PythonOperator + Vertex AI Notebooks API)

from airflow import DAG

from airflow.providers.google.cloud.hooks.vertex_ai import VertexAIHook

from airflow.operators.python import PythonOperator

from datetime import datetime, timedelta


default_args = {

    'owner': 'data-team',

    'start_date': datetime(2024, 1, 1),

    'retries': 1,

    'retry_delay': timedelta(minutes=5),

}


def run_vertex_notebook():

    hook = VertexAIHook()

    notebook_instance = "projects/PROJECT_ID/locations/us-central1/instances/NOTEBOOK_NAME"

    hook.get_conn().projects().locations().instances().start(name=notebook_instance).execute()

    # Optionally run a notebook cell or script remotely here


with DAG(

    'schedule_vertex_notebook',

    default_args=default_args,

    schedule_interval='0 6 * * *',  # 6 AM daily

    catchup=False,

) as dag:


    run_notebook = PythonOperator(

        task_id='run_notebook',

        python_callable=run_vertex_notebook,

    )



You can also use a Vertex AI custom job to execute a .ipynb or .py notebook on demand.


✅ Option B: Run a Notebook from Cloud Storage via Papermill


If you keep your notebook in Cloud Storage, use Papermill to parameterize and execute it inside a Composer worker.


from airflow import DAG

from airflow.operators.python import PythonOperator

from datetime import datetime

import papermill as pm


def run_notebook():

    pm.execute_notebook(

        'gs://my-bucket/notebooks/report.ipynb',

        '/tmp/output.ipynb',

        parameters={'run_date': str(datetime.now().date())}

    )


with DAG(

    'notebook_scheduler',

    start_date=datetime(2024, 1, 1),

    schedule_interval='0 7 * * *',  # daily at 7 AM

    catchup=False,

) as dag:


    execute_notebook = PythonOperator(

        task_id='run_papermill_notebook',

        python_callable=run_notebook,

    )



๐Ÿ”น Papermill is great for injecting parameters (dates, table names, etc.) into notebooks at runtime.

๐Ÿ”น Save the executed notebook back to GCS or convert to HTML/PDF for reporting.


๐Ÿ“Š 2. Automating Report Generation

๐Ÿงพ Option A: Generate BigQuery Reports


Use Airflow’s BigQuery operators to run queries and export results.


from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator


run_report_query = BigQueryInsertJobOperator(

    task_id='bq_generate_report',

    configuration={

        "query": {

            "query": "SELECT region, SUM(sales) AS total_sales FROM dataset.sales GROUP BY region",

            "destinationTable": {

                "projectId": "my_project",

                "datasetId": "reports",

                "tableId": "daily_sales_report",

            },

            "writeDisposition": "WRITE_TRUNCATE",

        }

    },

)



You can chain this with an export operator:


from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator


export_report = BigQueryToGCSOperator(

    task_id='export_to_gcs',

    source_project_dataset_table='my_project.reports.daily_sales_report',

    destination_cloud_storage_uris=['gs://my-bucket/reports/daily_sales_{{ ds }}.csv'],

    export_format='CSV',

)


๐Ÿ“ฌ Option B: Email or Slack the Report


After exporting your report or rendering a notebook, you can distribute it automatically.


Send via Email:

from airflow.operators.email import EmailOperator


email_report = EmailOperator(

    task_id='email_report',

    to='analytics@yourcompany.com',

    subject='Daily Sales Report',

    html_content='Attached is the latest daily sales report.',

    files=['/tmp/daily_sales.csv'],

)


Send to Slack:

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator


notify_slack = SlackWebhookOperator(

    task_id='notify_slack',

    http_conn_id='slack_connection',

    message='✅ Daily Sales Report generated successfully!',

)


๐Ÿ—“️ 3. Scheduling and Dependency Management


Use Airflow’s schedule_interval (or @daily, @weekly, etc.) for timing.


Chain tasks:


run_report_query >> export_report >> email_report >> notify_slack



Use TriggerRule.ALL_DONE to always send notifications even if upstream tasks fail.


๐Ÿง  4. Best Practices

Area Best Practice

Environment Use a Composer 2 environment with sufficient worker memory for notebooks.

Storage Store notebooks and outputs in GCS.

Parameterization Pass dates/filters as Papermill or Jupyter parameters.

Error Handling Use Airflow retries and on_failure_callback for alerting.

Monitoring Use Cloud Monitoring metrics for DAG/task failures.

Security Use Service Account permissions scoped to only what’s needed (BigQuery, GCS, Vertex AI).

๐Ÿ“˜ Example: End-to-End Notebook + Report DAG

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator

from airflow.operators.email import EmailOperator

from datetime import datetime

import papermill as pm


def run_notebook():

    pm.execute_notebook(

        'gs://my-bucket/notebooks/daily_analysis.ipynb',

        '/tmp/daily_analysis_out.ipynb',

        parameters={'date': '{{ ds }}'}

    )


with DAG(

    'daily_notebook_report',

    schedule_interval='0 6 * * *',

    start_date=datetime(2024, 1, 1),

    catchup=False,

) as dag:


    run_notebook_task = PythonOperator(

        task_id='run_notebook',

        python_callable=run_notebook,

    )


    bq_task = BigQueryInsertJobOperator(

        task_id='bq_summary',

        configuration={

            "query": {

                "query": "SELECT COUNT(*) AS total_orders FROM dataset.orders WHERE order_date = '{{ ds }}'",

                "destinationTable": {"projectId": "my_project", "datasetId": "reports", "tableId": "daily_summary"},

                "writeDisposition": "WRITE_TRUNCATE",

            }

        },

    )


    export_task = BigQueryToGCSOperator(

        task_id='export_to_gcs',

        source_project_dataset_table='my_project.reports.daily_summary',

        destination_cloud_storage_uris=['gs://my-bucket/reports/summary_{{ ds }}.csv'],

        export_format='CSV',

    )


    email_task = EmailOperator(

        task_id='send_email',

        to='reports@company.com',

        subject='Daily Report {{ ds }}',

        html_content='The daily analysis and report are ready.',

        files=['/tmp/daily_analysis_out.ipynb'],

    )


    run_notebook_task >> bq_task >> export_task >> email_task


✅ Summary

Task Tool/Method

Run Notebook Vertex AI Notebooks API or Papermill

Generate Report BigQueryInsertJobOperator

Export Report BigQueryToGCSOperator

Deliver Report EmailOperator / SlackWebhookOperator

Schedule Cloud Composer (Airflow DAG)

Learn GCP Training in Hyderabad

Read More

How to Handle DAG Failures Gracefully in Cloud Composer

CI/CD for Data Pipelines Using Cloud Composer and GitHub Actions

Interfacing Composer with AWS S3 and Redshift

Using Cloud Composer to Schedule ML Pipeline Retraining

Visit Our Quality Thought Training Institute in Hyderabad

Get Directions 

Subscribe by Email

Follow Updates Articles from This Blog via Email

No Comments

About

Search This Blog

Powered by Blogger.

Blog Archive