2015년 7월 24일 금요일

Spark summit 2015의 Netflix 개선 사례 모의 구현하기

HOT cluster computing framework 인 SPARK에 대한 사용 경험을 바탕으로,
최근 spark summit 2015에 소개된 Netflix 개선 사례 모의 구현을 목표로 지속 갱신할 문서를 만든다.

필자가 참고한 architecture 문서는 다음과 같다.
https://spark-summit.org/2015/events/spark-and-spark-streaming-at-netflix/

빅(BIG)한 message event들이 발생하는 Neflix, LinkIn에서 검증된 메세지 브로커(message broker) Kafka와,
AWS(Amazon Web Service)의 HOT Storage Service S3 그리고 Spark Streaming에 대한 모의 기능 구현을 예상한다.

1. Kafka 의 message event 처리하기.

Producer를 통한 메세지 전송하기

In [1]:
from kafka import SimpleProducer, KafkaClient

# 생산자를 생성한다.
kafka = KafkaClient('cook:9092')
producer = SimpleProducer(kafka)

# 메세지를 바이트 코드로 던진다.
point = producer.send_messages(b'frog', b'fall in well.')
producer.send_messages(b'frog', '우물에 빠지다.'.encode('utf-8'))
Out[1]:
[ProduceResponse(topic=b'frog', partition=0, error=0, offset=101)]

Consumer를 통한 메세지 수신하기

In [2]:
from kafka import KafkaConsumer

# 소비자를 만든다.
consumer = KafkaConsumer('frog',
                         bootstrap_servers=['cook:9092'],consumer_timeout_ms=10000)
consumer.set_topic_partitions(("frog", 0, point[0].offset))
for i in consumer.fetch_messages():
    print(i.value.decode('utf-8'))
fall in well.
우물에 빠지다.

2. AWS S3 Bucket 사용하기

boto API를 위한 사전 정보 저장하기
In [3]:
!ls ~/.aws
config credentials

버킷 만들기

In [4]:
import boto3
client = boto3.client('s3')
# 버켓 생성
client.create_bucket(Bucket='frog1nwell')
Out[4]:
{'Location': '/frog1nwell',
 'ResponseMetadata': {'HTTPStatusCode': 200,
  'HostId': 'inuHkVNZZa3OdU1YqijMZXo1d01pFr0OC7QDnAOogkrCv+y9iwvEBoDrrSkVQU9czgE=',
  'RequestId': '09E1B2023C66504A'}}
In [5]:
# 버켓 리스트 보기
client.list_buckets()
Out[5]:
{'Buckets': [{'CreationDate': datetime.datetime(2015, 7, 24, 9, 8, 57, tzinfo=tzutc()),
   'Name': 'frog1nwell'},
  {'CreationDate': datetime.datetime(2015, 7, 1, 7, 47, 36, tzinfo=tzutc()),
   'Name': 'koala1nwell'}],
 'Owner': {'DisplayName': 'jerry',
  'ID': 'b31d91a3e2d0512c901d7fb8e0a7b60702478af922f2f08921237b16'},
 'ResponseMetadata': {'HTTPStatusCode': 200,
  'HostId': 'bBmMrGujXa5Stm3ZAuExl+ZQPo3O5h6ZcS8nHTslNHxU18BFj37pezxJsEOWW++kqFXg=',
  'RequestId': 'B20589EE8D9'}}

오브젝트 추가하기

파일 오브젝트를 버킷으로 업로드 하기.

In [6]:
%%writefile quote_file.txt
You make me to be complete.
당신은 나를 완성시켜요.
Overwriting quote_file.txt
In [7]:
with open('quote_file.txt', 'rb') as f:
    client.put_object(Bucket='frog1nwell',Key='quote_file.txt',Body=f)

스트링을 ByteIO를 이용해서, 오프젝트처럼 처리 하기

In [8]:
from io import BytesIO
bs = BytesIO('You make me to be complete.\n당신은 나를 완성시켜요.'.encode('utf-8'))
In [9]:
client.put_object(Bucket='frog1nwell',Key='quote_byteIO.txt',Body=bs)
Out[9]:
{'ETag': '"372ff0057f46e3d8302856328b3ebba5"',
 'ResponseMetadata': {'HTTPStatusCode': 200,
  'HostId': '+yI5rXDeAsXpc2gWTUWR23aWzlkNRFbBFXf5NHq0+k0bTvqJArfMu5M606VVgeeufnnseM=',
  'RequestId': '5847A4880F82B'}}

오프젝트 읽기.

In [10]:
c1 = client.get_object(Bucket='frog1nwell',Key='quote_file.txt')
c2 = client.get_object(Bucket='frog1nwell',Key='quote_byteIO.txt')
In [11]:
print('File:\n',c1['Body'].read().decode('utf-8'))
print('ByteIO:\n',c2['Body'].read().decode('utf-8'))
File:
 You make me to be complete.
당신은 나를 완성시켜요.
ByteIO:
 You make me to be complete.
당신은 나를 완성시켜요.

추가할 내용으로 다음을 주제를 구상 중에 있다.
* Kafka 와 Spark Streaming 의 처리 소개
* AWS의 S3 와 EMR 의 처리 소개
* Netflix 의 모의 구현을 바탕으로 실 데이터 분석 사례

0 개의 댓글:

댓글 쓰기