스터디/AI

[이론/Imple] Variational autoencoder, VAE 변이형 오토인코더 2

_leezoee_ 2024. 4. 30. 20:53

 

이 게시물은 <만들면서 배우는 생성 AI 2판> 교재의 내용과 소스코드를 기반으로 실습한 내용을  기반으로 하고있다.

 

https://github.com/rickiepark/Generative_Deep_Learning_2nd_Edition/

 

GitHub - rickiepark/Generative_Deep_Learning_2nd_Edition: <만들면서 배우는 생성 AI 2판>의 코드 저장소

<만들면서 배우는 생성 AI 2판>의 코드 저장소. Contribute to rickiepark/Generative_Deep_Learning_2nd_Edition development by creating an account on GitHub.

github.com

 

 

얼굴 데이터 변이형 오토인코더 실습

 

실습 데이터는 kaggle 에 있는 CelebFace를 다운로드하여 진행한다.

 

https://www.kaggle.com/datasets/jessicali9530/celeba-dataset

 

CelebFaces Attributes (CelebA) Dataset

Over 200k images of celebrities with 40 binary attribute annotations

www.kaggle.com

 

VAE를 훈련할 때 레이블은 따로 필요치 않으나 나중에 다차원 잠재 공간에서 어떻게 특성이 감지되었는지 탐색할 때 레이블을 사용한다. 

VAE가 훈련되면 잠재 공간에서 샘플링하여 새로운 유명 인사 얼굴을 생성할 수 있다.

 

먼저 데이터를 불러오고 전처리하는 소스를 작성한다.

# 파라미터 정의
IMAGE_SIZE = 64
CHANNELS = 3
BATCH_SIZE = 128
NUM_FEATURES = 64
Z_DIM = 200
LEARNING_RATE = 0.0005
EPOCHS = 10
BETA = 2000
LOAD_MODEL = False

