스터디/AI

[실무] 쿠베플로우 ML 파이프라인 개념,예제

_leezoee_ 2023. 3. 1. 00:16

 

쿠버네티스를 사용하는 이유는

병목지점, 스케일링에 대한 경험이 부족으로 서버 장애 대응에 대한 어려움

서비스 개수에 따른 인프라 관리의 어려움

등을 줄이기 위해서이다.

 

오토스케일에 대한 조대협 남의 블로그 참조

https://bcho.tistory.com/1349

 

쿠버네티스 #26 - 오토스케일러

#26 쿠버네티스 오토 스케일러조대협 (http://bcho.tistory.com) 쿠버네티스에서는 리소스 부족을 처리하기 위해서, 오토 스케일러를 사용할 수 있다.. 쿠버네티스는 용도에 따라 몇가지 다른 오토스케

bcho.tistory.com

 


 

<쿠버네티스 사용방법>

 

로컬에서 쿠버네티스 사용 방법.

 

1. Dockr Desktop 설치

2. 설정에서 쿠버네티스 inable 처리

 

 

<쿠버네티스 클러스터 상태 확인>

$ kubectl get po -A

 

<쿠버네티스 대시보드 설치>

https://kubernetes.io/ko/docs/tasks/access-application-cluster/web-ui-dashboard/

 

쿠버네티스 대시보드를 배포하고 접속하기

웹 UI(쿠버네티스 대시보드)를 배포하고 접속한다.

kubernetes.io

 

<도커빌드>

$ docker build -t 이름~

도커 로그인(docker hub 회원가입 되어있어야 함) 후 push 진행.

 

 

<Pod 만들기>

쿠버네티스 대시보드에 신규리소스 생성에 추가하기

apiVersion:v1
kind:Pod
metadata:
	name:test-pod
    labels:
    	app : hellotest
spec:
	containers:
    - name : hello-container
      image : 도커허브이미지이름
      ports:
      - containerPort:8000

 

<포트포워딩>

$ kubectl port-forward serviece/hellotest 8200:8200

로컬 8200이랑 k8s 8200이랑 연결

 

 


 

<쿠베플로우>

 

기능

① 조합가능성 : 머신러닝 실무자들에게 익숙한 데이터 과학 도구를 사용. 기계학습의 특정 단게를 용이하게 하기 위해 독립적으로 사용되거나, 엔드 투 엔드 파이프라인을 형성하기 위해 함께 구성될 수 있음.

 

② 이식성 : 컨터에너 기반 설계를 갖추고, 쿠버네티스 및 클라우드 네이티브 아키텍쳐를 활용함으로써 쿠베플로우는 특정 개발환경에 종속될 필요가 없다.

 

③ 확장성 : 쿠버네티스를 사용하면 기본 컨테이너와 기계의 수, 크기를 변경하여 클러스터의 요구에 따라 동적으로 확장 할 수 있음.

 

쿠베플로우 아키텍쳐 (출처 : 쿠베플로우 공식문서)

 

 

ML 워크플로우 (출처 : 쿠베플로우 공식문서)

 

  • 실험 단계에서는 초기 가정을 기반으로 모델을 개발하고 모델을 반복적으로 테스트 및 업데이트.
    • ML 시스템으로 해결하려는 문제를 식별.
    • ML 모델을 교육하는 데 필요한 데이터를 수집하고 분석.
    • ML 프레임워크와 알고리즘을 선택하고 모델의 초기 버전을 코딩.
    • 데이터를 실험하고 모델을 훈련.
    • 모델 하이퍼파라미터를 조정.
  • 프로덕션 단계.
    • 훈련 시스템에 필요한 형식으로 데이터를 변환. 학습 및 예측 중에 모델이 일관되게 작동하도록 하려면 실험 및 생산 단계에서 변환 프로세스가 동일해야한다.
    • ML 모델 학습.
    • 온라인 예측 또는 배치 모드 실행을 위해 모델 제공.
    • 모델의 성능을 모니터링하고 모델 조정 또는 재교육을 위해 프로세스에 결과를 제공.

 

 

<쿠베플로우 개념>

- Central dashboard : 주요 인터페이스로 작업 관리, 모니터링, 디버깅, 데이터/모델/평가 시각화를 위한 통합 프론트

- Jypytor : 프로토타이핑, 실험

