當前位置:
首頁 > 知識 > 在scala中使用spark sql解決特定需求(2)

在scala中使用spark sql解決特定需求(2)

接著上篇文章,本篇來看下如何在scala中完成使用spark sql將不同日期的數據導入不同的es索引裡面。

首下看下用到的依賴包有哪些:


elasticsearch-spark-20_2.11 5.3.2
elasticsearch 2.3.4
spark-sql_2.11 2.1.0
spark-hive_2.11 2.1.0
spark-core_2.11 2.1.0
hadoop-client 2.7.3
scala-library 2.11.8

下面看相關的代碼,代碼可直接在跑在win上的idea中,使用的是local模式,數據是模擬造的:

import org.apache.spark.sql.types.{DataTypes, StructField}
import org.apache.spark.sql.{Row, SparkSession}//導入Row對象

/**
* spark sql to es 本地測試例子
*/
object SparkGroupES {

def main(args: Array[String]): Unit = {

//構建spark session
val spark = SparkSession
.builder.master("local[1]")
.appName("Spark SQL basic example")
.config("es.nodes","192.168.10.125").config("es.port","9200")
.getOrCreate

//導入es-spark的包
import org.elasticsearch.spark.sql._
import spark.implicits._

//使用Seq造數據,四列數據
val df = spark.sparkContext.parallelize(Seq(
(0,"p1",30.9,"2017-03-04"),
(0,"u",22.1,"2017-03-05"),
(1,"r",19.6,"2017-03-04"),
(2,"cat40",20.7,"2017-03-05"),
(3,"cat187",27.9,"2017-03-04"),
(4,"cat183",11.3,"2017-03-06"),
(5,"cat8",35.6,"2017-03-08"))

).toDF("id", "name", "price","dt")//轉化df的四列數據s
//創建表明為pro
df.createTempView("pro")

import spark.sql //導入sql函數

//按照id分組,統計每組數量,統計每組裡面最小的價格,然後收集每組裡面的數據
val ds=sql("select dt, count(*) as c ,collect_list(struct(id,name, price)) as res from pro group by dt ")
//需要多次查詢的數據,可以緩存起來
ds.cache
//獲取查詢的結果,遍歷獲取結果集
ds.select("dt","c","res").collect.foreach(line=>{
val dt=line.getAs[String]("dt") //獲取日期
val count=line.getAs[Long]("c")//獲取數量
val value=line.getAs[Seq[Row]]("res")//獲取每組內的數據集合,注意是一個Row實體
println("日期:"+dt+" 銷售數量: "+count)

//創建一個schema針對struct結構
val schema = DataTypes
.createStructType( Array[StructField](
DataTypes.createStructField("id", DataTypes.IntegerType, false), //不允許為null
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("price", DataTypes.DoubleType, true)
))
//將value轉化成rdd
val rdd=spark.sparkContext.makeRDD(value)
//將rdd註冊成DataFrame
val df =spark.createDataFrame(rdd,schema)
//保存每一個分組的數據到es索引裡面
EsSparkSQL.saveToEs(df,"spark"+dt+"/spark",Map("es.mapping.id" -> "id"))
// value.foreach(row=>{//遍歷組內數據集合,然後列印
// println(row.getAs[String]("name")+" "+row.getAs[Double]("price"))
// })

})
println("索引成功")
spark.stop
}

}

分析下,代碼執行過程:

(1)首先創建了一個SparkSession對象,注意這是新版本的寫法,然後加入了es相關配置

(2)導入了隱式轉化的es相關的包

(3)通過Seq+Tuple創建了一個DataFrame對象,並註冊成一個表

(4)導入spark sql後,執行了一個sql分組查詢

(5)獲取每一組的數據

(6)處理組內的Struct結構

(7)將組內的Seq[Row]轉換為rdd,最終轉化為df

(8)執行導入es的方法,按天插入不同的索引裡面

(9)結束

需要注意的是必須在執行collect方法後,才能在循環內使用sparkContext,否則會報錯的,在服務端是不能使用sparkContext的,只有在Driver端才可以。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 科技優家 的精彩文章:

OAuth2 Provider的最簡單實現 (Spring Boot + Spring Security OAuth2)
C++學習(三)入門篇——函數
設計模式解密(10)-迭代器模式
架構之路 之 Nginx實現負載均衡

TAG:科技優家 |

您可能感興趣

kali 使用apt-get update報錯GPG error的解決辦法
如何解決 「mount.nfs:Stale file handle」錯誤
Win7系統提示steam client not found解決方法
HikariCP源碼分析之leakDetectionThreshold及實戰解決Spark/Scala連接池泄漏
Kanye West 已解決 Saint Pablo 巡演訴訟案
解決Electra越獄顯示Error:topanga錯誤的方法!
Belle&Sebastian:如何解決人類的問題?
小米Pro/Air筆記本重裝系統教程及開機No Bootable Devices解決方案
聯想、OPPO、vivo和小米分別與Qualcomm Technologies 簽署射頻前端解決方案跨年度採購諒解備忘錄
Win7使用Msconfig.exe解決電腦啟動慢的方法
殭屍毀滅工程steam is not enabled錯誤解決方法
Cadence Innovus助力Realtek成功開發DTV SoC解決方案
Realtek藉助Cadence Innovus成功開發DTV SoC解決方案
絕地求生大逃殺BattlEye Launcher 998解決方法
智能營銷計劃的解決方案Gagapay Network
解決SSD問題後,Windows 10似乎跟Avast Antivirus過不去
Nordic Semiconductor推出支持nRF52840多協議SoC的ZigBee解決方案,拓展其智能家居應用產品
Antycip Simulatio推出最新VR和3D沉浸式解決方案
OmniVision和Smart Eye推出200萬像素解決方案支持VR/AR
Win7解決werfault.exe應用程序錯誤方法