* 목적 : 여러대의 hadoop data node 에서 발생하는 정보를 한눈으로 보고싶다!
* 내용 : hadoop 대수가 늘어나면서 직접 해당 노드에 들어가서 상태를 보는것이
사실상 안된다고 보는게 맞는것 같다. 그래서 한곳으로 몰빵하는 작업을 시작 하기로 했음.
* 설치 내용은 이제 빼도 될만큼 잘구성되어있어서 설치는 쉬우니 패스;;;;
준비물
* 장비 : 서버 6대
* 소프트웨어 :
1. spark 2.2.1
2. kafka 0.10.x
3. zookeepr 3.4.11
4. flume 1.8
1. 구성
개발목적으로 제공되는 서버가 6대가 있어서 해당 서버를 가지고 구성을 하기때문에
여기저기에 중복설치를 하기로함.
내가 사용중인 구성은 아래와같다. ( spak stream 처리에 대한 내용이니 그거관련 만 기술함 )
2. SPARK STREAM
* 여러가지 테스트 케이스를 거쳐서 현재는 잘 작동 ( 원소스는 타자치기 귀찮아서 필요한것만 작성 )
* 개발하면서 발생한 문제및 참고사항 기록
1. kafka 를 최신 버전을 선택했다. 그러다보니 관련 라이브러리가 필요하다.
0.8 과 0.10 대를 나눠서 되는거를 구분했던데 아무튼 import 를 보면 알듯 10 대 소스를 가져와서 사용 .
참조 사이트 : http://spark.apache.org/docs/latest/streaming-kafka-integration.html
2. spark-shell 을 가지고 그냥 돌리다보니 기본으로 sc 가 생겨나니 그걸가지고 다시 ssc 생성
3. kafka에 topic을 노드수만큼 만들어두어서 모든 topic 에 대한 정보를 아래 rdd.foreach 부분에서
데이터를 처리한다.
4. 아래 소스중 accumulator 가 있다.
고민을 많이 했었다. 이것때문에
처리할 데이터가 많고 해서 spark를 cluster로 구성해서 여러노드에서 돌렸더니 문제가 발생했었다.
처리하고 난 데이터를 아래 rdd.foreach 상위인 dstream.foreachRDD 안에다가 배열선언하고 배열값을 받았더니 에러도 나고 안되기도 하고 문제였다.
그럴것이 각 다른노드에서 작업중인 데이터가 뭔가 특별한 장치가 없으면 값을 못받을것이다.
spark 문서에보니 accumulator를 제공했다.
참조 사이트 : http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.TopicPartition
import scala.util.matching.Regex
import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.{Map,SynchroizedMap,HashMap}
import scala.util.control.Breaks._
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "data1:9092,data2:9092,data3:9092,data4:9092",
"session.timeout.ms" -> 100000,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"enable.auto.commit" -> (true: java.lang.Boolean),
"group.id" -> "MERONG",
"auto.offset.reset" -> "latest" ,
"enable.auto.commit" -> (false : java.lang.Boolean)
)
val topicList = new ListBuffer[String]()
val topics = topicList.toList
val dstream = KafkaUtils.createDirectStream[String, String] (
ssc,
PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
dstream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val acObj = spark.sparkContext.collecitonAccumulator[String]("acObj")
rdd.foreach { res =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
val vTopic = o.topic
val vPartition = o.partition
val vsoffset = o.fromOffset
val veoffset = o.untilOffset
/* 데이터 처리하는 부분 시작 */
acObj.add(vsoffset)
println(res.value)
/* 데이터 처리하는 부분 시작 */
}
dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
ssc.start()
ssc.awaitTermination()
* scala 작업이 끝나고 spark master 있는 노드에서 실행!
* 아래는 spark-shell 돌린것
spark-shell --master spark://spark-master:7077 -i scala-file --verbose --executor-cores 4 --total-executor-core 32 --driver-memory 4G --executor-memory 4G --driver-java-options -Dcom.datastax.driver.FORCE_NIO=true
'OpenSource(Bigdata&distribution&Network) > Spark' 카테고리의 다른 글
[SPARK] scala 에서 데이처 처리한 것들 소스 ( MySQL , ES , Hive , Hbase(phoenix) ) (0) | 2018.01.09 |
---|---|
Spark2.1 Hive Auth Custom Test (0) | 2017.03.26 |
spark + cassandra 연동 (0) | 2016.04.08 |
spark + Hbase 연동에러 해결건. (0) | 2016.04.07 |