스터디/AI

[이론/Imple] Transformers 모델

_leezoee_ 2023. 4. 24. 17:13

NLP 모델 연혁

* 2014 RNN based sequence-to-sequence model 

* 2015 Attention Mechanism

* 2017 Transformer (Google)

* 2018 Embedding from Language Models(ELMO), OpenAI GPT, Bidirectional Encoder Representations from Transformers (BERT, Google)

* 2019 OpenAI GPT-2, XLNet(Google), T5 (Google), 

* 2020 OpenAI GPT-3

* 2021 Switch Transformer

* 2022 Data2Vec, InstructGPT, PaLM(Google)

* 2023 AdA, Muse, Bard, OpenAI GPT-4, PaLM-E

자연어 처리 커리큘럼에 대한 이해 구조도

https://mccormickml.com/2019/11/11/bert-research-ep-1-key-concepts-and-sources/

 

BERT Research - Ep. 1 - Key Concepts & Sources · Chris McCormick

BERT Research - Ep. 1 - Key Concepts & Sources 11 Nov 2019 1. Introduction In this “research notes” blog post, and the ones that follow it, I’ll be sharing what I am learning about BERT, as well as identifying the areas where I am perhaps confused or

mccormickml.com

 

transformer 이전까지는 한땀한땀 수작업으로 학습시키는 모델이였으나 가장 탑에 있는 BERT (+GPT, T5)의 등장으로 전이학습모델로 가져다가 fine tuning 이 가능하게 되어서 사실상 고유모델로 만드는게 무의미해졌다고 할 수 있게 되었다.

NLP를 공부하는 비기너들은 성과를 빨리 내야 하면 Transformer 부터 공부하면 되고

그 안에 있는 원리 부터 공부하겠다 하면 처음 레이어 쌓는 부분인 RNN LSTM 부터 공부하면 된다.

 

 

 

 

Transformer model 

 

LSTM을 이용한 NLP network의 한계점이 여전히 존재.

 

1. 단어가 많이 있는 긴 문단의 경우 LSTM 처리는 네트워크의 깊이가 매우 깊어서 기울기소실 문제가 여전히 발생

2. 전이학습이 어려움

3. 감성분석, 챗봇, 번역, NER 등등 특정 Task 마다 별도 label 된 dataset이 필요하다

4. RNN은 time step 순으로 순차처리하므로 GPU를 다 활용하지 못함

 

=> 이러한 한계점을 극복하고자 Transformer 모델이 등장

 

Transformer model 특징

 

1. RNN 혹은 CNN을 이용해 sequencd-align 하지 않은 최초의 self-attention 모델

2. 기존의 encoder-decoder 구조를 유지

3. RNN을 제거함으로써 병렬처리가 가능해졌고 GPU 효율을 극대화 할 수 있게 되었다.

 

Transformer 구조도

 

https://vaclavkosar.com/ml/transformers-self-attention-mechanism-simplified

 

Transformer’s Self-Attention Mechanism Simplified

How transformer models like BERT and GPT work?

vaclavkosar.com

 

 

encoder-decoder 구조에서 encoder 부분만 쓴게 BERT 모델의 기초고, decoder 부분만 쓴게 GPT 모델의 기초이다.

encoder를 많이 쓴다는 뜻은 문맥의 의미 파악에 중점, decoder를 많이 쓴다는 뜻은 text generation 즉 글쓰기에 중점을 두었다고 할 수 있다.

 

 

인코더 디코더 상세구조도

https://jalammar.github.io/illustrated-transformer/

 

The Illustrated Transformer

Discussions: Hacker News (65 points, 4 comments), Reddit r/MachineLearning (29 points, 3 comments) Translations: Arabic, Chinese (Simplified) 1, Chinese (Simplified) 2, French 1, French 2, Japanese, Korean, Persian, Russian, Spanish 1, Spanish 2, Vietnames

jalammar.github.io

 

인코더 디코더 구조를 상세히 살펴보면

encoder에 두개의 단어가 입력 된다고 했을때

1. 단어를 임베딩(단어를 벡터화)해서 입력한다. 위 그림에서는 Thinking과  Machines 각각 네개의 디멘션으로 된 임베딩 벡터를 만들었다.

2. Positional encoding 은 RNN이 없어지면서 시퀀스 즉 순서정보를 유실할 수 있으므로 순서정보를 강제로 주입한 개념이다.

3. 입력이 Self-Attention을 거쳐 바로 Normalize로 가는 기법은 ResNet 이라는 이미지처리 모듈에서 사용되는 방법으로 전처리 데이터에 원본 데이터를 더하는 방법으로 층이 계속 깊어져도 원래의 데이터값을 잊어버리지 않도록 한다.

