본문 바로가기

OpenSource(Bigdata&distribution&Network)/Spark

spark + cassandra 연동

개놈에 카산드라 -- 연동 짜증난다 ㅋ

Hbase 연동을 먼저좀 해보려고 하다가 실패를 하고 짜증나서 카산드라 연동좀 시도해보고 성공후에 기록남김.


spark 버전 : 1.6.1 ( 하둡 2.7.2를 사용해서 하둡용으로 컴파일된 버전사용 )

cassandra 버전 : 3.4 

cassandra spark java connector 버전 : 1.5버전 사용.

node 수 : 6


cassandra 설치는 쉬우므로 내가 기록할 정보만 기록한다.

나는 6대로 계속 테스트중이므로~~~


참고로 모든 노드의 스펙은 아래와 같다. ( vmware 스펙 ㅋ )


카산드라 설정파일 작업부분.

위의 VM들을 가지고 모든 노드에 각각 아이피를 할당하여 설정한 설정정보만 남긴다.


카산드라의 분산및 리플리케이션을 위한 구성을 위해서 건드려야할 파일리스트 


cassandra.yaml

* 변경위치 : 

- listen_address : 해당노드의 아이피

- rpc_address : 해당노드의 아이피

- seed_provider의 seeds 부분 : 연결된노드 아이피들~~ "111.111.111.111,222.222.222.222" 이런식으로.


cassandra-env.sh

* 변경위치 :

- JVM_OPTS의 hostname 부분 : 해당노드의 아이피


여기까지 변경하고 cassandra를 실행하면 1개의 datacenter 에 1개의 rack 으로 해서 6개의 노드가 생겨지더라.


아래 두개파일로 datacenter 는 몇개로 할지 rack은 몇개로 할지 등을 설정.


cassandra-rackdc.properties


cassandra-topology.properties


카산드라 노드 연결된 상태 확인.

[hadoop@os1 bin]$ ./nodetool status

Datacenter: datacenter1

=======================

Status=Up/Down

|/ State=Normal/Leaving/Joining/Moving

--  Address        Load       Tokens       Owns (effective)  Host ID                               Rack

UN  192.168.0.111  201.03 KB  256          49.5%             e13656b7-1436-4011-bd1d-01042a2d75fc  rack1

UN  192.168.0.112  205.89 KB  256          47.7%             09ef262e-9013-4292-9ea8-98a32ebdadc1  rack1

UN  192.168.0.113  253.59 KB  256          47.7%             bad6d57a-3135-4dff-b546-21887d77aca6  rack1

UN  192.168.0.114  198.71 KB  256          49.3%             c2e5c7bc-b9a1-4751-9e6b-6179b20fc76f  rack1

UN  192.168.0.115  214 KB      256          53.8%             9f9d28e9-7e03-4653-a329-7aba65aaf5f0  rack1

UN  192.168.0.116  219.55 KB  256          52.1%             cffba9e0-19b3-4070-93ba-ecb4e6263e47  rack1


[hadoop@os1 bin]$ 


참고로 노드가 죽으면 DN 으로 바뀌더라;;;


이제 카산드라에 기본 데이터몇개 넣기.

* 설정파일이 로컬이아닌 아이피를 넣으니 cqlsh 만 치면 못찾더라 ㅋ

* 메뉴얼의 굵은글씨만 보고 일단 만들어서 실행 ㅋ


[hadoop@os1 bin]$ ./cqlsh 192.168.0.111

Connected to CASDR at 192.168.0.111:9042.

[cqlsh 5.0.1 | Cassandra 3.4 | CQL spec 3.4.0 | Native protocol v4]

Use HELP for help.

cqlsh> create keyspace keytest with replication = {'class' :'SimpleStrategy','replication_factor' : 3};

cqlsh>

cqlsh> use keytest;

cqlsh:keytest> create table test_table ( id varchar,name varchar,primary key(id));          

cqlsh:keytest> insert into test_table (id,name) values('2','fasdlkffasdffajfkls');

cqlsh:keytest> select *from test_table;


 id | name

----+-------------------------------

  3 | fasdlkfasdfasdfaffasdffajfkls

  2 |           fasdlkffasdffajfkls

  1 |     fasdlkfjafkljfsdklfajfkls


(3 rows)

cqlsh:keytest> 



* 현재 상황은 1~6번까지 노드가 있고 spark는 1번노드에 설치해놓음.

