본문 바로가기
카테고리 없음

[ML] Normalizing Flow 완전 분석 (RealNVP)

by ML_MJSHIN 2021. 12. 16.

Introduction

PytorchTS는 Gluon-TS를 기반으로 Pytorch로 구현된 time series를 전문적으로 다루는 오픈 소스 입니다. 제법 다양한 시계열을 위한 ML 모델들을 담고 있어서 실제로 논문을 보고 구현할 때 많은 도움을 받고 있습니다. 제가 이전 포스팅 중에서 RealNVP의 image net 버전을 올렸었는데, 시계열 데이터를 위한 모델이 PytorchTS에 있어서 오늘은 코드에 대해서 자세히 살펴보면서 간간히 이론도 다시 정리하는 시간을 가져볼 것 같습니다. 저도 코드를 보면서 공부할 겸, 시계열 모델에 대한 정보는 잘 없다보니 누군가 보고 도움이 되었으면 해서 포스팅을 하게 되었습니다.

Code

기본적으로 PytorchTS는 TrainingNetwork와 PredictionNetwork를 두개로 분리하여 사용하는데, 학습할때 사용된 TrainingNetwork는 학습이 종료될때 학습된 weight를 PredictionNetwork로 넘기고 소멸합니다. 그러면, TrainingNetwork 부터 바로 살펴보도록 하겠습니다. > 아래 코드는 https://github.com/zalandoresearch/pytorch-ts/blob/master/pts/model/tempflow/tempflow_network.py 에서 확인하실 수 있습니다. 우선, PytorchTS의 TempFlow는 'flow_type' 이라는 변수로 'RealNVP' 혹은 'MAF'를 입력받아서 둘 중 하나의 normalizing flow 모델을 사용합니다. 이 포스트에서는 RealNVP를 다루고 추후에 MAF도 이론부터 포스팅해볼 계획입니다 ㅎㅎ 전체 생성자 부분은 다음과 같으며, 아래에서 한 부분씩 분석해보도록 하겠습니다.

