본문 바로가기

OpenSource(Bigdata&distribution&Network)/Spark

[SPARK] scala 에서 데이처 처리한 것들 소스 ( MySQL , ES , Hive , Hbase(phoenix) )

1. MySQL 테이블 조회해서 dataframe 으로 만들고 Hive 에넣기


var driver = "com.mysql.jdbc.Driver"

var url = "jdbc:mysql://hostname"

var username = "user"

var password = "pass"


val conn = new Properties()

conn.put("user", username)

conn.put("password",password)


val sqlContext = new SQLContext(sc)

val df = sqlContext.read.jdbc(url, "TABLE_NAME", conn)

df.write,mode("overwrite").saveAsTable("hive.table")




2. ELASTICSEARCH 조회해서 spark Temp Table (dataframe) 만들고 Hive 에 넣기


val conf = new SparkConf().setAppName("ela")

conf.set("spark.sql.catalogImplementation","hive")

conf.set("spark.master","spark://hostname:7077")

conf.set("spark.driver.allowMultipleContexts","true")

conf.set("es.index.auto.create","false")

conf.set("es.node.discovery","true")

conf.set("es.nodes","host,host,host")


val es_query = """

{

    "query" : {

        "bool" : { 

            "must" : [

               {"range" : {

                  "@timestamp" : { "gte" : "2017-01-01T00:30:00" , "lte" : "2017-02-01T30:00" , "time_zone" : "+09:00"

               }

         }}

         ,

         { "bool" : {

                "should" : [

                      { "wildcard" : {

                             "msg.keyword" : { "value" : "message*" }

                      }}

             ]

     }}

     ]

  }

  }

}

"""


conf.set("es.query", es_query)


val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)


val df = sqlContext.read.format("org.elasticsearch.spark.sql").load("elastic_index")

df.regsterTempTable("SPARK TEMP TABLE")

val ndf = sqlContext.sql("SELECT *FROM SPARK TEMP TABLE")

ndf.write.mode("overwrite").saveAsTable("hive table")




3. SPARK SQL ( hive ) 로 조회해서 HBASE - phoenix로 넣기 


val res_sql = """

SELECT *FROM HIVE_TABLE 

"""


val df = sqlContext.sql(res_sql)

val ar = df.collect.map( x => (x.getString(0) , x.getString(1))).toLIst

sc.parallelize(ar).saveToPhoenix(

     "PHOENIX_TABLE",

     Seq("col1","col2"),

     zkUrl = Some("zookeeper_host:2181")