ML

[GluonCV, OpenSource 분석하기] GluonCV의 multiprocessing 이해하기 [2]

ML_MJSHIN 2021. 10. 27. 09:57

Introduction


  지난 포스팅에 이어서 GluonCV의 DDP 활용법에 대해서 살펴보도록 하겠습니다.

 

 

Train 


  train_ddp_pytorch.py 스크립트에서 모델을 생성하고 데이터셋을 로드하는 부분까지는 지난 포스팅에서 살펴보았습니다. 이제 train 하는 과정에서의 DDP로 인한 변화된 부분을 살펴볼  차례입니다.

 

  우선, optimizer를 선언하는 것에 있어서는 기존에 익숙한 방식대로 똑같이 선언해주면 됩니다. 하지만 차이점은 criterion을 설정할 때는 .cuda() 함수를 통해서 GPU에 올려주는 것이 필요합니다.

 

  실제로 차이가 나는 부분은 train_sampler.set_epoch() 함수가 처음 등장하게 됩니다. 

# create dataset and dataloader
    train_loader, val_loader, train_sampler, val_sampler, mg_sampler = build_dataloader(cfg)

    # create optimizer
    optimizer = torch.optim.SGD(model.parameters(),
                                lr=cfg.CONFIG.TRAIN.LR,
                                momentum=cfg.CONFIG.TRAIN.MOMENTUM,
                                weight_decay=cfg.CONFIG.TRAIN.W_DECAY)

    # load a pre-trained checkpoint for finetuning or evaluation
    if cfg.CONFIG.MODEL.LOAD:
        model, _ = load_model(model, optimizer, cfg, load_fc=True)

    # create lr scheduler
    if cfg.CONFIG.TRAIN.LR_POLICY == 'Step':
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                                                         milestones=cfg.CONFIG.TRAIN.LR_MILESTONE,
                                                         gamma=cfg.CONFIG.TRAIN.STEP)
    elif cfg.CONFIG.TRAIN.LR_POLICY == 'Cosine':
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,
                                                               T_max=cfg.CONFIG.TRAIN.EPOCH_NUM - cfg.CONFIG.TRAIN.WARMUP_EPOCHS,
                                                               eta_min=0,    # minimum learning rate
                                                               last_epoch=cfg.CONFIG.TRAIN.RESUME_EPOCH)
    else:
        print('Learning rate schedule %s is not supported yet. Please use Step or Cosine.')

    if cfg.CONFIG.TRAIN.USE_WARMUP:
        scheduler_warmup = GradualWarmupScheduler(optimizer,
                                                  multiplier=1.0,    # warmup_lr is required to start from 0
                                                  total_epoch=cfg.CONFIG.TRAIN.WARMUP_EPOCHS,
                                                  after_scheduler=scheduler)
    # create criterion
    criterion = nn.CrossEntropyLoss().cuda()

    # train loop
    base_iter = 0
    for epoch in range(cfg.CONFIG.TRAIN.EPOCH_NUM):
        if cfg.DDP_CONFIG.DISTRIBUTED:
            train_sampler.set_epoch(epoch)

        # train one epoch
        base_iter = train_classification(base_iter, model, train_loader, epoch, criterion, optimizer, cfg, writer=writer)
        if cfg.CONFIG.TRAIN.USE_WARMUP:
            scheduler_warmup.step()
        else:
            scheduler.step()

        # evaluation
        if epoch % cfg.CONFIG.VAL.FREQ == 0 or epoch == cfg.CONFIG.TRAIN.EPOCH_NUM - 1:
            validation_classification(model, val_loader, epoch, criterion, cfg, writer)

        # save model
        if epoch % cfg.CONFIG.LOG.SAVE_FREQ == 0 or epoch == cfg.CONFIG.TRAIN.EPOCH_NUM - 1:
            if cfg.DDP_CONFIG.GPU_WORLD_RANK == 0 or cfg.DDP_CONFIG.DISTRIBUTED == False:
                save_model(model, optimizer, epoch, cfg)

    if writer is not None:
        writer.close()

 

  multi_process로 동작하는 과정에서 같인 epoch에 대해서 데이터셋을 나누고 loss를 수집하여 모델을 업데이트 해야 하므로 epoch 마다 train_sampler의 set_epoch을 통해서 epoch의 정보를 전달해줍니다. 

 

  그리고 이 함수의 목적은 매우 중요합니다. 그러기 위해서 저희는 DistributedSampler 를 살펴보아야 합니다. [1]

