Tuesday, November 11, 2025

thumbnail

Integrating Apache Hudi with Dataproc for Incremental Data Processing

 ๐Ÿงฉ 1. What Is Apache Hudi?


Apache Hudi (Hadoop Upserts Deletes and Incrementals) is an open-source framework that brings database-like capabilities to data lakes.


It allows you to:


Perform upserts (insert + update) and deletes in data lakes.


Do incremental pulls (query only new/changed data).


Maintain ACID transactions on top of files (like Parquet).


Integrate with Spark, Flink, Presto, Hive, and BigQuery.


☁️ 2. Why Combine Hudi with Dataproc?

Benefit Description

Managed Spark Runtime Dataproc handles cluster provisioning, scaling, and monitoring.

Native GCS Support Hudi tables can be stored in Cloud Storage.

Incremental ETL Process only new or updated data efficiently.

Integration Works seamlessly with BigQuery, Dataplex, and Vertex AI.

Cost Optimization Run ephemeral clusters or serverless jobs to minimize cost.


So instead of rewriting full tables daily, Hudi enables change capture-style updates and efficient incremental loads.


⚙️ 3. Architecture Overview

Incremental ETL Pattern

                 ┌────────────────────┐

    Source Data →│  Ingest (Pub/Sub,  │

                  │   Kafka, Files)   │

                 └────────┬───────────┘

                          │

                          ▼

               ┌────────────────────────┐

               │ Dataproc (Spark + Hudi)│

               │  Upsert → GCS Table    │

               └────────┬───────────────┘

                        │

                        ▼

              ┌────────────────────────┐

              │ BigQuery (via External │

              │   Table or Dataplex)   │

              └────────────────────────┘


๐Ÿงฑ 4. Setting Up Hudi on Dataproc

Option 1: Use a Hudi-enabled Dataproc image


Dataproc images ≥ 2.0 include optional support for Hudi.


gcloud dataproc clusters create hudi-cluster \

    --region=us-central1 \

    --image-version=2.2-debian12 \

    --optional-components=HUDI \

    --enable-component-gateway \

    --single-node


Option 2: Install via initialization action


If you’re using a custom image:


gcloud dataproc clusters create hudi-cluster \

    --initialization-actions=gs://goog-dataproc-initialization-actions-${REGION}/hudi/hudi.sh \

    --region=us-central1 \

    --image-version=2.1-debian12


๐Ÿงฎ 5. Example: Incremental Data Processing with Hudi + Spark


Let’s say you receive daily updates of customer transactions — new, updated, and deleted records — and you want to maintain a single unified dataset in GCS.


Step 1: Define your Hudi table location

export HUDI_TABLE_PATH=gs://my-data-lake/hudi/transactions


Step 2: PySpark job (hudi_upsert.py)

from pyspark.sql import SparkSession

from pyspark.sql.functions import current_timestamp


spark = SparkSession.builder \

    .appName("HudiIncrementalExample") \

    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \

    .config("spark.sql.hive.convertMetastoreParquet", "false") \

    .getOrCreate()


# Load new or changed data (for example, from GCS)

incoming_df = spark.read.option("header", True).csv("gs://my-raw-data/transactions_2025-11-10.csv")


# Add ingestion timestamp

incoming_df = incoming_df.withColumn("ingestion_ts", current_timestamp())


# Define Hudi write options

hudi_options = {

    "hoodie.table.name": "transactions_hudi",

    "hoodie.datasource.write.recordkey.field": "transaction_id",

    "hoodie.datasource.write.precombine.field": "ingestion_ts",

    "hoodie.datasource.write.operation": "upsert",

    "hoodie.datasource.write.table.type": "MERGE_ON_READ",

    "hoodie.datasource.hive_sync.enable": "true",

    "hoodie.datasource.hive_sync.database": "default",

    "hoodie.datasource.hive_sync.table": "transactions_hudi",

    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",

}


# Write data incrementally

(incoming_df.write.format("hudi")

    .options(**hudi_options)

    .mode("append")

    .save(HUDI_TABLE_PATH))



Submit the job:


gcloud dataproc jobs submit pyspark gs://my-scripts/hudi_upsert.py \

  --cluster=hudi-cluster \

  --region=us-central1


๐Ÿ”„ 6. Incremental Querying (Read Only New Data)


After writing data with Hudi, you can query only new or changed records since the last commit.


# Load latest commit timestamp

commit_instant = "20251110120000"  # example commit timestamp


# Read incrementally from Hudi table

incremental_df = spark.read.format("hudi") \

    .option("hoodie.datasource.query.type", "incremental") \

    .option("hoodie.datasource.read.begin.instanttime", commit_instant) \

    .load(HUDI_TABLE_PATH)


incremental_df.show()



This is powerful for incremental downstream ETL or CDC pipelines.


๐Ÿ“Š 7. Integrating Hudi Tables with BigQuery


You can query Hudi data from BigQuery using an external table or via Dataplex.


Option 1: External Table

CREATE EXTERNAL TABLE `myproject.analytics.transactions_hudi`

OPTIONS (

  format = 'PARQUET',

  uris = ['gs://my-data-lake/hudi/transactions/*/*.parquet']

);


Option 2: Dataplex Integration


Register Hudi tables as managed assets in Dataplex for governance and metadata tracking.


⚡ 8. Recommended Configuration for Incremental Workloads

Setting Purpose

MERGE_ON_READ table type Efficient upserts; read latest view from log + base files

COPY_ON_WRITE Simpler, but less efficient for frequent updates

Checkpointing Store commit timelines in GCS

Compaction Schedule periodic compaction jobs to merge delta logs

Cluster autoscaling Optimize cost for periodic upsert jobs

Serverless Dataproc (Batches API) Run short-lived incremental jobs without cluster management

๐Ÿง  9. Common Incremental Patterns

Pattern Description

Daily upsert Merge new/changed rows from source daily into Hudi dataset

CDC replication Apply Kafka/PubSub change streams into Hudi tables

Data lake → Warehouse sync Stream updated Hudi data into BigQuery for analytics

Feature store Maintain point-in-time accurate feature sets for ML models

๐Ÿงฐ 10. Best Practices


✅ Use partitioning (e.g., by event_date) to optimize query performance.

✅ Schedule compaction jobs to manage log file growth.

✅ Store tables in Cloud Storage with object versioning disabled (Hudi handles versions).

✅ Use MERGE_ON_READ for frequent updates; COPY_ON_WRITE for simpler ETL.

✅ Use Dataproc Serverless for ad-hoc incremental jobs.

✅ Monitor Hudi commit metadata with Cloud Logging or Dataplex UI.


๐Ÿงญ 11. Summary

Component Role

Dataproc (Spark) Executes upsert/incremental jobs

Apache Hudi Adds transactional & incremental capabilities to data lake

GCS Durable, scalable Hudi table storage

BigQuery / Dataplex Downstream analytics

Pub/Sub / Kafka Real-time ingestion source

Learn GCP Training in Hyderabad

Read More

Using Structured Streaming with Spark on Dataproc

Cloud Dataproc - Real-Time & Batch Processing Solutions

Building Dynamic DAGs Using Metadata and Environment Variables

Airflow Plugins for Custom GCP Operators in Composer

Visit Our Quality Thought Training Institute in Hyderabad

Get Directions 

Subscribe by Email

Follow Updates Articles from This Blog via Email

No Comments

About

Search This Blog

Powered by Blogger.

Blog Archive