본문 바로가기

OpenSource(Bigdata&distribution&Network)/bigdata

spark + Hadoop + python ~ HelloWorld

서론.

Hbase에 접근하여 SQL형태로 데이터를 추출하는작업을 하고자 처음 사용한것은 phoneix ~

===> http://phoenix.apache.org/

관심 있는 사람은 들어가보겠지만

일단 난 갈아타기로 했다. 

이것저것 찾아보다가 SPARK를 발견.

나이가 좀있는사람은 알겠지만 국내 성인잡지중에 SPARK라고 있었다.

음~ -_-ㅋ;;; 난 그걸 얼굴에 철판을 깔고 당당히 서점에서 사서 본적은 있다.

물론 지금도 있는지 찾아볼필요도 없었고 궁금하지도 않다.;;;;;;;


아무튼 spark는 자바도 지원하고 겸사겸사 공부하려고하는 파이썬도 지원하고있다보니 관심있게보다가.

이쪽분야에서 고공분투하는 연구팀및 개발자분들이 극찬을 한것을 보고 이게모지??? 하면서 깔기시작하다가.


개인사정으로 구성만하고 이제서야 helloworld를 찍어보려고 한다.


본론 1

준비물 : hadoop ~ ㅋㅋㅋ

참고로 난 2.7.2 ( 1master + 5data node )


spark는 당연히 spark 사이트에서 받으면 되고 .

내가 이글을 쓴시점에 1.6.1이 나왔다. (내가 처음안건 1.5~)

우선뭐 ~ 

내려받아서 압축풀면된다.

난 바로 하둡에 붙일것이라 ~ 아예 hadoop에 맞게 컴파일이 된버전을 받았다.


1. 다운로드

스파크 주소 : http://spark.apache.org 

다운로드메뉴로 가면~


아래같은 모습을 볼것이고


난 2번항목에서 아래처럼 hadoop 용을 선택했다. ( 내가 설치한 hadoop은 2.7.2 이걸쓸때 최신버전이 또올라왔길래 바로 업그레이드)



본론 2

1. helloworld 찍어보기.

개발이면뭐 콘솔에 찍어보겠으나. 

이건 데이터니까~ 샘플파일하나 하둡에 넣고 그 파일의 row 갯수확인후 완료


2. hadoop에 아무 텍스트파일하나 만들어서 넣는다.

혹시모르니 hadoop에 파일넣는것도 같이 기록하련다. 난 돌대가리니까 ㅋ


$vi testman.txt 

나불나불~ 

나불나불~


저장후

$hadoop fs -mkdir /test <-- 디렉토리 생성

$hadoop fs -put /test testman.txt <-- OS파일을 HADOOP으로 밀어넣기.



$cd 스파크있는 home 경로

$./bin/pyspark 

Python 2.7.5 (default, Nov 20 2015, 02:00:19) 

[GCC 4.8.5 20150623 (Red Hat 4.8.5-4)] on linux2

Type "help", "copyright", "credits" or "license" for more information.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

16/03/15 22:20:55 INFO SparkContext: Running Spark version 1.6.1

16/03/15 22:20:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

16/03/15 22:20:57 INFO SecurityManager: Changing view acls to: hadoop

16/03/15 22:20:57 INFO SecurityManager: Changing modify acls to: hadoop

16/03/15 22:20:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)

16/03/15 22:20:58 INFO Utils: Successfully started service 'sparkDriver' on port 49115.

16/03/15 22:20:59 INFO Slf4jLogger: Slf4jLogger started

16/03/15 22:20:59 INFO Remoting: Starting remoting

16/03/15 22:21:00 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.0.111:41015]

16/03/15 22:21:00 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 41015.

16/03/15 22:21:00 INFO SparkEnv: Registering MapOutputTracker

16/03/15 22:21:00 INFO SparkEnv: Registering BlockManagerMaster

16/03/15 22:21:00 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-b918f3eb-05d7-4f85-a0cb-678f3327e8b6

