Kicarussays

[Transformer 이해하기 3] Attention is all you need, Transformer 설명 및 코드리뷰 본문

Deep Learning

[Transformer 이해하기 3] Attention is all you need, Transformer 설명 및 코드리뷰

Kicarus 2022. 1. 25. 00:40

다음 포스팅을 참고하여 작성하였음을 밝힙니다.

https://wikidocs.net/31379

 

1) 트랜스포머(Transformer)

* 이번 챕터는 앞서 설명한 어텐션 메커니즘 챕터에 대한 사전 이해가 필요합니다. 트랜스포머(Transformer)는 2017년 구글이 발표한 논문인

wikidocs.net

 

 

이전 포스팅에서는 Attention이 어떻게 작동하는지 살펴보았습니다.

 

이번에 살펴볼 Transformer는 인코더-디코더 구조를 따르지만, RNN을 사용하지 않고 Attention만을 사용하여 설계된 모델입니다. 번역 성능의 우수성은 이미 아실 것이라 생각됩니다 (대표적 파생모델 BERT). 코드와 함께 Transformer를 살펴봅시다!

 

 

논문 링크: https://arxiv.org/abs/1706.03762

깃허브 링크: https://github.com/bentrevett/pytorch-seq2seq/blob/master/6%20-%20Attention%20is%20All%20You%20Need.ipynb

 

GitHub - bentrevett/pytorch-seq2seq: Tutorials on implementing a few sequence-to-sequence (seq2seq) models with PyTorch and Torc

