Inside Hugging Face's Accelerate!

"Could Accelerate really be this easy?" I asked myself, and the result is this dive into the internal workings of the package. Made by Aman Arora using W&B
Aman Arora

Introduction

As someone who first spent around a day implementing Distributed Data Parallel (DDP) in PyTorch, and then spent around 5 mins doing the same thing using Hugging Face's new 🤗 Accelerate, I was intrigued and amazed by the simplicity of the package!
"Could it really be this easy?" I asked myself, and the result is this post.
🤗 Accelerate: GitHub | Docs | Commit ID at the time of writing this post
As part of this post, we will be looking at the source code of 🤗 Accelerate, but at times, I will skip some parts of the code for simplicity. Also, we will not be looking at TPU-related code and only focus on how 🤗 Accelerate works for single and multi-GPU environments.
Please note that this post is not an introduction to Distributed Data Parallel (DDP). For an introduction to DDP, please refer to the following wonderful resources:
  1. PyTorch's docs & tutorials
  2. PyTorch Distributed Training by Lei Mao
  3. Distributed data parallel training in PyTorch by Kevin Kaichuang Yang
So, let's get started!

Distributed Data Parallel in PyTorch

If you're a PyTorch user like I am and have previously tried to implement DDP in PyTorch to train your models on multiple GPUs in the past, then you know how painful it can be, especially if you're doing it the first time.
🍀: Here's how a typical training script using DDP in PyTorch looks like without 🤗 Accelerate.
As you can see, there are a few things that need to be done in order to implement DDP correctly:
  1. Initialize a process group using torch.distributed package: dist.init_process_group(backend="nccl")
  2. Take care of variables such as local_world_size and local_rank to handle correct device placement based on the process index.
  3. Add a sampler of type torch.utils.data.distributed.DistributedSampler to the DataLoader such that the batch get's split appropriately and only a subset of it is passed to the GPUs based on the local_rank of the process.
  4. Wrap the model inside DistributedDataParallel class passing in the device_ids such that a replica of the model can be created on each of the GPUs.
And more! This process is error-prone and time consuming, especially if you're doing it the first time.
Enter 🤗 Accelerate to the rescue! :)

Introduction to 🤗 Accelerate

🍀: Here's how the same script looks like with 🤗 Accelerate.
💭: Please note that this is not an official example. For that, please refer to the docs here.
We just lost around 50 lines of boilerplate code thanks to 🤗 Accelerate and Sylvain Gugger. And, the same script can now work on a single GPU, multi-GPUs and TPUs!
💭: I personally haven't tried working with TPUs but just being able to use the same script when training on single & multi GPUs is enough for me. ;)
Basically, all we need to do now to implement training on multi-GPUs is the following:
from accelerate import Accelerator accelerator = Accelerator()# dummy code to get dataloaders, model & optimizertrain_dataloader, eval_dataloader, model, optimizer = get_everything()# prepare for DDP using acceleratortrain_dataloader, eval_dataloader, model, optimizer = accelerator.prepare( train_dataloader, eval_dataloader, model, optimizer )
And that's pretty much it! Just passing our objects dataloaders, model & optimizer through accelerator.prepare can get them ready for us. But, how? How can these two lines of code be enough to look after everything?
That's the exact same question I asked myself. Below, let's shed some light on the internal mechanisms of 🤗 Accelerate and try to answer this question.

Inside 🤗 Accelerate

Before we start digging into the source code, let's keep in mind that there are two key steps to using 🤗 Accelerate:
  1. Initialize Accelerator: accelerator = Accelerator()
  2. Prepare the objects such as dataloader, optimizer & model: train_dataloader, model, optimizer = accelerator.prepare(train_dataloader, model, optimizer)
We are going to understand what goes on in each of these two key steps next.
💭: Please note that from this point on, this article becomes code heavy as we start looking into the source code step-by-step. :)

Step 1: Initializing the Accelerator

Every time we initialize an Accelerator, accelerator = Accelerator(), the first thing that happens is that the Accelerator's state is set to be an instance of AcceleratorState class. From the source code:
class Accelerator: def __init__( self, device_placement: bool = True, split_batches: bool = False, fp16: bool = None, cpu: bool = False, rng_types: Optional[List[Union[str, RNGType]]] = None, kwargs_handlers: Optional[List[KwargsHandler]] = None, ): # initialize state self.state = AcceleratorState(fp16=fp16, cpu=cpu, _from_accelerator=True)
We pass in a bunch of variables - fp16, cpu & from_accelerator and that's it. That's how the self.state is set for each Accelerator.

❓: What is this AcceleratorState class?

You can find this class's source code here, but essentially all it does is set the appropriate value for the following variables depending on which type of hardware is available:
The shorter version of the __init__ method of the AcceleratorState looks something like:
class AcceleratorState: _shared_state = {} def __init__(self, fp16: bool = None, cpu: bool = False, _from_accelerator: bool = False): self.__dict__ = self._shared_state if not getattr(self, "initialized", False): if is_tpu_available() and not cpu: # setup all TPU related variables elif int(os.environ.get("LOCAL_RANK", -1)) != -1 and not cpu: # setup all MULTI-GPU related variables else: # setup single GPU or CPU related variables depending on whether CUDA is available self.initialized = True
💭: Can you see now what I meant by saying that AcceleratorState sets a particular value for the variables based on the hardware?"

