當前位置:
首頁 > 最新 > 簡約的JAVA版本MapReduce和日常No.25

簡約的JAVA版本MapReduce和日常No.25

昨天做了一個小調查,說看看想看些啥。大概的分布是這樣的,一個1代表一個投票。看來還是2、3比較多。

11111 希望看到"演算法"回復1。

還好多人說想看我長啥樣,嘛,在我比較正經的時候,就長下面這樣。

大圖預警!!!!

日常呢,就長這樣。

長這樣。

好了切入正題,今天開始挖一個新坑,就是實現一些基於MapReduce的一些圖演算法,比如Pregel啊,PageRank啊,LPA啊,SLPA啊等等,坑很大,非常大,慢慢寫吧,都不會講非常難的理論問題,以代碼細節為主。。

先上一個我思維拓展的時候寫得java實現的MapReduce的基礎版本吧,寫得不是很好,我也在慢慢完善,Go語言版本的還在寫,真是慚愧感覺一直在吃老本。

今天實現的一個內容是,將一個List進行map操作變成另外一個List,然後通過reduce進行加和。

靈感來源來自於《MapReduce: Simplified Data Processing on Large Clusters 》這篇論文,大家可以看看我之前的文章,在了解完什麼是Mapreduce。然後先去看看這篇論文,啟發很多。

首先我們從兩個介面入手,MapFunction和ReduceFunction,這是MapReduce的兩個靈魂介面,由使用者去定義,這裡我定義的都是最最簡單的版本,暫時並沒有進行泛化的能力。

MapFunction定義了一個介面,類型為V,然後通過一個叫map的方法,輸出一個類型為V的值。

public interfaceMapFunction {

Vmap(Vtarget);

}

ReduceFunction定義了一個介面,類型為V,然後通過一個叫reduce的方法,通過聚合兩個V類型的值,輸出一個類型為V的值。

public interfaceReduceFunction {

Vreduce(VA,VB);

}

上面兩個方法定義了MapReduce的核心內容,就是任務切分和任務聚合。有小夥伴不理解這裡為什麼使用泛型,因為作為一個框架來說,我是不知道使用者想使用什麼樣的類型進行計算的(雖然這裡我知道我接下來就要用Integer進行計算了),所以必須不能指定類型,否則這個框架就永遠只能用Integer類型了。

那我們的map和reduce任務要跑在哪裡呢?有小夥伴說跑在分布式環境里。對沒錯,最終目的是跑在分布式環境里。但是在這裡,咱就偷個懶,先用多線程來模擬這個過程,並且使用內存來作為消息機制。

我是i5雙核的CPU,經驗值下面,只有兩個cpu的話,創建4個線程對於性能來說比單線程好。(畢竟線程切換存在開銷,控制得不好多線程肯定是比單線程慢的,不服來辯)

好了,MapFunction有了,CPUs也有了,接下來可以開始寫提交器了。任務提交器是什麼東西呢,就是把一個map任務進行切分,並且交給多個線程去非同步執行,然後最終把結果匯總還給客戶端的一個類。下面的類都比較大,建議在電腦端看。

這個類做了什麼事呢?就是把List封裝起來,然後把任務分發給多個線程去執行,使用CountDownLatch來保證所有的線程都已經完成計算,然後再把結果返回給客戶端。

public classMapSubmitter {

privateListtarget;

private intlength;

publicMapSubmitter(List target){

this.target= target;

this.length= target.size();

}

publicList map(finalMapFunction mapFunction){

finalCountDownLatch countDownLatch =newCountDownLatch(length);

finalList result =newArrayList();

for(inti =; i

finalVcurrent =target.get(i);

final intcurrentIndex = i;

try{

Future future = CPUs.submit(newCallable() {

publicVcall()throwsException {

Vresult =mapFunction.map(current);

//Printer.println(currentIndex);

returnresult;

}

});

result.add(i,future.get());

countDownLatch.countDown();

}

catch(InterruptedException e) {

e.printStackTrace();

}catch(ExecutionException e) {

e.printStackTrace();

}

}

try{

countDownLatch.await();

}catch(InterruptedException e) {

}

finally{

returnresult;

}

}

}

這個類又做了什麼事呢?List封裝起來,交給很多線程去執行,然後維護一個最終的結果類V,並為這個結果提供線程安全的保護,避免因為多線程操作同一個結果造成結果錯誤。

