當前位置:
首頁 > 知識 > spark分析某國氣象站平均氣溫實戰

spark分析某國氣象站平均氣溫實戰

一、數據集分析

數據文件按照氣象站和日期進行組織,每個氣象站都是一個總目錄,而且每個氣象站下面從 1980 年到 2010 年,每一年又都作為一個子目錄。 因為某國有成千上萬個氣象站,所以整個數據集由大量的小文件組成。通常情況下,處理少量的大型文件更容易、更有效,因此,這些數據需要經過預處理,將每個氣象站的數據文件拼接成一個單獨的文件。 預處理過的數據文件示例如下所示:

30yr_03103.dat

30yr_03812.dat

30yr_03813.dat

30yr_03816.dat

30yr_03820.dat

30yr_03822.dat

30yr_03856.dat

30yr_03860.dat

30yr_03870.dat

30yr_03872.dat

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

其中03103、03812、03813、03816、03820、03822、03856、03860、03870、03872代表的是氣象站編號。

這些數據按行並以 ASCII 格式存儲,其中每一行是一條記錄。 下面我們展示一行採樣數據,其中重要的欄位被突出顯示。該行數據被分割成很多行以突出每個欄位,但在實際文件中,這些欄位被整合成一行且沒有任何分隔符。

數據 含義 所佔位數

1998 #year 4

03 #month 3

09 #day 3

17 #hour 3

11 #temperature 6

-100 #dew 6

10237 #pressure 6

60 #wind_direction 6

72 #wind_speed 6

0 #sky_condition 6

0 #rain_1h 6

-9999 #rain_6h 6

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

真實數據如下所示:

2010 09 08 21 200 83 10091 230 72 -9999 0 -9999

  • 1
  • 2

數據集下載:請點擊這裡.

二、將數據導入到HDFS上

通過HDFS Java API封裝了一個本地上傳文件到HDFS的工具類HDFSUploadFileUtil,目前支持將本地目錄下的所有文件上傳到HDFS的某個目錄,也可以通過正則表達式去過濾本地目錄的文件,將不需要的文件過濾掉。實現代碼如下:

private static FileSystem fileSystem = null;

private static FileSystem localFileSystem = null;

public static void uploadFile(String srcDirectory, String tagDirectory, String hdfsUrl, String regex) throws URISyntaxException, IOException {

Configuration conf = new Configuration();

URI uri = new URI(hdfsUrl);

if (fileSystem == null) {

fileSystem = FileSystem.get(uri, conf);

}

if (localFileSystem == null) {

localFileSystem = FileSystem.getLocal(conf);

}

Path path = new Path(hdfsUrl + tagDirectory);

if (fileSystem.exists(path)) {

if (fileSystem.listStatus(path).length > 0) {

fileSystem.close();

throw new IllegalArgumentException("目標目錄存在其他文件!");

}

} else {

fileSystem.mkdirs(path);

}

FileStatus[] listStatus = null;

if (regex.length() > 0) {

listStatus = localFileSystem.globStatus(new Path(srcDirectory), new RegexAcceptPathFilter(regex));

} else {

listStatus = localFileSystem.globStatus(new Path(srcDirectory));

}

Path[] sources = FileUtil.stat2Paths(listStatus);

for (Path p : sources) {

fileSystem.copyFromLocalFile(p, path);

System.out.println("文件[" + p.toString() + "]上傳成功!");

}

fileSystem.close();

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

三、spark分析數據

我們要計算每個氣象站的平均氣溫,那首先要得到兩個東西:氣象站編號、氣溫值大小

氣溫值大小:通過觀察數據,我們知道截取每行數據的14~19位就可以獲取到氣溫值

氣象站編號:我們發現,在數據中是獲取不到我們所需要的氣象站編號,只能通過輸入文件的文件名去截取,所以這裡的解決方案是通過hadoop的InputSplit去獲取文件名,具體代碼如下:

val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)

val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]

val lines = hadoopRDD.mapPartitionsWithInputSplit((inputSplit : InputSplit, iterator : Iterator[(LongWritable, Text)]) =>{

val file = inputSplit.asInstanceOf[FileSplit]

iterator.map(x => x._2 + " " + file.getPath.toString)

})

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

這裡通過InputSplit獲取到輸入文件的名稱之後,將文件名拼在每行數據的後面,產生新的RDD

然後就可以用截取每行數據獲取我們所需要的信息,同時在這裡我們可以對數據做校驗,過濾掉沒用的數據:

val groupRDD = lines.filter(line => line.substring(14, 19).trim().toInt != 9999).map(line => (line.substring(line.length - 9, line.length - 4), line.substring(14, 19).trim().toInt))

  • 1
  • 2

獲取到的數據為:("03812", 22),key為氣象站編號,value為氣溫值

最後,通過強大的combineByKey進行平均值的計算,並列印出來,也可以將輸出結果保存到文件中:

val resultRDD = groupRDD.combineByKey(

(temp) => (1, temp),

(tmp : (Int, Int), temp) => (tmp._1 + 1, tmp._2 + temp),

(tmp1 : (Int, Int), tmp2 : (Int, Int)) => (tmp1._1 + tmp2._1, tmp1._2 + tmp2._2)

).map{case(name, (num, temp)) => (name, temp/num)}.collect().foreach(println)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

四、完整代碼

import org.apache.hadoop.io.{LongWritable, Text}

import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}

import org.apache.spark.rdd.HadoopRDD

import org.apache.spark.{SparkConf, SparkContext}

object WeatherAverage {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("weather-average")

val sc = new SparkContext(conf)

val input = "/data/spark-example/weather/"

val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)

val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]

val lines = hadoopRDD.mapPartitionsWithInputSplit((inputSplit : InputSplit, iterator : Iterator[(LongWritable, Text)]) =>{

val file = inputSplit.asInstanceOf[FileSplit]

iterator.map(x => x._2 + " " + file.getPath.toString)

})

val groupRDD = lines.filter(line => line.substring(14, 19).trim().toInt != 9999).map(line => (line.substring(line.length - 9, line.length - 4), line.substring(14, 19).trim().toInt))

val resultRDD = groupRDD.combineByKey(

(temp) => (1, temp),

(tmp : (Int, Int), temp) => (tmp._1 + 1, tmp._2 + temp),

(tmp1 : (Int, Int), tmp2 : (Int, Int)) => (tmp1._1 + tmp2._1, tmp1._2 + tmp2._2)

).map{case(name, (num, temp)) => (name, temp/num)}.collect().foreach(println)

sc.stop()

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

五、打包,提交spark任務

通過maven將寫好的代碼打成jar包,並上傳到裝有spark的伺服器的某個目錄上,並運行

bin/spark-submit

--class cn.oldsix.spark.weather.WeatherAverage

--master local

/home/oldsix/app/app-jars/spark/spark-example-0.0.1.jar

  • 1
  • 2
  • 3
  • 4
  • 5

從控制台可以看到如下結果:

spark分析某國氣象站平均氣溫實戰

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

Python函數的參數 默認參數 可變參數 關鍵字參數 命名關鍵字參數
Vue.js 入門

TAG:程序員小新人學習 |