본문 바로가기
ML

[GluonCV, OpenSource 분석하기] GluonCV의 multiprocessing 이해하기 [1]

by ML_MJSHIN 2021. 10. 27.

Introduciton


  이번 포스팅에서는 Mxnet과 PytorchTS등 다양한 ML OpenSource 의 기반이 되고 있는 GluonCV의 multiprocessing 이 어떻게 동작하는지에 대해서 한번 살펴보려고 합니다. 

 

  pytorch에서는 data parallel의 기법으로 nn.DataParallel을 지원하고 있지만 이 함수를 통한 데이터 병렬화 기법은 비효율적인 측면이 많이 있습니다. 대표적으로 GPU의 불균형적인 사용과 multi-thread 기반의 동작으로 python의 GIL (Global Interpreter Lock) 등의 문제가 존재하며, multiple nodes의 multi-gpu에서는 사용이 불가능한 형태입니다. 

  

 pytorch는 multiple nodes 에서 multi-gpu를 활용하고자 하는 경우를 위해 nn.DistributedDataParallel 방법을 제공하고 있습니다. 하지만, nn.DataParallel처럼 쉽게 모델을 nn.DataParallel(net)과 같이 변환하는 방법으로 사용할 수 있는 형태가 아니기 때문에 공부가 필요합니다.

 

 물론, 많은 분들이 pytorch를 사용하시면서 nn.DistributedDataParallel (DDP) 에 대해서도 사용법을 예제 코드와 함께 공유해주셔서 블로그를 통해서도 기본적인 사용법에 대해서 익히시는 것에는 어려움이 없을 것이라고 생각합니다.

 

 제가 이 포스팅을 하게 된 이유는 pytorch 공식홈페이지에서 제공하는 DDP의 예제[1]와 같이 회사에서 단순하게 train 함수를 선언하고 main_worker 함수를 선언해서 더해주는 방법으로 코드를 작성하는 형태로 구현을 할 수 없을 뿐더러 보통 스타트업이나 기업들에서는 AI Engine이라는 구조화된 형태를 띄는 코드의 구성이 필요하기 때문에, 이러한 구조적 특징을 이해하기 위해서 GluonCV에서는 어떤 구조로 해당 기능을 구현하고 있는지 알아보기 위해서 입니다. 

 

 말이 길었으니 바로 살펴보도록 하겠습니다.

 

GluonCV


  우선 GluonCV의 공식홈페이지에서도 DDP에 대해서 사용법을 따로 언급하고 있습니다 [2]. 위 홈페이지의 설명 이외에 실제 Gluon AI Engine의 코드는 https://github.com/dmlc/gluon-cv/blob/master/scripts/action-recognition/train_ddp_pytorch.py 에서 살펴보실 수 있으며 해당 코드에 대해서 분석하는 포스팅입니다. 

  

  위의 코드를 실행하기 위해서 우선 config 파일을 하나 정의해야 합니다. 이 파일을 살펴보는 것으로 GluonCV에서 DDP를 위해 어떤 파라미터들을 보는지 알 수 있습니다. 우선 GluonCV에서는 yaml 형태로 이 DDP를 위한 파라미터 값들을 정의하는 파일을 관리합니다. 그리고 파라미터들은 다음과 같은 의미를 가지고 있습니다. 

  1. AUTO_RANK_MATCH : 한 node 내의 gpu 에 대해서 rank를 수동으로 입력하지 않아도 됩니다. (False 시에는 GPU 에 값을 일일이 할당해줘야 합니다.)
  2. WORLD_SIZE : DDP 에서 가장 중요한 값으로 node의 수(machine의 수)를 의미합니다.
  3. WORLD_RANK : node의 rank 를 의미합니다. 여기서 rank는 식별자(?) 혹은 이름표라고 생각하시면 됩니다. 주로 0번 rank의 machine이 main machine이 되어 DDP를 시작하고 (process를 시작) 통신을 관리합니다.
  4. DIST_URL : main machine이 될 node (machine)의 접속 가능 주소
  5. WORLD_URL : machine들의 접속 가능 주소 
  6. GPU_WORLD_SIZE : 전체 node에서 가지고 있는 GPU의 수를 의미합니다. 
  7. GPU_WORLD_RANK : 전체 node의 GPU들 중 현재 process가 나타낼 rank를 의미합니다. 이 부분은 AUTO_RANK_MATCH 를 True로 설정하는 것으로 신경쓰지 않아도 됩니다. 
  8. DIST_BACKEND : nccl, gloo 등 여러가지 백엔드 중 선택하면 됩니다. 하지만 pytorch를 쓰시면 nccl이 제일 무난합니다. 
  9. GPU : node 내에서 현재 process가 가질 gpu 의 rank를 의미합니다. 현재 프로세스가 GPU 2개짜리 서버 내에 있다면 0과 1 만 (< num_gpu) 만 가능합니다. 이 부분은 AUTO_RANK_MATCH 를 True로 설정하는 것으로 신경쓰지 않아도 됩니다. 
  10. DISTRIBUTED : True 값을 주어야지만 위의 설정을 기반으로 DDP가 실행됩니다. False인 경우 single node에서 학습을 진행하게 됩니다. 