Tutorials on implementing a few sequence-to-sequence (seq2seq) models with PyTorch and TorchText. - GitHub - bentrevett/pytorch-seq2seq: Tutorials on implementing a few sequence-to-sequence (seq2se...

github.com

 

 

 

 

 Introduction

 

아래 이미지는 Transformer 작동 모식도입니다. 우리는 Transformer의 핵심 메커니즘인 포지셔널 인코딩과 Multi-head Self-Attention을 살펴보고, 코드를 통해 어떤 방식으로 Transformer가 작동되는지 살펴볼 것입니다.

 

Transformer 작동 모식도

 

포지셔널 인코딩 (Positional Encoding)

Transformer는 Attention만으로 인코더와 디코더를 구성한 모델입니다. 번역 task에서 RNN을 사용했던 이유는 번역에서 중요한 순서 정보를 학습에 활용했기 때문입니다. 하지만 Transformer는 RNN을 사용하지 않기 때문에, 문장의 위치 정보를 따로 제공해줄 필요가 있습니다. 이를 위해 포지셔널 인코딩 (Postitional Encoding) 을 활용하게 됩니다.

 

 

위 이미지의 "I am a student"는 4 * 4 행렬로 토큰화 될 수 있습니다. 여기서 pos는 row (문장 내 토큰의 순서) 를, $i$는 column (임베딩 벡터 내의 순서) 입니다. $d_{model}$은 사용자가 정의한 "Transformer의 출력 차원"입니다. 여기서는 4의 값을 가집니다.

 

포지셔널 인코딩은 아래와 같은 함수를 사용합니다.

$$PE_{(pos, 2i)} = sin(pos/10000^{2i/d_{model}})$$

$$PE_{(pos, 2i+1)} = cos(pos/10000^{2i/d_{model}})$$

이 포지셔널 인코딩 행렬을 문장 행렬에 더해줍니다. 이를 통해 같은 단어라도 문장 내의 위치에 따라서 임베딩 벡터의 값이 달라집니다.

 

Multi-head Self-Attentions

Attention은 "주어진 쿼리(Query)에 대해서 모든 키(Key)와의 유사도를 각각 구해 키와 매핑되어 있는 값(Value)에 반영" 하는 메커니즘입니다. Seq2Seq 에서는 인코더/디코더를 통과하고 나온 임베딩 벡터들로 Attention을 계산했습니다. Q에는 디코더 임베딩 벡터를, K와 V에는 인코더 임베딩 벡터를 활용했습니다.

 

Transformer에서는 Attention으로써 Multi-head Self-Attention을 활용합니다. 미리 말하자면, Self-Attention에서 사용하는 Q, K, V는 모두 입력 문장을 활용하고, Multi-head는 Attention을 병렬로 처리한다는 의미입니다. 

 


 

Self-Attention?

 

 

Self-Attention을 설명하기에 앞서, 위 문장을 먼저 살펴봅시다.

 

"The animal didn't cross the street because it was too tired."

"그 동물은 길을 건너지 않았다, 왜냐하면 그것은 너무 피곤했기 때문에"

 

이 문장에서 it은 animal을 지칭한다는 것을 쉽게 판단할 수 있지만, 컴퓨터는 이런 판단을 하는 것이 쉽지 않습니다. Self-Attention은 이러한 문장 내의 단어들의 관계(유사도)를 파악하기 위한 메커니즘입니다. 그럼 Self-Attention이 어떤 메커니즘으로 작동하게 되는 것일까요?

 

 

1. Q, K, V 벡터 얻기

 

Self-Attention은 Q, K, V를 모두 입력문장으로부터 가져옵니다.

 

 

위와 같이 동일한 크기의 weight 행렬 $W^Q, W^K, W^V$를 토큰에 각각 곱하여 Q, K, V를 가져옵니다. 이 Q, K, V의 크기는 사용자가 직접 지정할 수 있는 하이퍼파라미터이고, $d_k$로 표현합니다. 여기서 $d_k = d_{model} / \text{num_heads}$ 인데, num_head는 multi-head (병렬) 로 처리할 숫자를 의미합니다.

 

위 이미지에서는

$d_{model} = 4$,

$\text{num_head} = 2$,

$d_k = d_{model} / \text{num_head} = 2$ 입니다.

 

 

2. Attention 계산

 

이후 Attention Value를 가져오는 과정은 Seq2Seq 에서의 Attention과 동일합니다. 벡터 간 내적으로 Attention score를 구합니다.

 

 

지금까지는 "I, am, a, student"를 따로따로 Attention value를 구했지만, 당연히 행렬 연산으로 일괄적으로 구할 수 있습니다. 그리고 이를 식으로 표현하면 다음과 같습니다.

$$\text{Attention} (Q, K, V) = \text{softmax} (\frac{QK^T}{\sqrt{d_k}})V$$

 

 

여기서 Attention Value Matrix 가 [문장의 길이, $d_k$] 크기로 산출된 것에 주목합시다. 앞서 우리는 num_head를 2로 설정하였습니다. 아래의 이미지는 num_head를 2보다 큰 수라고 가정하고 그린 그림이긴 하지만, 어쨌든 최종적으로는 입력문장의 크기인 [문장의 길이, d_model] 로 Multi-head Self-Attention 이 산출됩니다!!

 

 

트랜스포머를 처음 발표한 논문에서는 Multi-head 로 Attention을 처리한 것이 Single Attention 보다 더 좋은 성능을 보였다고 언급하고 있습니다.

 


 

여기까지 Transformer의 핵심 메커니즘은 포지셔널 인코딩과 Multi-head Self-Attention을 살펴보았습니다. 이제 코드를 통해서 Transformer가 어떻게 작동되는지 살펴봅시다!

 

 

 

 

 Building the Model

 

(Preparing Data 부분은 이전 [Transformer 이해하기 1] 과 동일합니다)

바로 인코더 코드를 봅시다!

 

인코더 모식도

 

class Encoder(nn.Module):
    def __init__(self, 
                 input_dim, 
                 hid_dim, 
                 n_layers, 
                 n_heads, 
                 pf_dim,
                 dropout, 
                 device,
                 max_length = 100):
        super().__init__()

        self.device = device
        
        self.tok_embedding = nn.Embedding(input_dim, hid_dim)
        self.pos_embedding = nn.Embedding(max_length, hid_dim)
        
        self.layers = nn.ModuleList([EncoderLayer(hid_dim, 
                                                  n_heads, 
                                                  pf_dim,
                                                  dropout, 
                                                  device) 
                                     for _ in range(n_layers)])
        
        self.dropout = nn.Dropout(dropout)
        
        self.scale = torch.sqrt(torch.FloatTensor([hid_dim])).to(device)
        
    def forward(self, src, src_mask):
        
        #src = [batch size, src len]
        #src_mask = [batch size, 1, 1, src len]
        
        batch_size = src.shape[0]
        src_len = src.shape[1]
        
        pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
        
        #pos = [batch size, src len]
        
        src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos))
        
        #src = [batch size, src len, hid dim]
        
        for layer in self.layers:
            src = layer(src, src_mask)
            
        #src = [batch size, src len, hid dim]
            
        return src

 

 

