Pytorch Lightning与“权阈(Weights & Biases)”的结合使用
PyTorch Lightning 是一种轻量级包装器,用于组织PyTorch代码,并能轻松添加高级功能,例如分布式训练和16位精度。
结合Weights & Biases integration集成,您可以快速训练和追踪模型,以实现完全可追溯性和可重复性,这仅需额外两行代码:
from pytorch_lightning.loggers import WandbLogger
wandb_logger = WandbLogger()
在此示例中,我们将优化基于MNIST数据集的一些简单模型。
试试Pytorch Lightning →
🚀 安装
Pytorch-lightning和W&B可以通过pip轻松安装。
pip install pytorch-lightning wandb
我们只需要导入一些Pytorch-Lightning模块以及WandbLogger,就可以定义模型了。
🏗️ 使用LightningModule定义我们的模型
研究通常涉及使用新的实验变体来编辑样板代码。 由于这种修补过程,大多数错误都会被引入代码库。 Pytorch lighting通过提供用于定义和训练模型的确定代码结构,大大减少了样板代码。
要在Pytorch中创建神经网络类(class),我们必须从torch.nn.module
导入或扩展。 同样,当使用Pytorch-Lightning时,我们将导入类pl.LightningModule
。
让我们创建一个用于训练MNIST数据集分类模型的类。 为了比较结果,我们将使用与官方文档中相同的示例。
class LitMNIST(LightningModule):
def __init__(self):
super().__init__()
# mnist images are (1, 28, 28) (channels, width, height)
self.layer_1 = torch.nn.Linear(28 * 28, 128)
self.layer_2 = torch.nn.Linear(128, 256)
self.layer_3 = torch.nn.Linear(256, 10)
此外,我们还可以添加指标并保存我们的超参数。
完整代码在这里 →
如上所示,除了导入的基类,代码中的其他所有内容与Pytorch几乎相同。
然后,我们需要定义更多方法:
forward
用于推论,就像在Pytorch中一样training_step
与forward
类似,但还需要从单个batch中返回损失(loss),这将作为训练循环自动进行迭代
def forward(self, x):
'''method used for inference input -> output'''
batch_size, channels, width, height = x.size()
# (b, 1, 28, 28) -> (b, 1*28*28)
x = x.view(batch_size, -1)
x = self.layer_1(x)
x = F.relu(x)
x = self.layer_2(x)
x = F.relu(x)
x = self.layer_3(x)
x = F.log_softmax(x, dim=1)
return x
def training_step(self, batch, batch_idx):
'''needs to return a loss from a single batch'''
x, y = batch
logits = self(x)
loss = F.nll_loss(logits, y)
# Log training loss
self.log('train_loss', loss)
# Log metrics
self.log('train_acc', self.accuracy(logits, y))
return loss
优化器的定义与Pytorch中的定义相同,但需要通过configure_optimizes
完成。
def configure_optimizers(self):
'''defines model optimizer'''
return Adam(self.parameters(), lr=self.lr)
最后,我们可以添加一个validation_step
和一个test_step
来记录损失和指标。
📊 数据载入
可以使用以下方法创建数据管道:
-
🍦 普通Pytorch的
DataLoaders
-
⚡ Pytorch Lightning的
DataModules
DataModules
具有更结构化的定义,它允许进行其他优化,例如在CPU和GPU之间自动分配工作负载。 建议尽可能使用DataModules
!DataModule
也由接口定义: -
prepare_data
(可选):只需在1个GPU上调用一次——通常在类似下面的数据下载步骤调用 -
setup
, 在每个GPU上分别调用,如果我们处于fit或test步骤,可以接受stage来定义 -
train_dataloader
,val_dataloader
和test_dataloader
:加载数据集
这是在代码中执行此操作的方法。
class MNISTDataModule(LightningDataModule):
def __init__(self, data_dir='./', batch_size=256):
super().__init__()
self.data_dir = data_dir
self.batch_size = batch_size
self.transform = transforms.ToTensor()
def prepare_data(self):
'''called only once and on 1 GPU'''
# download data
MNIST(self.data_dir, train=True, download=True)
MNIST(self.data_dir, train=False, download=True)
def setup(self, stage=None):
'''called one ecah GPU separately - stage defines if we are at fit or test step'''
# we set up only relevant datasets when stage is specified (automatically set by Pytorch-Lightning)
if stage == 'fit' or stage is None:
mnist_train = MNIST(self.data_dir, train=True, transform=self.transform)
self.mnist_train, self.mnist_val = random_split(mnist_train, [55000, 5000])
if stage == 'test' or stage is None:
self.mnist_test = MNIST(self.data_dir, train=False, transform=self.transform)
def train_dataloader(self):
'''returns training dataloader'''
mnist_train = DataLoader(self.mnist_train, batch_size=self.batch_size)
return mnist_train
def val_dataloader(self):
'''returns validation dataloader'''
mnist_val = DataLoader(self.mnist_val, batch_size=self.batch_size)
return mnist_val
def test_dataloader(self):
'''returns test dataloader'''
mnist_test = DataLoader(self.mnist_test, batch_size=self.batch_size)
return mnist_test
👟 训练模型
一个传统的Pytorch训练流水线是这样的——执行epoch的循环,迭代微型批次(mini-batches),为每个微型批次执行前馈传递,计算损失,为每个批次执行反向传播,最后更新 梯度(gradients)。
为了在Pytorch Lightning中执行相同的操作,我们把Pytorch Lightning模块中训练逻辑和数据加载的主要元素拿出来。
使用这些函数,Pytorch Lightning将使这个过程的训练部分自动化。 我们将介绍这一点,但在这之前,让我们先了解pytorch lightning如何简单地与Weights&Biases集成在一起,以跟踪实验并创建可在任何地方进行观察的可视化效果。
使用WandB跟踪Pytorch Lightning模型的性能
让我们看看wandbLogger如何与Lightning集成。
from pytorch_lightning.loggers import WandbLogger
wandb_logger = WandbLogger(name='Adam-32-0.001',project='pytorchlightning')
在这里,我们创建了一个wandbLogger对象,其中包含被记录下来的项目和运行的详细信息。
训练循环
现在,让我们进入任何模型训练最重要的部分,即训练循环。 当我们使用Pytorch Lightning时,大多数逻辑已经在幕后设定好了。 我们只需要指定一些超参数,训练过程将使用Trainer
自动完成。 除此之外,每次迭代您还将有一个进度条。
完整代码在这里→
# setup data
mnist = MNISTDataModule()
# setup model - choose different hyperparameters per experiment
model = LitMNIST(n_layer_1=128, n_layer_2=256, lr=1e-3)
# define a Trainer
trainer = Trainer(
logger=wandb_logger, # W&B integration
gpus=-1, # use all GPU's
max_epochs=3 # number of epochs
)
可视化代码中重要的部分是——WandbLogger对象作为记录器(logger)传递到Pytorch Lightning的Trainer对象中。 这将自动使用记录器记录结果。
def train():
trainer.fit(model)
这就是使用Pytorch Lightning训练Pytorch模型所需要做的全部工作。 这一行代码将轻松替代您繁琐且效率低下的普通Pytorch代码。
通过“权阈”可视化模型性能
让我们看一下这个运行生成的可视化效果。
训练模型时,特定运行的训练损失和验证损失会自动实时记录在仪表板上。
我们可以使用不同的超参数重复相同的训练步骤,以比较不同的运行。我们将更改记录器的名称,以标识每次运行。
完整代码在这里→
wandb_logger = WandbLogger(name='Adam-32-0.001',project='pytorchlightning')
wandb_logger = WandbLogger(name='Adam-64-0.01',project='pytorchlightning')
wandb_logger = WandbLogger(name='sgd-64-0.01',project='pytorchlightning')
在这里,我使用惯例来命名运行。第一部分是优化器,第二部分是mini-batch大小,第三部分是学习率(learning rate)。例如,名称“ Adam-32-0.001”表示正在使用的优化程序是Adam,批处理大小为32,学习率为0.001。
您可以在上面的图中查看每种模型的性能。
这些可视化文件将永久存储在您的项目中,这使得比较具有不同超参数的模型的性能,恢复最佳性能的模型以及与团队共享结果变得更加容易。
Pytorch Lightning提供了两种方法来进行提前停止。 使用方法如下:
完整代码在这里→
# A) Set early_stop_callback to True. Will look for 'val_loss'
# in validation_end() return dict. If it is not found an error is raised.
trainer = Trainer(early_stop_callback=True)
# B) Or configure your own callback
early_stop_callback = EarlyStopping(
monitor='val_loss',
min_delta=0.00,
patience=3,
verbose=False,
mode='min'
)
trainer = Trainer(early_stop_callback=early_stop_callback)
定义验证函数后,我们可以直接设置early_stop_callback = true
:
trainer = pl.Trainer(max_epochs = 5,logger= wandb_logger, gpus=1, distributed_backend='dp',early_stop_callback=True)
根据项目的要求,您可能需要增加或减少模型权重的精度。 降低精度可使您将更大的模型放入GPU中。 让我们看看如何在pytorch lightning中加入16位精度。
首先,我们需要安装NVIDIA apex。 为此,我们将在colab中创建一个shell脚本并执行它。
完整代码在这里→
%%writefile setup.sh
git clone https://github.com/NVIDIA/apex
pip install -v --no-cache-dir ./apex
!sh setup.sh
安装apex后,您需要重新启动运行时。
现在,我们可以直接在训练的precision参数中传递所需的值。
trainer = pl.Trainer(max_epochs = 100,logger= wandb_logger, gpus=1, distributed_backend='dp',early_stop_callback=True, amp_level='O1',precision=16)
Lightning提供了用于执行数据并行性和多GPU训练的简单API。 您无需在采样器中使用torch的数据并行类。 您只需要指定并行模式和希望使用的GPU数量即可。
有多种训练方式:
-
数据并行(distributed_backend ='dp')(多GPU,一台机器)
-
分布式数据并行(distributed_backend =“ ddp”)(跨多台计算机的多GPU)
-
分布式数据并行2(distributed_backend ='ddp2')(一台计算机中是dp,多台计算机间是ddp)。
-
TPUs(num_tpu_cores = 8 | x)(tpu或TPU pod)
在这篇文章中,我们将使用数据并行后端。 以下是我们将其合并到现有代码中的方法。
trainer = pl.Trainer(gpus=1, distributed_backend='dp',max_epochs = 5,logger= wandb_logger)
因为我在Google colab上工作,这里我仅使用1个GPU。
随着使用更多GPU,在wandb中您将能够观察不同配置之间的内存使用差异,如左图所示。
通常,在研究过程中,您需要间歇性训练模型。这时我们就需要停止训练、保存状态、稍后加载保存的状态、然后在我们停止的地方继续训练。
能够保存和还原模型还可以使您与团队更有效地协作,并返回几周前的实验。
要使用wandb保存pytorch lightning模型,我们可以使用:
trainer.save_checkpoint('EarlyStoppingADam-32-0.001.pth')
wandb.save('EarlyStoppingADam-32-0.001.pth')
这将在本地运行时中创建一个检查点文件,并将其上传到wandb。现在,当我们决定(甚至在不同的系统上)恢复训练时,我们可以简单地从wandb加载检查点文件并将其加载到我们的程序中,如下所示:
wandb.restore('EarlyStoppingADam-32-0.001.pth')
model.load_from_checkpoint('EarlyStoppingADam-32-0.001.pth')
现在,检查点已加载到模型中,模型可以使用所需的训练模块来继续训练。既然我们已经看到了lightning提供的简单框架,让我们简单比较一下它和pytorch。在lightning中,我们可以通过创建trainer和调用train()方法,使用自动化回调函数和进度条来训练模型。让我们来看看使用普通Pytorch(Vanilla Pytorch)如何实现相同的效果。
完整代码在这里→
#Pytorch
pytorch_model = MNISTClassifier()
optimizer = torch.optim.Adam(pytorch_model.parameters(), lr=1e-3)
# ----------------
# LOSS
# ----------------
def cross_entropy_loss(logits, labels):
return F.nll_loss(logits, labels)
# ----------------
# TRAINING LOOP
# ----------------
num_epochs = 1
for epoch in range(num_epochs):
# TRAINING LOOP
for train_batch in mnist_train:
x, y = train_batch
logits = pytorch_model(x)
loss = cross_entropy_loss(logits, y)
print('train loss: ', loss.item())
loss.backward()
optimizer.step()
optimizer.zero_grad()
# VALIDATION LOOP
with torch.no_grad():
val_loss = []
for val_batch in mnist_val:
x, y = val_batch
logits = pytorch_model(x)
val_loss.append(cross_entropy_loss(logits, y).item())
val_loss = torch.mean(torch.tensor(val_loss))
print('val_loss: ', val_loss.item())
您可以看到训练代码变得多么复杂,我们甚至还没有包含多GPU训练,早期停止或结合wandb跟踪性能。
为了在Pytorch中添加分布式训练,我们需要使用DistributedSampler对数据集进行采样。
def train_dataloader(self):
dataset = MNIST(...)
sampler = None
if self.on_tpu:
sampler = DistributedSampler(dataset)
return DataLoader(dataset, sampler=sampler)
您还需要编写一个自定义函数以包含提前停止功能。 但是当使用lightning时,所有这些都可以通过一行代码来完成。
#Pytorch Lightning
trainer = pl.Trainer(max_epochs = 5,logger= wandb_logger, gpus=1, distributed_backend='dp',early_stop_callback=True)
trainer.fit(model)
这就是这篇文章的全部内容。
试试Pytorch Lightning→.
如果您对将lightning与权阈集成在一起有任何疑问,我们很乐意在我们Slack社区中回答它们。