當前位置:
首頁 > 知識 > spark中dataFrame的一些方法回顧

spark中dataFrame的一些方法回顧

1.文本轉dataframe

//加入spark隱士轉換
import spark.implicits._
def textData2DataFrame(): Unit ={
val text:List[(String, String, String)]=Source.fromFile("C:\Users\91BGJK2\Desktop\天府新區.txt","utf-8").getLines().map{ line=>
val strings = line.split(" ")
(strings(0),strings(1),strings(2))//通過構造元組進行轉換
}.toList
//需要加入spark隱士轉換,才能使用toDF強轉
val testDataFram=text.toDF("id","name_patient","birthday")
testDataFram.select("name_patient").show(10)
//第二種--通過構造schema
val textRdd:RDD[Row]=spark.read.text("file:///C:\Users\91BGJK2\Desktop\天府新區.txt").rdd
val textRdd2:RDD[String]=spark.sparkContext.textFile("file:///C:\Users\91BGJK2\Desktop\天府新區.txt")
val rowRDD:RDD[Row] = textRdd2.map(_.split(" ").map(_.toString)).map(p => Row(p: _*))
val colNames=List("id","name_patient","birthday")
val schema:StructType = StructType(colNames.map(fieldName => StructField(fieldName,StringType)))//統一轉換成string格式
// 將數據和結構合成,創建為DataFrame
val data = spark.createDataFrame(rowRDD, schema)
data.select("name_patient").show(10)
//第三中:使用csv封裝的讀取器
val text2:DataFrame=spark.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", true)
.option("delimiter", " ")//指定定界符
.load("file:///C:\Users\91BGJK2\Desktop\天府新區.txt")
text2.select("name_patient").show(10)
}

2.dataframe的一些函數

val t_kc21k1=spark.read.format("jdbc")

.options(properties).option("dbtable","t_kc21k1").load()

--where

t_kc21k1.where($"fprn" === "1" && $"ftimes" === 2).show()

--order by

t_kc21k1.sort(asc("id")).show(10)

--group by

//grouby必須結合聚合函數使用才有效

t_kc21k1.groupBy("code","name").count().where($"count"===591).show()

//groupBy之後,對每組數據進行聚合,一些聚合操作包括max, min, count等。結合包括了column1及agg中指定的各列

t_kc21k1.groupBy("id_depa").agg(min("money_sum")as "minFee",max("money_sum") as "maxFee" ,count("ack190") as "timeNums").show(10)

--join

val rw_mapping = spark.read.format("jdbc")

.options(properties).option("dbtable", "rw_mapping").load()

//使用Seq()剔除重複列

t_kc21k1.join(rw_mapping,Seq("id_drg", "id_drg"),"inner").select("id_drg","rw").show(10)

--filter

t_kc21k1.filter($"id_drg"==="914").join(rw_mapping,Seq("id_drg","id_drg"),"inner").select("id_drg","rw").show(10)

3.dataFrame欄位拆分

val t_kc21k1=spark.read.format("jdbc")
.options(properties).option("dbtable","t_kc21k1").load()
//方法一、實現欄位拆分為多行----使用rdd中的flatmap+toDf
val sdxFrame=t_kc21k1.select("zdbm","zdmc").rdd.filter(str=>str.get(0)!=null).flatMap(str=>delemiterName(str.getAs[String]("zdbm"),str.getAs[String]("zdmc"))).toDF("code","name")
.write.mode(SaveMode.Overwrite)
.format("jdbc")
.options(properties)
.option("dbtable","t_zd_statistic")
.save()
def delemiterName(code:String,name:String) ={
val codes = code.split("\|")
val names=name.split("\|")
val tuples:Array[(String, String)] = codes.zip(names)
tuples
}
---也可以不用轉rdd,直接dataframe操作,再重新複製欄位名稱
t_kc21k1.select("id","ssmc").where($"id"===137).flatMap(str=>delemiterName(str.getAs[String]("ssmc"),str.getAs[String]("ssmc")))
.toDF("ssmc1","ssmc2")
.show(10)
//方法二--使用spark sql中的explode函數---配合自定義函數udf
spark.udf.register("testArr",(x:String)=>testArr(x))
spark.sql("select id,explode(testArr(ssmc)) from t_kc21k1 where ssbm is not null").show(10)
def testArr(code:String)={
val res=code.split("\|")
res
}
//方法三---直接使用dataframe中的functions.expolde()
t_kc21k1.select("id","ssmc").where($"id"===137).explode( "ssmc" , "ssmc_new" ){time: String => time.split("\|")}.show(10)
//備註:explode已經棄用了,現在使用functions.expolde()。
Dataset and DataFrame API explode has been deprecated, alternatively, use functions.explode() with select or flatMap。
前提是該欄位為Array或Map數據類型。如果滿足上述類型,需要進行轉換---先轉換為Array再進行操作。
t_kc21k1.select("id","ssmc").where($"id"===137).select(functions.split($"ssmc","\|") as "ssmc")
.select(functions.explode($"ssmc")).show(10)
---dataframe的過濾null值 filter
t_kc21k1.filter("ssmc is not null").show(10)

spark中dataFrame的一些方法回顧

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

Spring核心——字元串到實體轉換
分散式框架spring-session實現session一致性使用問題

TAG:程序員小新人學習 |