當前位置:
首頁 > 知識 > spark streaming DStream運算元大全

spark streaming DStream運算元大全

DStream作為spark 流處理的數據抽象,有三個主要的特徵:

1.依賴的DStream的列表

2.DStream生成RDD的時間間隔

3.用來生成RDD的方法

本篇pom.xml文件spark streaming版本為1.6.0

目錄

window()

reduceByWindow()

countByWindow()

countByValueAndWindow()

reduceByKeyAndWindow()

updatestateByKey()

window()

生成新的DStream

def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)

def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {

new WindowedDStream(this, windowDuration, slideDuration)

}

兩個重載方法,第一個只傳窗口長度(滑動間隔默認為實例化ssc時的時間間隔),第二個傳窗口長度與滑動間隔

val ssc=new StreamingContext(sc,Seconds(1))//時間間隔為1s

val stream=xxxx //非重點,省略

stream.print()

stream.window(Seconds(4),Seconds(4)).print()

stream.window(Seconds(4),Seconds(5)).print()

第一次print():是每秒列印一次這1秒內接收的數據

第二次print():每4秒列印前4秒接收的數據

第三次print():每5秒列印最近4秒接收的數據 ,上個5秒間隔,第一秒內的數據不會列印

reduceByWindow()

生成新的DStream,作用於key-value類型

def reduceByWindow(

reduceFunc: (T, T) => T,

windowDuration: Duration,

slideDuration: Duration

): DStream[T] = ssc.withScope {

this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)

}

需要傳3個參數,依次為reduce()方法,窗口長度,滑動長度。

該方法的主要過程是:先將時間間隔內的數據調用reduce()運算元聚合,然後調window()生成新的DStream,再將各間隔聚合完的結果聚合。

val ssc=new StreamingContext(sc,Seconds(1))//時間間隔

val stream=xxxx //類型為DStream[String]

stream.print()

stream.reduce((s1,s2)=>{

s1+":"+s2

}).print()

stream.reduceByWindow((s1,s2)=>{

s1+":"+s2

},Seconds(60),Seconds(10)).print()

第一次print():每秒列印一次接收的數據

第二次print():每秒列印一次,會將每秒接收到的數據拼接成起來

第三次print():每10秒列印一次,列印最近一分鐘接收的數據,並拼接

countByWindow()

生成新的DStream

def countByWindow(

windowDuration: Duration,

slideDuration: Duration): DStream[Long] = ssc.withScope {

//reduceByWindow()第二各方法_+_為逆函數

this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)

}

需要兩個參數,依次為:窗口長度,滑動間隔

該方法的主要過程為:先將每個元素生成長整數1,然後調用reduceByWindow()運算元,將每個元素值相加。

val ssc=new StreamingContext(sc,Seconds(1))//時間間隔為1s

val stream=xxxx

stream.print()

stream.count().print()

stream.countByWindow(Seconds(10),Seconds(2)).print()

第一次print():每秒列印一次接收的數據

第二次print():每秒列印一次接收到元素的數量

第三次print():每2秒列印一次最近10秒接收到元素的數量

countByValueAndWindow()

生成新的DStream

def countByValueAndWindow(

windowDuration: Duration,

slideDuration: Duration,

numPartitions: Int = ssc.sc.defaultParallelism)

(implicit ord: Ordering[T] = null)

: DStream[(T, Long)] = ssc.withScope {

this.map(x => (x, 1L)).reduceByKeyAndWindow(

(x: Long, y: Long) => x + y,

(x: Long, y: Long) => x - y,

windowDuration,

slideDuration,

numPartitions,

(x: (T, Long)) => x._2 != 0L

)

}

需要三個參數,依次為:窗口長度,滑動間隔,分區數(有默認值,可不傳)

該方法的主要過程為:先將每個元素生成(元素,1L),然後調用reduceByKeyAndWindow(),可以理解為按key聚合,統計每個key的次數,也就是統計每個元素的次數

val ssc=new StreamingContext(sc,Seconds(1))

val stream=xxxx

stream.print()

stream.countByValue().print()

data.countByValueAndWindow(Seconds(10),Seconds(2)).print()

第一次print():每秒列印一次接收的數據

第二次print():每秒列印一次,會統計每個元素的次數

第三次print():每2秒列印最近10秒的數據,統計每個元素次數

reduceByKeyAndWindow()

生成新的DStream

def reduceByKeyAndWindow(

reduceFunc: (V, V) => V,

windowDuration: Duration

): DStream[(K, V)] = ssc.withScope {

reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())

}

該方法有6個重載方法,就不一一粘了,參數為:聚合函數,窗口長度

該方法主要過程為:調reduceByKey()函數

val ssc=new StreamingContext(sc,Seconds(1))

val stream=xxxx //stream類型為[String,Long]

stream.print()

data.reduceByKey((c1,c2)=>{

c1+c2

}).print()

data.reduceByKeyAndWindow((c1,c2)=>{

c1+c2

},Seconds(10)).print()

第一次print():每秒列印一次接收的數據

第二次print():每秒列印一次,計算每個key的次數

第三次print():每秒列印一次最近10秒每個key的次數

updatestateByKey()

生成新的DStream

所有的window操作都是計算長度為窗口長度的數據,非window操作都是計算設置的時間間隔內的數據,而updateBykey可以理解成在無限長的時間裡,為每個key保存一個狀態,這個時間長度可以通過ssc.awaitTerminationOrTimeout()來控制,一般來說長度每天或每小時。

def updateStateByKey[S: ClassTag](

updateFunc: (Seq[V], Option[S]) => Option[S]

): DStream[(K, S)] = ssc.withScope {

updateStateByKey(updateFunc, defaultPartitioner())

}

當然,該方法重載方法也6個,這裡只討論上面的,傳入一個更新方法,該方法兩個參數:一個為當前時間間隔內數據,類型為Seq,一個為之前的數據,可能無數據(第一次提交計算的時候),類型為Option,返回值也為Option類型

下面是兩個實例,求pv和uv

//pv

val stream=xxxx//類型得轉化為[當前日期,1L]

stream.updateStateByKey((curvalues:Seq[Long],prevalue:Option[Long])=>{

val sum=curvalues.sum

val pre=prevalue.getOrElse(0L)

Some(sum+pre)

})

//uv 因為uv涉及到去重,故將userid放入Set里

val stream=xxxx //類型為[當前日期,userid]

stream.updateStateByKey((curvalues:Seq[Set[String]],prevalue:Option[Set[String]])=>{

var curs=Set[String]()

if(!curvalues.isEmpty){

curs=curvalues.reduce(_++_)//將兩個Set集合合併

}

Some(curs++prevalue.getOrElse(Set[String]()))

})

未完待續。

spark streaming DStream運算元大全

打開今日頭條,查看更多精彩圖片
喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

互聯網公司為什麼普遍996而不是666
我們一直談論「寫代碼」,但你會「讀代碼」嗎?

TAG:程序員小新人學習 |