4. 다음으로 Feed Forward 레이어라고 Dense 레이어를 거치면서 Weight를 적용시켜 학습을 진행하고,

5. 최종 output이 decoder 쪽에 전달된다.

6. decoder에도 두개의 단어가 입력 되면 encoder 와 동일한 구조를 거친다. output을 다음 decoder로 계속 전달하는식.

7. 마지막에 Linear 를 거치고 Softmax 함수를 거쳐 확률분포로 표현된다.

 

Multi-Head Attention : 여러개의 attention을 병렬로 사용한 후 Attention Head를 연결

 다른 시각으로 단어 간 상관관계를 파악하는 방법으로, 각각 random 하게 초기화 되므로 학습 후 다른 subspace로 표시된다.

Multi-Head Attention의 크기는 encoder의 최초입력의 크기와 동일하다.

Transformer는  encoder를 6개 쌓은 형태로 입력의 크기가 출력에서 유지되어야 다음 encoder의 입력으로 사용이 가능하다.

 

 

Position-wise Feed Forward 네트워크

encoder와 decoder의 각각의 layer에있음, Fully-Connected 레이어라고 보면 됨

def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
    	tf.keras.layers.Dense(dff, activation='relu')
        tf.keras.layers.Dense(d_model)
    ])

일반적으로 쓰던 Dense 레이어와 동일

 

 

Residual Connection , Layer Normalization

트랜스포머는 서브층의 입력, 출력이 동일한 차원을 유지하므로 잔차 연결이 가능하다. (기울기소실문제 해결)

 

 

Batch Normalization

내부 공변량 변화 (Internal Covariance Shift) :  네트워크의 각 층이나 Activation 마다 input의 distribution이 달라지는 현상을 말한다. 최초 input layer의 normalization 효과가 hidden layer를 거치면서 희석이 된다.

=> 각 층의 input의 distribution을 평균 0, 표준편차가 1인 input으로 normalize 시켜준다

 

 

Hyper-parameter

* dmodel : encoder와 decoder의 정해진 입출력크기, embedding vector의 크기, 모든 encoder와 decoder 층에 공통사항

* num_layers : encoder와 decoder의 전체 층 수

* num_heads : 병렬처리 multi-head 갯수

*  dff : 트랜스포머 내부 feed forward 신경망의 은닉층 크기

 

 

 

Attention

 

Self-Attention(Intra-Attention) : Attention을 자기 자신에 대해 수행해 문장 내 단어들 간의 유사도를 구하는 방법, 

3개의 Vector : Query Vector, Key Vector, Value Vector 를 훈련과정에서 스스로 학습한다.

 

 

1) Q, K, V vector를 생성하는 단계 

2) Q, K, V를 이용한 scoring 단계 : 모든 K vector에 대해 attention score를 구한다

3) Softmax 함수로 Attention 분포를 구한다

4) V vector를 가중합하여 Attention Value를 구한다

 

위 과정을 행렬(Matrix)연산으로 일관 처리 해 GPU 성능을 최적화한다.

 

텐서플로우에서 scled_dot_product_attention을 사용하는 방법이 있다. 부가적인 설명에 좋은 블로그 글 있어서 참고용으로 올려두겠다.

https://velog.io/@cha-suyeon/%EC%8A%A4%EC%BC%80%EC%9D%BC%EB%93%9C-%EB%8B%B7-%ED%94%84%EB%A1%9C%EB%8D%95%ED%8A%B8-%EC%96%B4%ED%85%90%EC%85%98Scaled-dot-product-Attention

 

스케일드 닷-프로덕트 어텐션(Scaled dot-product Attention)

Q, K, V 벡터를 얻었다면 지금부터는 기존에 배운 어텐션 메커니즘과 동일합니다. 각 Q 벡터는 모든 K 벡터에 대해서 어텐션 스코어를 구하고, 어텐션 분포를 구한 뒤에 이를 사용하여 모든 V 벡터

velog.io

 

 

 

Positional Encoding

위치 정보 특성을 해결하기 위해 Embedding vector에 positional encoding 값을 추가한다.

짝수위치일때는 sin 함수, 홀수위치일때는 cos 함수를 적용한다. (sin은 90도일때 1이고 cos은 90도일때 0이라 엇갈리는 모양의 정현파가 나옴)

같은 단어라도 문장 내 위치에 따라 transformer의 입력으로 들어가는 embedding vector값이 달라진다 

=> 순서의 정보가 포함된 embedding vector