class DistributedSampler(Sampler):
    def __init__(self, dataset, num_replicas=None, rank=None):
        num_replicas = dist.get_world_size()
        rank = dist.get_rank()
        self.dataset = dataset
        self.num_replicas = num_replicas
        self.rank = rank
        self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas))
        self.total_size = self.num_samples * self.num_replicas
        
    def __iter__(self):
        g = torch.Generator()
        g.manual_seed(self.epoch)
        indices = torch.randperm(len(self.dataset), generator=g).tolist()
        indices = indices[self.rank:self.total_size:self.num_replicas]
        return iter(indices)

  각 Sampler는 전체 데이터를 GPU의 개수로 나눈 부분 데이터에서만 데이터를 샘플링합니다. 이때 각 GPU에서 할당하여 학습할 부분 데이터의 수는 위에서 self.num_samples에 계산됩니다. 그리고 iteration 과정에서는 데이터셋의 length를 구하여 (len(self.dataset)) randperm 으로 index 의 무작위 리스트를 만들어 내고 각 프로세스가 자신이 샘플링 해야 할 구간을 선택해서 GPU Sampler에 할당합니다. 

 

  이때 보면 index 들이 모든 프로세스에서 같은 random indeces 리스트가 생성되어야 하는 것을 알 수 있습니다. 그렇기 때문에 set_epoch(epoch) 함수를 통해서 이 random seed를 매번 맞춰준다고 생각하시면 됩니다. 

 

  이제 끝난 것 같지만 ... 저희에게는 아직 validation_classification 함수가 남아있습니다. train_classification 함수는 실제로 보면 일반 train 함수 부분과 동일하기에 살펴볼 것이 없지만 validation_classification 함수는 조금 있습니다 ... 

  

 위 두 함수의 경로는 gluon-cv/gluoncv/torch/utils/task_utils/classification.py 입니다. 

 

 해당 함수에서 한번 살펴보면 좋을 부분은 eval_path 의 directory를 만들어서 활용하는 부분입니다. 