public classReduceSubmitter {

privateListtarget;

private intlength;

privateVresult;

Locklock=newReentrantLock();

publicReduceSubmitter(List target){

this.target= target;

this.length= target.size();

this.result= target.get();

}

publicVreduce(finalReduceFunction reduceFunction){

finalCountDownLatch countDownLatch =newCountDownLatch(length);

countDownLatch.countDown();

for(inti =1; i

finalVcurrent =target.get(i);

CPUs.execute(newRunnable() {

public voidrun() {

lock.lock();

Vnext =reduceFunction.reduce(ReduceSubmitter.this.result,current);

ReduceSubmitter.this.result= next;

lock.unlock();

countDownLatch.countDown();

}

});

}

try{

countDownLatch.await();

}catch(InterruptedException e) {

}

finally{

return this.result;

}

}

}

好咯,寫完了就開始測試了,主要就創建一個長度為10的數組,然後進行map操作把每一個值都進行平方,然後通過reduce操作進行求和,代碼比較簡單就不一一細說了,有啥問題後台留言交流。

public classTestMapReduce {

public static voidmain(String[] args){

//僅僅是為了耗時而模擬的一個好像很複雜的操作,不然太快了。

final intjunkTime =1000000;

//初始化一個想進行操作的數組

List integerList =newArrayList();

for(inti =; i

integerList.add(i);

}

intlength = integerList.size();

// printer.printList(integerList);

Longstart= System.currentTimeMillis();

//進行map操作並返回結果

MapSubmitter mapSubmitter =newMapSubmitter(integerList);

integerList = mapSubmitter.map(newMapFunction() {

publicInteger map(Integer target) {

Double b =0D;

for(inti =; i

b += Math.exp(i);

}

returntarget * target;

}

});

Printer.println("mapreduce cost time:"+ (System.currentTimeMillis() -start));

start= System.currentTimeMillis();

//進行reduce操作並返回結果

ReduceSubmitter reduceSubmitter =newReduceSubmitter(integerList);

Integer resultInteger = reduceSubmitter.reduce(newReduceFunction() {

publicInteger reduce(Integer A, Integer B) {

Double b =0D;

for(inti =; i

b += Math.exp(i);

}

returnA+B;

}

});

Printer.println("reduce cost time:"+ (System.currentTimeMillis() -start));

CPUs.shutdown();

}

}

好啦,今天的MapReduce就說到這裡。經過我的實驗,無論多少次實驗,都是比單線程快那麼一丟丟的,這都要得益於那個耗時的操作,模糊了線程切換帶來的時間損耗,畢竟不怎麼耗時的操作來說,單線程其實是絕對比多線程快的。

細心的同學會發現,好像這個並不符合論文裡面的標準吖。嗯吶是的,這個只是我心血來潮寫的簡單版本。問題有諸如,我們上面的map操作好像不能變成其他類型吖,怎麼實現WordCount呢?以及Driver好像沒有進行任務切分和分發吖?好像也沒有suffle操作啊?好像整個過程也不是嚴格多線程的吖,怎麼辦呢?下一次給大家分享一個更加完整的MapReduce。

希望大家都能在自己的機器上跑成功。源碼都在上面了我就不放鏈接了。

好了,如果有任務問題請後台留言,我會看的。如果對您有一點點的幫助或者啟發的話,幫忙轉發或者點個贊都是對我很大的支持喔,么么噠。

賞一個唄。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 一名叫大蕉的程序員 的精彩文章:

TAG:一名叫大蕉的程序員 |

您可能感興趣

簡約無縫,New Balance 574 Sport 「Decon」 登場!
Reebok 推出加強簡約版 Aztec ANTQ
110㎡簡約—Soft Dream
Note Design Studio:瑞典簡約小公寓設計
J Apostrophe,均衡-簡約
設計更加簡約!Balenciaga Triple S 2.0 現已正式發售
Nike AirMax Plus97新配色來襲!簡約復古最愛
至薄簡約の小鋼炮,Fractal Design Node 202 Slim 裝機作業
adidas EQT Support ADV 展示簡約黑白魅力
簡約時尚造型!Ian Paley x Reebok 3D OP 聯名系列現已發售
簡約時尚必備!NIke x Jcrew 「Kill shot 2」 美網補貨
簡約復古式大愛!Nike Air Max Plus 97新配色來襲!
簡約黑白更街頭! NIKE Air VaporMax 2.0 黑白版本市售在即!
簡約黑白更街頭! NIKE Air VaporMax 2.0 黑白版本市售在即!
簡約復古!全新聯名鞋款 size? x adidas Kegker Super 「Beer」 即將來襲
簡約高街風!兩款 Air Jordan 1 「High Zip」 下周發售!
150㎡簡約—MANI Red 歸
A BATHING APE 推出簡約迷彩布鞋 1st CAMO Yank Sta
簡約大氣!John Elliott x Nike LeBron Icon 即將來襲!
簡約且不失細節!JUICE x adidas Consortium Gazalle 即將發售!