정현파는 일정한 cycle로 값이 규칙적으로 반복되므로, 신경망이 상대적 위치값을 쉽게 학습할 수 있다.

 

 

 


 

 

 

 

Decoder

top encoder의 output이 decoder의 attention vector의 key, value로 transform 되어 각 decoder의 "encoder-decoder attention" layer에서 사용된다.

=> input sequence의 적절한 위치를 focus 하도록 한다

 

query : training - groud truth label , inference - previous predictions

 

decoding result는 encoder 경우처럼 다음으로 전달한다

=> 각 step의 output은 next time step 에서 bottom decoder의 input으로 사용된다

 

decoder input에 positional encoding을 추가해준다

 

decoder의 self-attention layer는 ouput sequence의 self-attention 계산을 위한 softmax step 전에는 future position을 masking하여 이전 위치만 attend 할 수 있도록 한다. (-inf setting)

 

encoder-decoder attention은 query가 decoder의 아래쪽 layer에서 생성되는 것 외에는 encoder의 multi-headed attention과 동일하게 동작한다.

 

Final Linear & Softmax Layer에서 가장 큰 값의 index를 선택한다(argmax)

label smoothing 기법을 사용해 overfitting 방지를 위해 1보다 작고, 0보다 크도록 one-hot encoding을 조정한다. (0 혹은 1 외에도 여러가지가 언어모델에서는 정답이 될 수 있기 때문)

 

 

 

 

 

 


 

 

 

 

 

소스구현

 

구글에서 트랜스포머 모델 논문을 그대로 구현하는 튜토리얼을 제공중이다.

참고

https://www.tensorflow.org/text/tutorials/transformer?hl=ko 

 

언어 이해를 위한 변환기 모델  |  Text  |  TensorFlow

날짜를 저장하십시오! TensorFlow가 5월 10일 Google I/O에서 돌아왔습니다. 지금 등록하세요. 이 페이지는 Cloud Translation API를 통해 번역되었습니다. Switch to English 언어 이해를 위한 변환기 모델 컬렉션

www.tensorflow.org

 

아래 소스 구현 구조도 (번호로 순서 참조)

 

 

 

 

필요라이브러리 설치, 임포트

!pip install tensorflow_datasets
!pip install -U tensorflow-text
import collections
import logging

import os
import pathlib
import re
import string
import sys
import time

import numpy as np
import matplotlib.pyplot as plt

import tensorflow_datasets as tfds
import tensorflow_text as text
import tensorflow as tf

logging.getLogger('tensorflow').setLevel(logging.ERROR)

 

구글에서 제공하는 TED 강연포르투갈,영어 페어링 번역 데이터를 다운로드 받는다

https://github.com/neulab/word-embeddings-for-nmt

 

GitHub - neulab/word-embeddings-for-nmt: Supplementary material for "When and Why Are Pre-trained Word Embeddings Useful for Neu

Supplementary material for "When and Why Are Pre-trained Word Embeddings Useful for Neural Machine Translation?" at NAACL 2018 - GitHub - neulab/word-embeddings-for-nmt: Supplementary mat...

github.com

 

examples, metadata = tfds.load('ted_hrlr_translate/pt_to_en', with_info=True,
		as_supervised=True)