❓: How does AcceleratorState know which hardware is available?

Well, that's just a bunch of if-else conditions in the __init__ method that we saw above:
if is_tpu_available() and not cpu: # setup variables for TPU elif int(os.environ.get("LOCAL_RANK", -1)) != -1 and not cpu: # setup variables for MULTI_GPUelse: # setup variables for SINGLE_GPU or CPU depending on whether CUDA is available

❓: How does AcceleratorState set the variables depending on hardware?

Taking a single node with multiple GPUs as an example, in that case, the __init__ method for AcceleratorState looks like:
if is_tpu_available() and not cpu: # setup variables for TPU elif int(os.environ.get("LOCAL_RANK", -1)) != -1 and not cpu: self.distributed_type = DistributedType.MULTI_GPU if not torch.distributed.is_initialized(): torch.distributed.init_process_group(backend="nccl") self.num_processes = torch.distributed.get_world_size() self.process_index = torch.distributed.get_rank() self.local_process_index = int(os.environ.get("LOCAL_RANK", -1)) self.device = torch.device("cuda", self.local_process_index) torch.cuda.set_device(self.device) self.use_fp16 = parse_flag_from_env("USE_FP16", False) if fp16 is None else fp16else: # setup variables for SINGLE_GPU or CPU depending on whether CUDA is available
Having a look at the source code above, we can see that self.distributed_type gets set to DistributedType.MULTI_GPU which is nothing but a type that has string value 'MULTI_GPU'.
So, in essence, self.distributed_type is set a string value of 'MULTI_GPU'.
Next, we initialize the distributed processes as we did in our PyTorch DDP script with 'nccl' backend. This is pretty standard as we do need to initialize a process group before starting out with distributed training.
Next, in code we can see that we get the number of processes from the process group itself and also the process_index for each of our individual processes. Note that the process_index is going to be different for each process. Same for the local_process_index.
💭: The only time when local_process_index and process_index are going to be different is when we are training using multiple-nodes. That is, training on multiple machines with multiple GPUs. This has also been explained further on the PyTorch forums here.
Finally, each process uses its own CUDA device based on the local_process_index and a value for use_fp16 is also set.
💭: Thus, we have seen how these variables get set in a MULTI_GPU environment. The values of these variables would have been set up differently had we been training on a single GPU or CPU-only instance.

Step 2: Getting objects ready for DDP using the Accelerator

🤗 Accelerate - prepare_model

From the four steps I shared in the DDP in PyTorch section, all we need to do is pretty much wrap the model in DistributedDataParallel class from PyTorch passing in the device IDs - right?
def prepare_model(self, model): if self.device_placement: model = model.to(self.device) if self.distributed_type == DistributedType.MULTI_GPU: kwargs = self.ddp_handler.to_kwargs() if self.ddp_handler is not None else {} model = torch.nn.parallel.DistributedDataParallel( model, device_ids=[self.local_process_index], output_device=self.local_process_index, **kwargs, ) if self.native_amp: model.forward = torch.cuda.amp.autocast()(model.forward) return model
And there it is! That's exactly what we are doing inside the prepare_model method. Since device_placement is set to True by default, first we move the model to self.device.
💭: Note that each process has its own device.
Next, we wrap the model inside DistributedDataParallel class, passing in a list of device IDs which is the local_process_index that's separate for each process again! So, we have been able to achieve the same result as our pure PyTorch script, while at the same abstracting away all the complexity from the users. This is neat!
💭: I actually learnt a lot just by going through the source code of this library. Thanks Sylvain for writing such beautiful code. :)
Now, moving on the DataLoaders - this is where most of the work needs to be done.

🤗 Accelerate -> Prepare DataLoaders