# 데이터 로드
train_data = utils.image_dataset_from_directory(
    "경로입력",
    labels=None,
    color_mode="rgb",
    image_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=BATCH_SIZE,
    shuffle=True,
    seed=42,
    interpolation="bilinear",

# 데이터 전처리
def preprocess(img):
    img = tf.cast(img, "float32") / 255.0
    return img


train = train_data.map(lambda x: preprocess(x))
train_sample = sample_batch(train)

 

이미지 크기를 64*64로 조정하고 픽셀 사이를 보간한다.

원본 데이터는 [0, 255] 범위로 픽셀 강도를 나타낸다, 이를 [0, 1] 범위로 다시 조정해준다.

 

다으으로 VAE를 훈련시키는 소스를 작성해야한다.

 

그 전에 앞 게시물인 MNIST 예제와의 차이점을 비교해보고자 한다.

 

1. 얼굴 데이터는 입력 채널이 1개 (흑백)이 아니라 3개 (RGB)이다. 이에 따라 디코더 마지막에 있는 전치 합성곱 층 채널의 수를 맞추어야 한다.

2. 사용할 잠재 공간의 차원 수는 2개가 아니라 200개이다. 얼굴 데이터는 MNIST 이미지보다 복잡하기 때문에 이미지에 있는 상세 정보를 충분히 인코딩하려고 잠재공간의 차원을 늘려주어야 한다.

3. 안정적인 훈련을 위해 각 합성곱 층 뒤에 배치 정규화 층을 둠. 각 배치의 실행 속도는 느려지지만 동일 수준 손실에 도달하는데 필요한 배치 횟수는 크게 줄어든다.

4. KL 발산을 위한 베타인수를 2000으로 상향한다.

 

 

#샘플링 함수 제작
class Sampling(layers.Layer):
    def call(self, inputs):
        #입력값으로 잠재 변수의 평균, 로그 분산을 받음
        z_mean, z_log_var = inputs
        
        #잠재 변수의 배치 크기, 차원을 가져옴
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        
        #평균이 0, 표준편차가 1인 정규분포에서 epsilon을 샘플링
        epsilon = K.random_normal(shape=(batch, dim))
        
        #재매개변수화 사용해 잠재변수 샘플링
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

 

# 인코더
encoder_input = layers.Input(
    shape=(IMAGE_SIZE, IMAGE_SIZE, CHANNELS), name="encoder_input"
)
x = layers.Conv2D(NUM_FEATURES, kernel_size=3, strides=2, padding="same")(
    encoder_input
)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2D(NUM_FEATURES, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2D(NUM_FEATURES, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2D(NUM_FEATURES, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2D(NUM_FEATURES, kernel_size=3, strides=2, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
shape_before_flattening = K.int_shape(x)[1:]  # 디코더에 필요합니다!

x = layers.Flatten()(x)
z_mean = layers.Dense(Z_DIM, name="z_mean")(x)
z_log_var = layers.Dense(Z_DIM, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])

encoder = models.Model(encoder_input, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

인코더 summary() 결과

 

 

# 디코더
decoder_input = layers.Input(shape=(Z_DIM,), name="decoder_input")
x = layers.Dense(np.prod(shape_before_flattening))(decoder_input)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Reshape(shape_before_flattening)(x)
x = layers.Conv2DTranspose(
    NUM_FEATURES, kernel_size=3, strides=2, padding="same"
)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2DTranspose(
    NUM_FEATURES, kernel_size=3, strides=2, padding="same"
)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2DTranspose(
    NUM_FEATURES, kernel_size=3, strides=2, padding="same"
)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2DTranspose(
    NUM_FEATURES, kernel_size=3, strides=2, padding="same"
)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Conv2DTranspose(
    NUM_FEATURES, kernel_size=3, strides=2, padding="same"
)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
decoder_output = layers.Conv2DTranspose(
    CHANNELS, kernel_size=3, strides=1, activation="sigmoid", padding="same"
)(x)
decoder = models.Model(decoder_input, decoder_output)
decoder.summary()

 

디코더 summary() 결과

 

 

class VAE(models.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        #인코더, 디코더 모델 초기화
        self.encoder = encoder
        self.decoder = decoder
        #훈련 중 손실을 추적하기 위한 메트릭스 초기화
        self.total_loss_tracker = metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = metrics.Mean(name="kl_loss")

    @property
    def metrics(self): #훈련 중 추적할 메트릭스 반환
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def call(self, inputs): #특정 입력에서 모델 호출
        #입력을 인코더에 전달해 잠재 변수를 추출
        z_mean, z_log_var, z = encoder(inputs)
        #추출된 잠재 변수를 디코더에 전달해 재구성 수행
        reconstruction = decoder(z)
        return z_mean, z_log_var, reconstruction

    def train_step(self, data): #훈련 스텝 실행
        with tf.GradientTape() as tape: #자동 미분을 위한 GadientTape을 사용해 훈련 과정을 기록
            #입력 데이터를 모델에 전달해 잠재 변수와 재구성을 얻음
            z_mean, z_log_var, reconstruction = self(data, training=True)
            #재구성 손실을 계산
            reconstruction_loss = tf.reduce_mean(
                BETA * losses.mean_squared_error(data, reconstruction)
            )
            #KL 발산 손실 계산
            kl_loss = tf.reduce_mean(
                tf.reduce_sum(
                    -0.5
                    * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)),
                    axis=1,
                )
            )
            #총 손실은 재구성 손실 + KL 발산 손실의 합
            total_loss = reconstruction_loss + kl_loss

        #총 손실에 대한 그래디언트를 계산
        grads = tape.gradient(total_loss, self.trainable_weights)
        #계산된 그래디언트를 모델의 가중치에 적용해 모델을 업데이트
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        #훈련 중 손실 메트릭스를 업데이트
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
#변이형 오토인코더 생성
vae = VAE(encoder, ecoder)

 

 

다음으로 변이형 오토인코더 훈련을 진행한다.

 

# 변이형 오토인코더 컴파일
optimizer = optimizers.Adam(learning_rate=LEARNING_RATE)
vae.compile(optimizer=optimizer)

#이미지 생성기 콜백 정의
class ImageGenerator(callbacks.Callback):
    def __init__(self, num_img, latent_dim):
        #생성할 이미지의 개수, 잠재변수의 차원 초기화
        self.num_img = num_img
        self.latent_dim = latent_dim

    #각 에포크가 끝날 때마다 호출되는 콜백함수
    #임의의 잠재 변수를 생성해 디코더에 전달해 이미지를 생성, 저장
    def on_epoch_end(self, epoch, logs=None): 
        random_latent_vectors = tf.random.normal(
            shape=(self.num_img, self.latent_dim)
        )
        #임의의 잠재 변수로부터 이미지를 생성
        generated_images = self.model.decoder(random_latent_vectors)
        # 이미지를 0~255 사이 값으로 스케일링
        generated_images *= 255
        # 이미지 배열을 넘파이 배열로 변환
        generated_images.numpy()
        # 생성된 이미지를 저장
        for i in range(self.num_img):
            img = utils.array_to_img(generated_images[i])
            img.save("./output/generated_img_%03d_%d.png" % (epoch, i))
            
#모델훈련            
vae.fit(
    train,
    epochs=EPOCHS,
    callbacks=[
        model_checkpoint_callback, #모델 체크포인트 콜백, 실습에서는 사용X
        tensorboard_callback, #텐서보드 콜백
        ImageGenerator(num_img=10, latent_dim=Z_DIM), #이미지 생성기 콜백
    ],
)

 

 

다음으로 테스트 세트에서 이미지를 선택, VAE 모델을 사용해 이미지를 재구성, 출력해본다

# 테스트 세트에서 일부분을 선택합니다.
batches_to_predict = 1
example_images = np.array(
    list(train.take(batches_to_predict).get_single_element())
)

# 오토인코더 예측을 생성하고 출력합니다.
z_mean, z_log_var, reconstructions = vae.predict(example_images)

재구성 이미지 출력 예제

 

 

 

이제 잠재 공간의 포인트 분포가 다변량 표준 정규 분포와 비슷한지 확인한다.

표준 정규 분포와 크게 다르면 KL 발산 항이 충분히 영향을 못미친다는 뜻으로 재구성 손실 가중치를 줄여야한다.

 

먼저 잠재 공간의 처음 50개 차원을 그래프화한다.

 

_, _, z = vae.encoder.predict(example_images)

x = np.linspace(-3, 3, 100)

fig = plt.figure(figsize=(20, 5))
fig.subplots_adjust(hspace=0.6, wspace=0.4)

for i in range(50):
    ax = fig.add_subplot(5, 10, i + 1)
    ax.hist(z[:, i], density=True, bins=20)
    ax.axis("off")
    ax.text(
        0.5, -0.35, str(i), fontsize=10, ha="center", transform=ax.transAxes
    )
    ax.plot(x, norm.pdf(x))

plt.show()

 

잠재 공간 처음 50개차원 포인트 분포도

 

 

결과에서 알 수 있듯이 50개 전부 표준 정규 분포를 크게 벗어나는 부분이 없음을 볼 수 있다.

 

이제 새로운 얼굴을 생성하는 코드를 적는다.

 

# 표준 정규 분포에서 잠재 공간의 일부 포인트 샘플링, Z_DIM 파라미터 사용
grid_width, grid_height = (10, 3)
z_sample = np.random.normal(size=(grid_width * grid_height, Z_DIM))

# 샘플링된 포인트 디코딩
reconstructions = decoder.predict(z_sample)

# 디코딩된 이미지의 그리기
fig = plt.figure(figsize=(18, 5))
fig.subplots_adjust(hspace=0.4, wspace=0.4)

# 얼굴 그리드 출력
for i in range(grid_width * grid_height):
    ax = fig.add_subplot(grid_height, grid_width, i + 1)
    ax.axis("off")
    ax.imshow(reconstructions[i, :, :])

 

이미지 생성 결과 일부

 

 

 

여기까지 인코더 디코더를 이용한 변이형 오토인코더 얼굴 이미지 생성을 해보았다.

이미지를 인코딩 시 저차원 잠재 공간으로 매핑하면 벡터에 대한 연산을 수행할 수 있다는 장점이 생긴다.

여기서 연산의 결과를 디코딩하면 시각적으로 비슷한 효과를 만들 수 있다.

 

예를 들어 잠재 공간에서 "Blond_Hair" 이라는 벡터를 찾고, 이 벡터를 잠재 공간에 있는 원본 이미지의 인코딩에 더해준다.

이후 이 새로운 포인트를 디코딩하면 원본보다 더 금발의 이미지를 얻을 수 있게 되는 것이다.

 

 

# 레이블이 부착된 얼굴 데이터 로드
LABEL = "Blond_Hair" #금발 레이블 설정
labelled_test = utils.image_dataset_from_directory(
    "./img_align_celeba",
    labels=attributes[LABEL].tolist(),
    color_mode="rgb",
    image_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=BATCH_SIZE,
    shuffle=True,
    seed=42,
    validation_split=0.2,
    subset="validation",
    interpolation="bilinear",
)

labelled = labelled_test.map(lambda x, y: (preprocess(x), y))

# 이미지에 벡터 추가
add_vector_to_images(labelled, vae, attribute_vec)
# 얼굴 형태 변형 수행
morph_faces(labelled, vae)

 

벡터 추가한 이미지

 

얼굴 형태 변형 수행 결과

 

 

 

 

 

추가학습

 

여기까지 VAE(Variational Autoencoder)를 학습했다.

추가적으로 CVAE와 AAE를 언급하고자 한다.

 

CVAE(Conditional Variational Vutoencoder)는 VAE의 확장으로 조건부 생성(conditional generation)에 중점을 둔 모델이다. 우리나라 말로는 조건부 오토인코더라고도 불린다.

 

여기에서 말하는 조건은 주로 입력 데이터와 함께 적용되어, 생성된 출력이 특정 조건을 만족하도록 하며 이는 훈련 데이터에서 원하는 특성을 재현하거나 특정한 출력을 생성하는데 사용될 수 있다.

CVAE는 이러한 추가적인 정보를 바탕으로 특정 목표에 대해 고정된 분포에서 샘플링하는 역할을 한다.

 

CVAE는 기본적으로 VAE와 비슷하게 인코더 디코더 구조를 따른다. 인코더 디코더 모두에 추가 정보를 제공하는 조건이 적용되는 차이점이 있다.

조건은 인코더에게 입력 데이터와 함께 제공되며, 디코더에게는 잠재 표현과 함께 제공된다.

 

이러한 방식으로 CVAE는 VAE가 제공하는 확률적이고 연속적인 잠재공간을 유지하며, 조건부 생성 과정을 가능하게 한다.

 

더보기

위키독스 인공지능(AI) & 머신러닝(ML) 사전 발췌

 

AAE(Adversarial Autoencoder)는 AE의 한 유형으로, 생성적 적대 신경망 (GAN, Generative Adversarial Network)의 개념을 AE에 통합한 모델이다.

 

AAE 역시 오토인코더 기본 구조인 인코딩, 디코딩 구조를 따른다. 입력 데이터를 저차워느이 잠재 공간으로 인코딩하고, 이를 디코딩해 입력을 재구성한다.

또한 GAN의 아이디어를 이용해 생성된 이미지가 실제 이미지와 구별하기 어렵게 한다. 이를 위해 생성된 이미지와 실제 이미지 간 차이를 최소화하고, 구분자(discriminator)를 사용해 두 이미지 유사도를 평가한다.

 

 

다음 게시글로 GAN에 대해 다뤄보고자 한다. 기회가 된다면 GAN까지 공부하고 난 다음에 AAE를 더 자세히 살펴보고자 한다.