train_examples, val_examples = examples['train'], examples['validation

print(examples)
print("=" * 80)
print(metadata)

 

데이터 상태를 확인한다

for k, v in metadata.splits.items():
	print(k, v.num_examples)

train 51785

validation 1193

test 1803

개수를 확인할 수 있다.

 

포르투칼어<->영어 문장 페어를 생성한다.

for pt_examples, en_examples in train_examples.batch(3).take(1):
	for pt in pt_examples.numpy():
		print(pt.decode('utf-8'))

	print()
    
    
	for en in en_examples.numpy():
		print(en.decode('utf-8'))

 

변환된 텍스트를 embedding에 대한 인덱스인 토큰 ID 수열로 변환한다.

구글이 제공하는 사전훈련 토크나이저를 다운로드해서 활용한다.

# 사전 훈련된 Subword Tokenizer download
model_name = "ted_hrlr_translate_pt_en_converter"

tf.keras.utils.get_file(
	f"{model_name}.zip",
	f"https://storage.googleapis.com/download.tensorflow.org/models/{mo
	cache_dir='.', cache_subdir='', extract=True
)
tokenizers = tf.saved_model.load(model_name)

 

tokenize를 통해 문자열 배치를 토큰 ID의 padded batch로 변환한다. 

구두점, 소문자를 분할하고 토큰화하기 전에 입력을 유니코드로 정규화하는 과정을 거친다.

detokenize를 통해 tokenized ID를 사람이 읽을 수 있는 텍스트로 다시 변환 한다.

for en in en_examples.numpy():
	print(en.decode('utf-8'))

encoded = tokenizers.en.tokenize(en_examples)
	for row in encoded.to_list():
		print(row)

round_trip = tokenizers.en.detokenize(encoded)
for line in round_trip.numpy():
	print(line.decode('utf-8'))

 

토큰화 해보기

"Transformer is awesome."을 tokenize.

sample_string = tf.constant(['Transformer is awesome.'])

tokenized_string = tokenizers.en.tokenize(sample_string)

print(tokenized_string.numpy())
print(tokenizers.en.lookup(tokenized_string))
print(tokenizers.en.detokenize(tokenized_string).numpy())

토큰화 결과

 


(1) 입력 파이프라인 설정

 

데이터를 페이링하기 위해 입력 파이프라인을 설정한다.

원시 텍스트 데이터의 비정형 텐서(rangged tensor)를 tensor로 변환한다.

#패딩해주는 역할, to_tensor가 패딩까지 다 됨
def tokenize_pairs(pt, en):
	pt = tokenizers.pt.tokenize(pt)
	# 비정형에서 밀도로 변환하고 패딩 0으로 진행
	pt = pt.to_tensor()

	en = tokenizers.en.tokenize(en)
	# 비정형에서 밀도로 변환하고 패딩 0으로 진행
	en = en.to_tensor()
	return pt, en

 

 

다음으로 데이터 처리, 셔플, 일괄 처리하는 입력 파이프라인을 만든다.,

학습, 검정 배치 파이프라인을 생성한다.

BUFFER_SIZE = 20000
BATCH_SIZE = 64

def make_batches(ds):
	return (
		ds
		.cache()
		.shuffle(BUFFER_SIZE)
		.batch(BATCH_SIZE)
		.map(tokenize_pairs, num_parallel_calls=tf.data.AUTOTUNE) # 병렬처리 시 최적화 num_parallel_calls=tf.data.AUTOTUNE
		.prefetch(tf.data.AUTOTUNE))

train_batches = make_batches(train_examples)
val_batches = make_batches(val_examples)
train_batches, val_batches

 

 

데이터가 잘 만들어졌나 확인해보기

pt_batch, en_batch = next(iter(val_batches))
pt_batch, en_batch

 

 

(2) positional encoding

 

데이터가 다 준비되면 

positional encoding을 진행한다.

 

def get_angles(pos, i, d_model):
	angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
	return pos * angle_rates
def positional_encoding(position, d_model):
	angle_rads = get_angles(np.arange(position)[:, np.newaxis],
	np.arange(d_model)[np.newaxis, :], d_model)

# 짝수 위치 (2i) 에는 sin 적용
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

# 홀수 위치 (2i) 에는 cos 적용
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
pos_encoding = angle_rads[np.newaxis, ...]

return tf.cast(pos_encoding, dtype=tf.float32)

 

데이터가 위치에 따라 포지셔닝에 어떤 차이가 있는지 보기 위해 시각화 진행

n, d = 2048, 512
pos_encoding = positional_encoding(n, d)
print(pos_encoding.shape)

pos_encoding = pos_encoding[0]
#플롯의 차원 저글링
pos_encoding = tf.reshape(pos_encoding, (n, d//2, 2))
pos_encoding = tf.transpose(pos_encoding, (2, 1, 0))
pos encoding = tf.reshape(pos encoding, (d, n))

plt.pcolormesh(pos_encoding, cmap='RdBu')
plt.ylabel('Depth')
plt.xlabel('Position')
plt.colorbar()
plt.show()

시각화 결과

 

 

(3) Making

 

다음으로 두 가지 마스크를 생성한다. (PAD mask, look ahead mask)

- PAD mask : encoder, decoder 의 sequence batch 내의 token 위치 표시

- look ahead mask : decoder 에서 뒤쪽 단어를 보지 않기 위해 mask

 

먼저 PAD Mask

- sequence batch 내의 모든 패드 토큰 (0) 을 마스킹.

- 이렇게 하면 모델이 패딩을 입력으로 처리하지 않는다.

- 마스크는 패드 값 '0'이있는 위치를 나타낸다.

- 해당 위치에서 '1'을 출력하고 그렇지 않으면 '0'을 출력한다.

def create_padding_mask(seq):
	seq = tf.cast(tf.math.equal(seq, 0), tf.float32) # integer 0 을 float32 type으로 변환
	# attention logit 에 padding 을 더하기 위해 extra dimension 을 추가
	return seq[:, tf.newaxis, tf.newaxis, :] # (batch_size, 1, 1, seq_len)

look-ahead 마스크

- sequence 의 뒤쪽(오른쪽) 토큰을 마스킹하는 데 사용.

- 즉, 마스크는 예측에 사용하지 말아야 할 단어를 표시

- 예를 들어 세 번째 단어를 예측하기 위해 첫 번째와 두 번째 단어 만 사용 됨을 의미

- 네 번째 단어를 예측하려면 마찬가지로 첫 번째, 두 번째 및 세 번째 단어 만 사용

 

def create_look_ahead_mask(size):
	mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
	return mask # (seq_len, seq_len)

 

 

 

(4) Scaled dot product attention

 

def scaled_dot_product_attention(q, k, v, mask):
	matmul_qk = tf.matmul(q, k, transpose_b=True) # shape (..., seq_len_q, seq_len_k), k를 transpose
	
    # scale matmul_qk
	dk = tf.cast(tf.shape(k)[-1], tf.float32)
	scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
	
    # scaled_attention_logit 에 (mask * -1e9) 를 더하여 padding 위치 (masked 1)의 값이 0 이 되도록 함
	if mask is not None:
		scaled_attention_logits += (mask * -1e9)
    
    # last axis (seq_len_k) 를 기준으로 softmax normalize 되어 scores 의 합이 1이 되도록 함.
	attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
	output = tf.matmul(attention_weights, v) # (..., seq_len_q, depth_v
	return output, attention_weights

 

 

(5) Multi-Head Attention

 

- linear layer 와 헤드 분할.

- scaled dot-product attention

- multi-head 의 concatenate.

- 최종 linear layer

 

class MultiHeadAttention(tf.keras.layers.Layer):
	def __init__(self, d_model, num_heads):
            super(MultiHeadAttention, self).__init__()
            self.num_heads = num_heads
            self.d_model = d_model

            assert d_model % self.num_heads == 0

            self.depth = d_model // self.num_heads

            # Linear Layer
            self.wq = tf.keras.layers.Dense(d_model)
            self.wk = tf.keras.layers.Dense(d_model)
            self.wv = tf.keras.layers.Dense(d_model)

            self.dense = tf.keras.layers.Dense(d_model)

	def split_heads(self, x, batch_size):
            x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
            return tf.transpose(x, perm=[0, 2, 1, 3])

	#메인로직
	def call(self, v, k, q, mask):
            batch_size = tf.shape(q)[0]
        
            q = self.wq(q) # (batch_size, seq_len, d_model)
            k = self.wk(k) # (batch_size, seq_len, d_model)
            v = self.wv(v) # (batch_size, seq_len, d_model)

            q = self.split_heads(q, batch_size) # (batch_size, num_heads, seq_len_q, depth)
            k = self.split_heads(k, batch_size) # (batch_size, num_heads, seq_len_k depth)
            v = self.split_heads(v, batch_size) # (batch_size, num_heads, seq_len_v depth)

            # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
            # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
            scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
            scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) # (batch_size, seq_len_q, num_heads, depth)
            concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model)) # (batch_size, seq_len_q, d_model)
            output = self.dense(concat_attention) # (batch_size, seq_len_q, d_model)

            return output, attention_weights

 

 

