이전(Lab3) 까지는 적절한 머신러닝 모델을 만드는 데 주력했습니다.
하지만, 데이터의 크기가 엄청 크다면 BigQuery를 통해서 데이터를 처리하고, 텐서플로우를 통해서 모델을 학습하는 것이 상당히 힘들 수 있습니다.
이번 시간에는 큰 데이터로부터 머신러닝 모델을 잘 조작할 수 있도록 하는 기반작업을 할 것입니다.
Operationalize the model
1-1. Steps to do
- Create data pipeline
- Train model & ML engine
- Serve model using rest API
- Deploy app & app engine
이번에 할 것 : creating two datasets, train & evaluation
=> 그것을 위해서 Cloud Dataflow라는 녀석을 사용할 것임.
1-2. Key benefits of using Cloud Dataflow
- 병렬로 데이터 처리 & 변환 가능
- streaming & batch job을 지원함.
=> 모델 로직을 변경하지 않고 모델 훈련과 서빙을 둘 다 할 수 있음.
1-3. Apache beam
아파치 빔은 다음 기능들을 전부 할 수 있는 녀석입니다.
-
batch and streaming data parallel processing pipelines
-
set of language specific SDKs for constructing
pipelines
-
runners for executing them on distributed processing backends
1-4. Cloud Dataflow
-
역할 : apache beam API의 코드를 작동시키게 함.
-
작동 로직 예시
ex) 데이터의 포맷을 다른 포맷으로 바꾸는 작업
- Data in data warehouse(ex,
BigQuery
) - 받은 데이터를 파이프라인을 이용하여 변환.
- 결과를 Google Cloud Storage에 저장
=> 이것을 가능하게 해주는 플랫폼이 Cloud Dataflow입니다.
- Data in data warehouse(ex,
-
자바와 파이썬 둘다 사용가능함.
-
Serverless framework
The open-source, application framework to easily build serverless architectures on AWS Lambda & more. Startups and Fortune 500 companies are using it to build incredibly efficient applications.
요약 : 쉽게 Serverless 아키텍쳐를 빌드할 수 있는 프레임워크
-
특정 작업을 수행하기 위해 컴퓨터 & 가상머신에 서버를 설정하는 것이 아니라, Baas, Faas에 의존하여 작업함.
-
Baas (Backend as a Service)
Firebase와 같이 백엔드 기능을 대신할 수 있는 서비스를 의미함. (데이터베이스, 소셜서비스 연동, 파일시스템 등을 지원받음)
-
FaaS (Function as a Service)
프로젝트를 여러 함수로 쪼개서, 이러한 함수들을 분산 컴퓨팅 자원에(ex, AWS lambda)등록하고 호출하는 방식
-
-
fully managed offering from Google that allows you to execute Data Processing Pipelines at scale
=> 구글이 서버 클러스터를 관리하기 때문에 프로그래밍에 전념할 수 이씁니다. (서버리스 구조의 장점)
-
1-5. Apache beam을 작성하는 방식
p = beam.Pipeline() # 파이프라인 정의
(p
| beam.io.ReadFromText('gs://..')
# 구글 스토리지에서 가져옴
| beam.Map(transform) # 데이터 변경 작업
# Parallel tasks에 의해서 자동으로 스케일링됨.
| beam.GroupByKey() # Groupization, merge operation
| beam.FlatMap(Filter) # Filter
| beam.io.WriteToText('gs://...') # 구글 스토리지에 저장
)
p.run();
이런 코드 구현 방식은 pipe implementation과 pipe execution이 구분되어 있음. run 메소드를 실행해야만 pipe가 실행됨.
real-time & batch 작업을 할 때도 코드는 유사합니다.
=> Dataflow를 이용하여 이러한 작업을 하면, 대시보드를 볼 수 있습니다.
p = beam.Pipeline() # 파이프라인 정의
(p
| beam.io.ReadStringsFromPubSub('project/topic')
| beam.WindowInto(SlidingWindows(60))
| beam.Map(transform) # 데이터 변경 작업
| beam.GroupByKey() # Groupization, merge operation
| beam.FlatMap(Filter) # Filter
| beam.io.WriteToBigQuery(table)
)
p.run();
beam.io 메서드에는 여러 connector로 부터 데이터의 io작업을 할 수 있습니다. (ex, 파일시스템, 클라우드, 스트림..)
ex) PubSub Conntector는 메시지를 스트림으로부터 읽을 수 있음
1-6. An example Beam pipeline for BigQuery -> CSV on cloud
import apache_beam as beam
def transform(rowdict):
import copy
result = copy.deepcopy(rowdict)
if rowdict['a'] > 0:
result['c'] = result['a'] + result['b']
yield '.'.join([str(result[k])
if k in result else 'None'
for k in ['a','b','c']])
if __name__ == '__main__':
p = beam.Pipeline(argv=sys.argv)
selquery = 'SELECT a,b FROM someds.sometable'
(p
| beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql=True))
| beam.Map(transform_data)
| beam.io.WriteToText('gs://...')
)
p.run()
- 임포트
- transform 함수를 제작
- beam pipeline
- BigQuery에서 sql 쿼리문을 통해서 데이터를 읽어옴.
- dataflow runner??
파이프라인을 실행하기 위해서 main() 메서드를 터미널에서 실행하면 됩니다.
python ./et1.py # 로컬에서 실행
# 로컬에서는 DirectflowRunner를 사용
python ./et1.py \ # cloud에서 실행 : cloud 파라미터
--project=$PROJECT \
--job_name=myjob \
--staging_location=gs://$BUCKET/staging/ \
--temp_location=gs://$BUCKET/staging \
# 임시 파일을 저장할 클라우드 stroage bucket 지정
--runner=DataflowRunner
# 클라우드에서 쓰려면 DataflowRunner를 써야함.