spark分析某國氣象站平均氣溫實戰
一、數據集分析
數據文件按照氣象站和日期進行組織,每個氣象站都是一個總目錄,而且每個氣象站下面從 1980 年到 2010 年,每一年又都作為一個子目錄。 因為某國有成千上萬個氣象站,所以整個數據集由大量的小文件組成。通常情況下,處理少量的大型文件更容易、更有效,因此,這些數據需要經過預處理,將每個氣象站的數據文件拼接成一個單獨的文件。 預處理過的數據文件示例如下所示:
30yr_03103.dat
30yr_03812.dat
30yr_03813.dat
30yr_03816.dat
30yr_03820.dat
30yr_03822.dat
30yr_03856.dat
30yr_03860.dat
30yr_03870.dat
30yr_03872.dat
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
其中03103、03812、03813、03816、03820、03822、03856、03860、03870、03872代表的是氣象站編號。
這些數據按行並以 ASCII 格式存儲,其中每一行是一條記錄。 下面我們展示一行採樣數據,其中重要的欄位被突出顯示。該行數據被分割成很多行以突出每個欄位,但在實際文件中,這些欄位被整合成一行且沒有任何分隔符。
數據 含義 所佔位數
1998 #year 4
03 #month 3
09 #day 3
17 #hour 3
11 #temperature 6
-100 #dew 6
10237 #pressure 6
60 #wind_direction 6
72 #wind_speed 6
0 #sky_condition 6
0 #rain_1h 6
-9999 #rain_6h 6
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
真實數據如下所示:
2010 09 08 21 200 83 10091 230 72 -9999 0 -9999
- 1
- 2
數據集下載:請點擊這裡.
二、將數據導入到HDFS上
通過HDFS Java API封裝了一個本地上傳文件到HDFS的工具類HDFSUploadFileUtil,目前支持將本地目錄下的所有文件上傳到HDFS的某個目錄,也可以通過正則表達式去過濾本地目錄的文件,將不需要的文件過濾掉。實現代碼如下:
private static FileSystem fileSystem = null;
private static FileSystem localFileSystem = null;
public static void uploadFile(String srcDirectory, String tagDirectory, String hdfsUrl, String regex) throws URISyntaxException, IOException {
Configuration conf = new Configuration();
URI uri = new URI(hdfsUrl);
if (fileSystem == null) {
fileSystem = FileSystem.get(uri, conf);
}
if (localFileSystem == null) {
localFileSystem = FileSystem.getLocal(conf);
}
Path path = new Path(hdfsUrl + tagDirectory);
if (fileSystem.exists(path)) {
if (fileSystem.listStatus(path).length > 0) {
fileSystem.close();
throw new IllegalArgumentException("目標目錄存在其他文件!");
}
} else {
fileSystem.mkdirs(path);
}
FileStatus[] listStatus = null;
if (regex.length() > 0) {
listStatus = localFileSystem.globStatus(new Path(srcDirectory), new RegexAcceptPathFilter(regex));
} else {
listStatus = localFileSystem.globStatus(new Path(srcDirectory));
}
Path[] sources = FileUtil.stat2Paths(listStatus);
for (Path p : sources) {
fileSystem.copyFromLocalFile(p, path);
System.out.println("文件[" + p.toString() + "]上傳成功!");
}
fileSystem.close();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
三、spark分析數據
我們要計算每個氣象站的平均氣溫,那首先要得到兩個東西:氣象站編號、氣溫值大小
氣溫值大小:通過觀察數據,我們知道截取每行數據的14~19位就可以獲取到氣溫值
氣象站編號:我們發現,在數據中是獲取不到我們所需要的氣象站編號,只能通過輸入文件的文件名去截取,所以這裡的解決方案是通過hadoop的InputSplit去獲取文件名,具體代碼如下:
val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)
val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]
val lines = hadoopRDD.mapPartitionsWithInputSplit((inputSplit : InputSplit, iterator : Iterator[(LongWritable, Text)]) =>{
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(x => x._2 + " " + file.getPath.toString)
})
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
這裡通過InputSplit獲取到輸入文件的名稱之後,將文件名拼在每行數據的後面,產生新的RDD
然後就可以用截取每行數據獲取我們所需要的信息,同時在這裡我們可以對數據做校驗,過濾掉沒用的數據:
val groupRDD = lines.filter(line => line.substring(14, 19).trim().toInt != 9999).map(line => (line.substring(line.length - 9, line.length - 4), line.substring(14, 19).trim().toInt))
- 1
- 2
獲取到的數據為:("03812", 22),key為氣象站編號,value為氣溫值
最後,通過強大的combineByKey進行平均值的計算,並列印出來,也可以將輸出結果保存到文件中:
val resultRDD = groupRDD.combineByKey(
(temp) => (1, temp),
(tmp : (Int, Int), temp) => (tmp._1 + 1, tmp._2 + temp),
(tmp1 : (Int, Int), tmp2 : (Int, Int)) => (tmp1._1 + tmp2._1, tmp1._2 + tmp2._2)
).map{case(name, (num, temp)) => (name, temp/num)}.collect().foreach(println)
- 1
- 2
- 3
- 4
- 5
- 6
四、完整代碼
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.{SparkConf, SparkContext}
object WeatherAverage {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("weather-average")
val sc = new SparkContext(conf)
val input = "/data/spark-example/weather/"
val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)
val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]
val lines = hadoopRDD.mapPartitionsWithInputSplit((inputSplit : InputSplit, iterator : Iterator[(LongWritable, Text)]) =>{
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(x => x._2 + " " + file.getPath.toString)
})
val groupRDD = lines.filter(line => line.substring(14, 19).trim().toInt != 9999).map(line => (line.substring(line.length - 9, line.length - 4), line.substring(14, 19).trim().toInt))
val resultRDD = groupRDD.combineByKey(
(temp) => (1, temp),
(tmp : (Int, Int), temp) => (tmp._1 + 1, tmp._2 + temp),
(tmp1 : (Int, Int), tmp2 : (Int, Int)) => (tmp1._1 + tmp2._1, tmp1._2 + tmp2._2)
).map{case(name, (num, temp)) => (name, temp/num)}.collect().foreach(println)
sc.stop()
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
五、打包,提交spark任務
通過maven將寫好的代碼打成jar包,並上傳到裝有spark的伺服器的某個目錄上,並運行
bin/spark-submit
--class cn.oldsix.spark.weather.WeatherAverage
--master local
/home/oldsix/app/app-jars/spark/spark-example-0.0.1.jar
- 1
- 2
- 3
- 4
- 5
從控制台可以看到如下結果:
![](https://pic.pimg.tw/zzuyanan/1488615166-1259157397.png)
![](https://pic.pimg.tw/zzuyanan/1482887990-2595557020.jpg)
※Python函數的參數 默認參數 可變參數 關鍵字參數 命名關鍵字參數
※Vue.js 入門
TAG:程序員小新人學習 |