# yaml
DDP_CONFIG:
  AUTO_RANK_MATCH: True
  WORLD_SIZE: 2 # Total Number of machines
  WORLD_RANK: 0 # Rank of this machine
  DIST_URL: 'tcp://172.31.72.195:23456'
  WORLD_URLS: ['172.31.72.195', '172.31.72.196']

  GPU_WORLD_SIZE: 8 # Total Number of GPUs, will be assigned automatically
  GPU_WORLD_RANK: 0 # Rank of GPUs, will be assigned automatically
  DIST_BACKEND: 'nccl'
  GPU: 0 # Rank of GPUs in the machine, will be assigned automatically
  DISTRIBUTED: True

 

  위와 같이 yaml 파일을 만들고 나면, 다음과 같은 명령어를 통해서 GluonCV의 DDP 예제를 실행할 수 있습니다.

python train_ddp_pytorch.py --config-file XXX.yaml

  

  이 포스팅에서는 코드를 뜯어서 하나하나 보는게 목적이기 때문에 실행 결과 보다는 하나하나 타고 들어가서 살펴보도록 하겠습니다. 

 

 

train_ddp_pytorch.py 


  train_ddp_pytorch.py는 GluonCV의 DDP 예제코드 실행 스크립트입니다. main 함수 부분은 다음과 같이 구성되어 있습니다. 이중에서 configuration과 argparse 부분을 제외하면 spawn_worker함수만 존재합니다.

 

  실제로 이 함수가 DDP를 실행하는 메인 함수입니다. 여기서 main_worker는 multiprocess가 작동했을 때 각각의 프로세스에서 실행하게 될 함수 입니다. 

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Train video action recognition models.')
    parser.add_argument('--config-file', type=str, help='path to config file.')
    args = parser.parse_args()

    cfg = get_cfg_defaults(name='action_recognition')
    cfg.merge_from_file(args.config_file)
    spawn_workers(main_worker, cfg)

 

  바로 spawn_workers가 어떻게 구성되어 있나 살펴보도록 하겠습니다. spawn_workers는 gluon-cv/gluoncv/torch/engine/launch.py 에 있는 함수입니다. 그리고 해당 파일을 찾아가보면 위의 main_worker 함수도 같이 있는 것을 발견할 수 있습니다.

 

  spawn_workers에서는 위에서 저희가 살펴보았던 config.yaml 파일의 내용을 가지고 assert 함수를 통해 DDP 가 실행될 수 없는 조건들에 대해서 검사를 수행하는 것으로 시작합니다.

 이렇게 코드를 살펴보는 것이 좋은 이유가 있습니다. 위에서 살펴본 DISTR_URL이 WORLD_URLS 의 첫 번째 값으로 존재해야 한다는 것을 알 수 있습니다. 만약 코드를 살펴보지 않았다면 다르게 정의했을때 "왜 안되지..?"라고 고민했을 텐데 Open Source 를 자주 보는게 도움이 되는 이유라고 생각합니다. 

  그리고, WORLD_RANK는 실행 이전에 get_local_ip_and_match 함수를 통해서 socket 검사를 수행한 다음 해당 서버 혹은 노드에 연결이 가능한지를 확인을 우선적으로 수행합니다. 

  만약 노드 접속이 불가능하다면 미리 확인이 가능하게 구성되어 있네요. 

