1. What You’re Building
You’ll build a pipeline that:
Ingests transaction data (e.g., credit card transactions) into Spark on Dataproc
Cleans and engineers features (amount, time, location, device, etc.)
Trains a fraud classifier using Spark MLlib (e.g., RandomForestClassifier)
Evaluates performance (precision, recall, AUC)
Saves the model to Cloud Storage
(Optionally) Serves batch or near-real-time predictions
2. Typical Architecture on GCP
Main components:
Cloud Storage (GCS): raw + processed data and model storage
Cloud Dataproc: managed Spark/Hadoop cluster to run MLlib jobs
BigQuery (optional): for downstream analytics / dashboards
Cloud Composer / Cloud Functions (optional): for orchestration / automation
Basic flow:
Data lands in GCS (CSV/Parquet/JSON).
Dataproc cluster runs a PySpark job for training.
Model is saved to GCS.
Another job uses the trained model to score new transactions.
3. Step 1 – Prepare Your Data
Your transaction dataset might look like:
transaction_id
user_id
amount
timestamp
merchant_id
device_type
country
ip_address
label (0 = legitimate, 1 = fraud)
Upload it to GCS, e.g.:
gs://my-bucket/fraud/raw/transactions.csv
4. Step 2 – Create a Dataproc Cluster
From the GCP Console or gcloud:
gcloud dataproc clusters create fraud-cluster \
--region=us-central1 \
--single-node \
--image-version=2.1-debian12 \
--properties spark:spark.serializer=org.apache.spark.serializer.KryoSerializer
(A single-node cluster is fine for experiments; use multi-node in production.)
5. Step 3 – Set Up the Spark ML Pipeline (PySpark)
You’ll use:
StringIndexer, OneHotEncoder for categorical features
VectorAssembler to combine features
RandomForestClassifier (or GBTClassifier, LogisticRegression)
BinaryClassificationEvaluator for AUC
5.1. Basic Training Script (PySpark)
Save as fraud_train.py and put it in GCS or submit from local.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, hour, dayofweek
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
def main():
spark = SparkSession.builder.appName("FraudDetectionTraining").getOrCreate()
# -----------------------------
# 1. Load data
# -----------------------------
input_path = "gs://my-bucket/fraud/raw/transactions.csv"
df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_path)
# -----------------------------
# 2. Basic feature engineering
# -----------------------------
# Example: extract time features from timestamp
df = df.withColumn("ts_unix", unix_timestamp(col("timestamp")))
df = df df.withColumn("hour_of_day", hour(col("timestamp")))
df = df.withColumn("day_of_week", dayofweek(col("timestamp")))
# Target column
label_col = "label"
# Categorical and numerical columns
categorical_cols = ["device_type", "country", "merchant_id"]
numeric_cols = ["amount", "hour_of_day", "day_of_week"]
# Index + one-hot encode categoricals
indexers = [
StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
for c in categorical_cols
]
encoders = [
OneHotEncoder(inputCols=[f"{c}_idx"], outputCols=[f"{c}_ohe"])
for c in categorical_cols
]
# All feature columns
feature_cols = [f"{c}_ohe" for c in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_assembled")
scaler = StandardScaler(inputCol="features_assembled", outputCol="features", withStd=True, withMean=False)
# -----------------------------
# 3. Model
# -----------------------------
rf = RandomForestClassifier(
labelCol=label_col,
featuresCol="features",
numTrees=100,
maxDepth=10,
probabilityCol="probability",
seed=42
)
# -----------------------------
# 4. Pipeline
# -----------------------------
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, rf])
# -----------------------------
# 5. Train/test split
# -----------------------------
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_df)
# -----------------------------
# 6. Evaluation
# -----------------------------
preds = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(
labelCol=label_col,
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc = evaluator.evaluate(preds)
print(f"Test AUC = {auc}")
# Optional: look at precision/recall trade-off using probability threshold
# (in Spark you'd typically handle this outside, by inspecting probability column.)
# -----------------------------
# 7. Save model
# -----------------------------
output_model_path = "gs://my-bucket/fraud/models/rf_pipeline"
model.write().overwrite().save(output_model_path)
spark.stop()
if __name__ == "__main__":
main()
6. Step 4 – Submit the Training Job to Dataproc
From your machine:
gcloud dataproc jobs submit pyspark fraud_train.py \
--cluster=fraud-cluster \
--region=us-central1
Dataproc will:
Spin up a Spark job
Read data from GCS
Train the pipeline
Print AUC in logs
Save the trained model to GCS
7. Step 5 – Building a Scoring / Inference Job
You’ll now load the saved pipeline model and apply it to new transactions.
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import PipelineModel
def main():
spark = SparkSession.builder.appName("FraudScoring").getOrCreate()
model_path = "gs://my-bucket/fraud/models/rf_pipeline"
model = PipelineModel.load(model_path)
# New (unlabeled) data
input_path = "gs://my-bucket/fraud/new/new_transactions.csv"
df_new = spark.read.option("header", "true").option("inferSchema", "true").csv(input_path)
scored = model.transform(df_new)
# Keep only relevant columns
output = scored.select(
"transaction_id",
"user_id",
"amount",
"probability",
"prediction"
)
# Save scored data
output_path = "gs://my-bucket/fraud/scored/new_transactions_scored.parquet"
output.write.mode("overwrite").parquet(output_path)
spark.stop()
if __name__ == "__main__":
main()
Submit it similarly:
gcloud dataproc jobs submit pyspark fraud_score.py \
--cluster=fraud-cluster \
--region=us-central1
8. Handling Class Imbalance (Very Important in Fraud)
Fraud cases are usually rare (e.g., <1%). If you just train normally, the model might predict “not fraud” for everything and still get high accuracy.
Common strategies:
Class weights (e.g., weightCol in classifiers)
Oversample fraud cases or undersample non-fraud
Use metrics like AUC, precision, recall, F1, not raw accuracy
Adjust probability threshold (e.g., flag as fraud if p_fraud > 0.7 instead of 0.5)
Example with weights:
from pyspark.sql.functions import when
# Assume label 1 = fraud (rare), 0 = non-fraud
fraud_frac = df.filter("label = 1").count() / df.count()
nonfraud_frac = 1 - fraud_frac
df = df.withColumn(
"class_weight",
when(df.label == 1, nonfraud_frac).otherwise(fraud_frac)
)
rf = RandomForestClassifier(
labelCol="label",
featuresCol="features",
weightCol="class_weight"
)
(Then include this column appropriately in your pipeline.)
9. Operational Considerations
Automate retraining (e.g., daily/weekly) with Composer / Cloud Scheduler
Log metrics (AUC, precision/recall) for each run
Monitor data drift (transaction distributions changing over time)
Store model version + config with each saved model in GCS
Consider exporting scored results to BigQuery for BI and dashboards
10. Quick Checklist
Data:
Transactions in GCS
Label column (0/1) for historical data
Dataproc:
Cluster created
PySpark scripts accessible
Model:
Pipeline with feature engineering
Algorithm (RF/GBT/Logistic Regression)
Evaluation with AUC / precision / recall
Model saved to GCS
Scoring:
New data loaded
Model applied
Predictions + probabilities stored
Learn GCP Training in Hyderabad
Read More
Running TensorFlow Distributed Training on Dataproc
Using GraphFrames on Spark for Network Analysis
Building a Scalable Recommendation Engine Using Dataproc
Managing Dataproc Workflow Templates in Production
Visit Our Quality Thought Training Institute in Hyderabad
Subscribe by Email
Follow Updates Articles from This Blog via Email
No Comments