본문 바로가기

OpenSource(Bigdata&distribution&Network)/Spark

[SPARK,KAFKA] spark stream 구성

* 목적 : 여러대의 hadoop data node 에서 발생하는 정보를 한눈으로 보고싶다!

* 내용 : hadoop 대수가 늘어나면서 직접 해당 노드에 들어가서 상태를 보는것이 

          사실상 안된다고 보는게 맞는것 같다. 그래서 한곳으로 몰빵하는 작업을 시작 하기로 했음.

* 설치 내용은 이제 빼도 될만큼 잘구성되어있어서 설치는 쉬우니 패스;;;;


준비물 


* 장비 : 서버 6대

* 소프트웨어 : 

1. spark 2.2.1 

2. kafka 0.10.x

3. zookeepr 3.4.11

4. flume 1.8


1. 구성 

개발목적으로 제공되는 서버가 6대가 있어서 해당 서버를 가지고 구성을 하기때문에 

여기저기에 중복설치를 하기로함. 


내가 사용중인 구성은 아래와같다. ( spak stream 처리에 대한 내용이니 그거관련 만 기술함 )




2. SPARK STREAM 

* 여러가지 테스트 케이스를 거쳐서 현재는 잘 작동 ( 원소스는 타자치기 귀찮아서 필요한것만 작성 ) 


* 개발하면서 발생한 문제및 참고사항 기록 

1. kafka 를 최신 버전을 선택했다. 그러다보니 관련 라이브러리가 필요하다. 

   0.8 과 0.10 대를 나눠서 되는거를 구분했던데 아무튼 import 를 보면 알듯 10 대 소스를 가져와서 사용 .


   참조 사이트 : http://spark.apache.org/docs/latest/streaming-kafka-integration.html 


2. spark-shell 을 가지고 그냥 돌리다보니 기본으로 sc 가 생겨나니 그걸가지고 다시 ssc 생성 


3. kafka에 topic을 노드수만큼 만들어두어서 모든 topic 에 대한 정보를 아래 rdd.foreach 부분에서 

   데이터를 처리한다.


4. 아래 소스중 accumulator 가 있다. 

고민을 많이 했었다. 이것때문에 

처리할 데이터가 많고 해서 spark를 cluster로 구성해서 여러노드에서 돌렸더니 문제가 발생했었다. 

처리하고 난 데이터를 아래 rdd.foreach 상위인 dstream.foreachRDD 안에다가 배열선언하고 배열값을 받았더니 에러도 나고 안되기도 하고 문제였다. 


그럴것이 각 다른노드에서 작업중인 데이터가 뭔가 특별한 장치가 없으면 값을 못받을것이다. 


spark 문서에보니 accumulator를 제공했다.


참조 사이트 : http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints



import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.kafka.common.TopicPartition


import scala.util.matching.Regex

import org.apache.spark.storage.StorageLevel

import scala.collection.JavaConversions.asScalaBuffer

import scala.collection.mutable.ListBuffer

import scala.collection.mutable.{Map,SynchroizedMap,HashMap}

import scala.util.control.Breaks._


val ssc = new StreamingContext(sc, Seconds(2)) 

val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "data1:9092,data2:9092,data3:9092,data4:9092",

"session.timeout.ms" -> 100000,

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"enable.auto.commit" -> (true: java.lang.Boolean), 

"group.id" -> "MERONG", 

"auto.offset.reset" -> "latest" , 

"enable.auto.commit" -> (false : java.lang.Boolean)

)


val topicList = new ListBuffer[String]()

val topics = topicList.toList


val dstream = KafkaUtils.createDirectStream[String, String] (

ssc,

PreferConsistent,

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

)


dstream.foreachRDD { rdd => 


      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      

      val acObj = spark.sparkContext.collecitonAccumulator[String]("acObj")


      rdd.foreach { res => 

       val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)

 val vTopic = o.topic

 val vPartition = o.partition

 val vsoffset = o.fromOffset

 val veoffset = o.untilOffset 


 /* 데이터 처리하는 부분 시작 */

 acObj.add(vsoffset)

 println(res.value)

 /* 데이터 처리하는 부분 시작 */


      }


      dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

}


ssc.start()

ssc.awaitTermination()


* scala 작업이 끝나고 spark master 있는 노드에서 실행!

* 아래는 spark-shell 돌린것 

spark-shell --master spark://spark-master:7077 -i scala-file --verbose --executor-cores 4 --total-executor-core 32 --driver-memory 4G --executor-memory 4G --driver-java-options -Dcom.datastax.driver.FORCE_NIO=true