The prepare_dataloaders method is a bit more complex and it is responsible for breaking the dataset into subsets based on the process_index inside the process group, making sure that each GPU only ever gets a subset of the data.
🤗 Accelerate achieves this by updating the data sampler inside the given DataLoader and updating the sampler to be an instance of type BatchSamplerShard. Also, the DataLoader itself gets wrapped inside DataLoaderShard.
Essentially, we grab some key information from the existing DataLoader:
new_dataset = dataloader.datasetnew_batch_sampler = dataloader.batch_samplergenerator = getattr(dataloader, "generator", None)new_batch_sampler = BatchSamplerShard( dataloader.batch_sampler, num_processes=num_processes, process_index=process_index, split_batches=split_batches,)
First we grab the dataset, existing batch sampler, and generator if it exists from our given PyTorch DataLoader. Next, we create a new batch sampler which is an instance of `BatchSamplerShard class. Let's look at this class in detail next.

BatchSamplerShard

The BatchSamplerShard class is responsible for breaking the dataset into subsets based on the process_index and making sure that the appropriate subsets of the data get sent to the correct device. Let's look at the high level source code of this class.
class BatchSamplerShard(BatchSampler): def __init__( self, batch_sampler: BatchSampler, num_processes: int = 1, process_index: int = 0, split_batches: bool = False, ): if split_batches and batch_sampler.batch_size % num_processes != 0: raise ValueError( f"To use `BatchSamplerShard` in `split_batches` mode, the batch size ({batch_sampler.batch_size}) " f"needs to be a round multiple of the number of processes ({num_processes})." ) self.batch_sampler = batch_sampler self.num_processes = num_processes self.process_index = process_index self.split_batches = split_batches self.batch_size = batch_sampler.batch_size self.drop_last = batch_sampler.drop_last def __len__(self): if len(self.batch_sampler) % self.num_processes == 0: return len(self.batch_sampler) // self.num_processes length = len(self.batch_sampler) // self.num_processes return length if self.drop_last else length + 1 def __iter__(self): return self._iter_with_split() if self.split_batches else self._iter_with_no_split()
Okay, great, so this class accepts some variables like batch_sampler, num_processes, process_index, split_batches, batch_size & drop_last. Note that the process_index and num_processes values are coming from the accelerator's prepare method itself.
So then, what's the main idea?
💭: Well, we want the data to be split into subsets based on the process_index and make sure that the subsets are different and don't overlap.

❓: How is this achieved inside the BatchSamplerShard class?

The BatchSamplerShard class has an __iter__ method that yields the correct batches based on the process_index.
Let's assume we are going with split_batches = False. In this case, the self._iter_with_no_split method gets called. So, let's look at the source code of this method.
Starting with the first part of this method:
def _iter_with_no_split(self): initial_data = [] batch_to_yield = [] for idx, batch in enumerate(self.batch_sampler): # We gather the initial indices in case we need to circle back at the end. if not self.drop_last and idx < self.num_processes: initial_data += batch # We identify the batch to yield but wait until we ar sure every process gets a full batch before actually # yielding it. if idx % self.num_processes == self.process_index: batch_to_yield = batch if idx % self.num_processes == self.num_processes - 1 and len(batch) == self.batch_size: yield batch_to_yield batch_to_yield = []
💭: BTW, if you haven't seen a DataLoader's `batch_sampler before, in PyTorch, it returns something like: [0, 1, 2, 3], [4, 5, 6, 7], [8, 9] , given we have 10 elements in our Dataset and the batch size is 4.
Let's try and grasp the main idea of this method by looking at the above source code. We already know that self.batch_sampler has been set to existing DataLoader's batch_sampler, so by iterating over it, we get our batch.
BUT! Since we need to split the data into parts based on the process_index, that means that we can't send this whole data out, as otherwise all our GPUs would be getting the complete dataset.
To do that, first we add some data indices to initial_data list. This is in case we need to circle back and add more data later on.
Next, based on the process_index, we can identify the batch we want to yield as in the code:
if idx % self.num_processes == self.process_index: batch_to_yield = batch
Assuming we have 2 num_processes, then for process_index==0, going by our example, this just returns [0, 1, 2, 3] and for process_index==1 this returns [4, 5, 6, 7].
So, that's it! We have successfully sharded our batch into subsets based on the process_index. Isn't this cool?
Next, there's some logic inside the BatchSamplerShard about drop_last which I skip for simplicity. But essentially, it just makes sure that we get enough data in case drop_last=False.
💭: I leave it as an exercise to the reader to look at the source code and try out the code to get a good understanding of what happens when drop_last=True and whendrop_last=False.

DataLoaderShard

So far, we have only divided our data into subsets based on the process_index but we haven't placed the data on the device yet. DataLoaderShard class will do that inside 🤗 Accelerate!
Let's look at the source code:
class DataLoaderShard(DataLoader): def __init__(self, dataset, device=None, rng_types=None, generator=None, **kwargs): super().__init__(dataset, **kwargs) self.device = device self.rng_types = rng_types self.generator = generator def __iter__(self): if self.rng_types is not None: synchronize_rng_states(self.rng_types, self.generator) state = AcceleratorState() for batch in super().__iter__(): if state.distributed_type == DistributedType.TPU: xm.mark_step() yield batch if self.device is None else send_to_device(batch, self.device)
The main lines of code that we should focus on are:
for batch in super().__iter__(): # Ignoring TPU operations yield batch if self.device is None else send_to_device(batch, self.device)
So if the device is not None, we send the data to device using send_to_device function. The source code for this send_to_device function looks like:
def send_to_device(tensor, device): if isinstance(tensor, (list, tuple)): return type(tensor)(send_to_device(t, device) for t in tensor) elif isinstance(tensor, dict): return type(tensor)({k: send_to_device(v, device) for k, v in tensor.items()}) elif not hasattr(tensor, "to"): return tensor return tensor.to(device)
💭: I leave it as an exercise to the reader to understand this function but it's really straightforward and clean!

Conclusion

That's really most of it! This article does not explain the entirety of 🤗 Accelerate but I hope I have been able to shed some light on how it works and how it's structured.
I have already started using the package for my internal workflows and it's been really easy to use! Also, please note that this article is based on my understanding of the package and the official documentation by Hugging Face would be a better place to learn more about the package.