16/03/15 22:21:00 INFO MemoryStore: MemoryStore started with capacity 511.5 MB

16/03/15 22:21:00 INFO SparkEnv: Registering OutputCommitCoordinator

16/03/15 22:21:01 INFO Utils: Successfully started service 'SparkUI' on port 4040.

16/03/15 22:21:01 INFO SparkUI: Started SparkUI at http://192.168.0.111:4040

16/03/15 22:21:01 INFO Executor: Starting executor ID driver on host localhost

16/03/15 22:21:01 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37043.

16/03/15 22:21:01 INFO NettyBlockTransferService: Server created on 37043

16/03/15 22:21:01 INFO BlockManagerMaster: Trying to register BlockManager

16/03/15 22:21:01 INFO BlockManagerMasterEndpoint: Registering block manager localhost:37043 with 511.5 MB RAM, BlockManagerId(driver, localhost, 37043)

16/03/15 22:21:01 INFO BlockManagerMaster: Registered BlockManager

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1

      /_/


Using Python version 2.7.5 (default, Nov 20 2015 02:00:19)

SparkContext available as sc, HiveContext available as sqlContext.

>>> xx = sc.hadoopFile("hdfs://192.168.0.111:9000/test/testman.txt","org.apache.hadoop.mapred.TextInputFormat","org.apache.hadoop.io.Text","org.apache.hadoop.io.LongWritable") <-- document 에 있는 문서보고 이렇게 쓰랜다 ;;; 그래서 우선 따라서 이대로 해봄

16/03/15 22:30:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 153.6 KB, free 153.6 KB)

16/03/15 22:30:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 167.5 KB)

16/03/15 22:30:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:37043 (size: 13.9 KB, free: 511.5 MB)

16/03/15 22:30:19 INFO SparkContext: Created broadcast 0 from hadoopFile at PythonRDD.scala:613

16/03/15 22:30:20 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 166.7 KB, free 334.2 KB)

16/03/15 22:30:20 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 13.0 KB, free 347.2 KB)

16/03/15 22:30:20 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:37043 (size: 13.0 KB, free: 511.5 MB)

16/03/15 22:30:20 INFO SparkContext: Created broadcast 1 from broadcast at PythonRDD.scala:570

16/03/15 22:30:21 INFO FileInputFormat: Total input paths to process : 1

16/03/15 22:30:21 INFO SparkContext: Starting job: take at SerDeUtil.scala:201

16/03/15 22:30:21 INFO DAGScheduler: Got job 0 (take at SerDeUtil.scala:201) with 1 output partitions

16/03/15 22:30:21 INFO DAGScheduler: Final stage: ResultStage 0 (take at SerDeUtil.scala:201)

16/03/15 22:30:21 INFO DAGScheduler: Parents of final stage: List()

16/03/15 22:30:21 INFO DAGScheduler: Missing parents: List()

16/03/15 22:30:21 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at PythonHadoopUtil.scala:181), which has no missing parents

16/03/15 22:30:21 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.3 KB, free 350.5 KB)

16/03/15 22:30:21 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1980.0 B, free 352.5 KB)

16/03/15 22:30:21 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:37043 (size: 1980.0 B, free: 511.5 MB)

16/03/15 22:30:21 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006

16/03/15 22:30:21 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at PythonHadoopUtil.scala:181)

16/03/15 22:30:21 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

16/03/15 22:30:21 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2144 bytes)

16/03/15 22:30:21 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

16/03/15 22:30:21 INFO HadoopRDD: Input split: hdfs://192.168.0.111:9000/test/testman.txt:0+60

16/03/15 22:30:21 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

16/03/15 22:30:21 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

16/03/15 22:30:21 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

16/03/15 22:30:21 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

16/03/15 22:30:21 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

16/03/15 22:30:22 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2193 bytes result sent to driver

16/03/15 22:30:22 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 502 ms on localhost (1/1)

16/03/15 22:30:22 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 

16/03/15 22:30:22 INFO DAGScheduler: ResultStage 0 (take at SerDeUtil.scala:201) finished in 0.544 s