"""Multiprocessing distributed data parallel support"""
import torch
import torch.distributed as dist
import torch.backends.cudnn as cudnn
import torch.multiprocessing as mp


def get_local_ip_and_match(ip_list):
    import socket
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(("8.8.8.8", 80))
    this_ip = s.getsockname()[0]
    s.close()
    for _i, ip in enumerate(ip_list):
        if ip == this_ip:
            return _i
    return -1


def spawn_workers(main, cfg):
    """Use torch.multiprocessing.spawn to launch distributed processes"""
    if cfg.DDP_CONFIG.AUTO_RANK_MATCH:
        assert len(cfg.DDP_CONFIG.WOLRD_URLS) > 0
        assert cfg.DDP_CONFIG.WOLRD_URLS[0] in cfg.DDP_CONFIG.DIST_URL
        assert len(cfg.DDP_CONFIG.WOLRD_URLS) == cfg.DDP_CONFIG.WORLD_SIZE
        cfg.DDP_CONFIG.WORLD_RANK = get_local_ip_and_match(cfg.DDP_CONFIG.WOLRD_URLS)
        assert cfg.DDP_CONFIG.WORLD_RANK != -1

    ngpus_per_node = torch.cuda.device_count()
    if cfg.DDP_CONFIG.DISTRIBUTED:
        cfg.DDP_CONFIG.GPU_WORLD_SIZE = ngpus_per_node * cfg.DDP_CONFIG.WORLD_SIZE
        mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, main, cfg))
    else:
        main_worker(cfg.DDP_CONFIG.GPU, ngpus_per_node, main, cfg)
        

def main_worker(gpu, ngpus_per_node, main, cfg):
    """The main_worker process function (on individual GPU)"""
    cudnn.benchmark = True

    cfg.DDP_CONFIG.GPU = gpu
    print("Use GPU: {}".format(gpu))

    if cfg.DDP_CONFIG.DISTRIBUTED:
        cfg.DDP_CONFIG.GPU_WORLD_RANK = cfg.DDP_CONFIG.WORLD_RANK * ngpus_per_node + gpu
        dist.init_process_group(backend=cfg.DDP_CONFIG.DIST_BACKEND,
                                init_method=cfg.DDP_CONFIG.DIST_URL,
                                world_size=cfg.DDP_CONFIG.GPU_WORLD_SIZE,
                                rank=cfg.DDP_CONFIG.GPU_WORLD_RANK)
    main(cfg)

 

  혹시 socket 함수에 대해서 익숙치 않은 분들이 계실 수도 있으니 get_local_ip_and_match 함수에 대해서도 살펴보고 가도록 하겠습니다. 

 

  •  socket.socket(socket.AF_INET, socket.SOCK_DGRAM) : 소켓 객체를 생성합니다. AF_INET은 IP4v ip주소를 사용한다는 것을 의미하며, SOCK_DGRAM은 데이터그램의 소켓 타입을 의미합니다.
  • connect(("8.8.8.8", 80)) : 소켓을 통해서 연결하고자 하는 서버의 ip 주소와 해당 ip 주소로 통신할 port 값을 입력합니다. 
  • getsockname : 현재 소켓에 bind 된 ip와 포트 정보를 tuple로 return 하기 때문에 this_ip를 사용해서 서버에 연결된 순서를 _i 라는 값으로 return해주는 것으로 WORLD_RANK (전체 노드에서 몇번째 노트인지) 를 지정해주게 됩니다. 

 위의 과정에서 ip_list의 첫번째 노드 (machine)은 root 노드로 지정되어 pytorch를 사용한 multi-processing을 시작하는 node가 됩니다. 

