Friday, November 21, 2025

thumbnail

Building a Fraud Detection Pipeline with Spark MLlib on Dataproc

 1. What You’re Building


You’ll build a pipeline that:


Ingests transaction data (e.g., credit card transactions) into Spark on Dataproc


Cleans and engineers features (amount, time, location, device, etc.)


Trains a fraud classifier using Spark MLlib (e.g., RandomForestClassifier)


Evaluates performance (precision, recall, AUC)


Saves the model to Cloud Storage


(Optionally) Serves batch or near-real-time predictions


2. Typical Architecture on GCP


Main components:


Cloud Storage (GCS): raw + processed data and model storage


Cloud Dataproc: managed Spark/Hadoop cluster to run MLlib jobs


BigQuery (optional): for downstream analytics / dashboards


Cloud Composer / Cloud Functions (optional): for orchestration / automation


Basic flow:


Data lands in GCS (CSV/Parquet/JSON).


Dataproc cluster runs a PySpark job for training.


Model is saved to GCS.


Another job uses the trained model to score new transactions.


3. Step 1 – Prepare Your Data


Your transaction dataset might look like:


transaction_id


user_id


amount


timestamp


merchant_id


device_type


country


ip_address


label (0 = legitimate, 1 = fraud)


Upload it to GCS, e.g.:


gs://my-bucket/fraud/raw/transactions.csv


4. Step 2 – Create a Dataproc Cluster


From the GCP Console or gcloud:


gcloud dataproc clusters create fraud-cluster \

  --region=us-central1 \

  --single-node \

  --image-version=2.1-debian12 \

  --properties spark:spark.serializer=org.apache.spark.serializer.KryoSerializer



(A single-node cluster is fine for experiments; use multi-node in production.)


5. Step 3 – Set Up the Spark ML Pipeline (PySpark)


You’ll use:


StringIndexer, OneHotEncoder for categorical features


VectorAssembler to combine features


RandomForestClassifier (or GBTClassifier, LogisticRegression)


BinaryClassificationEvaluator for AUC


5.1. Basic Training Script (PySpark)


Save as fraud_train.py and put it in GCS or submit from local.


from pyspark.sql import SparkSession

from pyspark.sql.functions import col, unix_timestamp, hour, dayofweek


from pyspark.ml import Pipeline

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator


def main():

    spark = SparkSession.builder.appName("FraudDetectionTraining").getOrCreate()


    # -----------------------------

    # 1. Load data

    # -----------------------------

    input_path = "gs://my-bucket/fraud/raw/transactions.csv"

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_path)


    # -----------------------------

    # 2. Basic feature engineering

    # -----------------------------

    # Example: extract time features from timestamp

    df = df.withColumn("ts_unix", unix_timestamp(col("timestamp")))

    df = df df.withColumn("hour_of_day", hour(col("timestamp")))

    df = df.withColumn("day_of_week", dayofweek(col("timestamp")))


    # Target column

    label_col = "label"


    # Categorical and numerical columns

    categorical_cols = ["device_type", "country", "merchant_id"]

    numeric_cols = ["amount", "hour_of_day", "day_of_week"]


    # Index + one-hot encode categoricals

    indexers = [

        StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")

        for c in categorical_cols

    ]


    encoders = [

        OneHotEncoder(inputCols=[f"{c}_idx"], outputCols=[f"{c}_ohe"])

        for c in categorical_cols

    ]


    # All feature columns

    feature_cols = [f"{c}_ohe" for c in categorical_cols] + numeric_cols


    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_assembled")


    scaler = StandardScaler(inputCol="features_assembled", outputCol="features", withStd=True, withMean=False)


    # -----------------------------

    # 3. Model

    # -----------------------------

    rf = RandomForestClassifier(

        labelCol=label_col,

        featuresCol="features",

        numTrees=100,

        maxDepth=10,

        probabilityCol="probability",

        seed=42

    )


    # -----------------------------

    # 4. Pipeline

    # -----------------------------

    pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, rf])


    # -----------------------------

    # 5. Train/test split

    # -----------------------------

    train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)


    model = pipeline.fit(train_df)


    # -----------------------------

    # 6. Evaluation

    # -----------------------------

    preds = model.transform(test_df)


    evaluator = BinaryClassificationEvaluator(

        labelCol=label_col,

        rawPredictionCol="rawPrediction",

        metricName="areaUnderROC"

    )


    auc = evaluator.evaluate(preds)

    print(f"Test AUC = {auc}")


    # Optional: look at precision/recall trade-off using probability threshold

    # (in Spark you'd typically handle this outside, by inspecting probability column.)


    # -----------------------------

    # 7. Save model

    # -----------------------------

    output_model_path = "gs://my-bucket/fraud/models/rf_pipeline"

    model.write().overwrite().save(output_model_path)


    spark.stop()