def validation_classification(model, val_dataloader, epoch, criterion, cfg,
                              writer):
    """Task of validating video classification"""
    batch_time = AverageMeter()
    data_time = AverageMeter()
    losses = AverageMeter()
    top1 = AverageMeter()
    top5 = AverageMeter()
    model.eval()

    end = time.time()
    with torch.no_grad():
        for step, data in enumerate(val_dataloader):
            data_time.update(time.time() - end)
            val_batch = data[0].cuda()
            val_label = data[1].cuda()
            outputs = model(val_batch)

            loss = criterion(outputs, val_label)
            if cfg.CONFIG.DATA.NUM_CLASSES < 5:
                prec1a, prec5a = accuracy(outputs.data, val_label, topk=(1, 1))
                # Tricky solution for datasets with less than 5 classes, top5 acc is always set to 100%
                prec5a = 100
            else:
                prec1a, prec5a = accuracy(outputs.data, val_label, topk=(1, 5))

            losses.update(loss.item(), val_batch.size(0))
            top1.update(prec1a.item(), val_batch.size(0))
            top5.update(prec5a.item(), val_batch.size(0))
            batch_time.update(time.time() - end)
            end = time.time()

            if step % cfg.CONFIG.LOG.DISPLAY_FREQ == 0 and cfg.DDP_CONFIG.GPU_WORLD_RANK == 0:
                logger.info('----validation----')
                print_string = 'Epoch: [{0}][{1}/{2}]'.format(
                    epoch, step + 1, len(val_dataloader))
                logger.info(print_string)
                print_string = 'data_time: {data_time:.3f}, batch time: {batch_time:.3f}'.format(
                    data_time=data_time.val, batch_time=batch_time.val)
                logger.info(print_string)
                print_string = 'loss: {loss:.5f}'.format(loss=losses.avg)
                logger.info(print_string)
                print_string = 'Top-1 accuracy: {top1_acc:.2f}%, Top-5 accuracy: {top5_acc:.2f}%'.format(
                    top1_acc=top1.avg, top5_acc=top5.avg)
                logger.info(print_string)

        eval_path = cfg.CONFIG.LOG.EVAL_DIR
        if not os.path.exists(eval_path):
            os.makedirs(eval_path)

        with open(
                os.path.join(eval_path,
                             "{}.txt".format(cfg.DDP_CONFIG.GPU_WORLD_RANK)),
                'w') as f:
            f.write("{} {} {}\n".format(losses.avg, top1.avg, top5.avg))
        torch.distributed.barrier()

        loss_lst, top1_lst, top5_lst = [], [], []
        if cfg.DDP_CONFIG.GPU_WORLD_RANK == 0 and writer is not None:
            print("Collecting validation numbers")
            for x in range(cfg.DDP_CONFIG.GPU_WORLD_SIZE):
                data = open(os.path.join(
                    eval_path,
                    "{}.txt".format(x))).readline().strip().split(" ")
                data = [float(x) for x in data]
                loss_lst.append(data[0])
                top1_lst.append(data[1])
                top5_lst.append(data[2])
            print("Global result:")
            print_string = 'loss: {loss:.5f}'.format(loss=np.mean(loss_lst))
            print(print_string)
            print_string = 'Top-1 accuracy: {top1_acc:.2f}%, Top-5 accuracy: {top5_acc:.2f}%'.format(
                top1_acc=np.mean(top1_lst), top5_acc=np.mean(top5_lst))
            print(print_string)
            writer.add_scalar('val_loss_epoch', np.mean(loss_lst), epoch)
            writer.add_scalar('val_top1_acc_epoch', np.mean(top1_lst), epoch)
            writer.add_scalar('val_top5_acc_epoch', np.mean(top5_lst), epoch)

    eval_path에다가 각각의 프로세스는 자신의 loss 값과 성능에 대한 지표 값을 저장합니다. 그리고, GPU_WORLD_RANK 가 0인 프로세스에서는 이 파일들을 모두 열어서 평균치를 계산하여 tensorboard와 화면에 출력을 수행합니다.

 

  사실 큰 부분은 아니고 이런식으로 값들을 전달해줄 수 있다는 것을 한번 보고 넘어가면 좋다고 생각해서 살펴보았습니다. 실제로 queue를 통해서 공유되는 메모리를 사용할 수도 있지만 저의 경험에서는 이렇게 파일로 저장을 하는 것으로 나중에 각 프로세스의 학습 과정이 어떻게 흘러갔는지도 알 수 있을 뿐더라 MLOps의 artifact들은 자세하게 남아 있을 수록 추후에 복기하거나 문제점을 찾기 좋기 때문에 한번쯤 사용해봐도 좋다고 생각합니다. 

  

 

 

 

References


[1] https://medium.com/daangn/pytorch-multi-gpu-%ED%95%99%EC%8A%B5-%EC%A0%9C%EB%8C%80%EB%A1%9C-%ED%95%98%EA%B8%B0-27270617936b

 

🔥PyTorch Multi-GPU 학습 제대로 하기

PyTorch를 사용해서 Multi-GPU 학습을 하는 과정을 정리했습니다. 이 포스트는 다음과 같이 진행합니다.

medium.com