簡約的JAVA版本MapReduce和日常No.25
昨天做了一個小調查,說看看想看些啥。大概的分布是這樣的,一個1代表一個投票。看來還是2、3比較多。
11111 希望看到"演算法"回復1。
還好多人說想看我長啥樣,嘛,在我比較正經的時候,就長下面這樣。
大圖預警!!!!
日常呢,就長這樣。
長這樣。
好了切入正題,今天開始挖一個新坑,就是實現一些基於MapReduce的一些圖演算法,比如Pregel啊,PageRank啊,LPA啊,SLPA啊等等,坑很大,非常大,慢慢寫吧,都不會講非常難的理論問題,以代碼細節為主。。
先上一個我思維拓展的時候寫得java實現的MapReduce的基礎版本吧,寫得不是很好,我也在慢慢完善,Go語言版本的還在寫,真是慚愧感覺一直在吃老本。
今天實現的一個內容是,將一個List進行map操作變成另外一個List,然後通過reduce進行加和。
靈感來源來自於《MapReduce: Simplified Data Processing on Large Clusters 》這篇論文,大家可以看看我之前的文章,在了解完什麼是Mapreduce。然後先去看看這篇論文,啟發很多。
首先我們從兩個介面入手,MapFunction和ReduceFunction,這是MapReduce的兩個靈魂介面,由使用者去定義,這裡我定義的都是最最簡單的版本,暫時並沒有進行泛化的能力。
MapFunction定義了一個介面,類型為V,然後通過一個叫map的方法,輸出一個類型為V的值。
public interfaceMapFunction {
Vmap(Vtarget);
}
ReduceFunction定義了一個介面,類型為V,然後通過一個叫reduce的方法,通過聚合兩個V類型的值,輸出一個類型為V的值。
public interfaceReduceFunction {
Vreduce(VA,VB);
}
上面兩個方法定義了MapReduce的核心內容,就是任務切分和任務聚合。有小夥伴不理解這裡為什麼使用泛型,因為作為一個框架來說,我是不知道使用者想使用什麼樣的類型進行計算的(雖然這裡我知道我接下來就要用Integer進行計算了),所以必須不能指定類型,否則這個框架就永遠只能用Integer類型了。
那我們的map和reduce任務要跑在哪裡呢?有小夥伴說跑在分布式環境里。對沒錯,最終目的是跑在分布式環境里。但是在這裡,咱就偷個懶,先用多線程來模擬這個過程,並且使用內存來作為消息機制。
我是i5雙核的CPU,經驗值下面,只有兩個cpu的話,創建4個線程對於性能來說比單線程好。(畢竟線程切換存在開銷,控制得不好多線程肯定是比單線程慢的,不服來辯)
好了,MapFunction有了,CPUs也有了,接下來可以開始寫提交器了。任務提交器是什麼東西呢,就是把一個map任務進行切分,並且交給多個線程去非同步執行,然後最終把結果匯總還給客戶端的一個類。下面的類都比較大,建議在電腦端看。
這個類做了什麼事呢?就是把List封裝起來,然後把任務分發給多個線程去執行,使用CountDownLatch來保證所有的線程都已經完成計算,然後再把結果返回給客戶端。
public classMapSubmitter {
privateListtarget;
private intlength;
publicMapSubmitter(List target){
this.target= target;
this.length= target.size();
}
publicList map(finalMapFunction mapFunction){
finalCountDownLatch countDownLatch =newCountDownLatch(length);
finalList result =newArrayList();
for(inti =; i
finalVcurrent =target.get(i);
final intcurrentIndex = i;
try{
Future future = CPUs.submit(newCallable() {
publicVcall()throwsException {
Vresult =mapFunction.map(current);
//Printer.println(currentIndex);
returnresult;
}
});
result.add(i,future.get());
countDownLatch.countDown();
}
catch(InterruptedException e) {
e.printStackTrace();
}catch(ExecutionException e) {
e.printStackTrace();
}
}
try{
countDownLatch.await();
}catch(InterruptedException e) {
}
finally{
returnresult;
}
}
}
這個類又做了什麼事呢?List封裝起來,交給很多線程去執行,然後維護一個最終的結果類V,並為這個結果提供線程安全的保護,避免因為多線程操作同一個結果造成結果錯誤。
public classReduceSubmitter {
privateListtarget;
private intlength;
privateVresult;
Locklock=newReentrantLock();
publicReduceSubmitter(List target){
this.target= target;
this.length= target.size();
this.result= target.get();
}
publicVreduce(finalReduceFunction reduceFunction){
finalCountDownLatch countDownLatch =newCountDownLatch(length);
countDownLatch.countDown();
for(inti =1; i
finalVcurrent =target.get(i);
CPUs.execute(newRunnable() {
public voidrun() {
lock.lock();
Vnext =reduceFunction.reduce(ReduceSubmitter.this.result,current);
ReduceSubmitter.this.result= next;
lock.unlock();
countDownLatch.countDown();
}
});
}
try{
countDownLatch.await();
}catch(InterruptedException e) {
}
finally{
return this.result;
}
}
}
好咯,寫完了就開始測試了,主要就創建一個長度為10的數組,然後進行map操作把每一個值都進行平方,然後通過reduce操作進行求和,代碼比較簡單就不一一細說了,有啥問題後台留言交流。
public classTestMapReduce {
public static voidmain(String[] args){
//僅僅是為了耗時而模擬的一個好像很複雜的操作,不然太快了。
final intjunkTime =1000000;
//初始化一個想進行操作的數組
List integerList =newArrayList();
for(inti =; i
integerList.add(i);
}
intlength = integerList.size();
// printer.printList(integerList);
Longstart= System.currentTimeMillis();
//進行map操作並返回結果
MapSubmitter mapSubmitter =newMapSubmitter(integerList);
integerList = mapSubmitter.map(newMapFunction() {
publicInteger map(Integer target) {
Double b =0D;
for(inti =; i
b += Math.exp(i);
}
returntarget * target;
}
});
Printer.println("mapreduce cost time:"+ (System.currentTimeMillis() -start));
start= System.currentTimeMillis();
//進行reduce操作並返回結果
ReduceSubmitter reduceSubmitter =newReduceSubmitter(integerList);
Integer resultInteger = reduceSubmitter.reduce(newReduceFunction() {
publicInteger reduce(Integer A, Integer B) {
Double b =0D;
for(inti =; i
b += Math.exp(i);
}
returnA+B;
}
});
Printer.println("reduce cost time:"+ (System.currentTimeMillis() -start));
CPUs.shutdown();
}
}
好啦,今天的MapReduce就說到這裡。經過我的實驗,無論多少次實驗,都是比單線程快那麼一丟丟的,這都要得益於那個耗時的操作,模糊了線程切換帶來的時間損耗,畢竟不怎麼耗時的操作來說,單線程其實是絕對比多線程快的。
細心的同學會發現,好像這個並不符合論文裡面的標準吖。嗯吶是的,這個只是我心血來潮寫的簡單版本。問題有諸如,我們上面的map操作好像不能變成其他類型吖,怎麼實現WordCount呢?以及Driver好像沒有進行任務切分和分發吖?好像也沒有suffle操作啊?好像整個過程也不是嚴格多線程的吖,怎麼辦呢?下一次給大家分享一個更加完整的MapReduce。
希望大家都能在自己的機器上跑成功。源碼都在上面了我就不放鏈接了。
好了,如果有任務問題請後台留言,我會看的。如果對您有一點點的幫助或者啟發的話,幫忙轉發或者點個贊都是對我很大的支持喔,么么噠。
賞一個唄。
TAG:一名叫大蕉的程序員 |
※簡約無縫,New Balance 574 Sport 「Decon」 登場!
※Reebok 推出加強簡約版 Aztec ANTQ
※110㎡簡約—Soft Dream
※Note Design Studio:瑞典簡約小公寓設計
※J Apostrophe,均衡-簡約
※設計更加簡約!Balenciaga Triple S 2.0 現已正式發售
※Nike AirMax Plus97新配色來襲!簡約復古最愛
※至薄簡約の小鋼炮,Fractal Design Node 202 Slim 裝機作業
※adidas EQT Support ADV 展示簡約黑白魅力
※簡約時尚造型!Ian Paley x Reebok 3D OP 聯名系列現已發售
※簡約時尚必備!NIke x Jcrew 「Kill shot 2」 美網補貨
※簡約復古式大愛!Nike Air Max Plus 97新配色來襲!
※簡約黑白更街頭! NIKE Air VaporMax 2.0 黑白版本市售在即!
※簡約黑白更街頭! NIKE Air VaporMax 2.0 黑白版本市售在即!
※簡約復古!全新聯名鞋款 size? x adidas Kegker Super 「Beer」 即將來襲
※簡約高街風!兩款 Air Jordan 1 「High Zip」 下周發售!
※150㎡簡約—MANI Red 歸
※A BATHING APE 推出簡約迷彩布鞋 1st CAMO Yank Sta
※簡約大氣!John Elliott x Nike LeBron Icon 即將來襲!
※簡約且不失細節!JUICE x adidas Consortium Gazalle 即將發售!