class TempFlowTrainingNetwork(nn.Module):
    @validated()
    def __init__(
        self,
        input_size: int,
        num_layers: int,
        num_cells: int,
        cell_type: str,
        history_length: int,
        context_length: int,
        prediction_length: int,
        dropout_rate: float,
        lags_seq: List[int],
        target_dim: int,
        conditioning_length: int,
        flow_type: str,
        n_blocks: int,
        hidden_size: int,
        n_hidden: int,
        dequantize: bool,
        cardinality: List[int] = [1],
        embedding_dimension: int = 1,
        scaling: bool = True,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.target_dim = target_dim
        self.prediction_length = prediction_length
        self.context_length = context_length
        self.history_length = history_length
        self.scaling = scaling

        assert len(set(lags_seq)) == len(lags_seq), "no duplicated lags allowed!"
        lags_seq.sort()
        self.lags_seq = lags_seq

        self.cell_type = cell_type
        rnn_cls = {"LSTM": nn.LSTM, "GRU": nn.GRU}[cell_type]
        self.rnn = rnn_cls(
            input_size=input_size,
            hidden_size=num_cells,
            num_layers=num_layers,
            dropout=dropout_rate,
            batch_first=True,
        )

        flow_cls = {
            "RealNVP": RealNVP,
            "MAF": MAF,
        }[flow_type]
        self.flow = flow_cls(
            input_size=target_dim,
            n_blocks=n_blocks,
            n_hidden=n_hidden,
            hidden_size=hidden_size,
            cond_label_size=conditioning_length,
        )
        self.dequantize = dequantize

        self.distr_output = FlowOutput(
            self.flow, input_size=target_dim, cond_size=conditioning_length
        )

        self.proj_dist_args = self.distr_output.get_args_proj(num_cells)

        self.embed_dim = 1
        self.embed = nn.Embedding(
            num_embeddings=self.target_dim, embedding_dim=self.embed_dim
        )

        if self.scaling:
            self.scaler = MeanScaler(keepdim=True)
        else:
            self.scaler = NOPScaler(keepdim=True)

PytorchTS는 List로 된 lags_seq을 입력받아서 time lag를 자동으로 생성해줍니다. 이때 같은 lag (ex, [1, 1]) 과 같이 1-lag를 여러번 생성하는 것은 막고자 set(lags_seq)으로 중복없는 값들을 얻고나서 그 길이로 중복 여부를 판단하는것을 알 수 있습니다. input으로 생성되는 lags_seq가 작은 lag부터 생성되어 모델의 입력으로 들어갈 수 있도록 sort를 해주는 것을 알 수 있습니다.

assert len(set(lags_seq)) == len(lags_seq), "no duplicated lags allowed!"
        lags_seq.sort()
        self.lags_seq = lags_seq

그리고, cell_type이라는 parameter를 입력받아, lstm 혹은 gru를 사용할지 정합니다. rnn이 왜 사용되는지 궁금하실 텐데, 저희가 아는 Transformer의 Encoder의 기능으로 생각하시면 될 것 같습니다.

이전 RealNVP 포스팅에서도 보여드렸던 사진인데, 왼쪽의 Input으로부터 z 를 생성하는 Flow를 self.rnn이 수행합니다. 두가지는 기본 pytorch의 모듈을 사용하므로, 각각의 파라미터는 다음의 의미를 가집니다. + input_size – input 'x' 의 feature dimension으로 입력이 (B x T x C)라면 C를 의미합니다. + hidden_size – lstm 혹은 gru가 중간중간 intermediate output을 만들어 냅니다. (h, c)로 표현되며 hidden 과 cell state인데 LSTM만 cell state가 존재합니다. + batch_first - output의 차원이 (batch, time_step, hidden_size) 이 되도록 할지를 결정합니다. self.rnn의 output은 (output, (h, c))가 된다는 것은 모두 아실거라고 생각합니다. bidirectional 을 쓸 것이냐에 따라서 크기가 변하지만 hidden state 의 차원은 (num_layer*num_direction, batch, hidden_size) 가 됩니다.

    self.cell_type = cell_type
    rnn_cls = {"LSTM": nn.LSTM, "GRU": nn.GRU}[cell_type]
    self.rnn = rnn_cls(
        input_size=input_size,
        hidden_size=num_cells,
        num_layers=num_layers,
        dropout=dropout_rate,
        batch_first=True,
    )

그리고 제일 중요한 normalizing flow 모델을 설정하는 부분이 등장합니다.위에서 말씀드린 대로 'RealNVP'와 'MAF' 중 하나를 선택하며, 'RealNVP'를 선택한 경우 어떤 코드가 불러지는 지를 따라가서 살펴보도록 하겠습니다.

    flow_cls = {"RealNVP": RealNVP, "MAF": MAF,}[flow_type]
    self.flow = flow_cls(
        input_size=target_dim,
        n_blocks=n_blocks,
        n_hidden=n_hidden,
        hidden_size=hidden_size,
        cond_label_size=conditioning_length,
    )

처음에 저도 RealNVP 코드를 보고 이렇게 간단하다고? 라고 생각했는데, 알고보니 상속받은 Flow가 normalizing flow의 핵심 코드 부분을 다 담고 있고 복잡하였습니다. 이 포스팅에서는 이것까지 다 살펴보도록 할 예정입니다. 남의 코드를 보는 연습은 언제나 도움이 된다라고 생각하니까 .... 우선, input_size라는 parameter는 TempFlow의 target_dim입니다. target_dim이 의미하는 것은 input 시계열의 차원입니다. 예를들어 multivariate인 경우에는 time series가 여러개가 합쳐져서 입력으로 들어오게 될 것이며 이때 합쳐지는 time series data의 갯수가 target_dim 이 됩니다. 즉, univariate 모델이면 target_dim 은 1이 됩니다. RealNVP 코드에서 mask와 LinearMaskedCoupling function이 RealNVP 의 핵심 요소입니다. 그리고, 생성자 parameter들은 다음과 같은 의미를 가집니다. + n_blocks : z=f(x) 역할의 LinearMaskedCouping layer의 층 수 + input_size : target_dim으로 시계열 몇개를 입력으로 할 것 인지 + hidden_size : LinearMaskedCoupling layer에서 중간에 차원을 몇차원으로 만들었다가 x==z 인 차원으로 축소/확대 하게 할 것인지 + n_hidden : LinearMaskedCoupling layer내의 layer 의 층 수 + cond_label_size : 시계열 데이터 외에 입력으로 들어오는 feature의 크기. 예를들어 시계열 4개와 static feature 2 차원이 들어온다면 cond_label_size로 2를 주면 됨.

class RealNVP(Flow):
    def __init__(
        self,
        n_blocks,
        input_size,
        hidden_size,
        n_hidden,
        cond_label_size=None,
        batch_norm=True,
    ):
        super().__init__(input_size, cond_label_size)

        # construct model         modules = []
        mask = torch.arange(input_size).float() % 2
        for i in range(n_blocks):
            modules += [
                LinearMaskedCoupling(
                    input_size, hidden_size, n_hidden, mask, cond_label_size
                )
            ]
            mask = 1 - mask
            modules += batch_norm * [BatchNorm(input_size)]

        self.net = FlowSequential(*modules)

먼저, LinearMaskedCoupling 코드부터 살펴보도록 하겠습니다. 이 LinearMaskedCoupling layer는 normalizing flow의 z=f(x)에서의 함수 f를 의미합니다. 그러므로, 가장 필요한 과정을 코드를 보기 전에 미리 생각해보면, x→z를 만들고, 그와 동시에 dzdx 를 계산하는 과정입니다. mask는 시계열 데이터를 짝수 & 홀수 데이터만을 키고 끄는 방법을 쓰는 RealNVP의 방법입니다. imagenet의 경우에는 짝수 pixel 과 홀수 pixel을 번갈아 가면서 CouplingLayer에서 x→z로 변환을 하지만 시계열에서는 multivariate 인 경우 짝수 번째 와 홀수 번째 시계열 데이터를 보게 됩니다. 여기에서, 중요한 코드 구현 부분은 log_s = torch.tanh(s) * (1 - self.mask) 부분입니다. tanh를 넣어준 이유는 normalizing flow에서 x→z 과정의 f가 bijective 함수, 즉 invertible 함수여야 하므로 tanh가 invertible한 특징을 가고 있기 때문에 사용해 준 것이라고 생각하면 됩니다. 구현상, 해당 코드처럼 log_s인 논문 수식의 |log⁡dzdx로 판단하고 loss 계산 시에 sum만 해도 되지만, dzdx라고 가정한 다음 loss 계산시에 log 를 취한 후 sum을 해도 됩니다. PytorchTS에서는 위의 값을 log_s로 가정하였네요 ㅎㅎ 그리고, u = x * torch.exp(log_s) + t 이 부분에 대해서는 normalizing flow의 개념상으로 살펴보면 u  z 라고 볼 수 있고, log_s 는 log⁡dzdx 이므로, z=x∗dzdx+t 가 됩니다. 즉, x∗dzdx  z 가 될 텐데, 문제는 이때 dzdx 가 추정한 값이므로 비정확하니까 t 라는 neural network를 통과한 값이 보정해줘서 z로 만들어 준다! 라는 개념으로 생각하시면 코드를 이해하시는데 문제 없을 거라고 생각합니다. 저는 위의 부분 때문에 PytorchTS의 아래 코드는 사실 RealNVP 논문을 그대로 구현했다기 보다는 normalizing flow의 개념을 코딩으로 구현하는데 집중한 느낌이었습니다. 그래서 이 부분만 살펴보시면 사실상 LinearMaskedCoupling 코드는 다 되었다고 볼 수 있습니다. inverse는 코드만 보셔도 위의 forward 설명을 이해하셨다면 쉽게 보실 수 있을 거라고 생각합니다. 이때 inverse에서의 log_s는 사실상 의미가 없어지므로 중요하게 생각하지 않으셔도 됩니다.

class LinearMaskedCoupling(nn.Module):
    """ Modified RealNVP Coupling Layers per the MAF paper """

    def __init__(self, input_size, hidden_size, n_hidden, mask, cond_label_size=None):
        super().__init__()

        self.register_buffer("mask", mask)

        # scale function         s_net = [
            nn.Linear(
                input_size + (cond_label_size if cond_label_size is not None else 0),
                hidden_size,
            )
        ]
        for _ in range(n_hidden):
            s_net += [nn.Tanh(), nn.Linear(hidden_size, hidden_size)]
        s_net += [nn.Tanh(), nn.Linear(hidden_size, input_size)]
        self.s_net = nn.Sequential(*s_net)

        # translation function         self.t_net = copy.deepcopy(self.s_net)
        # replace Tanh with ReLU's per MAF paper         for i in range(len(self.t_net)):
            if not isinstance(self.t_net[i], nn.Linear):
                self.t_net[i] = nn.ReLU()

    def forward(self, x, y=None):
        mx = x * self.mask

        s = self.s_net(mx if y is None else torch.cat([y, mx], dim=-1))
        t = self.t_net(mx if y is None else torch.cat([y, mx], dim=-1)) * (
            1 - self.mask
        )

        log_s = torch.tanh(s) * (1 - self.mask)
        u = x * torch.exp(log_s) + t
        log_abs_det_jacobian = log_s

        return u, log_abs_det_jacobian

    def inverse(self, u, y=None):
        mu = u * self.mask

        s = self.s_net(mu if y is None else torch.cat([y, mu], dim=-1))
        t = self.t_net(mu if y is None else torch.cat([y, mu], dim=-1)) * (
            1 - self.mask
        )

        log_s = torch.tanh(s) * (1 - self.mask)
        x = (u - t) * torch.exp(-log_s)
        log_abs_det_jacobian = -log_s

        return x, log_abs_det_jacobian