(6) Point wise feed forward network

 

Pointwise feed forward network 은 ReLU 활성화를 통해 두 개의 완전 연결 층으로 구성된다.

def point_wise_feed_forward_network(d_model, dff):
	return tf.keras.Sequential([
		tf.keras.layers.Dense(dff, activation='relu'), #(batch_size,seq_len,dff)
		tf.keras.layers.Dense(d_model) # (batch_size, #(batch_size,seq_len,d_model)
])

shape 확인해보기

sample_ffn = point_wise_feed_forward_network(512, 2048)
sample_ffn(out).shape

 

 

 

(7) encoder layer 작성

 

각 encoder layer 는 sublayer 들로 구성

1. Multi-head attention (with padding mask)

2. Point wise feed forward networks.

 

class EncoderLayer(tf.keras.layers.Layer):
	def __init__(self, d_model, num_heads, dff, rate=0.1):
            super(EncoderLayer, self).__init__()
		
            self.mha = MultiHeadAttention(d_model, num_heads)
            self.ffn = point_wise_feed_forward_network(d_model, dff)
            self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
            self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        
            self dropout1 = tf keras layers Dropout(rate)
            self.dropout2 = tf.keras.layers.Dropout(rate)


	def call(self, x, training, mask):
            attn_output, _ = self.mha(x, x, x, mask) # (batch_size, input_seq_len, d_model)
            attn_output = self.dropout1(attn_output, training=training)
        
            out1 = self.layernorm1(x + attn_output) # (batch_size, input_seq_len, d_model)
        
            ffn_output = self.ffn(out1) # (batch_size, input_seq_len, d_model)
            ffn_output = self.dropout2(ffn_output, training=training)
            out2 = self.layernorm2(out1 + ffn_output) # (batch_size, input_seq_len, d_model)
        
            return out2

 

 