16/03/15 22:30:22 INFO DAGScheduler: Job 0 finished: take at SerDeUtil.scala:201, took 0.755415 s

>>> 16/03/15 22:30:45 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:37043 in memory (size: 1980.0 B, free: 511.5 MB)

16/03/15 22:30:45 INFO ContextCleaner: Cleaned accumulator 2


>>> xx.count() <-- 방금 읽은거 갯수출력명령

16/03/15 22:32:49 INFO SparkContext: Starting job: count at <stdin>:1

16/03/15 22:32:49 INFO DAGScheduler: Got job 1 (count at <stdin>:1) with 2 output partitions

16/03/15 22:32:49 INFO DAGScheduler: Final stage: ResultStage 1 (count at <stdin>:1)

16/03/15 22:32:49 INFO DAGScheduler: Parents of final stage: List()

16/03/15 22:32:49 INFO DAGScheduler: Missing parents: List()

16/03/15 22:32:49 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[3] at count at <stdin>:1), which has no missing parents

16/03/15 22:32:49 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 6.0 KB, free 353.2 KB)

16/03/15 22:32:49 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.7 KB, free 356.9 KB)

16/03/15 22:32:49 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:37043 (size: 3.7 KB, free: 511.5 MB)

16/03/15 22:32:49 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006

16/03/15 22:32:49 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (PythonRDD[3] at count at <stdin>:1)

16/03/15 22:32:49 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks

16/03/15 22:32:49 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,ANY, 2144 bytes)

16/03/15 22:32:49 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, localhost, partition 1,ANY, 2144 bytes)

16/03/15 22:32:49 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)

16/03/15 22:32:49 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)

16/03/15 22:32:49 INFO HadoopRDD: Input split: hdfs://192.168.0.111:9000/test/testman.txt:0+60

16/03/15 22:32:49 INFO HadoopRDD: Input split: hdfs://192.168.0.111:9000/test/testman.txt:60+61

16/03/15 22:32:50 INFO PythonRunner: Times: total = 661, boot = 627, init = 33, finish = 1

16/03/15 22:32:50 INFO PythonRunner: Times: total = 633, boot = 616, init = 15, finish = 2

16/03/15 22:32:50 INFO Executor: Finished task 1.0 in stage 1.0 (TID 2). 2179 bytes result sent to driver

16/03/15 22:32:50 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 734 ms on localhost (1/2)

16/03/15 22:32:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2179 bytes result sent to driver

16/03/15 22:32:50 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 750 ms on localhost (2/2)

16/03/15 22:32:50 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 

16/03/15 22:32:50 INFO DAGScheduler: ResultStage 1 (count at <stdin>:1) finished in 0.752 s

16/03/15 22:32:50 INFO DAGScheduler: Job 1 finished: count at <stdin>:1, took 0.778757 s

27  <--- 결과가 출력됨.

>>>


결론 

HelloWorld 완료~ ㅋ


참고 : 

나같은경우엔 hadoop master에다가 spark를 같이 넣어서 하는중이라 주소가 같은데.

위에 pyspark 를 실행하면 http://192.168.0.111:4040/stages 웹관리자? 가 있다고 주저리주저리 하는걸 볼수있다.

써있기로는 SparkUI 라고하는데 일종의 모니터링툴이다.


아래처럼 깔끔한 UI에서 방금 하둡에 있는 파일을 읽은 내용이 나온다. 

아래 그림보면 count한것도 나온다~ 

오!!!!!!!!!! 멋찜~ ㅋㅋㅋㅋㅋㅋ 




내가 실행했던 count명령에 대한 상세내용~ 그림으로도 니가 이케했는데 이케댔어요~~~ 라고 이쁘장하게 잘나온다.

인터넷으로 돌아다니는 공개자료들중에 뭐 보면 몇노드에서 백억건을 읽는는데 몇분몇분 이러는 결과가 아래처럼 잘 나온다.

리포트로 쓰기에도 나쁘지않은 디자인이다 ㅋ