def get_local_ip_and_match(ip_list):
    import socket
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(("8.8.8.8", 80))
    this_ip = s.getsockname()[0]
    s.close()
    for _i, ip in enumerate(ip_list):
        if ip == this_ip:
            return _i
    return -1

  위의 과정으로 config.yaml에서의 변수들에 대한 default값을 가져온 후 본격적인 multi-process 작업을 수행합니다. 먼저 torch.cuda.device_count() 함수를 통해서 현재 프로세스가 실행중인 node에서 사용 가능한 cuda device (GPU)의 수를 반환받습니다. 

 

  그리고 config.yaml에서의 DISTRIBUTED 가 True 인경우와 아닌 경우 작동이 달라지게 됩니다. 간단히 말하면, True인 경우는 multi-node 를 가정하여 server machine이 여러대인 경우를, False인 경우는 single-node를 가정하는 것이라고 보면 됩니다. 

 

 이때, GPU_WORLD_SIZE 는 사용할려고 하는 모든 node 에 존재하는 GPU들의 총 합을 의미합니다. 그러므로, ngpus_per_node * WORLD_SIZE 를 통해서 해당 값을 구할 수 있습니다.

 

 *이 부분에 대해서는 node 별로 다른 숫자의 GPU를 사용하고자 한다면 수정하여 사용할 수 있는 포인트가 될 것으로 생각됩니다.*

 

  이 포스팅에서는 DISTRIBUTED 가 True 인 상황을 따라가도록 하겠습니다. 위의 GPU_WORLD_SIZE를 계산한 뒤, torch.distributed.multiprocessing 의 spawn 함수를 통해서 main_worker 함수를 호출합니다. 이때, nprocs 에 넘겨준 값 (int) 만큼 멀티프로세스가 생성되어 하나의 프로세스가 main_worker 함수를 한개씩 가지고 있습니다.  

 

  여기서 주의할 것이 하나 있습니다. 이 코드에서 main 은 train_ddp_pytorh.py 에 있는 main_worker 함수이며, 이 코드의 main_worker 함수는 gluoncv/torch/engine/launch.py 의 main_worker 함수입니다. 이 부분에서 이해하실 때 헷갈리지 않으셔야 합니다!

def spawn_workers(main, cfg):
    """Use torch.multiprocessing.spawn to launch distributed processes"""
    if cfg.DDP_CONFIG.AUTO_RANK_MATCH:
        assert len(cfg.DDP_CONFIG.WOLRD_URLS) > 0
        assert cfg.DDP_CONFIG.WOLRD_URLS[0] in cfg.DDP_CONFIG.DIST_URL
        assert len(cfg.DDP_CONFIG.WOLRD_URLS) == cfg.DDP_CONFIG.WORLD_SIZE
        cfg.DDP_CONFIG.WORLD_RANK = get_local_ip_and_match(cfg.DDP_CONFIG.WOLRD_URLS)
        assert cfg.DDP_CONFIG.WORLD_RANK != -1

    ngpus_per_node = torch.cuda.device_count()
    if cfg.DDP_CONFIG.DISTRIBUTED:
        cfg.DDP_CONFIG.GPU_WORLD_SIZE = ngpus_per_node * cfg.DDP_CONFIG.WORLD_SIZE
        mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, main, cfg))
    else:
        main_worker(cfg.DDP_CONFIG.GPU, ngpus_per_node, main, cfg)

