본문 바로가기

OpenSource(Bigdata&distribution&Network)/Storm

storm Hello World

Storm 을 소개하는 글을 보면 보통 실시간 데이터 처리용 이라고들 하더라.

아무튼 어찌되었든 쓰긴할것처럼 보여서 일단 Hello World 찍어보고 .기본사용법만 익혀본다.


완전 java 로 만들어져있는것처럼 

다른 opensource 마냥 무언가 application 을 설치하는 형태라고 보긴 모호한것같다.



사용 버전 : 0.9.6 

사용노드 수 : 6대 ( 1 : nimbus , 5 : supervisor )

주키퍼 버전 : 3.4.6 (원래쓰던거임으로 ~ 일단 이건 패스)


수정한 설정파일 내역

주키퍼설정, 디렉토리 , 호트 , 슬롯포트 정도만 넣었다.

conf 디렉토리의 storm.yaml 파일만 수정하고 모든 노드에 동일하게 적용. 


########### These MUST be filled in for a storm configuration

storm.zookeeper.servers:

     - "192.168.0.112"

     - "192.168.0.113"

     - "192.168.0.114"

     - "192.168.0.115"

     - "192.168.0.116"



storm.local.dir: "/apps/strm/data"


nimbus.host: "192.168.0.111"

supervisor.slots.ports:

    - 6700

    - 6701

    - 6702

    - 6703


ui.port: 6799



설정후 백그라운드 실행을 하면

자바프로세스명으로 nimbus ( 마스터 노드 ? )

나머지 5대노드에선 (supervisor ) 라고 보일것이다.    

그리고 마스터 노드에선 ( nimbus 가 있는 노드 ) ui 도 띄웠다. 


전부 백드라운드로 실행함.  


