SparkML and XGBoost Spark with W&B
Adding W&B tracking to SparkML Pipelines and CrossValidators
Created on December 21|Last edited on September 6
Comment
Overview
What is Weights & Biases? Simply put, W&B is a system of record of Machine Learning Workflows.

With W&B, we can
- Track and compare experiments in as little as five lines of code.
- Provide full lineage of your machine learning / data science workflow.
- Add W&B lightweight integration to your existing code and quickly get live metrics, terminal logs, and system stats streamed to the centralized dashboard
- Explain how your model works, show graphs of how model versions improved, discuss bugs, and demonstrate progress towards milestones.
Recently Spark has been popped up with a few customers, asking specifically how can W&B integrate into Spark workflows. In this report we'll highlight how W&B can add value to your SparkML workflows by
- using W&B to track training of SparkML Estimators, capturing configurations and metrics
- use W&B to surface results of hyperparameter search giving easy model comparisons
- logs models to the W&B Model Registry
- Utilize W&B Launch (Beta) to Trigger spark jobs to score datasets
Spark
Apache Spark is the most widely-used engine for scalable computing, and has been adopted by thousands of companies, included 80% of the Fortune 500.

In this report we'll talk through adding W&B experiment tracking to SparkML Pipelines and CrossValidators.
Spark ML
The spark.ml package aims to provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines
Spark Pipelines
Spark ML standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. This section covers the key concepts introduced by the Spark ML API, where the pipeline concept is mostly inspired by the scikit-learn project.
- DataFrame: Spark ML uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.
- Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms DataFrame with features into a DataFrame with predictions.
- Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
- Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
- Parameter: All Transformers and Estimators now share a common API for specifying parameters
A simple Pipeline might look like
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")xgb_classifier = SparkXGBClassifier(label_col="indexedLabel",missing=0.0,n_estimators = 20)pipeline = Pipeline(stages=[indexer, xgb_classifier])model = pipeline.fit(training)
SparkML Evaluation with W&B Tracking
Spark Evaluators are used to assess performance of ML Estimators (and Pipelines). The general usage pattern for an evaluator is to instantiate a specific instance of an evaluator based on the Estimator type, e.g., Binary, Multiclass, Multilabel, Regression. Then, one would set the primary metric for evaluation and pass through a scored dataframe.
Consider this simple example
from pyspark.ml.linalg import Vectorsfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorscoreAndLabels = map(lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1], x[2]),[(0.1, 0.0, 0.0), (0.1, 1.0, 0.0),(0.4, 0.0, 0.0), (0.6, 0.0, 0.1),(0.6, 1.0, 1.0), (0.6, 1.0, 0.0),(0.8, 1.0, 1.0)])dataset = spark.createDataFrame(scoreAndLabels, ["probability", "label", "prediction"])evaluator = MulticlassClassificationEvaluator()evaluator.setPredictionCol("prediction")print(evaluator.evaluate(dataset, metric="logloss"))print(evaluator.evaluate(dataset, metric="accuracy"))
This evaluator is used to print log loss and accuracy of the model to STDOUT. While there is nothing wrong with logging to STDOUT, it definitely is not persistent.
Plugging in W&B Experiment Tracking
It is certainly possible to add wandb.log statements to our code to log to W&B instead of STDOUT, but there is a better way. With any of these SparkML estimators (or not native estimators which implement the api), we can streamline the logging considerably by writing our own Evaluator! In what follows, a lot of code was eliminated or simplified and you should expect it to run as is. The goal here is to highlight what is possible. See here for the source code of the WandbEvaluator used below.
Introducing W&B Tracking, the simple example from above will become
from pyspark.ml.linalg import Vectorsfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorscoreAndLabels = map(lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1], x[2]),[(0.1, 0.0, 0.0), (0.1, 1.0, 0.0),(0.4, 0.0, 0.0), (0.6, 0.0, 0.1),(0.6, 1.0, 1.0), (0.6, 1.0, 0.0),(0.8, 1.0, 1.0)])dataset = spark.createDataFrame(scoreAndLabels, ["probability", "label", "prediction"])evaluator = MulticlassClassificationEvaluator()evaluator.setPredictionCol("prediction")run = wandb.init(project = "sparkml-evaluator-ex",job_type = "multiclass-classification-eval")wandb_evaluator = WandbEvaluator(sparkMlEvaluator = evaluator)wandb_evaluator.setWandbRun(run)wandb_evaluator.setMetricPrefix("test/")wandb_evaluator.evaluate(dataset)run.finish()
The approach is to simply create a Evaluator that wraps up an existing SparkML evaluator. The WandbEvaluator.evaluate method will call all available metrics for the provided SparkML Evaluator when calling the evaluate method on the dataset and log them to W&B with the prefix test/ .
Run set
2
Concerning evaluation of SparkML Estimators during actual training is quite limited. The native SparkML estimators do not offer callbacks, which would allow us to hook immediately in, so we are stuck doing evaluations after training has completed, or in some cases, looking at the training summary available from the model itself.
SparkXGB Models do offer call backs and we'll look at an example of that later.
SparkML Tuning with W&B Tracking
Spark does offer hyperparameter search via spark.tuning.CrossValidator. Hyperparameter selection (and extension model selection) happens by way of grid search based on a sequence of parameter maps provided by the user. Consider the example of a XGBoost on Spark, we may wish to test out various learning rates and max depth for each estimator in the boosted tree. In the instance, give spark a list of candidates to try out, and we would use the pyspark.tuning.CrossValidator class as follows
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")xgb_classifier = SparkXGBClassifier(label_col="indexedLabel",missing=0.0,n_estimators = 20)pipeline = Pipeline(stages=[indexer, xgb_classifier])evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel",metricName="weightedPrecision")grid = ParamGridBuilder() \.addGrid(xgb_classifier.learning_rate, [0.0, 0.01, 0.1]) \.addGrid(xgb_classifier.max_depth, [2, 3, 5]) \.build()cv = CrossValidator(estimator=pipeline,estimatorParamMaps=grid,evaluator=evaluator)cvModel = cv.fit(dataset)
The CrossValidator class will perform the grid search based on ParamGridBuilder that was passed to it, and it will chose the bet configuration based on the set metric for the Evaluator.
With a few changes to the CrossValidator class we can pass the WandbEvaluator so metrics and parameters are logged to W&B during the search. We will end up with as many runs in W&B as we have configurations that are created by the ParamGridBuilder.
At the time of this writing everything is based on XGBoost 1.7.0 and XGBoost on Spark in this release is considered experimental not for production use.
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")xgb_classifier = SparkXGBClassifier(label_col="indexedLabel",missing=0.0,n_estimators = 20)pipeline = Pipeline(stages=[indexer, xgb_classifier])# Create an evaluator. In this case, use "weightedPrecision".evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel",metricName="weightedPrecision")wandb_evaluator = WandbEvaluator(sparkMlEvaluator = evaluator, metricPrefix = "cv/")grid = ParamGridBuilder() \.addGrid(xgb_classifier.learning_rate, [0.0, 0.01, 0.1]) \.addGrid(xgb_classifier.max_depth, [2, 3, 5]) \.build()cv = WandbCrossValidator(estimator=pipeline,evaluator=wandb_evaluator,estimatorParamMaps=grid,numFolds=3)cv.fit(training)
Run set
41
At the end of cross validation the best hyperparmater config will be used to train the model of the full dataset, and the associated metrics (for training data) will be logged, and the run will be tagged as best. Once this finished, call the evaluators on a test dataset and log the model.
Run set
1
sparkml-pipeline
Direct lineage view
Spark XGBoost with Callbacks and Tracking
XGBoost on spark does support usage of callbacks. Since XGBoost on spark can take advantage of available nodes for training, thus multiple processes will be started, thus multiple wandb runs will be used to track.
A very simple example of using callbacks is seen below. This is only tracking metrics per process, and we group those metrics via the spark estimator uid, but can easily be extended for other pieces.
Start by writing your own callback which should inherit xgboost.callbacks.TrainingCallback , and you'll be off to the races. You can see my callback here or the wandb.xgboost.WandbCallback here. Your call back should start AND finish the W&B run. Moreover, it will be important to group runs that are created since XGBoost on spark is distributed training, you are tracking each training process. This example is far from perfect and still needs a bit of work.
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")indexer.setHandleInvalid("keep")xgb_classifier = SparkXGBClassifier(label_col="indexedLabel",validation_indicator_col="validationIndicatorCol")wandb_kwargs = {"project": "sparkml-wandb", "group": xgb_classifier.uid}cb = xgb_classifier.getParam("callbacks")xgb_classifier.set(cb, [WandbSparkXGBCallback(**wandb_kwargs)])pipeline = Pipeline(stages = [indexer, xgb_classifier])pipeline.fit(training)
Run set
2
In this report we focused on adding W&B experiment tracking and artifact usage to a simple SparkML workflow. To see more detail on how to use W&B in Databricks, check out the report linked below.
Add a comment
Tags: Articles, W&B Features
Iterate on AI agents and models faster. Try Weights & Biases today.