W&B를 사용한 SparkML 및 XGBoost Spark
SparkML 파이프라인 및 CrossValidator에 W&B 트래킹 추가하기
Created on December 22|Last edited on December 22
Comment
개요
Weights & Biases란 무엇일까요? 간단히 말해, W&B는 머신 러닝 워크플로 기록 시스템입니다.

W&B를 사용하면 다음을 수행할 수 있습니다.
- 단 5줄의 코드로 실험을 추적하고 비교합니다.
- 머신 러닝/데이터 사이언스 워크플로의 전체 계보를 제공합니다.
- 기존 코드에 W&B의 라이트웨이트 통합 기능을 추가하여 실시간 메트릭, 터미널 로그 및 시스템 통계를 중앙 집중식 대시보드로 빠르게 스트리밍합니다.
- 모델 작동 방식을 설명하고, 모델 버전이 어떻게 개선되었는지 그래프로 표시하고, 버그에 대해 논의하고, 프로젝트 마일스톤에 대한 진행 상황을 보여줍니다.
최근 몇 분의 고객이 W&B를 Spark 워크플로에 통합하는 방법을 구체적으로 문의해 왔습니다. 이 보고서에서는 W&B를 사용하여 SparkML Estimator 학습을 추적하고 구성 및 메트릭을 캡처함으로써
- W&B가 SparkML 워크플로에 어떻게 기여할 수 있는지를 중점적으로 다루겠습니다.
- W&B를 사용하여 하이퍼파라미터 검색 결과를 표시해 손쉽게 모델을 비교
- 모델을 W&B 모델 레지스트리에 기록
- W&B 론칭(베타)을 활용하여 데이터세트 점수를 매기는 Spark 작업을 트리거
Spark
Apache Spark는 확장 가능한 컴퓨팅을 위해 가장 널리 사용되는 엔진으로, 포천 500대 기업의 80%를 포함해 수천 개의 기업에서 채택하고 있습니다.