if __name__ == "__main__":

    main()


6. Step 4 – Submit the Training Job to Dataproc


From your machine:


gcloud dataproc jobs submit pyspark fraud_train.py \

  --cluster=fraud-cluster \

  --region=us-central1



Dataproc will:


Spin up a Spark job


Read data from GCS


Train the pipeline


Print AUC in logs


Save the trained model to GCS


7. Step 5 – Building a Scoring / Inference Job


You’ll now load the saved pipeline model and apply it to new transactions.


from pyspark.sql import SparkSession

from pyspark.ml.pipeline import PipelineModel


def main():

    spark = SparkSession.builder.appName("FraudScoring").getOrCreate()


    model_path = "gs://my-bucket/fraud/models/rf_pipeline"

    model = PipelineModel.load(model_path)


    # New (unlabeled) data

    input_path = "gs://my-bucket/fraud/new/new_transactions.csv"

    df_new = spark.read.option("header", "true").option("inferSchema", "true").csv(input_path)


    scored = model.transform(df_new)


    # Keep only relevant columns

    output = scored.select(

        "transaction_id",

        "user_id",

        "amount",

        "probability",

        "prediction"

    )


    # Save scored data

    output_path = "gs://my-bucket/fraud/scored/new_transactions_scored.parquet"

    output.write.mode("overwrite").parquet(output_path)


    spark.stop()


if __name__ == "__main__":

    main()



Submit it similarly:


gcloud dataproc jobs submit pyspark fraud_score.py \

  --cluster=fraud-cluster \

  --region=us-central1


8. Handling Class Imbalance (Very Important in Fraud)


Fraud cases are usually rare (e.g., <1%). If you just train normally, the model might predict “not fraud” for everything and still get high accuracy.


Common strategies:


Class weights (e.g., weightCol in classifiers)


Oversample fraud cases or undersample non-fraud


Use metrics like AUC, precision, recall, F1, not raw accuracy


Adjust probability threshold (e.g., flag as fraud if p_fraud > 0.7 instead of 0.5)


Example with weights:


from pyspark.sql.functions import when


# Assume label 1 = fraud (rare), 0 = non-fraud

fraud_frac = df.filter("label = 1").count() / df.count()

nonfraud_frac = 1 - fraud_frac


df = df.withColumn(

    "class_weight",

    when(df.label == 1, nonfraud_frac).otherwise(fraud_frac)

)


rf = RandomForestClassifier(

    labelCol="label",

    featuresCol="features",

    weightCol="class_weight"

)



(Then include this column appropriately in your pipeline.)


9. Operational Considerations


Automate retraining (e.g., daily/weekly) with Composer / Cloud Scheduler


Log metrics (AUC, precision/recall) for each run


Monitor data drift (transaction distributions changing over time)


Store model version + config with each saved model in GCS


Consider exporting scored results to BigQuery for BI and dashboards


10. Quick Checklist


Data:


 Transactions in GCS


 Label column (0/1) for historical data


Dataproc:


 Cluster created


 PySpark scripts accessible


Model:


 Pipeline with feature engineering


 Algorithm (RF/GBT/Logistic Regression)


 Evaluation with AUC / precision / recall


 Model saved to GCS


Scoring:


 New data loaded


 Model applied


 Predictions + probabilities stored

Learn GCP Training in Hyderabad

Read More

Running TensorFlow Distributed Training on Dataproc

Using GraphFrames on Spark for Network Analysis

Building a Scalable Recommendation Engine Using Dataproc

Managing Dataproc Workflow Templates in Production

Visit Our Quality Thought Training Institute in Hyderabad

Get Directions 

Subscribe by Email

Follow Updates Articles from This Blog via Email

No Comments

About

Search This Blog

Powered by Blogger.

Blog Archive