그래서, 위의 RealNVP 코드가 구현되면 최종 작동은 Flow가 담당하게 됩니다. 여기서 갑자기 base_dist_mean과 base_dist_var가 등장하게 되는데 이 부분도 RealNVP 보다는 normalizing flow의 수학적 개념에서 등장하게 된 것입니다. log⁡p(z)+log⁡dzdx 를 maximize 하는게 목적인 normalizing flow 문제에서 p(z)를 우리가 알 수 없기 때문에 특정한 distribution 으로 가정하게 됩니다. 이떄, 이 distribution을 PytorchTS 에서는 mean 이 0, std가 1인 Normal distribution이라고 가정한 것입니다. 학습에서는 forward함수와 log_prob 함수만을 사용하면 되며, flow 를 decoder로 사용하고자 하면 inverse와 sample을 사용하게 됩니다. 그러므로, log_prob의 입력은 input 데이터인 x가 되고, sample의 입력은 z가 된다는 것을 아실 수 있습니다.

class Flow(nn.Module):
    def __init__(self, input_size):
        super().__init__()
        self.__scale = None
        self.net = None

        # base distribution for calculation of log prob under the model         self.register_buffer("base_dist_mean", torch.zeros(input_size))
        self.register_buffer("base_dist_var", torch.ones(input_size))

    @property
    def base_dist(self):
        return Normal(self.base_dist_mean, self.base_dist_var)

    @property
    def scale(self):
        return self.__scale

    @scale.setter
    def scale(self, scale):
        self.__scale = scale

    def forward(self, x, cond):
        if self.scale is not None:
            x /= self.scale
        u, log_abs_det_jacobian = self.net(x, cond)
        return u, log_abs_det_jacobian

    def inverse(self, u, cond):
        x, log_abs_det_jacobian = self.net.inverse(u, cond)
        if self.scale is not None:
            x *= self.scale
            log_abs_det_jacobian += torch.log(torch.abs(self.scale))
        return x, log_abs_det_jacobian

    def log_prob(self, x, cond):
        u, sum_log_abs_det_jacobians = self.forward(x, cond)
        return torch.sum(self.base_dist.log_prob(u) + sum_log_abs_det_jacobians, dim=-1)

    def sample(self, sample_shape=torch.Size(), cond=None):
        if cond is not None:
            shape = cond.shape[:-1]
        else:
            shape = sample_shape

        u = self.base_dist.sample(shape)
        sample, _ = self.inverse(u, cond)
        return sample

FlowSequential이 왜 들어갔는지는 사실 inverse 때문입니다. 위에서 RealNVP는 layer를 순차적으로 생성해서 nn.Sequenctial에 담는데 inverse 과정은 x→z 과정의 반대로 layer를 거쳐야하므로, 아래에서 보면 self 를 reversed 한 것을 볼 수 있습니다.

class FlowSequential(nn.Sequential):
    """ Container for layers of a normalizing flow """

    def forward(self, x, y):
        sum_log_abs_det_jacobians = 0
        for module in self:
            x, log_abs_det_jacobian = module(x, y)
            sum_log_abs_det_jacobians += log_abs_det_jacobian
        return x, sum_log_abs_det_jacobians

    def inverse(self, u, y):
        sum_log_abs_det_jacobians = 0
        for module in reversed(self):
            u, log_abs_det_jacobian = module.inverse(u, y)
            sum_log_abs_det_jacobians += log_abs_det_jacobian
        return u, sum_log_abs_det_jacobians

여기 까지가 RealNVP 코드의 전체이며 다시 TempFlow의 생성자 남은 부분을 살펴보도록 하겠습니다. 이 부분은 다음 포스트에서 진행하도록 하겠습니다.