(8) decoder layer 작성

 

각 decoder layer 는 다음의 sublayer 들로 구성

1. Masked multi-head attention (look ahead mask 와 padding mask 로 구성)

2. Multi-head attention (with padding mask).

3. Point wise feed forward networks

class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()
        
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)
        
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)
        
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        # enc_output.shape == (batch_size, input_seq_len, d_model)
        
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask) #(batch_size, target_seq_len, d_model)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask) #(batch_size, target_seq_len, d_model)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)  # (batch_size, target_seq_len, d_model)
        
        ffn_output = self.ffn(out2) # (batch_size, target_seq_len, d_model)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2) # (batch_size, target_seq_len, d_model)

        return out3, attn_weights_block1, attn_weights_block2

 

 

 

(9) encoder 전체

 

전체 Encoder 는 다음으로 구성

1. Input Embedding

2. Positional Encoding

3. N encoder layers

 

class Encoder(tf.keras.layers.Layer):
  def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
               maximum_position_encoding, rate=0.1):
    super(Encoder, self).__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
    self.pos_encoding = positional_encoding(maximum_position_encoding,
                                            self.d_model)

    self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate)
                       for _ in range(num_layers)]

    self.dropout = tf.keras.layers.Dropout(rate)

  def call(self, x, training, mask):

    seq_len = tf.shape(x)[1]

    # 임베딩 및 위치 인코딩 추가
    x = self.embedding(x)  # (batch_size, input_seq_len, d_model)
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x += self.pos_encoding[:, :seq_len, :]

    x = self.dropout(x, training=training)

    for i in range(self.num_layers):
      x = self.enc_layers[i](x, training, mask)

    return x  # (batch_size, input_seq_len, d_model)

 

 

(10) decoder 전체

 

전체 Decoder 는 다음으로 구성

1. Output Embedding

2. Positional Encoding

3. N decoder layers

class Decoder(tf.keras.layers.Layer):
  def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,
               maximum_position_encoding, rate=0.1):
    super(Decoder, self).__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
    self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

    self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate)
                       for _ in range(num_layers)]
    self.dropout = tf.keras.layers.Dropout(rate)

  def call(self, x, enc_output, training,
           look_ahead_mask, padding_mask):

    seq_len = tf.shape(x)[1]
    attention_weights = {}

    x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x += self.pos_encoding[:, :seq_len, :]

    x = self.dropout(x, training=training)

    for i in range(self.num_layers):
      x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                             look_ahead_mask, padding_mask)

      attention_weights[f'decoder_layer{i+1}_block1'] = block1
      attention_weights[f'decoder_layer{i+1}_block2'] = block2

    # x.shape == (batch_size, target_seq_len, d_model)
    return x, attention_weights

 

 

 

(11) Transformer 생성

 

Transformer 는 인코더, 디코더 및 최종 linear layer 로 구성.

디코더의 출력은 linear layer 에 대한 입력이며 출력이 반환된다

class Transformer(tf.keras.Model):
  def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
               target_vocab_size, pe_input, pe_target, rate=0.1):
    super().__init__()
    self.encoder = Encoder(num_layers, d_model, num_heads, dff,
                             input_vocab_size, pe_input, rate)

    self.decoder = Decoder(num_layers, d_model, num_heads, dff,
                           target_vocab_size, pe_target, rate)

    self.final_layer = tf.keras.layers.Dense(target_vocab_size)

  def call(self, inputs, training):
    # Keras 모델은 첫 번째 인수에 모든 입력을 전달
    inp, tar = inputs

    enc_padding_mask, look_ahead_mask, dec_padding_mask = self.create_masks(inp, tar)

    enc_output = self.encoder(inp, training, enc_padding_mask)  # (batch_size, inp_seq_len, d_model)

    # dec_output.shape == (batch_size, tar_seq_len, d_model)
    dec_output, attention_weights = self.decoder(
        tar, enc_output, training, look_ahead_mask, dec_padding_mask)

    final_output = self.final_layer(dec_output)  # (batch_size, tar_seq_len, target_vocab_size)

    return final_output, attention_weights

  def create_masks(self, inp, tar):
    # 인코더 패딩 마스크
    enc_padding_mask = create_padding_mask(inp)

    # 디코더의 두 번째 어텐션 블록에서 사용
    # 이 패딩 마스크는 인코더 출력을 마스크하는 데 사용됨
    dec_padding_mask = create_padding_mask(inp)

    # 디코더의 첫 번째 어텐션 블록에서 사용
    # 입력에서 향후 토큰을 채우고 마스킹하는 데 사용됨
    # the decoder.
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    look_ahead_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

    return enc_padding_mask, look_ahead_mask, dec_padding_mask

 

 

