Tuesday, November 11, 2025

thumbnail

Using Structured Streaming with Spark on Dataproc

 🧩 1. What Is Structured Streaming?


Structured Streaming is Spark’s high-level API for continuous (real-time) data processing.

It lets you write streaming queries as if they were batch queries using DataFrame and Dataset APIs.


Key features:


Exactly-once processing semantics (when used with supported sinks)


Fault tolerance via checkpoints


Micro-batch or continuous execution modes


Integration with major sinks (BigQuery, Kafka, files, etc.)


☁️ 2. Why Use It on Google Cloud Dataproc?


Dataproc gives you:


A fully managed Spark runtime — no cluster setup headaches.


Native connectors for Pub/Sub, BigQuery, GCS, and Spanner.


Scalability — clusters can auto-scale up and down.


Integration with Composer (Airflow) for orchestration and Logging/Monitoring.


⚙️ 3. Typical Architecture on GCP

                ┌──────────────────────┐

   IoT / App →  │ Pub/Sub Topic        │

                └────────┬─────────────┘

                         │

                         ▼

                ┌──────────────────────┐

                │ Dataproc Cluster     │

                │ (Spark Structured    │

                │  Streaming Job)      │

                └────────┬─────────────┘

                         │

              ┌──────────┴───────────┐

              ▼                      ▼

       Cloud Storage (Raw)      BigQuery (Analytics)


🧠 4. Example: Spark Structured Streaming + Pub/Sub → BigQuery


Here’s a working Python example you can run on Dataproc.


Step 1: Create your resources

# Create a Pub/Sub topic and subscription

gcloud pubsub topics create sensor-events

gcloud pubsub subscriptions create sensor-sub --topic=sensor-events


# Create a BigQuery dataset

bq mk streaming_demo


Step 2: Example Streaming Job (stream_to_bq.py)

from pyspark.sql import SparkSession

from pyspark.sql.functions import from_json, col

from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType


spark = SparkSession.builder.appName("PubSubToBigQuery").getOrCreate()


# 1️⃣ Read from Pub/Sub

input_stream = (

    spark.readStream

         .format("pubsublite")  # or "pubsub" for Pub/Sub connector

         .option("pubsublite.subscription", "projects/myproject/subscriptions/sensor-sub")

         .load()

)


# 2️⃣ Parse the data

schema = StructType([

    StructField("device_id", StringType()),

    StructField("temperature", DoubleType()),

    StructField("timestamp", TimestampType())

])


json_df = input_stream.selectExpr("CAST(data AS STRING) AS json_str")

parsed_df = json_df.select(from_json(col("json_str"), schema).alias("data")).select("data.*")


# 3️⃣ Write to BigQuery

(parsed_df.writeStream

    .format("bigquery")

    .option("table", "myproject.streaming_demo.sensor_readings")

    .option("checkpointLocation", "gs://my-bucket/checkpoints/sensor_stream")

    .outputMode("append")

    .start()

    .awaitTermination()

)


Step 3: Submit the job to Dataproc

gcloud dataproc jobs submit pyspark \

  gs://my-bucket/scripts/stream_to_bq.py \

  --cluster=my-streaming-cluster \

  --region=us-central1 \

  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.1.jar,gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming_2.12-1.0.0.jar


🧮 5. Key Concepts to Understand

Checkpointing


Structured Streaming uses checkpoints to recover from failures:


.option("checkpointLocation", "gs://my-bucket/checkpoints/sensor_stream")



Store checkpoints in Cloud Storage, not locally on cluster VMs.


Output Modes

Mode Description Example Use

append New rows only Logs, sensor data

update Updates changed rows Stateful aggregations

complete Full table every trigger Aggregated reports

Trigger Intervals


You can control how frequently Spark processes data:


.trigger(processingTime="30 seconds")


Scaling and Reliability


Use long-running clusters with auto-scaling.


Use graceful shutdowns for rolling updates.


Monitor via Cloud Logging → Dataproc logs.


Use Dataproc initialization actions to install connectors.


🔄 6. Example Real-Time Aggregation


If you want to compute live metrics (e.g., average temperature per device every minute):


from pyspark.sql.functions import window, avg


agg_df = (

    parsed_df

      .withWatermark("timestamp", "2 minutes")

      .groupBy(window(col("timestamp"), "1 minute"), col("device_id"))

      .agg(avg("temperature").alias("avg_temp"))

)


agg_df.writeStream \

    .format("bigquery") \

    .option("table", "myproject.streaming_demo.device_aggregates") \

    .outputMode("update") \

    .option("checkpointLocation", "gs://my-bucket/checkpoints/aggregates") \

    .start()


🧰 7. Best Practices


✅ Use Dataproc autoscaling policies for fluctuating loads.

✅ Store checkpoints and intermediate data in GCS.

✅ Use Pub/Sub for ingestion — decouples producers and consumers.

✅ Write to BigQuery for analytics, GCS for archiving.

✅ Consider Dataproc Serverless for smaller streaming jobs.

✅ Add monitoring hooks (e.g., Cloud Monitoring, Stackdriver).


🧭 8. Real-World Example Use Cases

Use Case Description

IoT Analytics Process sensor data from Pub/Sub and stream to BigQuery for dashboards

Clickstream Analysis Track user behavior in real-time

Fraud Detection Analyze financial transactions in near real-time

Log Processing Parse, enrich, and aggregate application logs

Learn GCP Training in Hyderabad

Read More

Cloud Dataproc - Real-Time & Batch Processing Solutions

Building Dynamic DAGs Using Metadata and Environment Variables

Airflow Plugins for Custom GCP Operators in Composer

Scheduling Notebooks and Reports with Cloud Composer

Visit Our Quality Thought Training Institute in Hyderabad

Get Directions 

Subscribe by Email

Follow Updates Articles from This Blog via Email

No Comments

About

Search This Blog

Powered by Blogger.

Blog Archive