- Kubeflow pipelines Argo : 공유 구성 프레임워크 및 작업 오케스트레이션

 

: 쿠페를로우 파이프라인 만들기

my_python_func 함수에 @kfp.dsl.python_component 데코레이터를 적어주면 파이프라인 컴포넌트로 추상화가 됨.

dsl 데코레이터가 적힌 파이썬 함수를(추상화된) 도커 이미지로 패키징.

이 도커 이미지를 활용해 쿠베플로우 컴포넌트를 만들 수 있음.

마지막으로 @kfp.dsl.pipeline 데코레이터를 활용해 해당 쿠베플로우 컴포넌트를 파이프라인으로 패키징.

파이프라인 구성을 완료하면 yaml 형태로 패키징 (파이프라인 컴파일).

이 yaml 파일을 쿠베플로우 대시보드에 업로드하거나 kfctl 명령으로 업로드.

 

- Katlib : 모델튜닝(AutoML)을 담당. 

- TFJobs : 비동기 학습, 오프라인 추론 시 사용.

- KFServing : 온라인 인퍼런스 서버.

- MinIO : 파이프라인 간 저장소 기능, 컴포넌트간 통신, 학습 결과물 저장, 중간 체크포인트 저장 등.

 

 

 


 

<파이썬 파일 방식 컴포넌트 예제 코드>

import kfp

KUBEFLOW_HOST = "http://127.0.0.1:8080/pipeline"

def hello_world_component():
	ret = "Hello World!"
    print(ret)
    return ret
    
@kfp.dsl.pipeline(name="hello_pipeline", description="Hello World Pipeline!")
def hello_world_pipeline():
	hello_world_op = kfp.components.func_to_container_op(hello_world_component)
    _=hello_world_op()
    
if__name__=="__main__":
	kfp.compiler.Compiler().compile(hello_world_pipeline, 'hello-world-pipeline.zip')
    kfp.Client(host=KUBEFLOW_HOST).create_run_from_pipeline_func(
    	hello_world_pipeline,
        arguments={},
        experiment_name="hello-world-experiment")

 

<Bash 컴포넌트 예제코드>

import kfp
from kfp import dsl

BASE_IMAGE = "library/bash:4.4.23"
KUBEFLOW_HOST = "http://127.0.0.1:8080/pipeline"

def echo_op():
	return dsl.ContainerOP(
    	name="echo",
        image=BASE_IMAGE,
        command=["sh", "-c"],
        arguments=['echo "hello world"'],
     )
     
@dsl.pipeline(name="hello_world_bash_pipeline", description="A hello world pipeline.")
def hello_world_bash_pipeline():
	echo_task = echo_op()
    
if__name__=="__main__":
	kfp.compiler.Compiler().compile(hello_world_bash_pipeline,__file__+".zip")
    kfp.Client(host=KUBEFLOW_HOST).create_run_from_pipeline_func(
    	hello_world_bash_pipeline,
        arguments={},
        experiment_name="hello-world-bash-experiment",
    )

 

 

<Add 컴포넌트 예시>

import kfp
from kfp import components
from kfp import dsl

EXPERIMENT_NAME = 'Add number pipeline' #UI 실험 이름
BASE_IMAGE = "python:3.7"
KUBEFLOW_HOST="http://127.0.0.1:8080/pipeline"

@dsl.python_component(
	name='add_op',
    description='adds two numbers',
    base_image=BASE_IMAGE 
)

def.add(a:float, b:float) => float:
	'''두 인자 더하는 계산'''
    print(a,'+',b,'=',a+b)
    return a+b
    
 #함수를 파이프라인으로 변환
 add_op = components.func_to_container_op(
 	add,
    base_image = BASE_IMAGE,
 )
 
 @dsl.pipeline(
 	name = 'Calculation pipeline',
    description = 'A toy pipeline that performs arithmetic calculations.'
 )
 
 def calc_pipeline(
 	a:float = 0,
    b:float = 7
 ):
 
 	#작업 인수로 파이프라인 매개변수 및 상수 값 전달
    add_task = add_op(a,4)
    add_2_task = add_op(a,b)
    add_3_task = add_op(add_task.output, add_2_task.output)
    