하이퍼 파라미터 설정

num_layers = 4
d_model = 128
dff = 512
num_heads = 8
dropout_rate = 0.1

 

옵티마이저 설정

러닝레이트를 바꿔줌 , Adam 옵티마이저 쓰는데 처음엔 러닝레이트를 크게 줘서 학습이 빨리 되게 만들고 뒤로 가면 확 줄여서 천천히 찾아갈 수 있도록 조정

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

학습과정 시각화 (러닝레이트)

temp_learning_rate_schedule = CustomSchedule(d_model)

plt.plot(temp_learning_rate_schedule(tf.range(40000, dtype=tf.float32)))
plt.ylabel("Learning Rate")
plt.xlabel("Train Step")

 

학습과정 시각화 결과

 

 

손실 및 측정항목(Loss and metrics)

target sequences 가 padding 되어 있으므로, loss 계산 시 padding mask 를 적용하는 것이 중요.

tf.keras.losses.SparseCategoricalCrossentropy 의 reduction=NONE 으로 지정할 경우는 shape 이 [batch_size, d0, .. dN1] 이고, 그렇지 않은 경우는 scalar 이다.

 

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_sum(loss_)/tf.reduce_sum(mask)


def accuracy_function(real, pred):
  accuracies = tf.equal(real, tf.argmax(pred, axis=2))

  mask = tf.math.logical_not(tf.math.equal(real, 0))
  accuracies = tf.math.logical_and(mask, accuracies)

  accuracies = tf.cast(accuracies, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(accuracies)/tf.reduce_sum(mask)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.Mean(name='train_accuracy')

 

 

훈련 및 체크포인트 인스턴스 생성

transformer = Transformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=tokenizers.pt.get_vocab_size().numpy(),
    target_vocab_size=tokenizers.en.get_vocab_size().numpy(),
    pe_input=1000,
    pe_target=1000,
    rate=dropout_rate)

 

중간중간 체크포인트 저장

checkpoint_path = "./checkpoints/train"