UI 화면 ( http://192.168.0.111:6799/index.html )




HelloWorld 찍을 소스 ( 책보고 만듬 : 책이름 : 에이콘꺼 아파치 Storm을 이용한 분산 실시간 빅데이터 처리 - 도움된다. 적절히 ㅋ ) 

스톰의 HelloWorld 는 단어세기라고 한다. 우리 그 뭐시냐 빅데이터 RM 테스트할때도 단어세기가 대표적이니.이것도 마찬가지인듯.

(아래소스는 싱글노드용이나 마찬가지다. 클러스토로 할땐 다른클래스를 호출해서 써야하던데 그건 다음기록에;;;)


테스트한 단어 배열 ( 미국 국가라고 한다 - 검색해서 대충 복사함 )


Spout 부분

public class TestSpout extends BaseRichSpout 

{

private SpoutOutputCollector collector;

private String[] test_str = 

{

"Oh, say can you see, by the dawn’s early light,",

"What so proudly we hailed at the twilight’s last gleaming?",

"Whose broad stripes and bri",

"ght stars, through the perilous fight,",

"O’er the ramparts we watched, were so gallantly streaming?",

"And the rockets’red glare, the bombs bursting in air,",

"Gave proof through the night that our flag was still there.",

"Oh say, does that star spangled banner yet wave",

"O’er the land of the free and the home of the brave?"

};

private int index = 0;

/*  

    public static void main( String[] args )

    {

        System.out.println( "Hello World!" );

    }

*/

@Override

public void declareOutputFields(OutputFieldsDeclarer arg0) {

// TODO Auto-generaated method stub

System.out.println("sput declareoutputfields!!!");

arg0.declare(new Fields("sentence"));

}


@Override

public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {

// TODO Auto-generated method stub

System.out.println("sput open!!!");

this.collector = arg2;

}

@Override

public void nextTuple() {

// TODO Auto-generated method stub

System.out.println("sput nexttuple!!!");

this.collector.emit(new Values(test_str[index]));

index++;

System.out.println("sput nexttuple =======" + index);

System.out.println(index + "<<<<<<<<<<<<<<<<<<<=============================>>>>>>>>>>>>>>>>>>>>>" + test_str.length);

if ( index >= test_str.length)

{

index = 0;

System.out.println("sput nexttuple =======zero ===============" + 0);

}

waitForSeconds(1);

System.out.println("next tupple last");

}


public static void waitForSeconds(int seconds) {

        try {

            Thread.sleep(seconds * 1);

        } catch (InterruptedException e) {

        }

    }


}


Bolt부분 ( 문장배열 분리 )

public class TestBolt extends BaseRichBolt {


private OutputCollector collector;

@Override

public void declareOutputFields(OutputFieldsDeclarer arg0) {

// TODO Auto-generated method stub

System.out.println("==================== declare ====================");

arg0.declare(new Fields("word")); 

}

@Override

public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {

// TODO Auto-generated method stub

System.out.println("bolt prepare!!!");

this.collector = arg2;

}

@Override

public void execute(Tuple arg0) {

// TODO Auto-generated method stub

System.out.println("bolt execute!!!");

String sentence = arg0.getStringByField("sentence");

String[] words = sentence.split(" ");

for(String word: words)

{

this.collector.emit(new Values(word));

System.out.println("bolt execute!!! === >>>>>>>>>>>>>>>> " + word);

}

}


Bolt부분 두번째 ( 분리한 문장과 단어에서 단어 수 체크  작업 )

public class Test2Bolt extends BaseRichBolt {


private OutputCollector collector;

private HashMap<String,Long> counts = null;

@Override

public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {

// TODO Auto-generated method stub

System.out.println("test2 prepare");

this.collector = arg2;

this.counts = new HashMap<String, Long>();

}

@Override

public void execute(Tuple arg0) {

// TODO Auto-generated method stub

System.out.println("test2 execute");

String word = arg0.getStringByField("word");

Long count = this.counts.get(word);

if(count == null)

{

count = 0L;

}

count++;

this.counts.put(word, count);

this.collector.emit(new Values(word,count));

}



@Override

public void declareOutputFields(OutputFieldsDeclarer arg0) {

// TODO Auto-generated method stub

System.out.println("test2 declareoutput");

arg0.declare( new Fields("word","count"));

}


}



Bolt부분 세번째 ( 결과 출력 부분 )

public class ResultBolt extends BaseRichBolt {


private HashMap<String,Long> counts = null;

@Override

public void execute(Tuple arg0) {

// TODO Auto-generated method stub

String word = arg0.getStringByField("word");

Long count = arg0.getLongByField("count");

this.counts.put(word, count);

}


@Override

public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {

// TODO Auto-generated method stub

this.counts = new HashMap<String, Long>();

}


@Override

public void declareOutputFields(OutputFieldsDeclarer arg0) {

// TODO Auto-generated method stub

System.out.println("====================== This is result declare ================");

}

public void cleanup()

{

System.out.println("====================== Result ========================");

List<String> keys = new ArrayList<String>();

keys.addAll(this.counts.keySet());

Collections.sort(keys);

for (String key : keys)

{

System.out.println("======> " + key + "=======>>>>>>" + this.counts.get(key));

}

System.out.println("====================== Result ========================");

}


}




토폴로지 구현

이게 java 로 말하면 main 인듯.

토폴로지 부분을 보면 SPOUT 에들어간 인스턴스 넣고 , 이놈이 다음 볼트에 들어가고 , 그놈이 또 다음볼트 , 이놈이 또 다음볼트 이렇게 

들어가는것처럼 보인다.


해당 책을 가지고 있다면 글에도 써있겠지만 로컬테스트에선 이상없는데 난 일단 운영서버처럼 서버구성후 클러스터로 돌려서 그런지

이게 무한대로 돌더라~ 


그래서 아래 토폴로지 소스의 빨간색부분을 보면 이게 출력하는 매쏘드 인데.

이걸 직접 호출함. 아무튼 그런거같음 ㅋ


그러니 책있는 사람중에 노드 구현해서 테스트하는 사람은 . 저거 넣어줘야 출력이 될것임.


public class TestTolpo {


private static final String SENDENCE_SPOUT_ID = "test_spout_id";

private static final String SPLIT_BOLT_ID = "test_split_bolt_id";

private static final String COUNT_BOLT_ID = "test_count_bolt_id";

private static final String RESULT_BOLT_ID = "test_result_bolt_id";

private static final String TOPOL_NAME = "tpol_name";

public static void main(String[] args) throws Exception {

TestSpout ts = new TestSpout();

TestBolt tb1 = new TestBolt();

Test2Bolt tb2 = new Test2Bolt();

ResultBolt rs = new ResultBolt();

TopologyBuilder build = new TopologyBuilder();

System.out.println("======================== bolt set ========================== ");

build.setSpout(SENDENCE_SPOUT_ID, ts);

build.setBolt(SPLIT_BOLT_ID, tb1).shuffleGrouping(SENDENCE_SPOUT_ID);

build.setBolt(COUNT_BOLT_ID, tb2).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

build.setBolt(RESULT_BOLT_ID, rs).globalGrouping(COUNT_BOLT_ID);

Config cf = new Config();

System.out.println("======================== cluster regist ========================== ");

LocalCluster cluster = new LocalCluster();

cluster.submitTopology(TOPOL_NAME,cf,build.createTopology());

System.out.println("======================== submit ========================== ");

waitForSeconds(20);

System.out.println("======================== 10 s ========================== ");

cluster.killTopology(TOPOL_NAME);

rs.cleanup();

System.out.println("======================== kill ========================== ");

cluster.shutdown();

System.out.println("======================== shutdown ========================== ");

}

public static void waitForSeconds(int seconds) {

        try {

            Thread.sleep(seconds * 1000);

        } catch (InterruptedException e) {

        } 

    }

}




위의 소스를 가지고 jar로 만들어준다음 ( 난 maven 씀 )

storm 토폴로지에 등록시켜서 실행해본다.

 ./storm jar /home/hadoop/chapter1-0.0.1-SNAPSHOT.jar storm.blueprint.chapter1.TestTolpo 


이러면 자바로 뭐 막 이것저것 쭉쭉 등록하고 어쩌구 저쩌구 실행이 되는과정이 나온다. ㅋ 

======================== bolt set ==========================

======================== cluster regist ==========================

2126 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT

2130 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:host.name=os1.local

2130 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_77

2130 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.home=/apps/j2se/jre

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.class.path=/apps/strm/lib/storm-core-0.9.6.jar:/apps/strm/lib/ring-jetty-adapter-0.3.11.jar:/apps/strm/lib/clout-1.0.1.jar:/apps/strm/lib/joda-time-2.0.jar:/apps/strm/lib/

math.numeric-tower-0.0.1.jar:/apps/strm/lib/clojure-1.5.1.jar:/apps/strm/lib/jline-2.11.jar:/apps/strm/lib/compojure-1.1.3.jar:/apps/strm/lib/logback-core-1.0.13.jar:/apps/strm/lib/commons-codec-1.6.jar:/apps/strm/lib/jetty-6.1.26.jar:/apps/strm/lib/kr

yo-2.21.jar:/apps/strm/lib/ring-core-1.1.5.jar:/apps/strm/lib/disruptor-2.10.4.jar:/apps/strm/lib/commons-logging-1.1.3.jar:/apps/strm/lib/core.incubator-0.1.0.jar:/apps/strm/lib/logback-classic-1.0.13.jar:/apps/strm/lib/jetty-util-6.1.26.jar:/apps/str

m/lib/hiccup-0.3.6.jar:/apps/strm/lib/commons-fileupload-1.2.1.jar:/apps/strm/lib/objenesis-1.2.jar:/apps/strm/lib/commons-io-2.4.jar:/apps/strm/lib/json-simple-1.1.jar:/apps/strm/lib/commons-lang-2.5.jar:/apps/strm/lib/ring-devel-0.3.11.jar:/apps/strm

/lib/ring-servlet-0.3.11.jar:/apps/strm/lib/servlet-api-2.5.jar:/apps/strm/lib/tools.cli-0.2.4.jar:/apps/strm/lib/asm-4.0.jar:/apps/strm/lib/carbonite-1.4.0.jar:/apps/strm/lib/tools.logging-0.2.3.jar:/apps/strm/lib/tools.macro-0.1.0.jar:/apps/strm/lib/

slf4j-api-1.7.5.jar:/apps/strm/lib/snakeyaml-1.11.jar:/apps/strm/lib/clj-stacktrace-0.2.2.jar:/apps/strm/lib/clj-time-0.4.1.jar:/apps/strm/lib/chill-java-0.3.5.jar:/apps/strm/lib/reflectasm-1.07-shaded.jar:/apps/strm/lib/commons-exec-1.1.jar:/apps/strm

/lib/log4j-over-slf4j-1.6.6.jar:/apps/strm/lib/minlog-1.2.jar:/apps/strm/lib/jgrapht-core-0.9.0.jar:/home/hadoop/chapter1-0.0.1-SNAPSHOT.jar:/apps/strm/conf:/apps/strm/bin

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/tmp

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:java.compiler=<NA>

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:os.name=Linux

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:os.arch=amd64

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:os.version=3.10.0-327.10.1.el7.x86_64

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:user.name=hadoop

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:user.home=/home/hadoop

2131 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Client environment:user.dir=/apps/strm/bin

2144 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT

2145 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:host.name=os1.local

2145 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.version=1.8.0_77

2145 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.vendor=Oracle Corporation

2145 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.home=/apps/j2se/jre

2148 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.class.path=/apps/strm/lib/storm-core-0.9.6.jar:/apps/strm/lib/ring-jetty-adapter-0.3.11.jar:/apps/strm/lib/clout-1.0.1.jar:/apps/strm/lib/joda-time-2.0.jar:/a

pps/strm/lib/math.numeric-tower-0.0.1.jar:/apps/strm/lib/clojure-1.5.1.jar:/apps/strm/lib/jline-2.11.jar:/apps/strm/lib/compojure-1.1.3.jar:/apps/strm/lib/logback-core-1.0.13.jar:/apps/strm/lib/commons-codec-1.6.jar:/apps/strm/lib/jetty-6.1.26.jar:/app

s/strm/lib/kryo-2.21.jar:/apps/strm/lib/ring-core-1.1.5.jar:/apps/strm/lib/disruptor-2.10.4.jar:/apps/strm/lib/commons-logging-1.1.3.jar:/apps/strm/lib/core.incubator-0.1.0.jar:/apps/strm/lib/logback-classic-1.0.13.jar:/apps/strm/lib/jetty-util-6.1.26.

jar:/apps/strm/lib/hiccup-0.3.6.jar:/apps/strm/lib/commons-fileupload-1.2.1.jar:/apps/strm/lib/objenesis-1.2.jar:/apps/strm/lib/commons-io-2.4.jar:/apps/strm/lib/json-simple-1.1.jar:/apps/strm/lib/commons-lang-2.5.jar:/apps/strm/lib/ring-devel-0.3.11.j

ar:/apps/strm/lib/ring-servlet-0.3.11.jar:/apps/strm/lib/servlet-api-2.5.jar:/apps/strm/lib/tools.cli-0.2.4.jar:/apps/strm/lib/asm-4.0.jar:/apps/strm/lib/carbonite-1.4.0.jar:/apps/strm/lib/tools.logging-0.2.3.jar:/apps/strm/lib/tools.macro-0.1.0.jar:/a

pps/strm/lib/slf4j-api-1.7.5.jar:/apps/strm/lib/snakeyaml-1.11.jar:/apps/strm/lib/clj-stacktrace-0.2.2.jar:/apps/strm/lib/clj-time-0.4.1.jar:/apps/strm/lib/chill-java-0.3.5.jar:/apps/strm/lib/reflectasm-1.07-shaded.jar:/apps/strm/lib/commons-exec-1.1.j

ar:/apps/strm/lib/log4j-over-slf4j-1.6.6.jar:/apps/strm/lib/minlog-1.2.jar:/apps/strm/lib/jgrapht-core-0.9.0.jar:/home/hadoop/chapter1-0.0.1-SNAPSHOT.jar:/apps/strm/conf:/apps/strm/bin

2148 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib

2148 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.io.tmpdir=/tmp

2149 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.compiler=<NA>

2149 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.name=Linux

2149 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.arch=amd64

2149 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.version=3.10.0-327.10.1.el7.x86_64

2149 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.name=hadoop

2149 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.home=/home/hadoop

2149 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.dir=/apps/strm/bin

2993 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /tmp/0ff07ea0-be81-44fb-a9e1-c813c71bce61/version-2 snapdir /tmp/0ff07ea0-be81-44fb-a9e1-c813

c71bce61/version-2

3007 [main] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:2000

3014 [main] INFO  backtype.storm.zookeeper - Starting inprocess zookeeper at port 2000 and dir /tmp/0ff07ea0-be81-44fb-a9e1-c813c71bce61

3284 [main] INFO  backtype.storm.daemon.nimbus - Starting Nimbus with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization"

 true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reas

sign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.e

xecutor.send.buffer.size" 1024, "storm.local.dir" "/tmp/6ab89940-328b-4454-b360-4dbe4e7dabc3", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.

secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "192.168.0.111", "

storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.interv

alceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size

" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "d

rpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.



중간은 열라 길어서 패스 ~ ㅡ_ㅡ;;;;;;



====================== Result ========================

======> And=======>>>>>>1213

======> Gave=======>>>>>>1213

======> Oh=======>>>>>>1212

======> Oh,=======>>>>>>1213

======> O셞r=======>>>>>>2425

======> What=======>>>>>>1213

======> Whose=======>>>>>>1213

======> air,=======>>>>>>1213

======> and=======>>>>>>2425

======> at=======>>>>>>1213

======> banner=======>>>>>>1212

======> bombs=======>>>>>>1213

======> brave?=======>>>>>>1212

======> bri=======>>>>>>1213

======> broad=======>>>>>>1213

======> bursting=======>>>>>>1213

======> by=======>>>>>>1213

======> can=======>>>>>>1213

======> dawn셲=======>>>>>>1213

======> does=======>>>>>>1212

======> early=======>>>>>>1213

======> fight,=======>>>>>>1213

======> flag=======>>>>>>1213

======> free=======>>>>>>1212

======> gallantly=======>>>>>>1213

======> ght=======>>>>>>1213

======> glare,=======>>>>>>1213

======> gleaming?=======>>>>>>1213

======> hailed=======>>>>>>1213

======> home=======>>>>>>1212

======> in=======>>>>>>1213

======> land=======>>>>>>1212

======> last=======>>>>>>1213

======> light,=======>>>>>>1213

======> night=======>>>>>>1213

======> of=======>>>>>>2424

======> our=======>>>>>>1213

======> perilous=======>>>>>>1213

======> proof=======>>>>>>1213

======> proudly=======>>>>>>1213

======> ramparts=======>>>>>>1213

======> rockets셱ed=======>>>>>>1213

======> say=======>>>>>>1213

======> say,=======>>>>>>1212

======> see,=======>>>>>>1213

======> so=======>>>>>>2426

======> spangled=======>>>>>>1212

======> star=======>>>>>>1212

======> stars,=======>>>>>>1213

======> still=======>>>>>>1213

======> streaming?=======>>>>>>1213

======> stripes=======>>>>>>1213

======> that=======>>>>>>2425

======> the=======>>>>>>13339

======> there.=======>>>>>>1213

======> through=======>>>>>>2426

======> twilight셲=======>>>>>>1213

======> was=======>>>>>>1213

======> watched,=======>>>>>>1213

======> wave=======>>>>>>1212

======> we=======>>>>>>2426

======> were=======>>>>>>1213

======> yet=======>>>>>>1212

======> you=======>>>>>>1213

====================== Result ========================

55332 [Thread-4] INFO  backtype.storm.daemon.executor - Shut down executor test_result_bolt_id:[3 3]

55333 [Thread-4] INFO  backtype.storm.daemon.executor - Shutting down executor test_split_bolt_id:[4 4]

55334 [Thread-12-disruptor-executor[4 4]-send-queue] INFO  backtype.storm.util - Async loop interrupted!

55334 [Thread-13-test_split_bolt_id] INFO  backtype.storm.util - Async loop interrupted!

55335 [Thread-4] INFO  backtype.storm.daemon.executor - Shut down executor test_split_bolt_id:[4 4]

55336 [Thread-4] INFO  backtype.storm.daemon.executor - Shutting down executor test_spout_id:[5 5]

55339 [Thread-15-test_spout_id] INFO  backtype.storm.util - Async loop interrupted!

55339 [Thread-14-disruptor-executor[5 5]-send-queue] INFO  backtype.storm.util - Async loop interrupted!

55341 [Thread-4] INFO  backtype.storm.daemon.executor - Shut down executor test_spout_id:[5 5]

55342 [Thread-4] INFO  backtype.storm.daemon.executor - Shutting down executor __system:[-1 -1]

55342 [Thread-17-__system] INFO  backtype.storm.util - Async loop interrupted!

55343 [Thread-16-disruptor-executor[-1 -1]-send-queue] INFO  backtype.storm.util - Async loop interrupted!

55343 [Thread-4] INFO  backtype.storm.daemon.executor - Shut down executor __system:[-1 -1]

Storm 은개념이 안서면 쓰기힘들듯.