* 인터넷에 떠돌아 다니는 소스를 조합하여 몇몇가지 수정해서 실행함.

* 아래는 콘솔에서 짠 cassandra 테이블 카운트 세는 소스

* 아래소스보면 커넥터를 1.6과 1.5를 테스트하느라 둘다 쓴게 나오는데 1.6 , 1.5어느거 하나만 써도 상관없이 작동하니 신경쓰지말것.

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;


import java.util.ArrayList;

import java.util.List;


import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.Result;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.function.Function;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;


import com.datastax.spark.connector.japi.CassandraJavaUtil;

import com.datastax.spark.connector.japi.CassandraRow;




public class test3 {


        public static void main(String[] args)

        {


                SparkConf conf = new SparkConf(true)

                .set("spark.cassandra.connection.host","192.168.0.111")

                .setMaster("spark://192.168.0.111:7077")

                .setAppName("casr")

                .setJars(new String[]{"/apps/spark/lib/spark-cassandra-connector-java_2.10-1.6.0-M1.jar","/apps/spark/lib/spark-cassandra-connec

tor_2.10-1.6.0-M1.jar"})

                .setSparkHome("/apps/spark");



                JavaSparkContext sc = new JavaSparkContext(conf);


                sc.addJar("/home/hadoop/classes/cassandra-driver-core-3.0.0.jar");

                sc.addJar("/home/hadoop/classes/guava-19.0.jar");

                JavaRDD<CassandraRow> cassandraRdd = CassandraJavaUtil.javaFunctions(sc)

                        .cassandraTable("keytest", "test_table")

                        .select("name");


                                        System.out.println("row count:"+ cassandraRdd.count()); // once it is in RDD, we can use RDD 



               }

}


위의것을 컴파일후 실행하면 아래처럼 실행되더라.

[hadoop@os1 ~]$ java test3

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/home/hadoop/classes/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/hadoop/classes/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

16/04/08 00:53:19 INFO SparkContext: Running Spark version 1.6.1

16/04/08 00:53:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

16/04/08 00:53:20 INFO SecurityManager: Changing view acls to: hadoop

16/04/08 00:53:20 INFO SecurityManager: Changing modify acls to: hadoop

16/04/08 00:53:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)

16/04/08 00:53:21 INFO Utils: Successfully started service 'sparkDriver' on port 54330.

16/04/08 00:53:22 INFO Slf4jLogger: Slf4jLogger started

16/04/08 00:53:22 INFO Remoting: Starting remoting

16/04/08 00:53:22 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.0.111:34441]

16/04/08 00:53:22 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 34441.

16/04/08 00:53:22 INFO SparkEnv: Registering MapOutputTracker

16/04/08 00:53:22 INFO SparkEnv: Registering BlockManagerMaster

16/04/08 00:53:22 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-e91fb4d5-8f1e-45b5-9631-2af967164aff

16/04/08 00:53:22 INFO MemoryStore: MemoryStore started with capacity 1077.8 MB

16/04/08 00:53:22 INFO SparkEnv: Registering OutputCommitCoordinator

16/04/08 00:53:23 INFO Utils: Successfully started service 'SparkUI' on port 4040.

16/04/08 00:53:23 INFO SparkUI: Started SparkUI at http://192.168.0.111:4040

16/04/08 00:53:23 INFO HttpFileServer: HTTP File server directory is /tmp/spark-d02efafe-ae00-4eaa-9cfc-0700e5848dd3/httpd-c8f30120-f92a-4ec9-922f-c201e94998af

16/04/08 00:53:23 INFO HttpServer: Starting HTTP Server

16/04/08 00:53:23 INFO Utils: Successfully started service 'HTTP file server' on port 48452.

16/04/08 00:53:23 INFO SparkContext: Added JAR /apps/spark/lib/spark-cassandra-connector-java_2.10-1.6.0-M1.jar at http://192.168.0.111:48452/jars/spark-cassandra-connector-java_2.10-1.6.0-M1.jar with timestamp 1460044403426

16/04/08 00:53:23 INFO SparkContext: Added JAR /apps/spark/lib/spark-cassandra-connector_2.10-1.6.0-M1.jar at http://192.168.0.111:48452/jars/spark-cassandra-connector_2.10-1.6.0-M1.jar with timestamp 1460044403435

