🧩 1. What Is Structured Streaming?
Structured Streaming is Spark’s high-level API for continuous (real-time) data processing.
It lets you write streaming queries as if they were batch queries using DataFrame and Dataset APIs.
Key features:
Exactly-once processing semantics (when used with supported sinks)
Fault tolerance via checkpoints
Micro-batch or continuous execution modes
Integration with major sinks (BigQuery, Kafka, files, etc.)
☁️ 2. Why Use It on Google Cloud Dataproc?
Dataproc gives you:
A fully managed Spark runtime — no cluster setup headaches.
Native connectors for Pub/Sub, BigQuery, GCS, and Spanner.
Scalability — clusters can auto-scale up and down.
Integration with Composer (Airflow) for orchestration and Logging/Monitoring.
⚙️ 3. Typical Architecture on GCP
┌──────────────────────┐
IoT / App → │ Pub/Sub Topic │
└────────┬─────────────┘
│
▼
┌──────────────────────┐
│ Dataproc Cluster │
│ (Spark Structured │
│ Streaming Job) │
└────────┬─────────────┘
│
┌──────────┴───────────┐
▼ ▼
Cloud Storage (Raw) BigQuery (Analytics)
🧠 4. Example: Spark Structured Streaming + Pub/Sub → BigQuery
Here’s a working Python example you can run on Dataproc.
Step 1: Create your resources
# Create a Pub/Sub topic and subscription
gcloud pubsub topics create sensor-events
gcloud pubsub subscriptions create sensor-sub --topic=sensor-events
# Create a BigQuery dataset
bq mk streaming_demo
Step 2: Example Streaming Job (stream_to_bq.py)
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
spark = SparkSession.builder.appName("PubSubToBigQuery").getOrCreate()
# 1️⃣ Read from Pub/Sub
input_stream = (
spark.readStream
.format("pubsublite") # or "pubsub" for Pub/Sub connector
.option("pubsublite.subscription", "projects/myproject/subscriptions/sensor-sub")
.load()
)
# 2️⃣ Parse the data
schema = StructType([
StructField("device_id", StringType()),
StructField("temperature", DoubleType()),
StructField("timestamp", TimestampType())
])
json_df = input_stream.selectExpr("CAST(data AS STRING) AS json_str")
parsed_df = json_df.select(from_json(col("json_str"), schema).alias("data")).select("data.*")
# 3️⃣ Write to BigQuery
(parsed_df.writeStream
.format("bigquery")
.option("table", "myproject.streaming_demo.sensor_readings")
.option("checkpointLocation", "gs://my-bucket/checkpoints/sensor_stream")
.outputMode("append")
.start()
.awaitTermination()
)
Step 3: Submit the job to Dataproc
gcloud dataproc jobs submit pyspark \
gs://my-bucket/scripts/stream_to_bq.py \
--cluster=my-streaming-cluster \
--region=us-central1 \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.1.jar,gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming_2.12-1.0.0.jar
🧮 5. Key Concepts to Understand
Checkpointing
Structured Streaming uses checkpoints to recover from failures:
.option("checkpointLocation", "gs://my-bucket/checkpoints/sensor_stream")
Store checkpoints in Cloud Storage, not locally on cluster VMs.
Output Modes
Mode Description Example Use
append New rows only Logs, sensor data
update Updates changed rows Stateful aggregations
complete Full table every trigger Aggregated reports
Trigger Intervals
You can control how frequently Spark processes data:
.trigger(processingTime="30 seconds")
Scaling and Reliability
Use long-running clusters with auto-scaling.
Use graceful shutdowns for rolling updates.
Monitor via Cloud Logging → Dataproc logs.
Use Dataproc initialization actions to install connectors.
🔄 6. Example Real-Time Aggregation
If you want to compute live metrics (e.g., average temperature per device every minute):
from pyspark.sql.functions import window, avg
agg_df = (
parsed_df
.withWatermark("timestamp", "2 minutes")
.groupBy(window(col("timestamp"), "1 minute"), col("device_id"))
.agg(avg("temperature").alias("avg_temp"))
)
agg_df.writeStream \
.format("bigquery") \
.option("table", "myproject.streaming_demo.device_aggregates") \
.outputMode("update") \
.option("checkpointLocation", "gs://my-bucket/checkpoints/aggregates") \
.start()
🧰 7. Best Practices
✅ Use Dataproc autoscaling policies for fluctuating loads.
✅ Store checkpoints and intermediate data in GCS.
✅ Use Pub/Sub for ingestion — decouples producers and consumers.
✅ Write to BigQuery for analytics, GCS for archiving.
✅ Consider Dataproc Serverless for smaller streaming jobs.
✅ Add monitoring hooks (e.g., Cloud Monitoring, Stackdriver).
🧭 8. Real-World Example Use Cases
Use Case Description
IoT Analytics Process sensor data from Pub/Sub and stream to BigQuery for dashboards
Clickstream Analysis Track user behavior in real-time
Fraud Detection Analyze financial transactions in near real-time
Log Processing Parse, enrich, and aggregate application logs
Learn GCP Training in Hyderabad
Read More
Cloud Dataproc - Real-Time & Batch Processing Solutions
Building Dynamic DAGs Using Metadata and Environment Variables
Airflow Plugins for Custom GCP Operators in Composer
Scheduling Notebooks and Reports with Cloud Composer
Visit Our Quality Thought Training Institute in Hyderabad
Subscribe by Email
Follow Updates Articles from This Blog via Email
No Comments