본문 바로가기

OpenSource(Bigdata&distribution&Network)/Flume

[Flume]kafka를 Sink로 이용 & OS 변수 받아 오기

* 목적 : 여러대의 hadoop data node 에서 발생하는 정보를 한눈으로 보고싶다!

* 내용 : hadoop 대수가 늘어나면서 직접 해당 노드에 들어가서 상태를 보는것이 

          사실상 안된다고 보는게 맞는것 같다. 그래서 한곳으로 몰빵하는 작업을 시작 하기로 했음.


준비물 


* 장비 : 그냥 서버들 ( 내 상황은 운영중인 약 50대의 datanode 서버들 전부 ) 

* 소프트웨어 : 

flume 1.8



1. 서버에서 agent 형태로 실행해야하기때문에 shell 에서 설정한 변수를 가져올필요가 있었다

아래 ${~ } 이게 되네~ ㅋㅋ


하지만 알아둘것은 shell 내에서 변수를 선언하는게 아니라 export 까지 해야 인식한다.

그래서 conf 안에 있는 flume-env.sh 에다가 추가함.


* 1.6일때는 kafka가 없어서 custom 으로 만들어야 했으나 1.7부터는 kafka가 기본 제공

agent.source = seqGenSrc

agent.channels = memoryChannel

agent.sinks = k1


agent.sources.seqGenSrc.type = exec

agent.sources.seqGenSrc.command = tail -F 뭔가 떨어지는 로그

agent.sources.seqGenSrc.channels = memoryChannel


agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.k1.kafka.topic = topic-${HOSTNAME}

agent.sinks.k1.kafka.bootstrap.servers = data1:9092,data2:9092,data3:9092,data4:9092

agent.sink.k1.kafka.flumeBatchSize = 1000

agent.sink.k1.kafka.producer.acks = 1

agent.sinks.k1.kafka.producer.linger.ms = 10

agent.sinks.k1.channel = memoryChannel 

agent.channels.memoryChannel.type = memory 


agent.channels.memoryChannel.capacity = 10000

agent.channels.memoryChannel.transactionCapacity = 1000