* self.scale 은 $\sqrt{d_{model}}$ 역할을 합니다

* src_mask는 padding된 부분을 관리하기 위한 객체입니다

* (논문의 경우 6개의 인코더레이어를 활용) 인코더 내부의 인코더레이어에 입력문장 src를 통과시킵니다.

* max_length를 100으로 설정하여 토큰의 개수가 최대 100개인 문장을 학습합니다.

 

 

인코더 내부를 구성하는 인코더레이어는 Multi-head Self-Attention과 FFNN으로 구성되어 있습니다.

class EncoderLayer(nn.Module):
    def __init__(self, 
                 hid_dim, 
                 n_heads, 
                 pf_dim,  
                 dropout, 
                 device):
        super().__init__()
        
        self.self_attn_layer_norm = nn.LayerNorm(hid_dim)
        self.ff_layer_norm = nn.LayerNorm(hid_dim)
        self.self_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hid_dim, 
                                                                     pf_dim, 
                                                                     dropout)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src, src_mask):
        
        #src = [batch size, src len, hid dim]
        #src_mask = [batch size, 1, 1, src len] 
                
        #self attention
        _src, _ = self.self_attention(src, src, src, src_mask)
        
        #dropout, residual connection and layer norm
        src = self.self_attn_layer_norm(src + self.dropout(_src))
        
        #src = [batch size, src len, hid dim]
        
        #positionwise feedforward
        _src = self.positionwise_feedforward(src)
        
        #dropout, residual and layer norm
        src = self.ff_layer_norm(src + self.dropout(_src))
        
        #src = [batch size, src len, hid dim]
        
        return src

 

* self_attention 에서 (Q, K, V) 로 입력문장인 (src, src, src) 를 받습니다

* Multi-head Self-Attention 수행 -> 입력문장에 더하고 Normalization -> FFNN 통과 -> 입력문장에 더하고 Normalization

 

 

Multi-head Self-Attention, FFNN 코드를 봅시다.