def main_worker(gpu, ngpus_per_node, main, cfg):
    """The main_worker process function (on individual GPU)"""
    cudnn.benchmark = True

    cfg.DDP_CONFIG.GPU = gpu
    print("Use GPU: {}".format(gpu))

    if cfg.DDP_CONFIG.DISTRIBUTED:
        cfg.DDP_CONFIG.GPU_WORLD_RANK = cfg.DDP_CONFIG.WORLD_RANK * ngpus_per_node + gpu
        dist.init_process_group(backend=cfg.DDP_CONFIG.DIST_BACKEND,
                                init_method=cfg.DDP_CONFIG.DIST_URL,
                                world_size=cfg.DDP_CONFIG.GPU_WORLD_SIZE,
                                rank=cfg.DDP_CONFIG.GPU_WORLD_RANK)
    main(cfg)

  

 다시 spawn_workers 함수를 호출한 부분으로 돌아가도보도록 하겠습니다. 이때, 위 spawn_workers 함수의 main은 train_ddp_pytorch.py의 main_worker 입니다. 이 함수는 launch.py의 main_worker 내부에서 main(cfg) 부분에서 실제로 실행되는 것을 알 수 있습니다. 

 

 이 launch.py의 main_worker 함수에서 중요한 부분은 DISTRIBUTED 가 True 인 경우의 torch.distributed.init_process_group 함수 입니다. 이 함수는 DDP의 communications 를 위해 c10d ProcessGroup (gloo, NCCL, MPI)의 ProcessGroup 에 대한 instance를 만들어 주는 함수입니다. 

 

  사실 이 부분이 nn.DataParallel과 nn.DistributedDataParallel의 차이를 만드는 multi-process 병렬화를 통한 DP가 겪었던 복제된 모델들 사이에서 일어나던 GIL 문제를 안생기게 하는 부분입니다. 

 

*ML model이 DP 처럼 forward pass에 broadcast되는 것이 아니라 DDP는 모델 생성시기에 프로세스들에게 broadcast 되기 때문에 training을 DP에 비해 가속하는데 도움이 된다고 합니다. [3]*

 

  다시 spawn_workers의 함수로 돌아가보도록 하겠습니다. 

    spawn_workers(main_worker, cfg)

  이 spawn_workers에서 넘겨주는 main_worker가 실제로 데이터를 생성하고 학습하는 부분이므로 해당 부분을 살펴보는 것이 제일 중요하다고 할 수 있습니다.

 

  우선 처음 tensorboard 와 log 기록 부분에 대해서 보면, multi-processing 과정에서 main_worker 함수가 작동하는 프로세스 수 만큼 호출되기 때문에 tensorboard가 우후죽순 생겨나는 것을 방지하기 위한 부분입니다. 위에서 말씀드렸던 것처럼 주로 '0' 의 rank 를 가진 프로세스를 main process로 가정하기 때문에 GPU_WORLD_RANK == 0 인 경우에만 텐서보드를 생성하는 것을 알 수 있습니다.

 

 중요한 부분은 deploy_model 함수입니다. get_model은 그냥 nn.Module 클래스의 network를 리턴해오는 일반적인 ML 모델을 가져오는 함수이므로 생략하고 deploy_model 함수를 살펴보겠습니다. 