이 보고서에서는 SparkML Pipeline 및 CrossValidator에 W&B 실험 추적을 추가하는 방법에 대해 설명하겠습니다.
Spark ML
spark.ml 패키지는 균일한 상위 수준의 API 세트를 제공하는 것을 목표로 하며, 이는 사용자가 실질적인 머신 러닝 파이프라인을 생성하고 튜닝하는 데 도움이 되는 DataFrames 위에 구축됩니다.
Spark Pipelines
Spark ML은 머신 러닝 알고리즘을 위한 API를 표준화하여 여러 알고리즘을 단일 파이프라인 또는 워크플로에 더 쉽게 결합할 수 있도록 합니다. 이 섹션에서는 Spark ML API에서 도입한 주요 개념을 다룹니다. 여기서 파이프라인 개념은 대부분 scikit-learn 프로젝트에서 영감을 받았습니다.
- DataFrame: Spark ML은 ML 데이터세트로 Spark SQL의 DataFrame을 사용하는데, 이 데이터세트는 다양한 데이터 유형을 담을 수 있습니다. 예를 들어 DataFrame에는 텍스트, 특징 벡터, 실제 레이블 및 예측값을 저장하는 다양한 열이 있을 수 있습니다.
- Transformer: Transformer는 하나의 DataFrame을 다른 DataFrame으로 변환할 수 있는 알고리즘입니다. 예를 들어 ML 모델은 특징이 있는 DataFrame을 예측값이 있는 DataFrame으로 변환하는 Transformer입니다.
- Estimator: Estimator는 Transformer를 생성하기 위해 DataFrame에 맞출 수 있는 있는 알고리즘입니다 예를 들어 학습 알고리즘은 DataFrame을 학습하고 모델을 생성하는 Estimator입니다.
- Pipeline: Pipeline은 여러 Transformer와 Estimator를 서로 연결하여 ML 워크플로를 지정합니다.
- Parameter: 이제 모든 Transformer와 Estimator는 파라미터 지정을 위한 공통 API를 공유합니다.
간단한 Pipeline은 다음과 같습니다
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")xgb_classifier = SparkXGBClassifier(label_col="indexedLabel",missing=0.0,n_estimators = 20)pipeline = Pipeline(stages=[indexer, xgb_classifier])model = pipeline.fit(training)
W&B 트래킹을 통한 SparkML 평가
Spark Evaluator를 사용하여 ML Estimator(및 Pipeline)의 성능을 평가합니다. 평가기의 일반적인 사용 패턴은 Estimator 유형(예: 바이너리, 멀티클래스, 멀티레이블, 회귀)에 따라 평가기의 특정 인스턴스를 인스턴스화하는 것입니다. 그런 다음 평가를 위한 기본 메트릭을 설정하고 점수화된 데이터프레임을 거칩니다.
다음의 간단한 예제를 살펴보시기 바랍니다.
from pyspark.ml.linalg import Vectorsfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorscoreAndLabels = map(lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1], x[2]),[(0.1, 0.0, 0.0), (0.1, 1.0, 0.0),(0.4, 0.0, 0.0), (0.6, 0.0, 0.1),(0.6, 1.0, 1.0), (0.6, 1.0, 0.0),(0.8, 1.0, 1.0)])dataset = spark.createDataFrame(scoreAndLabels, ["probability", "label", "prediction"])evaluator = MulticlassClassificationEvaluator()evaluator.setPredictionCol("prediction")print(evaluator.evaluate(dataset, metric="logloss"))print(evaluator.evaluate(dataset, metric="accuracy"))
이 평가기는 모델의 로그 손실 및 정확도를 STDOUT으로 출력합니다. STDOUT에 로깅하는 것이 잘못된 일은 아니지만 확실히 지속적이지는 않습니다.
W&B 실험 추적 연결하기
코드에 wandb.log 문을 추가하여 STDOUT 대신 W&B에 로깅하는 것도 물론 가능하지만, 더 좋은 방법이 있습니다. 이러한 SparkML 추정기(또는 API를 구현하는 기본 추정기가 아닌 추정기)를 사용하면 자체 Evaluator를 작성하여 로깅을 상당히 간소화할 수 있습니다! 아래 예제에서는 많은 코드가 제거되거나 단순화되었으며 그대로 실행될 것으로 예상됩니다. 이 예제의 목표는 무엇이 가능한지를 보여주는 것입니다. 아래에 사용된 WandbEvaluator 소스 코드는 여기 를 참조하세요.
W&B 트래킹을 도입하면 위의 단순한 예제는 다음과 같이 됩니다.
from pyspark.ml.linalg import Vectorsfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorscoreAndLabels = map(lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1], x[2]),[(0.1, 0.0, 0.0), (0.1, 1.0, 0.0),(0.4, 0.0, 0.0), (0.6, 0.0, 0.1),(0.6, 1.0, 1.0), (0.6, 1.0, 0.0),(0.8, 1.0, 1.0)])dataset = spark.createDataFrame(scoreAndLabels, ["probability", "label", "prediction"])evaluator = MulticlassClassificationEvaluator()evaluator.setPredictionCol("prediction")run = wandb.init(project = "sparkml-evaluator-ex",job_type = "multiclass-classification-eval")wandb_evaluator = WandbEvaluator(sparkMlEvaluator = evaluator)wandb_evaluator.setWandbRun(run)wandb_evaluator.setMetricPrefix("test/")wandb_evaluator.evaluate(dataset)run.finish()
단순히 기존 SparkML 평가기를 감싸는 평가기를 생성하는 접근 방식입니다. WandbEvaluator.evaluate 메서드는 데이터세트에서 evaluate 메서드를 호출할 때 제공된 SparkML Evaluato 대해 사용 가능한 모든 메트릭을 호출하고 접두사 test/를 붙여 W&B에 기록합니다.
실제 학습 중 SparkML Estimator를 평가하는 것은 매우 제한적입니다. 기본 SparkML 추정기는 즉시 연결할 수 있는 콜백을 제공하지 않기 때문에 학습이 완료된 후에 평가를 수행하거나, 경우에 따라서는 모델 자체에서 제공하는 학습 요약을 살펴봐야 합니다.
SparkXGB 모델은 콜백을 제공하며, 이에 대한 예제는 나중에 살펴보겠습니다.
W&B 트래킹을 통한 SparkML 튜닝
Spark는 spark.tuning.CrossValidator를 통해 하이퍼파라미터 검색을 제공합니다. 하이퍼파라미터 선택(및 확장 모델 선택)은 사용자가 제공한 일련의 파라미터 맵을 기반으로 그리드 검색을 통해 이루어집니다. Spark의 XGBoost 예제를 살펴보면, 부스티드 트리의 각 추정기에 대해 다양한 학습 속도와 최대 깊이를 테스트하고 싶어집니다. 이 경우, 시험해 볼 후보 목록을 Spark에 제공하고 다음과 같이 pyspark.tuning.CrossValidator 클래스를 사용합니다.
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")xgb_classifier = SparkXGBClassifier(label_col="indexedLabel",missing=0.0,n_estimators = 20)pipeline = Pipeline(stages=[indexer, xgb_classifier])evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel",metricName="weightedPrecision")grid = ParamGridBuilder() \.addGrid(xgb_classifier.learning_rate, [0.0, 0.01, 0.1]) \.addGrid(xgb_classifier.max_depth, [2, 3, 5]) \.build()cv = CrossValidator(estimator=pipeline,estimatorParamMaps=grid,evaluator=evaluator)cvModel = cv.fit(dataset)
CrossValidator 클래스는 전달된 ParamGridBuilder를 기반으로 그리드 검색을 수행하고 Evaluator에 대해 설정된 메트릭에 따라 최적의 구성을 선택합니다.
CrossValidator 클래스를 약간 변경하면 WandbEvaluator를 전달하여 검색 중에 메트릭과 파라미터가 W&B에 기록됩니다. ParamGridBuilder에 의해 생성된 구성의 수만큼 W&B에서 실행이 이루어지게 됩니다.
이 글을 ���는 시점에서 모든 것은 XGBoost 1.7.0을 기반으로 하며, 이 릴리스에서 Spark의 XGBoost는 프로덕션 용도가 아닌 실험적인 것으로 간주됩니다.
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")xgb_classifier = SparkXGBClassifier(label_col="indexedLabel",missing=0.0,n_estimators = 20)pipeline = Pipeline(stages=[indexer, xgb_classifier])# Create an evaluator. In this case, use "weightedPrecision".evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel",metricName="weightedPrecision")wandb_evaluator = WandbEvaluator(sparkMlEvaluator = evaluator, metricPrefix = "cv/")grid = ParamGridBuilder() \.addGrid(xgb_classifier.learning_rate, [0.0, 0.01, 0.1]) \.addGrid(xgb_classifier.max_depth, [2, 3, 5]) \.build()cv = WandbCrossValidator(estimator=pipeline,evaluator=wandb_evaluator,estimatorParamMaps=grid,numFolds=3)cv.fit(training)
교차 검증이 끝나면 전체 데이터세트의 모델은 최상의 하이퍼파라미터 구성을 사용하여 학습하고, (학습 데이터에 대한) 관련 메트릭이 기록되며 해당 실행에 best로 태그가 지정됩니다. 작업이 완료되면 테스트 데이터세트에 대해 평가기를 호출하고 모델을 기록합니다.
sparkml-pipeline
Direct lineage view
콜백 및 트래킹을 지원하는 Spark XGBoost
Spark의 XGBoost는 콜백 사용을 지원합니다. Spark의 XGBoost는 학습에 사용 가능한 노드를 활용할 수 있으므로 여러 개의 프로세스가 시작되고, 따라서 여러 건의 wandb 실행을 통해 추적이 가능합니다.
아래에 콜백을 사용하는 아주 단순한 예제가 있습니다. 이 예제는 프로세스별 메트릭만 추적하고 Spark Estimator UID를 통해 이러한 메트릭을 그룹화하고 있지만, 다른 부분에 대해서도 쉽게 확장할 수 있습니다.
xgboost.callbacks.TrainingCallback을 상속하는 고유한 콜백을 작성하기 시작하면 바로 실행할 수 있습니다. 제가 작성한 콜백은 여기 에서 wandb.xgboost.WandbCallback은 여기 에서 확인하실 수 있습니다.콜백은 W&B 실행을 시작할 뿐만 아니라 끝내야 합니다. 또한 생성된 실행을 그룹화하는 것이 중요합니다. Spark의 XGBoost는 분산 학습이므로 각각의 학습 프로세스를 추적합니다 이 예제는 완벽과는 거리가 멀고 추가 작업이 필요합니다.
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")indexer.setHandleInvalid("keep")xgb_classifier = SparkXGBClassifier(label_col="indexedLabel",validation_indicator_col="validationIndicatorCol")wandb_kwargs = {"project": "sparkml-wandb", "group": xgb_classifier.uid}cb = xgb_classifier.getParam("callbacks")xgb_classifier.set(cb, [WandbSparkXGBCallback(**wandb_kwargs)])pipeline = Pipeline(stages = [indexer, xgb_classifier])pipeline.fit(training)
이 보고서에서는 간��한 SparkML 워크플로에 W&B 실험 추적 및 아티팩트 사용을 추가하는 데 중점을 두었습니다. Databricks에서 W&B를 사용하는 방법에 대한 자세한 내용을 보려면 아래 링크된 보고서를 확인하세요.
Add a comment
Tags: Articles, W&B Features
Iterate on AI agents and models faster. Try Weights & Biases today.