class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, hid_dim, n_heads, dropout, device):
        super().__init__()
        
        assert hid_dim % n_heads == 0
        
        self.hid_dim = hid_dim
        self.n_heads = n_heads
        self.head_dim = hid_dim // n_heads
        
        self.fc_q = nn.Linear(hid_dim, hid_dim)
        self.fc_k = nn.Linear(hid_dim, hid_dim)
        self.fc_v = nn.Linear(hid_dim, hid_dim)
        
        self.fc_o = nn.Linear(hid_dim, hid_dim)
        
        self.dropout = nn.Dropout(dropout)
        
        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)
        
    def forward(self, query, key, value, mask = None):
        
        batch_size = query.shape[0]
        
        #query = [batch size, query len, hid dim]
        #key = [batch size, key len, hid dim]
        #value = [batch size, value len, hid dim]
                
        Q = self.fc_q(query)
        K = self.fc_k(key)
        V = self.fc_v(value)
        
        #Q = [batch size, query len, hid dim]
        #K = [batch size, key len, hid dim]
        #V = [batch size, value len, hid dim]
                
        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        
        #Q = [batch size, n heads, query len, head dim]
        #K = [batch size, n heads, key len, head dim]
        #V = [batch size, n heads, value len, head dim]
                
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale
        
        #energy = [batch size, n heads, query len, key len]
        
        if mask is not None:
            energy = energy.masked_fill(mask == 0, -1e10)
        
        attention = torch.softmax(energy, dim = -1)
                
        #attention = [batch size, n heads, query len, key len]
                
        x = torch.matmul(self.dropout(attention), V)
        
        #x = [batch size, n heads, query len, head dim]
        
        x = x.permute(0, 2, 1, 3).contiguous()
        
        #x = [batch size, query len, n heads, head dim]
        
        x = x.view(batch_size, -1, self.hid_dim)
        
        #x = [batch size, query len, hid dim]
        
        x = self.fc_o(x)
        
        #x = [batch size, query len, hid dim]
        
        return x, attention
        
        