if__name == "__main__":
	arguments = {'a':'7', 'b':'8'}
    #파이프라인 함수 정의가 지정된 파이프라인 실행 시작
    kfp.Client(host=KUBEFLOW_HOST).create_run_from_pipeline_func(
    	calc_pipeline,
        arguments=arguments,
        experiment_name = EXPERIMENT_NAME
    )

 

<문장이어붙이기 예제>

import kfp
from kfp import dsl

EXPERIMENT_NAME = 'Parallel execution'
BASE_IMAGE = "python:3.7"
KUBEFLOW_HOST = "http://127.0.0.1:8080/pipeline"

def gcs_download_op(url):
	return dsl.ContainerOp(
    	name = 'GCS-Download', #Google Cloud Storage
        image='google/cloud-sdk:272.0.0',
        command = ['sh', '-c'],
        arguments = ['gsutil cat $0 | tee $1', url, '/tmp/results.txt'],
        file_outputs={
        	'data':'/tmp/results.txt'
        }
    )
    
def echo2_op(text1, text2):
	return dsl.ContainerOp(
    	name='echo',
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        arguments=['echo "Text 1:$0"; echo "Text 2:$1"',text1,text2]
    )
    
@dsl.pipline(
	name = 'Parallel pipeline',
    decription ='Download two messages in parallel and prints the concatenated result.'
)

def download_and_join(
	url1='gs://ml-pipeline-playgroud/shakespeare1.txt',
    url2='gs://ml-pipeline-playgroud/shakespeare2.txt',
):
	'''세 단계 파이프라인 첫번째 두번째 문장 이어붙이기'''
    
    download1_task = gcs_download_op(url1)
    download2_task = gcs_download_op(url2)
    
    echo_task = echo2_op(download1_task.output, download2_task.output)
    
if__name__=='__main__':
	#kfp.compiler.Compiler().compile(download_and_join, __file__+'.zip')
    kfp.Client(host=KUBEFLOW_HOST).create_run_from_pipeline_func(
    	download_and_join,
        arguments={},
        experiment_name=EXPERIMENT_NAME
    )

 

 

 

 


 

 

<동전던지기 방식을 통한 랜덤 조건 예제>

EXPERIMENT_NAME='Control Structure'
BASE_IMAGE="python:3.7"
KUBEFLOW_HOST="http://127.0.0.1:8080/pipeline"

import kfp
from kfp import dsl
from kfp.components import func_to_container_op, InputPath, OutputPath

@func_to_container_op
def get_random_int_op(mininum:int, maximum:int)->int:
	'''최소 최대 사이 랜덤 값 생성'''
    import random
    result=random.randint(mininum, maximum)
    print(result)
    return result
    
@func_to_container_op
#동전 앞뒷면
def flip_coin_op()=>str:
	import random
    result=random.choice(['heads','tails'])
    print(result)
    return result
    
@func_to_container_op
def print_op(message:str):
	print(message)
    

@dsl.pipeline(
	name='Conditional execution pipeline'
    description='Show how to use dsl.Condition().'
)

#컴포넌트간 조건이라서 if elif 말고 다른방식
def flipcoin_pipeline():
	flip=flip_coin_op()
    with dsl.Condition(flip.output=='heads'):
    	random_num_head = get_random_int_op(0,9)
        with dsl.Condition(random_num_head.output > 5):
        	print_op('heads and %s > 5!' % random_num_head.output)
        with dsl.Condition(random_num_head.output <= 5):
        	print_op('heads and %s <= 5!' % random_num_head.output)
    
    with dsl.Condition(flip.output=='tails'):
    random_num_tail = get_random_int_op(10,19)
        with dsl.Condition(random_num_tail.output > 15):
        	print_op('tails and %s > 15!' % random_num_tail.output)
        with dsl.Condition(random_num_tail.output <= 5):
        	print_op('tails and %s <= 15!' % random_num_tail.output)
           
           
@func_to_container_op
def fail_op(message):
    import sys
    print(message)
    sys.exit(1)

 

 

'스터디 > AI' 카테고리의 다른 글

[실무] bentoML 라이브러리  (0) 2023.03.04
[실무] 쿠베플로우 ML 파이프라인 예제2  (0) 2023.03.03
[실무] WIT 모델 분석  (2) 2023.02.26
[실무] TFDV 데이터 검증  (0) 2023.02.24
[실무] 코드관리  (0) 2023.02.23