Skip to main content

HuggingFace의 Accelerate 라이브러리 소개

이 글에서는 HuggingFace의 Accelerate 라이브러리 내부 동작을 살펴보며 “정말 Accelerate가 이렇게 쉽다고?”라는 질문에 답해 봅니다. 이 글은 AI 번역본입니다. 오번역이 의심되는 부분이 있다면 댓글로 알려주세요.
Created on September 15|Last edited on September 15
처음으로 Distributed Data Parallel (DDP)을 구현하는 데 하루가량을 보냈던 사람으로서 파이토치 그리고 같은 작업을 하는 데 약 5분을 더 보냈습니다 using Hugging Face의 새로운 Accelerate 라이브러리를 접하고, 그 패키지의 단순함에 흥미를 느끼고 놀랐습니다.
"""정말 이렇게 쉬운 걸까?나는 스스로에게 그 질문을 던졌고, 그 결과를 이 글에서 함께 살펴봅니다.
Accelerate: GitHub | 문서 | 이 글을 작성할 당시의 커밋 ID
이 글의 일부로서, 우리는 다음의 소스 코드를 살펴볼 것입니다 Hugging Face Accelerate하지만 때로는 단순화를 위해 코드의 일부를 생략하겠습니다. 또한 TPU 관련 코드는 다루지 않고, 오로지 어떻게 하는지에만 집중하겠습니다. Hugging Face Accelerate 단일 및 다중 GPU 환경에서 동작합니다.
이 글은 Distributed Data Parallel(DDP)에 대한 입문서가 아닙니다. DDP에 대한 소개는 아래의 훌륭한 자료들을 참고하세요:
  1. 파이토치의 문서 그리고 튜토리얼
  2. PyTorch 분산 학습 레이 마오 작성
  3. PyTorch에서 분산 데이터 병렬 학습 케빈 카이촹 양 작성
이 글에서 다룰 내용을 간략히 소개하면 다음과 같습니다:

목차



그럼 시작해 봅시다!

PyTorch의 분산 데이터 병렬 처리

저처럼 PyTorch를 사용하는 분이라면, 예전에 구현을 시도해 본 적이 있다면 DDP 과거에 PyTorch에서 여러 GPU로 모델을 학습해 본 적이 있다면, 특히 처음 시도할 때 얼마나 고통스러운지 잘 아실 겁니다.
보시다시피, DDP를 올바르게 구현하려면 몇 가지 작업을 수행해야 합니다:
  1. 다음으로 프로세스 그룹을 초기화합니다 torch.distributed 패키지: dist.init_process_group(backend="nccl")
  2. 다음과 같은 변수들을 처리하세요 local_world_size 그리고 local_rank 프로세스 인덱스를 기준으로 올바른 디바이스 배치를 처리하기 위해서입니다.
  3. 다음 유형의 샘플러를 추가하세요 torch.utils.data.distributed.DistributedSampler DataLoader에 추가하여 배치가 적절히 분할되고, 다음을 기준으로 그 일부만 GPU에 전달되도록 합니다 local_rank 프로세스의
  4. 모델을 다음으로 감싸세요 DistributedDataParallel 클래스를 전달하여 device_ids 각 GPU마다 모델의 복제본을 생성할 수 있도록 합니다.
그리고 더 많은 작업이 있습니다! 특히 처음 시도한다면 이 과정은 오류가 발생하기 쉽고 시간이 많이 듭니다.
허깅 페이스 등장 Accelerate 구원하러 등장!

Hugging Face Accelerate 소개

이것은 공식 예제가 아닙니다. 공식 예제는 다음을 참조하세요: 문서는 여기에서 확인하세요.
Hugging Face 덕분에 보일러플레이트 코드 약 50줄을 방금 줄였습니다 Accelerate 그리고 실뱅 구게르그리고 이제 같은 스크립트가 단일 GPU, 다중 GPU, 그리고 TPU에서도 동작합니다!
저는 개인적으로 TPU를 사용해 본 적은 없지만, 단일 GPU와 다중 GPU에서 같은 스크립트를 그대로 쓸 수 있다는 것만으로도 충분합니다. 😉
기본적으로 이제 다중 GPU에서 학습을 구현하기 위해 필요한 것은 다음과 같습니다:
from accelerate import Accelerator

accelerator = Accelerator()

# dummy code to get dataloaders, model & optimizer
train_dataloader, eval_dataloader, model, optimizer = get_everything()

# prepare for DDP using accelerator
train_dataloader, eval_dataloader, model, optimizer = accelerator.prepare(
train_dataloader, eval_dataloader, model, optimizer
)
거의 다 했습니다! 우리 객체의 데이터로더, 모델, 그리고 옵티마이저를 넘겨주기만 하면 됩니다 accelerator.prepare 준비해 줄 수 있습니다. 그런데 어떻게 이 두 줄의 코드만으로 모든 것을 처리할 수 있을까요?
저도 똑같은 의문을 가졌습니다. 아래에서 Hugging Face의 내부 메커니즘을 조금 들여다보겠습니다. Accelerate를 살펴보고 이 질문에 답해 보겠습니다.

Hugging Face Accelerate 내부 들여다보기

소스 코드를 파고들기 전에, 다음 사항을 염두에 둡시다. 두 가지 핵심 단계 Hugging Face를 사용하는 것으로 Accelerate:
  1. Accelerator 초기화: accelerator = Accelerator()
  2. DataLoader, 옵티마이저, 모델 등의 객체 준비하기: train_dataloader, model, optimizer = accelerator.prepare(train_dataloader, model, optimizer)
이제 다음 두 가지 핵심 단계에서 각각 어떤 일이 일어나는지 살펴보겠습니다.
지금부터는 소스 코드를 단계별로 살펴보면서 이 글의 코드 비중이 크게 늘어난다는 점을 유의하세요.

1단계: Accelerator 초기화

Accelerator를 초기화할 때마다 accelerator = Accelerator()먼저 발생하는 일은 Accelerator의 상태는 인스턴스로 설정되어 AcceleratorState 클래스입니다. 소스 코드에서:"
class Accelerator:
def __init__(
self,
device_placement: bool = True,
split_batches: bool = False,
fp16: bool = None,
cpu: bool = False,
rng_types: Optional[List[Union[str, RNGType]]] = None,
kwargs_handlers: Optional[List[KwargsHandler]] = None,
):
# initialize state
self.state = AcceleratorState(fp16=fp16, cpu=cpu, _from_accelerator=True)
우리는 여러 변수를 함께 전달합니다. fp16, cpu & from_accelerator 그리고 끝입니다. 그렇게 해서 self.state 각각에 대해 설정됩니다 Accelerator.

이게 뭐지 AcceleratorState 클래스인가요?

이 클래스의 소스 코드는 다음에서 확인할 수 있습니다 여기하지만 본질적으로 하는 일은 다음과 같습니다 사용 가능한 하드웨어 유형에 따라 다음 변수들의 적절한 값을 설정합니다:
  • distributed_type
  • num_processes
  • process_index
  • local_process_index
  • use_fp16
더 짧은 버전은 __init__ 메서드AcceleratorState 다음과 비슷합니다:
class AcceleratorState:
_shared_state = {}

def __init__(self, fp16: bool = None, cpu: bool = False, _from_accelerator: bool = False):
self.__dict__ = self._shared_state
if not getattr(self, "initialized", False):
if is_tpu_available() and not cpu:
# setup all TPU related variables
elif int(os.environ.get("LOCAL_RANK", -1)) != -1 and not cpu:
# setup all MULTI-GPU related variables
else:
# setup single GPU or CPU related variables depending on whether CUDA is available
self.initialized = True
이제 내가 무슨 뜻으로 말했는지 알겠나요? AcceleratorState 하드웨어에 따라 변수에 특정 값을 설정하나요?"

어떻게 AcceleratorState 어떤 하드웨어가 사용 가능한지 알다?

음, 그건 그냥 if-else 조건문 묶음일 뿐이에요 __init__ 위에서 살펴본 메서드:
if is_tpu_available() and not cpu:
# setup variables for TPU
elif int(os.environ.get("LOCAL_RANK", -1)) != -1 and not cpu:
# setup variables for MULTI_GPU
else:
# setup variables for SINGLE_GPU or CPU depending on whether CUDA is available

어떻게 AcceleratorState 하드웨어에 따라 변수를 설정하나요?

단일 노드에 여러 개의 GPU가 있는 경우를 예로 들면, 그때는 __init__ …에 대한 메서드 AcceleratorState 다음과 같습니다:
if is_tpu_available() and not cpu:
# setup variables for TPU
elif int(os.environ.get("LOCAL_RANK", -1)) != -1 and not cpu:
self.distributed_type = DistributedType.MULTI_GPU
if not torch.distributed.is_initialized():
torch.distributed.init_process_group(backend="nccl")
self.num_processes = torch.distributed.get_world_size()
self.process_index = torch.distributed.get_rank()
self.local_process_index = int(os.environ.get("LOCAL_RANK", -1))
self.device = torch.device("cuda", self.local_process_index)
torch.cuda.set_device(self.device)
self.use_fp16 = parse_flag_from_env("USE_FP16", False) if fp16 is None else fp16
else:
# setup variables for SINGLE_GPU or CPU depending on whether CUDA is available
위의 소스 코드를 살펴보면, 우리는 다음을 확인할 수 있습니다 self.distributed_type 로 설정됩니다 DistributedType.MULTI_GPU 문자열 값만을 갖는 타입일 뿐인MULTI_GPU'.
요컨대, self.distributed_type 문자열 값으로 설정됩니다 ‘MULTI_GPU
다음으로, 앞서와 동일한 방식으로 분산 프로세스를 초기화합니다. 우리 PyTorch DDP 스크립트 ‘nccl’ 백엔드를 사용합니다. 이는 꽤 표준적이며, 분산 학습을 시작하기 전에 프로세스 그룹을 초기화해야 하기 때문입니다.
다음으로, 코드에서 프로세스 그룹 자체로부터 프로세스 수를 가져오고 또한 process_index 각 개별 프로세스마다. 참고로, process_index 각 프로세스마다 달라지게 됩니다. 마찬가지로, local_process_index.
오직 ~할 때만 local_process_index 그리고 process_index 서로 달라지는 경우는 멀티 노드를 사용할 때입니다. 즉, 여러 대의 머신에서 여러 GPU로 학습할 때를 말합니다. 이에 대해서는 PyTorch 포럼에서도 추가로 설명되어 있습니다. 여기.
마지막으로, 각 프로세스는 다음을 기준으로 자신만의 CUDA 디바이스를 사용합니다 local_process_index 그리고 …에 대한 값 use_fp16 또한 설정됩니다.
따라서 MULTI_GPU 환경에서 이러한 변수들이 어떻게 설정되는지 확인했습니다. 만약 단일 GPU나 CPU 전용 인스턴스에서 학습했다면 이 변수들의 값은 다르게 설정되었을 것입니다.

2단계: Accelerator로 DDP 준비를 위해 객체 구성하기

허깅페이스 Accelerate - prepare_model

내가 공유한 네 단계에서 PyTorch의 DDP 섹션에서는 모델을 거의 그대로 래핑하기만 하면 됩니다 DistributedDataParallel PyTorch의 클래스에 device ID를 넘겨서 사용하는 거죠, 맞나요?
def prepare_model(self, model):
if self.device_placement:
model = model.to(self.device)
if self.distributed_type == DistributedType.MULTI_GPU:
kwargs = self.ddp_handler.to_kwargs() if self.ddp_handler is not None else {}
model = torch.nn.parallel.DistributedDataParallel(
model,
device_ids=[self.local_process_index],
output_device=self.local_process_index,
**kwargs,
)
if self.native_amp:
model.forward = torch.cuda.amp.autocast()(model.forward)
return model
바로 그거죠! 우리가 내부에서 하고 있는 일이 정확히 그것입니다 prepare_model 메서드. 따라서 device_placement 기본값으로 True로 설정되어 있으므로, 먼저 모델을 이동시키고 self.device.
각 프로세스는 고유한 디바이스를 가진다는 점에 유의하세요.
다음으로, 모델을 다음 내부에 래핑합니다 DistributedDataParallel 클래스에 디바이스 ID 목록을 전달하는데, 이는 local_process_index 그것도 다시 각 프로세스마다 따로입니다! 그래서 우리는 우리의 …와 동일한 결과를 얻을 수 있었습니다 순수 PyTorch 스크립트, 동시에 사용자로부터 모든 복잡성을 감추어 줍니다. 정말 멋지죠!
사실 이 라이브러리의 소스 코드를 읽어보면서 정말 많은 것을 배웠습니다. 감사합니다. 실뱅 이렇게 아름다운 코드를 작성해 주셔서 감사합니다.
이제 DataLoader로 넘어가 봅시다. 대부분의 작업이 필요한 부분입니다.

Hugging Face Accelerate -> DataLoader 준비

prepare_dataloaders 메서드는 조금 더 복잡하며, 데이터셋을 기준에 따라 하위 집합들로 나누는 역할을 합니다. process_index 프로세스 그룹 내부에서 각 GPU가 항상 데이터의 일부 하위 집합만 받도록 보장합니다.
Hugging Face Accelerate 이는 주어진 DataLoader 내부의 데이터 샘플러를 업데이트하고, 샘플러를 해당 타입의 인스턴스로 교체함으로써 이루어집니다. BatchSamplerShard또한 DataLoader 자체도 내부에서 래핑되어 DataLoaderShard.
본질적으로, 기존 DataLoader에서 다음과 같은 핵심 정보를 가져옵니다:
new_dataset = dataloader.dataset
new_batch_sampler = dataloader.batch_sampler
generator = getattr(dataloader, "generator", None)

new_batch_sampler = BatchSamplerShard(
dataloader.batch_sampler,
num_processes=num_processes,
process_index=process_index,
split_batches=split_batches,
)

먼저 주어진 PyTorch DataLoader에서 데이터셋, 기존 배치 샘플러, 그리고 존재한다면 생성기를 가져옵니다. 다음으로 새로운 배치 샘플러를 생성하는데, 이는 `BatchSamplerShard 클래스입니다. 다음으로 이 클래스를 자세히 살펴보겠습니다.

BatchSamplerShard

BatchSamplerShard 클래스는 데이터셋을 기준에 따라 하위 집합으로 분할하는 역할을 담당합니다 process_index 그리고 적절한 데이터 하위 집합이 올바른 디바이스로 전달되도록 보장합니다. 이제 이 클래스의 상위 수준 소스 코드를 살펴보겠습니다.
class BatchSamplerShard(BatchSampler):
def __init__(
self,
batch_sampler: BatchSampler,
num_processes: int = 1,
process_index: int = 0,
split_batches: bool = False,
):
if split_batches and batch_sampler.batch_size % num_processes != 0:
raise ValueError(
f"To use `BatchSamplerShard` in `split_batches` mode, the batch size ({batch_sampler.batch_size}) "
f"needs to be a round multiple of the number of processes ({num_processes})."
)
self.batch_sampler = batch_sampler
self.num_processes = num_processes
self.process_index = process_index
self.split_batches = split_batches
self.batch_size = batch_sampler.batch_size
self.drop_last = batch_sampler.drop_last

def __len__(self):
if len(self.batch_sampler) % self.num_processes == 0:
return len(self.batch_sampler) // self.num_processes
length = len(self.batch_sampler) // self.num_processes
return length if self.drop_last else length + 1

def __iter__(self):
return self._iter_with_split() if self.split_batches else self._iter_with_no_split()
좋습니다. 훌륭해요. 그러면 이 클래스는 다음과 같은 변수들을 인자로 받습니다: batch_sampler, num_processes, process_index, split_batches, batch_size & drop_last. 참고로/주의할 점은 process_index 그리고 num_processes 값은 accelerator의 …에서 옵니다 prepare 메서드 자체.
그렇다면 핵심은 무엇일까요?
음, 우리는 데이터가 다음을 기준으로 하위 집합으로 나뉘길 원합니다 process_index 그리고 하위 집합들이 서로 겹치지 않도록 서로 다르게 나뉘게 하세요.

이것은 내부에서 어떻게 구현될까요? 배치 샘플러 샤드 클래스?

BatchSamplerShard 클래스에는 …이 있습니다 __iter__ 에 따라 올바른 배치를 산출하는 메서드 process_index.
이렇게 가정해 봅시다 split_batches = False이 경우에는 self._iter_with_no_split 메서드가 호출됩니다. 그러니 이 메서드의 소스 코드를 살펴봅시다.
이 메서드의 첫 부분부터 살펴보겠습니다:
def _iter_with_no_split(self):
initial_data = []
batch_to_yield = []
for idx, batch in enumerate(self.batch_sampler):
# We gather the initial indices in case we need to circle back at the end.
if not self.drop_last and idx < self.num_processes:
initial_data += batch
# We identify the batch to yield but wait until we ar sure every process gets a full batch before actually
# yielding it.
if idx % self.num_processes == self.process_index:
batch_to_yield = batch
if idx % self.num_processes == self.num_processes - 1 and len(batch) == self.batch_size:
yield batch_to_yield
batch_to_yield = []

그런데, DataLoader의 `batch_sampler 예전에는, PyTorch에서는 대략 다음과 같은 값을 반환했습니다: [0, 1, 2, 3], [4, 5, 6, 7], [8, 9] , Dataset에 10개의 요소가 있고 배치 크기가 4라고 가정해 봅시다.
위의 소스 코드를 보며 이 메서드의 핵심 아이디어를 파악해 봅시다. 우리는 이미 알고 있습니다 that self.batch_sampler 기존 DataLoader의 …로 설정되었습니다 batch_sampler, 그래서 이를 순회하면 우리의 batch.
하지만! 우리는 데이터를 기준에 따라 여러 부분으로 나눠야 하므로 process_index, 즉 이 전체 데이터를 그대로 보내면 모든 GPU가 전체 데이터셋을 받게 되므로 그렇게 할 수 없습니다.
그렇게 하려면, 먼저 데이터 인덱스를 몇 개 추가해서 initial_data 목록입니다. 나중에 다시 돌아와 데이터를 더 추가해야 할 경우를 대비한 것입니다.
다음으로, …를 기반으로 process_index, 코드에서처럼 우리가 내보내고자 하는 배치를 식별할 수 있습니다:
if idx % self.num_processes == self.process_index:
batch_to_yield = batch
가정해 보겠습니다, 우리가 2개를 가지고 있다고 num_processes, 그런 다음 …에 대해 process_index==0, 우리의 예시에 따르면, 이것은 단지 다음을 반환합니다 [0, 1, 2, 3], 그리고 …에 대해 process_index==1, 이것은 다음을 반환합니다 [4, 5, 6, 7].
자, 여기까지입니다! 우리는 배치를 …를 기준으로 하위 집합들로 성공적으로 샤딩했습니다. process_index. 멋지지 않나요?
다음으로, 내부에는 일부 로직이 있습니다 BatchSamplerShard 대해 drop_last 간단히 말해 생략하지만, 본질적으로는 혹시 모를 경우를 대비해 충분한 데이터를 확보하도록 해 줍니다 drop_last=False.
독자 여러분이 소스 코드를 살펴보고 실제로 코드를 실행해 보면서, 언제 어떤 일이 일어나는지 충분히 이해해 보시기 바랍니다 drop_last=True 그리고 언제drop_last=False.

DataLoaderShard

지금까지 우리는 데이터를 다음 기준에 따라 하위 집합으로만 분할했습니다 process_index 하지만 아직 데이터를 디바이스에 올려 놓지는 않았습니다. DataLoaderShard 클래스는 Hugging Face 내부에서 그 작업을 수행합니다 액셀러레이트!
소스 코드를 살펴보겠습니다:
class DataLoaderShard(DataLoader):
def __init__(self, dataset, device=None, rng_types=None, generator=None, **kwargs):
super().__init__(dataset, **kwargs)
self.device = device
self.rng_types = rng_types
self.generator = generator

def __iter__(self):
if self.rng_types is not None:
synchronize_rng_states(self.rng_types, self.generator)
state = AcceleratorState()
for batch in super().__iter__():
if state.distributed_type == DistributedType.TPU:
xm.mark_step()
yield batch if self.device is None else send_to_device(batch, self.device)
우리가 집중해서 살펴봐야 할 핵심 코드 줄은 다음과 같습니다:
for batch in super().__iter__():
# Ignoring TPU operations
yield batch if self.device is None else send_to_device(batch, self.device)
그래서 만약 the device None 이 아니라면 다음을 사용해 데이터를 디바이스로 보냅니다 send_to_device 함수입니다. 이에 대한 소스 코드는 send_to_device 함수는 다음과 같습니다:
def send_to_device(tensor, device):
if isinstance(tensor, (list, tuple)):
return type(tensor)(send_to_device(t, device) for t in tensor)
elif isinstance(tensor, dict):
return type(tensor)({k: send_to_device(v, device) for k, v in tensor.items()})
elif not hasattr(tensor, "to"):
return tensor
return tensor.to(device)
이 함수는 독자가 직접 이해해 보도록 과제로 남겨 두겠습니다만, 정말 간단하고 깔끔합니다!

결론

대부분의 내용은 이것으로 충분합니다! 이 글은 HuggingFace 전체를 모두 설명하지는 않습니다. Accelerate 자체에 대한 모든 내용을 다룬 것은 아니지만, 어떻게 동작하고 어떤 구조로 되어 있는지에 대해 어느 정도는 이해를 돕는 설명이 되었기를 바랍니다.
저는 이미 내부 워크플로에 이 패키지를 사용하기 시작했으며, 사용하기가 정말 쉽습니다! 또한 이 글은 제가 이해한 범위에서 패키지를 바탕으로 작성되었고, Hugging Face의 공식 문서 패키지에 대해 더 배우기에는 더 적합한 곳일 것입니다.

이 글은 AI로 번역되었습니다. 오역이 있을 수 있으니 댓글로 알려 주세요. 원문 링크는 다음과 같습니다: 원문 보고서 보기
Shahriar Shayesteh
Shahriar Shayesteh •  
Hi, thanks for the clarification. I try to use accelerate but when I pass in my dataloader inside the accelerate.prepare() and get an accelerate.data_loader.DataLoaderShard, I cannot iterate over the data batch (it goes over one example each time)during the training. I'm a little confused about how the toy example in the documentation has done that. is there anything that I did not consider?
Reply