16/04/08 00:53:23 INFO AppClient$ClientEndpoint: Connecting to master spark://192.168.0.111:7077...

16/04/08 00:53:23 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20160408005323-0013

16/04/08 00:53:23 INFO AppClient$ClientEndpoint: Executor added: app-20160408005323-0013/0 on worker-20160407175148-192.168.0.111-57563 (192.168.0.111:57563) with 8 cores

16/04/08 00:53:23 INFO SparkDeploySchedulerBackend: Granted executor ID app-20160408005323-0013/0 on hostPort 192.168.0.111:57563 with 8 cores, 1024.0 MB RAM

16/04/08 00:53:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44940.

16/04/08 00:53:23 INFO NettyBlockTransferService: Server created on 44940

16/04/08 00:53:23 INFO BlockManagerMaster: Trying to register BlockManager

16/04/08 00:53:23 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.111:44940 with 1077.8 MB RAM, BlockManagerId(driver, 192.168.0.111, 44940)

16/04/08 00:53:23 INFO BlockManagerMaster: Registered BlockManager

16/04/08 00:53:23 INFO AppClient$ClientEndpoint: Executor updated: app-20160408005323-0013/0 is now RUNNING

16/04/08 00:53:24 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0

16/04/08 00:53:24 INFO SparkContext: Added JAR /home/hadoop/classes/cassandra-driver-core-3.0.0.jar at http://192.168.0.111:48452/jars/cassandra-driver-core-3.0.0.jar with timestamp 1460044404233

16/04/08 00:53:24 INFO SparkContext: Added JAR /home/hadoop/classes/guava-19.0.jar at http://192.168.0.111:48452/jars/guava-19.0.jar with timestamp 1460044404244

16/04/08 00:53:24 INFO NettyUtil: Found Netty's native epoll transport in the classpath, using it

16/04/08 00:53:25 INFO Cluster: New Cassandra host /192.168.0.111:9042 added

16/04/08 00:53:25 INFO Cluster: New Cassandra host /192.168.0.112:9042 added

16/04/08 00:53:25 INFO LocalNodeFirstLoadBalancingPolicy: Added host 192.168.0.112 (datacenter1)

16/04/08 00:53:25 INFO Cluster: New Cassandra host /192.168.0.113:9042 added

16/04/08 00:53:25 INFO LocalNodeFirstLoadBalancingPolicy: Added host 192.168.0.113 (datacenter1)

16/04/08 00:53:25 INFO Cluster: New Cassandra host /192.168.0.114:9042 added

16/04/08 00:53:25 INFO LocalNodeFirstLoadBalancingPolicy: Added host 192.168.0.114 (datacenter1)

16/04/08 00:53:25 INFO Cluster: New Cassandra host /192.168.0.115:9042 added

16/04/08 00:53:25 INFO LocalNodeFirstLoadBalancingPolicy: Added host 192.168.0.115 (datacenter1)

16/04/08 00:53:25 INFO Cluster: New Cassandra host /192.168.0.116:9042 added

16/04/08 00:53:25 INFO LocalNodeFirstLoadBalancingPolicy: Added host 192.168.0.116 (datacenter1)

16/04/08 00:53:25 INFO CassandraConnector: Connected to Cassandra cluster: CASDR

16/04/08 00:53:26 INFO SparkContext: Starting job: count at test3.java:44

16/04/08 00:53:26 INFO DAGScheduler: Got job 0 (count at test3.java:44) with 7 output partitions

16/04/08 00:53:26 INFO DAGScheduler: Final stage: ResultStage 0 (count at test3.java:44)

16/04/08 00:53:26 INFO DAGScheduler: Parents of final stage: List()

16/04/08 00:53:26 INFO DAGScheduler: Missing parents: List()

16/04/08 00:53:26 INFO DAGScheduler: Submitting ResultStage 0 (CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:15), which has no missing parents

16/04/08 00:53:26 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 7.3 KB, free 7.3 KB)

16/04/08 00:53:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.8 KB, free 11.1 KB)

16/04/08 00:53:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.111:44940 (size: 3.8 KB, free: 1077.7 MB)

16/04/08 00:53:26 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006

16/04/08 00:53:26 INFO DAGScheduler: Submitting 7 missing tasks from ResultStage 0 (CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:15)

16/04/08 00:53:26 INFO TaskSchedulerImpl: Adding task set 0.0 with 7 tasks