class PositionwiseFeedforwardLayer(nn.Module):
    def __init__(self, hid_dim, pf_dim, dropout):
        super().__init__()
        
        self.fc_1 = nn.Linear(hid_dim, pf_dim)
        self.fc_2 = nn.Linear(pf_dim, hid_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        
        #x = [batch size, seq len, hid dim]
        
        x = self.dropout(torch.relu(self.fc_1(x)))
        
        #x = [batch size, seq len, pf dim]
        
        x = self.fc_2(x)
        
        #x = [batch size, seq len, hid dim]
        
        return x

 

코드가 간단하여 따로 설명은 생략합니다.

 


 

이제 디코더를 살펴봅시다.

 

디코더 모식도

 

class Decoder(nn.Module):
    def __init__(self, 
                 output_dim, 
                 hid_dim, 
                 n_layers, 
                 n_heads, 
                 pf_dim, 
                 dropout, 
                 device,
                 max_length = 100):
        super().__init__()
        
        self.device = device
        
        self.tok_embedding = nn.Embedding(output_dim, hid_dim)
        self.pos_embedding = nn.Embedding(max_length, hid_dim)
        
        self.layers = nn.ModuleList([DecoderLayer(hid_dim, 
                                                  n_heads, 
                                                  pf_dim, 
                                                  dropout, 
                                                  device)
                                     for _ in range(n_layers)])
        
        self.fc_out = nn.Linear(hid_dim, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
        self.scale = torch.sqrt(torch.FloatTensor([hid_dim])).to(device)
        
    def forward(self, trg, enc_src, trg_mask, src_mask):
        
        #trg = [batch size, trg len]
        #enc_src = [batch size, src len, hid dim]
        #trg_mask = [batch size, 1, trg len, trg len]
        #src_mask = [batch size, 1, 1, src len]
                
        batch_size = trg.shape[0]
        trg_len = trg.shape[1]
        
        pos = torch.arange(0, trg_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
                            
        #pos = [batch size, trg len]
            
        trg = self.dropout((self.tok_embedding(trg) * self.scale) + self.pos_embedding(pos))
                
        #trg = [batch size, trg len, hid dim]
        
        for layer in self.layers:
            trg, attention = layer(trg, enc_src, trg_mask, src_mask)
        
        #trg = [batch size, trg len, hid dim]
        #attention = [batch size, n heads, trg len, src len]
        
        output = self.fc_out(trg)
        
        #output = [batch size, trg len, output dim]
            
        return output, attention

 

* 인코더와 구조가 거의 비슷하고, 마지막에 fc (fully-connected) 레이어를 통과시켜 output을 가져옵니다.

* 디코더는 두 번의 Attention 을 거치는데, 두 번째 Attention에서 인코더의 어텐션을 참조하게 되고 이를 위해 output과 attention을 함께 리턴하게 됩니다.

 

 

디코더 내부는 2번의 Attention, FFNN으로 구성됩니다.

class DecoderLayer(nn.Module):
    def __init__(self, 
                 hid_dim, 
                 n_heads, 
                 pf_dim, 
                 dropout, 
                 device):
        super().__init__()
        
        self.self_attn_layer_norm = nn.LayerNorm(hid_dim)
        self.enc_attn_layer_norm = nn.LayerNorm(hid_dim)
        self.ff_layer_norm = nn.LayerNorm(hid_dim)
        self.self_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
        self.encoder_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hid_dim, 
                                                                     pf_dim, 
                                                                     dropout)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, trg, enc_src, trg_mask, src_mask):
        
        #trg = [batch size, trg len, hid dim]
        #enc_src = [batch size, src len, hid dim]
        #trg_mask = [batch size, 1, trg len, trg len]
        #src_mask = [batch size, 1, 1, src len]
        
        #self attention
        _trg, _ = self.self_attention(trg, trg, trg, trg_mask)
        
        #dropout, residual connection and layer norm
        trg = self.self_attn_layer_norm(trg + self.dropout(_trg))
            
        #trg = [batch size, trg len, hid dim]
            
        #encoder attention
        _trg, attention = self.encoder_attention(trg, enc_src, enc_src, src_mask)
        
        #dropout, residual connection and layer norm
        trg = self.enc_attn_layer_norm(trg + self.dropout(_trg))
                    
        #trg = [batch size, trg len, hid dim]
        
        #positionwise feedforward
        _trg = self.positionwise_feedforward(trg)
        
        #dropout, residual and layer norm
        trg = self.ff_layer_norm(trg + self.dropout(_trg))
        
        #trg = [batch size, trg len, hid dim]
        #attention = [batch size, n heads, trg len, src len]
        
        return trg, attention

 

 

* 첫 번째 Self-Attention은 Masked Multi-head Attention으로, 이는 트랜스포머의 특성상 디코더가 학습 과정에서 토큰을 문장 순서대로 가져오지 않고 한꺼번에 가져오기 때문에, 매 스텝마다 미래의 단어들은 참고하지 못하도록 Masking하는 기능을 추가한 것입니다.

* 두 번째 Attention은 인코더의 부분과 동일한데, 이 부분은 Q에 디코더의 output을 할당하고, K, V에 인코더의 output을 할당하기 때문에 "self" 가 빠져있습니다.

* Masked Attention을 수행하기 위해 trg_mask와 trg를 함께 self_attention에 진입시킵니다.

* 이후 부분은 차례로 add & normalization, 두 번째 attention, add & normalization, FFNN 을 통과합니다.

 

 

마지막 Seq2Seq를 봅시다!!

class Seq2Seq(nn.Module):
    def __init__(self, 
                 encoder, 
                 decoder, 
                 src_pad_idx, 
                 trg_pad_idx, 
                 device):
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device
        
    def make_src_mask(self, src):
        
        #src = [batch size, src len]
        
        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)

        #src_mask = [batch size, 1, 1, src len]

        return src_mask
    
    def make_trg_mask(self, trg):
        
        #trg = [batch size, trg len]
        
        trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)
        
        #trg_pad_mask = [batch size, 1, 1, trg len]
        
        trg_len = trg.shape[1]
        
        trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device = self.device)).bool()
        
        #trg_sub_mask = [trg len, trg len]
            
        trg_mask = trg_pad_mask & trg_sub_mask
        
        #trg_mask = [batch size, 1, trg len, trg len]
        
        return trg_mask

    def forward(self, src, trg):
        
        #src = [batch size, src len]
        #trg = [batch size, trg len]
                
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)
        
        #src_mask = [batch size, 1, 1, src len]
        #trg_mask = [batch size, 1, trg len, trg len]
        
        enc_src = self.encoder(src, src_mask)
        
        #enc_src = [batch size, src len, hid dim]
                
        output, attention = self.decoder(trg, enc_src, trg_mask, src_mask)
        
        #output = [batch size, trg len, output dim]
        #attention = [batch size, n heads, trg len, src len]
        
        return output, attention

 