def main_worker(cfg):
    # create tensorboard and logs
    if cfg.DDP_CONFIG.GPU_WORLD_RANK == 0:
        tb_logdir = build_log_dir(cfg)
        writer = SummaryWriter(log_dir=tb_logdir)
    else:
        writer = None
    cfg.freeze()

    # create model
    model = get_model(cfg)
    model = deploy_model(model, cfg)

    # create dataset and dataloader
    train_loader, val_loader, train_sampler, val_sampler, mg_sampler = build_dataloader(cfg)

    # create optimizer
    optimizer = torch.optim.SGD(model.parameters(),
                                lr=cfg.CONFIG.TRAIN.LR,
                                momentum=cfg.CONFIG.TRAIN.MOMENTUM,
                                weight_decay=cfg.CONFIG.TRAIN.W_DECAY)

    # load a pre-trained checkpoint for finetuning or evaluation
    if cfg.CONFIG.MODEL.LOAD:
        model, _ = load_model(model, optimizer, cfg, load_fc=True)

    # create lr scheduler
    if cfg.CONFIG.TRAIN.LR_POLICY == 'Step':
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                                                         milestones=cfg.CONFIG.TRAIN.LR_MILESTONE,
                                                         gamma=cfg.CONFIG.TRAIN.STEP)
    elif cfg.CONFIG.TRAIN.LR_POLICY == 'Cosine':
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,
                                                               T_max=cfg.CONFIG.TRAIN.EPOCH_NUM - cfg.CONFIG.TRAIN.WARMUP_EPOCHS,
                                                               eta_min=0,    # minimum learning rate
                                                               last_epoch=cfg.CONFIG.TRAIN.RESUME_EPOCH)
    else:
        print('Learning rate schedule %s is not supported yet. Please use Step or Cosine.')

    if cfg.CONFIG.TRAIN.USE_WARMUP:
        scheduler_warmup = GradualWarmupScheduler(optimizer,
                                                  multiplier=1.0,    # warmup_lr is required to start from 0
                                                  total_epoch=cfg.CONFIG.TRAIN.WARMUP_EPOCHS,
                                                  after_scheduler=scheduler)
    # create criterion
    criterion = nn.CrossEntropyLoss().cuda()

    # train loop
    base_iter = 0
    for epoch in range(cfg.CONFIG.TRAIN.EPOCH_NUM):
        if cfg.DDP_CONFIG.DISTRIBUTED:
            train_sampler.set_epoch(epoch)

        # train one epoch
        base_iter = train_classification(base_iter, model, train_loader, epoch, criterion, optimizer, cfg, writer=writer)
        if cfg.CONFIG.TRAIN.USE_WARMUP:
            scheduler_warmup.step()
        else:
            scheduler.step()

        # evaluation
        if epoch % cfg.CONFIG.VAL.FREQ == 0 or epoch == cfg.CONFIG.TRAIN.EPOCH_NUM - 1:
            validation_classification(model, val_loader, epoch, criterion, cfg, writer)

        # save model
        if epoch % cfg.CONFIG.LOG.SAVE_FREQ == 0 or epoch == cfg.CONFIG.TRAIN.EPOCH_NUM - 1:
            if cfg.DDP_CONFIG.GPU_WORLD_RANK == 0 or cfg.DDP_CONFIG.DISTRIBUTED == False:
                save_model(model, optimizer, epoch, cfg)

    if writer is not None:
        writer.close()

  deploy_model 함수는 아래와 같습니다. 주석만 보아도 DDP training의 중요한 부분이 담겨있는 것을 알 수 있습니다. 함수 내부에서 DDP_CONFIG.GPUNone인 경우와 아닌 경우의 차이는 단순히 DDP_CONFIG.GPU 값대로 model을 할당할 것인지 아니면 현재 프로세스가 보고있는 GPU에 할당할 것인지의 차이 입니다. 

 

 GluonCV 함수에서는 Distributed 가 True 가 아닌 경우는 nn.DataParallel 함수를 사용하도록 되어 있네요. 저희는 여기에서 torch.nn.parallel.DistributedDataParallel 함수를 통해서 model을 감싸고, device_ids에 List 형태로 할당하고자 하는 GPU 의 목록을 넘겨주는 방법이 사용된다는 것만 알면 됩니다. 이 코드에서는 GPU는 1개의 값만이 할당되므로 모델 당 1개의 GPU에 할당이 되는 형태가 될 것입니다. 

 

