1. MySQL 테이블 조회해서 dataframe 으로 만들고 Hive 에넣기
var driver = "com.mysql.jdbc.Driver"
var url = "jdbc:mysql://hostname"
var username = "user"
var password = "pass"
val conn = new Properties()
conn.put("user", username)
conn.put("password",password)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.jdbc(url, "TABLE_NAME", conn)
df.write,mode("overwrite").saveAsTable("hive.table")
2. ELASTICSEARCH 조회해서 spark Temp Table (dataframe) 만들고 Hive 에 넣기
val conf = new SparkConf().setAppName("ela")
conf.set("spark.sql.catalogImplementation","hive")
conf.set("spark.master","spark://hostname:7077")
conf.set("spark.driver.allowMultipleContexts","true")
conf.set("es.index.auto.create","false")
conf.set("es.node.discovery","true")
conf.set("es.nodes","host,host,host")
val es_query = """
{
"query" : {
"bool" : {
"must" : [
{"range" : {
"@timestamp" : { "gte" : "2017-01-01T00:30:00" , "lte" : "2017-02-01T30:00" , "time_zone" : "+09:00"
}
}}
,
{ "bool" : {
"should" : [
{ "wildcard" : {
"msg.keyword" : { "value" : "message*" }
}}
]
}}
]
}
}
}
"""
conf.set("es.query", es_query)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("org.elasticsearch.spark.sql").load("elastic_index")
df.regsterTempTable("SPARK TEMP TABLE")
val ndf = sqlContext.sql("SELECT *FROM SPARK TEMP TABLE")
ndf.write.mode("overwrite").saveAsTable("hive table")
3. SPARK SQL ( hive ) 로 조회해서 HBASE - phoenix로 넣기
val res_sql = """
SELECT *FROM HIVE_TABLE
"""
val df = sqlContext.sql(res_sql)
val ar = df.collect.map( x => (x.getString(0) , x.getString(1))).toLIst
sc.parallelize(ar).saveToPhoenix(
"PHOENIX_TABLE",
Seq("col1","col2"),
zkUrl = Some("zookeeper_host:2181")
)
'OpenSource(Bigdata&distribution&Network) > Spark' 카테고리의 다른 글
[SPARK,KAFKA] spark stream 구성 (0) | 2018.05.29 |
---|---|
Spark2.1 Hive Auth Custom Test (0) | 2017.03.26 |
spark + cassandra 연동 (0) | 2016.04.08 |
spark + Hbase 연동에러 해결건. (0) | 2016.04.07 |