My Profile Photo

DongChanS's blog


수학과 학생의 개발일지


Studyjam 2) Lab 4 - Operationalize the model

이전(Lab3) 까지는 적절한 머신러닝 모델을 만드는 데 주력했습니다.

하지만, 데이터의 크기가 엄청 크다면 BigQuery를 통해서 데이터를 처리하고, 텐서플로우를 통해서 모델을 학습하는 것이 상당히 힘들 수 있습니다.

이번 시간에는 큰 데이터로부터 머신러닝 모델을 잘 조작할 수 있도록 하는 기반작업을 할 것입니다.

Operationalize the model

1-1. Steps to do

  • Create data pipeline
  • Train model & ML engine
  • Serve model using rest API
  • Deploy app & app engine

이번에 할 것 : creating two datasets, train & evaluation

=> 그것을 위해서 Cloud Dataflow라는 녀석을 사용할 것임.

1-2. Key benefits of using Cloud Dataflow

  • 병렬로 데이터 처리 & 변환 가능
  • streaming & batch job을 지원함.

=> 모델 로직을 변경하지 않고 모델 훈련과 서빙을 둘 다 할 수 있음.

1-3. Apache beam

아파치 빔은 다음 기능들을 전부 할 수 있는 녀석입니다.

  • batch and streaming data parallel processing pipelines

  • set of language specific SDKs for constructing

    pipelines

  • runners for executing them on distributed processing backends

1-4. Cloud Dataflow

  • 역할 : apache beam API의 코드를 작동시키게 함.

  • 작동 로직 예시

    ex) 데이터의 포맷을 다른 포맷으로 바꾸는 작업

    1. Data in data warehouse(ex, BigQuery)
    2. 받은 데이터를 파이프라인을 이용하여 변환.
    3. 결과를 Google Cloud Storage에 저장

    => 이것을 가능하게 해주는 플랫폼이 Cloud Dataflow입니다.

  • 자바와 파이썬 둘다 사용가능함.

  • Serverless framework

    The open-source, application framework to easily build serverless architectures on AWS Lambda & more. Startups and Fortune 500 companies are using it to build incredibly efficient applications.

    요약 : 쉽게 Serverless 아키텍쳐를 빌드할 수 있는 프레임워크

    • Serverless 아키텍쳐

      특정 작업을 수행하기 위해 컴퓨터 & 가상머신에 서버를 설정하는 것이 아니라, Baas, Faas에 의존하여 작업함.

      • Baas (Backend as a Service)

        Firebase와 같이 백엔드 기능을 대신할 수 있는 서비스를 의미함. (데이터베이스, 소셜서비스 연동, 파일시스템 등을 지원받음)

      • FaaS (Function as a Service)

        프로젝트를 여러 함수로 쪼개서, 이러한 함수들을 분산 컴퓨팅 자원에(ex, AWS lambda)등록하고 호출하는 방식

    • fully managed offering from Google that allows you to execute Data Processing Pipelines at scale

      => 구글이 서버 클러스터를 관리하기 때문에 프로그래밍에 전념할 수 이씁니다. (서버리스 구조의 장점)

1-5. Apache beam을 작성하는 방식

p = beam.Pipeline() # 파이프라인 정의
(p
	| beam.io.ReadFromText('gs://..') 
 		# 구글 스토리지에서 가져옴
	| beam.Map(transform) # 데이터 변경 작업
 		# Parallel tasks에 의해서 자동으로 스케일링됨.
 	| beam.GroupByKey() # Groupization, merge operation
 	| beam.FlatMap(Filter) # Filter
 	| beam.io.WriteToText('gs://...') # 구글 스토리지에 저장
)
p.run();

이런 코드 구현 방식은 pipe implementation과 pipe execution이 구분되어 있음. run 메소드를 실행해야만 pipe가 실행됨.

real-time & batch 작업을 할 때도 코드는 유사합니다.

=> Dataflow를 이용하여 이러한 작업을 하면, 대시보드를 볼 수 있습니다.

p = beam.Pipeline() # 파이프라인 정의
(p
	| beam.io.ReadStringsFromPubSub('project/topic')
 	| beam.WindowInto(SlidingWindows(60))
	| beam.Map(transform) # 데이터 변경 작업
 	| beam.GroupByKey() # Groupization, merge operation
 	| beam.FlatMap(Filter) # Filter
 	| beam.io.WriteToBigQuery(table)
)
p.run();

beam.io 메서드에는 여러 connector로 부터 데이터의 io작업을 할 수 있습니다. (ex, 파일시스템, 클라우드, 스트림..)

ex) PubSub Conntector는 메시지를 스트림으로부터 읽을 수 있음

1-6. An example Beam pipeline for BigQuery -> CSV on cloud

import apache_beam as beam
def transform(rowdict):
    import copy
    result = copy.deepcopy(rowdict)
    if rowdict['a'] > 0:
        result['c'] = result['a'] + result['b']
        yield '.'.join([str(result[k])
                       if k in result else 'None'
                       for k in ['a','b','c']])

if __name__ == '__main__':
    p = beam.Pipeline(argv=sys.argv)
    selquery = 'SELECT a,b FROM someds.sometable'
    (p
    	| beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql=True))
    	| beam.Map(transform_data)
     	| beam.io.WriteToText('gs://...')
    )
    p.run()
  1. 임포트
  2. transform 함수를 제작
  3. beam pipeline
    • BigQuery에서 sql 쿼리문을 통해서 데이터를 읽어옴.
    • dataflow runner??

파이프라인을 실행하기 위해서 main() 메서드를 터미널에서 실행하면 됩니다.

python ./et1.py # 로컬에서 실행
	# 로컬에서는 DirectflowRunner를 사용
python ./et1.py \ # cloud에서 실행 : cloud 파라미터
	 --project=$PROJECT \
	 --job_name=myjob \
	 --staging_location=gs://$BUCKET/staging/ \
	 --temp_location=gs://$BUCKET/staging \
	 # 임시 파일을 저장할 클라우드 stroage bucket 지정
	 --runner=DataflowRunner
	 # 클라우드에서 쓰려면 DataflowRunner를 써야함.
comments powered by Disqus