๐งญ Overview
Cloud Composer (managed Apache Airflow) acts as a scheduler/orchestrator for:
Jupyter / Vertex AI Workbench notebooks
BigQuery reports
Looker Studio dashboards
Custom reports (e.g., PDF, email, or CSV)
A common setup:
[Extract data] → [Transform in BigQuery] → [Run Notebook] → [Generate Report] → [Send via Email or Slack]
☁️ 1. Scheduling Vertex AI or Jupyter Notebooks
✅ Option A: Run Vertex AI Workbench Notebook via Cloud Composer
If you use Vertex AI Workbench managed notebooks, you can execute them using the Vertex AI Workflows Operators or an HTTP call to the Vertex AI Notebooks API.
Example DAG (PythonOperator + Vertex AI Notebooks API)
from airflow import DAG
from airflow.providers.google.cloud.hooks.vertex_ai import VertexAIHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def run_vertex_notebook():
hook = VertexAIHook()
notebook_instance = "projects/PROJECT_ID/locations/us-central1/instances/NOTEBOOK_NAME"
hook.get_conn().projects().locations().instances().start(name=notebook_instance).execute()
# Optionally run a notebook cell or script remotely here
with DAG(
'schedule_vertex_notebook',
default_args=default_args,
schedule_interval='0 6 * * *', # 6 AM daily
catchup=False,
) as dag:
run_notebook = PythonOperator(
task_id='run_notebook',
python_callable=run_vertex_notebook,
)
You can also use a Vertex AI custom job to execute a .ipynb or .py notebook on demand.
✅ Option B: Run a Notebook from Cloud Storage via Papermill
If you keep your notebook in Cloud Storage, use Papermill to parameterize and execute it inside a Composer worker.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import papermill as pm
def run_notebook():
pm.execute_notebook(
'gs://my-bucket/notebooks/report.ipynb',
'/tmp/output.ipynb',
parameters={'run_date': str(datetime.now().date())}
)
with DAG(
'notebook_scheduler',
start_date=datetime(2024, 1, 1),
schedule_interval='0 7 * * *', # daily at 7 AM
catchup=False,
) as dag:
execute_notebook = PythonOperator(
task_id='run_papermill_notebook',
python_callable=run_notebook,
)
๐น Papermill is great for injecting parameters (dates, table names, etc.) into notebooks at runtime.
๐น Save the executed notebook back to GCS or convert to HTML/PDF for reporting.
๐ 2. Automating Report Generation
๐งพ Option A: Generate BigQuery Reports
Use Airflow’s BigQuery operators to run queries and export results.
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
run_report_query = BigQueryInsertJobOperator(
task_id='bq_generate_report',
configuration={
"query": {
"query": "SELECT region, SUM(sales) AS total_sales FROM dataset.sales GROUP BY region",
"destinationTable": {
"projectId": "my_project",
"datasetId": "reports",
"tableId": "daily_sales_report",
},
"writeDisposition": "WRITE_TRUNCATE",
}
},
)
You can chain this with an export operator:
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
export_report = BigQueryToGCSOperator(
task_id='export_to_gcs',
source_project_dataset_table='my_project.reports.daily_sales_report',
destination_cloud_storage_uris=['gs://my-bucket/reports/daily_sales_{{ ds }}.csv'],
export_format='CSV',
)
๐ฌ Option B: Email or Slack the Report
After exporting your report or rendering a notebook, you can distribute it automatically.
Send via Email:
from airflow.operators.email import EmailOperator
email_report = EmailOperator(
task_id='email_report',
to='analytics@yourcompany.com',
subject='Daily Sales Report',
html_content='Attached is the latest daily sales report.',
files=['/tmp/daily_sales.csv'],
)
Send to Slack:
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
notify_slack = SlackWebhookOperator(
task_id='notify_slack',
http_conn_id='slack_connection',
message='✅ Daily Sales Report generated successfully!',
)
๐️ 3. Scheduling and Dependency Management
Use Airflow’s schedule_interval (or @daily, @weekly, etc.) for timing.
Chain tasks:
run_report_query >> export_report >> email_report >> notify_slack
Use TriggerRule.ALL_DONE to always send notifications even if upstream tasks fail.
๐ง 4. Best Practices
Area Best Practice
Environment Use a Composer 2 environment with sufficient worker memory for notebooks.
Storage Store notebooks and outputs in GCS.
Parameterization Pass dates/filters as Papermill or Jupyter parameters.
Error Handling Use Airflow retries and on_failure_callback for alerting.
Monitoring Use Cloud Monitoring metrics for DAG/task failures.
Security Use Service Account permissions scoped to only what’s needed (BigQuery, GCS, Vertex AI).
๐ Example: End-to-End Notebook + Report DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
from airflow.operators.email import EmailOperator
from datetime import datetime
import papermill as pm
def run_notebook():
pm.execute_notebook(
'gs://my-bucket/notebooks/daily_analysis.ipynb',
'/tmp/daily_analysis_out.ipynb',
parameters={'date': '{{ ds }}'}
)
with DAG(
'daily_notebook_report',
schedule_interval='0 6 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
run_notebook_task = PythonOperator(
task_id='run_notebook',
python_callable=run_notebook,
)
bq_task = BigQueryInsertJobOperator(
task_id='bq_summary',
configuration={
"query": {
"query": "SELECT COUNT(*) AS total_orders FROM dataset.orders WHERE order_date = '{{ ds }}'",
"destinationTable": {"projectId": "my_project", "datasetId": "reports", "tableId": "daily_summary"},
"writeDisposition": "WRITE_TRUNCATE",
}
},
)
export_task = BigQueryToGCSOperator(
task_id='export_to_gcs',
source_project_dataset_table='my_project.reports.daily_summary',
destination_cloud_storage_uris=['gs://my-bucket/reports/summary_{{ ds }}.csv'],
export_format='CSV',
)
email_task = EmailOperator(
task_id='send_email',
to='reports@company.com',
subject='Daily Report {{ ds }}',
html_content='The daily analysis and report are ready.',
files=['/tmp/daily_analysis_out.ipynb'],
)
run_notebook_task >> bq_task >> export_task >> email_task
✅ Summary
Task Tool/Method
Run Notebook Vertex AI Notebooks API or Papermill
Generate Report BigQueryInsertJobOperator
Export Report BigQueryToGCSOperator
Deliver Report EmailOperator / SlackWebhookOperator
Schedule Cloud Composer (Airflow DAG)
Learn GCP Training in Hyderabad
Read More
How to Handle DAG Failures Gracefully in Cloud Composer
CI/CD for Data Pipelines Using Cloud Composer and GitHub Actions
Interfacing Composer with AWS S3 and Redshift
Using Cloud Composer to Schedule ML Pipeline Retraining
Visit Our Quality Thought Training Institute in Hyderabad
Subscribe by Email
Follow Updates Articles from This Blog via Email
No Comments