ckpt = tf.train.Checkpoint(transformer=transformer,
                           optimizer=optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# 체크포인트가 있으면 최신 체크포인트를 복원
if ckpt_manager.latest_checkpoint:
  ckpt.restore(ckpt_manager.latest_checkpoint)
  print('Latest checkpoint restored!!')

 

teacher forcing을 위해 target 은 tar_inp와 tar_real로 나뉜다.

sentence = "SOS A lion in the jungle is sleeping EOS"

tar_inp = "SOS A lion in the jungle is sleeping"

tar_real = "A lion in the jungle is sleeping EOS

이렇게 데이터 만들기

EPOCHS = 20
train_step_signature = [
    tf.TensorSpec(shape=(None, None), dtype=tf.int64),
    tf.TensorSpec(shape=(None, None), dtype=tf.int64),
]


@tf.function(input_signature=train_step_signature)
def train_step(inp, tar):
  tar_inp = tar[:, :-1]
  tar_real = tar[:, 1:]

  with tf.GradientTape() as tape:
    predictions, _ = transformer([inp, tar_inp],
                                 training = True)
    loss = loss_function(tar_real, predictions)

  gradients = tape.gradient(loss, transformer.trainable_variables)
  optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

  train_loss(loss)
  train_accuracy(accuracy_function(tar_real, predictions))

 

 

모델학습 시작

 

for epoch in range(EPOCHS):
  start = time.time()

  train_loss.reset_states()
  train_accuracy.reset_states()

  #inp -> 포르투갈어, tar -> 영어
  for (batch, (inp, tar)) in enumerate(train_batches):
    train_step(inp, tar)

    if batch % 50 == 0:
      print(f'Epoch {epoch + 1} Batch {batch} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')

  if (epoch + 1) % 5 == 0:
    ckpt_save_path = ckpt_manager.save()
    print(f'Saving checkpoint for epoch {epoch+1} at {ckpt_save_path}')

  print(f'Epoch {epoch + 1} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')

  print(f'Time taken for 1 epoch: {time.time() - start:.2f} secs\n')

 

 

모델 추론 실행

class Translator(tf.Module):
  def __init__(self, tokenizers, transformer):
    self.tokenizers = tokenizers
    self.transformer = transformer

  def __call__(self, sentence, max_length=20):
    # 입력 문장은 포르투갈어이므로 시작 및 종료 토큰을 추가
    assert isinstance(sentence, tf.Tensor)
    if len(sentence.shape) == 0:
      sentence = sentence[tf.newaxis]

    sentence = self.tokenizers.pt.tokenize(sentence).to_tensor()

    encoder_input = sentence

    # 대상이 영어이므로 변환기에 대한 첫 번째 토큰은 영어로 시작
    start_end = self.tokenizers.en.tokenize([''])[0]
    start = start_end[0][tf.newaxis]
    end = start_end[1][tf.newaxis]

    # `tf.TensorArray`필요, 동적 루프는 `tf.function`으로 추적할 수 있음
    output_array = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
    output_array = output_array.write(0, start)

    for i in tf.range(max_length):
      output = tf.transpose(output_array.stack())
      predictions, _ = self.transformer([encoder_input, output], training=False)

      # seq_len 차원에서 마지막 토큰 선택
      predictions = predictions[:, -1:, :]  # (batch_size, 1, vocab_size)

      predicted_id = tf.argmax(predictions, axis=-1)

      # 디코더에 입력으로 제공되는 출력에 예측 ID를 연결
      output_array = output_array.write(i+1, predicted_id[0])

      if predicted_id == end:
        break

    output = tf.transpose(output_array.stack())
    # output.shape (1, tokens)
    text = tokenizers.en.detokenize(output)[0]  # shape: ()

    tokens = tokenizers.en.lookup(output)[0]

     #`tf.function`은 주의 가중치를 사용하는 것을 방지.
     # 루프의 마지막 반복에서 계산됨, 외부에서 다시 계산필요
     # 루프.
    _, attention_weights = self.transformer([encoder_input, output[:,:-1]], training=False)

    return text, tokens, attention_weights
translator = Translator(tokenizers, transformer)
def print_translation(sentence, tokens, ground_truth):
  print(f'{"Input:":15s}: {sentence}')
  print(f'{"Prediction":15s}: {tokens.numpy().decode("utf-8")}')
  print(f'{"Ground truth":15s}: {ground_truth}')
sentence = "este é um problema que temos que resolver."
ground_truth = "this is a problem we have to solve ."

translated_text, translated_tokens, attention_weights = translator(
    tf.constant(sentence))
print_translation(sentence, translated_text, ground_truth)

 

 

Attention plots

Translator 클래스는 모델의 내부 작업을 시각화하는 데 사용할 수 있는 어텐션 맵의 dictionary를 반환

def plot_attention_head(in_tokens, translated_tokens, attention):
   # 플롯은 토큰이 생성될 때 주목.
   # 모델이 출력에서 `<START>`를 생성하지 않았을때 skip
  translated_tokens = translated_tokens[1:]

  ax = plt.gca()
  ax.matshow(attention)
  ax.set_xticks(range(len(in_tokens)))
  ax.set_yticks(range(len(translated_tokens)))

  labels = [label.decode('utf-8') for label in in_tokens.numpy()]
  ax.set_xticklabels(
      labels, rotation=90)

  labels = [label.decode('utf-8') for label in translated_tokens.numpy()]
  ax.set_yticklabels(labels)
head = 0
# shape: (batch=1, num_heads, seq_len_q, seq_len_k)
attention_heads = tf.squeeze(
  attention_weights['decoder_layer4_block2'], 0)
attention = attention_heads[head]
attention.shape
in_tokens = tf.convert_to_tensor([sentence])
in_tokens = tokenizers.pt.tokenize(in_tokens).to_tensor()
in_tokens = tokenizers.pt.lookup(in_tokens)[0]

plot_attention_head(in_tokens, translated_tokens, attention)

시각화 결과

 

 

모델 저장 후 내보내기

class ExportTranslator(tf.Module):
  def __init__(self, translator):
    self.translator = translator

  @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.string)])
  def __call__(self, sentence):
    (result, 
     tokens,
     attention_weights) = self.translator(sentence, max_length=100)

    return result
translator = ExportTranslator(translator)

tf.saved_model.save(translator, export_dir='translator')

 

 

해당 소스는 텐서플로우에서 제공하는 튜토리얼을 정리한 소스이다. 

더 자세한 설명은 텐서플로우 튜토리얼을 참고하면 좋을 듯 하다.