* src, trg를 받고 매 스텝마다 Masking 된 trg를 활용하여 학습을 수행합니다.

* Masking에는 torch.tril 함수를 활용합니다. 이 함수는 행렬의 대각성분 위쪽의 요소들을 모두 0으로 바꿉니다.

* 리턴된 output과 실제 trg 간의 loss 함수를 적용하여 학습을 수행합니다.

 

 

그렇다면 이렇게 구성된 함수들로 어떻게 번역 작업이 이루어지는 것일까요? 위의 Seq2Seq에는 trg가 input으로 들어가게 되는데, 이렇게 되면 실제로 번역이 잘 이루어지는 것인지 모르겠습니다. src만을 input으로 받고, 적절히 번역된 output을 만드는 함수가 필요할 것 같습니다.

 

이제 Seq2Seq 모델이 잘 학습되었다고 가정하고 만든 번역 함수를 한 번 살펴봅시다.

def translate_sentence(sentence, src_field, trg_field, model, device, max_len = 50):
    
    model.eval()
        
    if isinstance(sentence, str):
        nlp = spacy.load('de_core_news_sm')
        tokens = [token.text.lower() for token in nlp(sentence)]
    else:
        tokens = [token.lower() for token in sentence]

    tokens = [src_field.init_token] + tokens + [src_field.eos_token]
        
    src_indexes = [src_field.vocab.stoi[token] for token in tokens]

    src_tensor = torch.LongTensor(src_indexes).unsqueeze(0).to(device)
    
    src_mask = model.make_src_mask(src_tensor)
    
    with torch.no_grad():
        enc_src = model.encoder(src_tensor, src_mask)

    trg_indexes = [trg_field.vocab.stoi[trg_field.init_token]]

    for i in range(max_len):

        trg_tensor = torch.LongTensor(trg_indexes).unsqueeze(0).to(device)

        trg_mask = model.make_trg_mask(trg_tensor)
        
        with torch.no_grad():
            output, attention = model.decoder(trg_tensor, enc_src, trg_mask, src_mask)
        
        pred_token = output.argmax(2)[:,-1].item()
        
        trg_indexes.append(pred_token)

        if pred_token == trg_field.vocab.stoi[trg_field.eos_token]:
            break
    
    trg_tokens = [trg_field.vocab.itos[i] for i in trg_indexes]
    
    return trg_tokens[1:], attention

 

* 먼저 입력문장을 토큰화합니다.

* 토큰화한 객체 앞뒤로 <sos>, <eos>를 추가합니다.

* 입력문장을 마스킹 객체와 함께 인코더에 통과시켜서 최종적으로 임베딩된 output을 enc_src에 할당합니다.

* trg_indexes에 <sos>를 할당합니다.

* trg_indexes를 디코더에 통과시켜서 얻는 output에서 예측 토큰을 가져오고, trg_indexes에 추가합니다.

* 예측 토큰이 <eos> 가 나올 때까지 반복합니다.

* itos함수로 trg_indexes에 있는 index들을 모두 매핑된 단어로 변환합니다. (index to source)

* trg_tokens[1:] -> <sos>를 제외하고 번역된 문장을 리턴합니다.

 


 

여기까지 Transformer가 코드 상으로 어떻게 작동하는지 살펴보았습니다. 내용과 코드 자체가 복잡해서 설명에 부족함이 많았을 것 같습니다.. 어쨌든 RNN을 활용하지 않고, 오로지 Attention과 Dense 레이어만을 활용하여 모델을 구성했고, 현재 번역을 수행하는 수많은 모델들의 기반이 되는 Transformer을 얕게나마 살펴보았습니다.

 

감사합니다!

 

 

 

 

 

Comments