16/04/08 00:53:27 INFO SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (os1.local:56503) with ID 0

16/04/08 00:53:27 INFO BlockManagerMasterEndpoint: Registering block manager os1.local:53103 with 511.1 MB RAM, BlockManagerId(0, os1.local, 53103)

16/04/08 00:53:27 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, os1.local, partition 0,NODE_LOCAL, 26312 bytes)

16/04/08 00:53:27 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, os1.local, partition 1,NODE_LOCAL, 26222 bytes)

16/04/08 00:53:27 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 2, os1.local, partition 5,NODE_LOCAL, 26352 bytes)

16/04/08 00:53:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on os1.local:53103 (size: 3.8 KB, free: 511.1 MB)

16/04/08 00:53:31 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 3, os1.local, partition 2,ANY, 26312 bytes)

16/04/08 00:53:31 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 4, os1.local, partition 3,ANY, 21141 bytes)

16/04/08 00:53:31 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 5, os1.local, partition 4,ANY, 23882 bytes)

16/04/08 00:53:31 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, os1.local, partition 6,ANY, 10818 bytes)

16/04/08 00:53:32 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 1106 ms on os1.local (1/7)

16/04/08 00:53:33 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 5530 ms on os1.local (2/7)

16/04/08 00:53:33 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 4) in 1945 ms on os1.local (3/7)

16/04/08 00:53:33 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6015 ms on os1.local (4/7)

16/04/08 00:53:33 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 2) in 5959 ms on os1.local (5/7)

16/04/08 00:53:33 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 5) in 2150 ms on os1.local (6/7)

16/04/08 00:53:33 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 3) in 2286 ms on os1.local (7/7)

16/04/08 00:53:33 INFO DAGScheduler: ResultStage 0 (count at test3.java:44) finished in 7.049 s

16/04/08 00:53:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 

16/04/08 00:53:33 INFO DAGScheduler: Job 0 finished: count at test3.java:44, took 7.435243 s

16/04/08 00:53:34 INFO CassandraConnector: Disconnected from Cassandra cluster: CASDR

16/04/08 00:53:35 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.0.111:44940 in memory (size: 3.8 KB, free: 1077.8 MB)

row count:3 <--------- 요기 요거가 나온답 ㅋ

16/04/08 00:53:35 INFO BlockManagerInfo: Removed broadcast_0_piece0 on os1.local:53103 in memory (size: 3.8 KB, free: 511.1 MB)

16/04/08 00:53:35 INFO SparkContext: Invoking stop() from shutdown hook

16/04/08 00:53:35 INFO ContextCleaner: Cleaned accumulator 1

16/04/08 00:53:35 INFO SparkUI: Stopped Spark web UI at http://192.168.0.111:4040

16/04/08 00:53:35 INFO SparkDeploySchedulerBackend: Shutting down all executors

16/04/08 00:53:35 INFO SparkDeploySchedulerBackend: Asking each executor to shut down

16/04/08 00:53:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/04/08 00:53:35 INFO MemoryStore: MemoryStore cleared

16/04/08 00:53:35 INFO BlockManager: BlockManager stopped

16/04/08 00:53:35 INFO BlockManagerMaster: BlockManagerMaster stopped

16/04/08 00:53:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/04/08 00:53:35 INFO SparkContext: Successfully stopped SparkContext

16/04/08 00:53:35 INFO ShutdownHookManager: Shutdown hook called

16/04/08 00:53:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-d02efafe-ae00-4eaa-9cfc-0700e5848dd3/httpd-c8f30120-f92a-4ec9-922f-c201e94998af

16/04/08 00:53:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

16/04/08 00:53:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-d02efafe-ae00-4eaa-9cfc-0700e5848dd3

16/04/08 00:53:35 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

[hadoop@os1 ~]$


주의사항

위에 저 빨간 글씨인 row count 나오게 하려고 별짓을 다했다. ㅋ

이 클래스안에서 실행되는 spark와 cassandra는 새로 이 두놈에 접속을 하여 자바에서 설정된 환경설정을 따라서 

실행되는것으로 보인다.

java 소스상에 보면 setJars 를 통해서 jar파일위치를 추가해주었다.

그랬더니 길고긴 닭질을 끝내고 성공하게 됨;;;; 

그렇게 서버를 만져도 자꾸 망각하게되는 부분. ㅎㅎ;;;