def deploy_model(model, cfg):
    """
    Deploy model to multiple GPUs for DDP training.
    """
    if cfg.DDP_CONFIG.DISTRIBUTED:
        if cfg.DDP_CONFIG.GPU is not None:
            torch.cuda.set_device(cfg.DDP_CONFIG.GPU)
            model.cuda(cfg.DDP_CONFIG.GPU)
            model = torch.nn.parallel.DistributedDataParallel(model,
                                                              device_ids=[cfg.DDP_CONFIG.GPU],
                                                              find_unused_parameters=True)
        else:
            model.cuda()
            model = torch.nn.parallel.DistributedDataParallel(model, find_unused_parameters=True)
    elif cfg.DDP_CONFIG.GPU is not None:
        torch.cuda.set_device(cfg.DDP_CONFIG.GPU)
        model.cuda(cfg.DDP_CONFIG.GPU)
    else:
        # DataParallel will divide and allocate batch_size to all available GPUs
        model = torch.nn.DataParallel(model).cuda()

    return model

   deploy_model 함수의 다음은 build_dataloader 함수 입니다. 이 함수는 gluon-cv/gluoncv/torch/data/video_cls/dataset_classification.py 에 위치하고 있습니다. 

 

  해당 함수를 가보면 뭐가 많이 있지만 중요한 부분만 골라서 보도록 하겠습니다. (코드 전체를 붙이기에는 너무 기네요 ..)

   

  여기에서 중요한 부분은 DISTRIBUTED 가 True 일때 torch.utils.data.distributed.DistributedSampler 를 호출해서 데이터셋을 감싼다는 부분입니다. 그리고, 이 sampler를 DataLoader 를 선언할 때 sampler argument로 할당해줍니다. 

  

  이렇게만 하면 나머지는 pytorch가 알아서 DDP에 맞게 데이터 샘플링을 수행해줍니다. 

def build_dataloader(cfg):
	# ...
    # 데이터 로드 부분은  생략 
	# ...
    
    if cfg.DDP_CONFIG.DISTRIBUTED:
        train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
        val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset)
    else:
        train_sampler = None
        val_sampler = None

    mg_sampler = None
    if cfg.CONFIG.TRAIN.MULTIGRID.USE_LONG_CYCLE or cfg.CONFIG.TRAIN.MULTIGRID.USE_SHORT_CYCLE:
        # ...
        # DISTRIBUTED 아닌 부분 생략 
        # ...
    else:
        train_loader = torch.utils.data.DataLoader(
            train_dataset, batch_size=cfg.CONFIG.TRAIN.BATCH_SIZE, shuffle=(train_sampler is None),
            num_workers=9, sampler=train_sampler, pin_memory=True)

    val_loader = torch.utils.data.DataLoader(
        val_dataset, batch_size=cfg.CONFIG.VAL.BATCH_SIZE, shuffle=(val_sampler is None),
        num_workers=9, sampler=val_sampler, pin_memory=True)

    return train_loader, val_loader, train_sampler, val_sampler, mg_sampler

 

  여기까지 보았다면 train_ddp_pytorch.py의 내용에서 DDP로 인해서 학습 시작인 epoch을 순회하기 이전까지 달라진 부분은 모두 본 것입니다.

 

  Open Source 엔진들도 처음 보면 엄청 어렵게 느껴지지만 이렇게 하나하나 흐름을 타고 따라가다보면 생각보다 쉽고 오히려 구조적인 인사이트를 얻을 수 있어서 좋다고 생각됩니다.

 

 이제 Train 을 시작하는 부분을 살펴보아야 하는데 이부분은 다음 포스팅에서 하도록 하겠습니다. 

 

 

References


[1] https://github.com/pytorch/examples/blob/3970e068c7f18d2d54db2afee6ddd81ef3f93c24/imagenet/main.py#L312

 

GitHub - pytorch/examples: A set of examples around pytorch in Vision, Text, Reinforcement Learning, etc.

A set of examples around pytorch in Vision, Text, Reinforcement Learning, etc. - GitHub - pytorch/examples: A set of examples around pytorch in Vision, Text, Reinforcement Learning, etc.

github.com

[2] https://cv.gluon.ai/build/examples_torch_action_recognition/ddp_pytorch.html

 

5. DistributedDataParallel (DDP) Framework — gluoncv 0.11.0 documentation

 

cv.gluon.ai

[3] https://algopoolja.tistory.com/56

 

torch.distributed

torch로 병렬화를 하기 위해서 torch에서 제안하는 몇가지 선택할 수 있는 선택지가 있다. 1. torch.nn.DataParallel DataParallel은 하나의 본체에서 multi-GPU 병렬화를 코딩을 많이 하지 않고 할 수 있는 선택.

algopoolja.tistory.com