當前位置:
首頁 > 科技 > 資源 | Pandas on Ray:僅需改動一行代碼,即可讓Pandas加速四倍

資源 | Pandas on Ray:僅需改動一行代碼,即可讓Pandas加速四倍


選自UC Berkeley Rise Lab


作者:Devin Petersohn

機器之心編譯


參與:Nurhachu Null、路雪




本文中,來自 UC Berkeley 的 Devin Petersohn 發布文章介紹了其參與的項目 Pandas on Ray,使用這款工具,無需對代碼進行太多改動即可加速 Pandas,遇到大型數據集也不怕。作者還對 Pandas on Ray、Pandas 進行了對比評估。機器之心對此文進行了編譯介紹。




項目鏈接:https://github.com/ray-project/ray




最近,我和一位使用 100 多 TB 生物數據的朋友討論了數據科學庫的一些局限性。當面臨這種規模的數據時,Pandas 成了最受喜愛的工具;然而,當你開始處理 TB 級別的基因數據時,單核運行的 Pandas 就會變得捉襟見肘。如果我們擁有更多的處理器核,或者要打開數十 TB 規模的文件時,我們希望 Pandas 運行得更快。目前,Apache Spark 是最高性能的分散式選擇了,但是如果未對 Pandas 代碼做出足夠多的修改,你無法使用 Apache Spark 運行 Pandas 代碼。



大規模數據科學任務向來都是丟給分散式計算專家來做的,或者至少是熟悉此類概念的人員。大多數分散式系統的設計者給用戶提供了調節「旋鈕」,並留下了大量的系統配置。因此,高系統性能需要用明顯更加陡峭的學習曲線來折中。大多數現有用戶可能只是想讓 Pandas 運行得更快,並不希望在特定的硬體環境中優化他們的工作流。在我的案例中,我想在 10KB 和 10TB 的數據上使用相同的 Pandas 腳本,並且希望 Pandas 在處理這兩種不同量級的數據時速度一樣快(如果我有足夠的硬體資源的話)。為了完成這些目標,我們開啟了一個 Pandas on Ray 項目。




我們對系統進行了初步測評,Pandas on Ray 可以在一台 8 核的機器上將 Pandas 的查詢速度提高了四倍,而這僅需用戶在 notebooks 中修改一行代碼。我們為現在的 Pandas 用戶設計了該系統,旨在幫助他們的程序運行得更快,並且無需大量代碼改動就能夠進行更好的擴展。這項工作的最終目標就是在雲環境中使用 Pandas。




簡介



Pandas on Ray 是 DataFrame 庫的早期階段,DataFrame 庫封裝了 Pandas,並且透明地分配數據和計算。使用 Pandas on Ray,用戶不需要知道他們的系統或集群有多少個核心,也不需要指定如何分配數據。事實上,在 Pandas on Ray 上體驗可觀的加速時,用戶可以繼續使用之前的 Pandas notebook,甚至是在同一台機器上。僅僅需要按照下面描述的修改 import 語句。一旦修改了 import 語句,你就可以像使用 Pandas 一樣使用 Pandas on Ray 了。




Pandas on Ray 主要針對的是希望在不切換 API 的情況下提高性能和運行速度的 Pandas 用戶。我們正在積極實現與 Pandas 所有 API 的對等功能,並且已經實現了 API 的一個子集。我們會介紹目前進展的一些細節,並且給出一些使用示例。




使用的數據集



標普 500 股市數據:29.6MB(https://www.kaggle.com/camnugent/sandp500/data)




導入 Pandas on Ray



  1. # import pandas as pd

  2. import

    ray.dataframe

    as

    pd




Waiting for redis server at 127.0.0.1:21844 to respond...


Waiting for redis server at 127.0.0.1:41713 to respond...


Starting local scheduler with the following resources: {"GPU": 0, "CPU": 8}.




======================================================================


View the web UI at http://localhost:8890/notebooks/ray_ui62630.ipynb?token=bcf6d5b6cb9c2c478207f025384869100d7a25dcc27d7a56


======================================================================



Ray 將根據可用內核的數量進行自動初始化。現在你可以開始運行 Pandas 命令,它們將被並行化。



  1. stocks_df = pd.read_csv(

    "all_stocks_5yr.csv"

    )

  2. print

    (type(stocks_df))




我們也可以開始檢查數據。讓我們來看一下坐標軸。



  1. print

    (stocks_df.axes)




[RangeIndex(start=0, stop=619040, step=1), Index(["date", "open", "high", "low", "close", "volume", "Name"], dtype="object")]




讓我們運行一個簡單的數據查詢(just for fun),看看有多少天是以正收益結束的。



  1. positive_stocks_df = stocks_df.query(

    "close > open"

    )

  2. print

    (positive_stocks_df[

    "date"

    ].head(n=

    10

    ))

  3. print

    (

    "
    Number of positive days:"

    , positive_stocks_df.size)

  4. print

    (

    "
    Ratio of positive days to total days:"

    , positive_stocks_df.size/stocks_df.size)




0 2013-02-13


1 2013-02-15


2 2013-02-26


3 2013-02-27


4 2013-03-01


5 2013-03-04


6 2013-03-05


7 2013-03-06


8 2013-03-07


9 2013-03-11


Name: date, dtype: object




Number of positive days: 2232790




Ratio of positive days to total days: 0.5152655724993538




我不喜歡使用默認索引,那麼讓我們來看一下「date」是不是一個好的索引。



  1. print

    (stocks_df[

    "date"

    ].head(n=

    10

    ))




0 2013-02-08


1 2013-02-11


2 2013-02-12


3 2013-02-13


4 2013-02-14


5 2013-02-15


6 2013-02-19


7 2013-02-20


8 2013-02-21


9 2013-02-22




Name: date, dtype: object




看上去是個正確的選擇,因為我可能希望基於日期查詢。讓我們修改一下 DataFrame 中的索引,以便設置基於日期的查詢。



  1. stocks_df.set_index(

    "date"

    , inplace=

    True

    )

  2. print

    (stocks_df.axes)




[Index(["2013-02-08", "2013-02-11", "2013-02-12", "2013-02-13", "2013-02-14",


 "2013-02-15", "2013-02-19", "2013-02-20", "2013-02-21", "2013-02-22",


 ...


 "2018-01-25", "2018-01-26", "2018-01-29", "2018-01-30", "2018-01-31",


 "2018-02-01", "2018-02-02", "2018-02-05", "2018-02-06", "2018-02-07"],


 dtype="object", name="date", length=619040), Index(["open", "high", "low", "close", "volume", "Name"], dtype="object")]




我們可以查詢數據來收集更多的信息。我們可以找到股票收益為正的日期。




這個小例子旨在演示一些 Pandas 操作,這些操作作為並行實現可在 Pandas on Ray 上找到。下面,我們會展示一些性能對比,以及我們可以利用機器上更多的資源來實現更快的運行速度,甚至是在很小的數據集上。




轉置




分散式轉置是 DataFrame 操作所需的更複雜的功能之一。在以後的博客中,我們將討論我們的實現和一些優化。目前,轉置功能相對粗糙,也不是特別快,但是我們可以實現一些簡單優化來獲得更好的性能。



  1. print

    (stocks_df.T[:])




date   2013-02-08 2013-02-11 2013-02-12 2013-02-13 2013-02-14 2013-02-15 


open        15.07      14.89      14.45       14.3      14.94      13.93   


high        15.12      15.01      14.51      14.94      14.96      14.61   


low         14.63      14.26       14.1      14.25      13.16      13.93   


close       14.75      14.46      14.27      14.66      13.99       14.5   


volume    8407500    8882000    8126000   10259500   31879900   15628000   


Name          AAL        AAL        AAL        AAL        AAL        AAL   




date   2013-02-19 2013-02-20 2013-02-21 2013-02-22    ...     2018-01-25 


open        14.33      14.17      13.62      13.57    ...          78.47   


high        14.56      14.26      13.95       13.6    ...          79.38   


low         14.08      13.15       12.9      13.21    ...         78.345   


close       14.26      13.33      13.37      13.57    ...          79.25   


volume   11354400   14725200   11922100    6071400    ...        2327262   


Name          AAL        AAL        AAL        AAL    ...            ZTS   




date   2018-01-26 2018-01-29 2018-01-30 2018-01-31 2018-02-01 2018-02-02 


open        79.49      79.81      78.44      78.49      76.84      77.53   


high        80.13      79.95      78.69      78.77      78.27      78.12   


low         79.38      79.11      77.91      76.54      76.69      76.73   


close       80.09      79.18      78.35      76.73      77.82      76.78   


volume    2532808    2662383    3808707    4136360    2982259    2595187   


Name          ZTS        ZTS        ZTS        ZTS        ZTS        ZTS   




date   2018-02-05 2018-02-06 2018-02-07  


open        76.64      72.74       72.7  


high        76.92      74.56         75  


low         73.18      72.13      72.69  


close       73.83      73.27      73.86  


volume    2962031    4924323    4534912  


Name          ZTS        ZTS        ZTS  




[6 rows x 619040 columns]




基準測試




接下來,我們要比較一下 Pandas 和 Ray on Pandas。儘管我們目前還沒有支持完整的 Pandas 功能 API,但是我們展示了一些初步的基準測試,證明我們的方法是有潛力的。我們會在以下對比中做到儘可能的公平。需要注意的是,我們沒有在 Pandas on Ray 上做任何特殊的優化,一切都使用默認設置。還需要注意的是,Ray 使用了 eager execution,因此我們無法進行任何查詢規劃,也無法掌握計算給定工作流的最佳方法。




所用的數據集




全球健康數據:1.79GB(https://www.kaggle.com/census/international-data/data)



  1. # we are importing Pandas to benchmark against it

  2. import

    pandas

    as

    old_pd




首先我們要檢查載入一個 CSV 文件所需的時間。這個文件相對較大(1.7GB),所以使用 Pandas 和使用 Pandas on Ray 的載入時間會有所不同。



  1. # Pandas on Ray

  2. print

    (

    "Pandas on Ray:"

    )

  3. %time pandas_on_ray = pd.read_csv(

    "midyear_population_age_country_code.csv"

    )

  4. # Pandas

  5. print

    (

    "
    Pandas:"

    )

  6. %time pandas_native = old_pd.read_csv(

    "midyear_population_age_country_code.csv"

    )




Pandas on Ray:


CPU times: user 48.5 ms, sys: 19.1 ms, total: 67.6 ms


Wall time: 68 ms




Pandas:


CPU times: user 49.3 s, sys: 4.09 s, total: 53.4 s


Wall time: 54.3 s




我們看到的結果是:Pandas on Ray 的載入速度大約是 Pandas 的 675 倍。儘管這些數字令人印象深刻,但是 Pandas on Ray 的很多實現將工作從主線程轉移到更非同步的線程。文件是並行讀取的,運行時間的很多改進可以通過非同步構建 DataFrame 組件來解釋。讓我們將所有線程的結果匯總到一起,看看它需要多長時間。



  1. # Pandas on Ray

  2. print

    (

    "Pandas on Ray:"

    )

  3. %time entire_df = pandas_on_ray[:]

  4. # Pandas

  5. print

    (

    "
    Pandas:"

    )

  6. %time entire_df = pandas_native[:]




Pandas on Ray:


CPU times: user 2.59 s, sys: 2.52 s, total: 5.11 s


Wall time: 9.09 s




Pandas:


CPU times: user 16 ms, sys: 240 ms, total: 257 ms


Wall time: 256 ms




這裡我們可以看到,如果我們使用 [:] 運算符將所有的數據收集到一起,Pandas on Ray 速度大約是之前的 1/36。這是因為並行化。所有的線程以並行的方式讀取文件,然後將讀取結果串列化。主線程又對這些值進行去串列化,這樣它們又變得可用了,所以(去)串列化就是我們在這裡看到的主要開銷。熟悉 Spark 的人可能會記得,這類似於一個.collect() 調用。它使任務不再並行執行,將它們轉移動單獨的線程中。所以,儘管它讀取文件更快,但是將這些片段重新組合在一起的開銷意味著 Pandas on Ray 應該不僅僅被用於文件讀取。讓我們看一下文件載入完成後索引會發生什麼。



  1. # Pandas on Ray

  2. print

    (

    "Pandas on Ray:"

    )

  3. %time pandas_on_ray.index

  4. print

    (

    "
    Pandas:"

    )

  5. # Pandas

  6. %time pandas_native.index




Pandas on Ray:


CPU times: user 12 μs, sys: 1 μs, total: 13 μs


Wall time: 16 μs




Pandas:


CPU times: user 4 μs, sys: 0 ns, total: 4 μs


Wall time: 7.15 μs




RangeIndex(start=0, stop=3058280, step=1)




請注意,兩種方法都在緩存.index 調用的結果,所以我們調用一次 .index 之後看到的是原始時間,再一次調用的時候看到的是緩存訪問時間。Pandas on Ray 大約慢了 10 μs,但是維持一個分散式索引的複雜度更高。這顯示了底層 Ray 基礎架構的效率,它能夠快速檢索數據。




現在讓我們嘗試加速一次示例查詢,看看 Pandas 和 Pandas on Ray 的性能對比。



  1. # Pandas on Ray

  2. print

    (

    "Pandas on Ray:"

    )

  3. %timeit q0 = pandas_on_ray.query(

    "max_age > 100"

    )

  4. # Pandas

  5. print

    (

    "
    Pandas:"

    )

  6. %timeit q1 = pandas_native.query(

    "max_age > 100"

    )




Pandas on Ray:


100 loops, best of 3: 4.14 ms per loop




Pandas:


The slowest run took 32.21 times longer than the fastest. This could mean that an intermediate result is being cached.


1 loop, best of 3: 17.3 ms per loop




在這次 timeit 調用中,我們看到 Pandas on Ray 的速度大約是 Pandas 的 4 倍。這是在一台 8 核的機器上運行的,由於開銷的因素,加速並不是特別完美。儘管如此,通過僅僅修改 import 語句,原始 Pandas 上的運行時間和 Pandas on Ray 上的運行時間還是有顯著差別的。




在 Dask 上進行實驗




DataFrame 庫 Dask 提供可在其並行處理框架上運行的分散式 DataFrame,Dask 還實現了 Pandas API 的一個子集。一般來說,目前 Dask 在絕大多數操作上都比 Pandas on Ray 快一些。Dask 為 Pandas 用戶提供精細調整的定製,而 Pandas on Ray 則提供一種以最少的工作量實現更快性能的方法,且不需要多少分散式計算的專業知識。Pandas on Ray 針對的不是目前的 Dask(或 Spark)用戶,而是希望在無需學習新 API 的情況下提升現有和未來工作負載的性能和可擴展性的 Pandas 用戶。在 columnar operation 上,Dask 比 Pandas on Ray 快,但是它需要一些超出傳統 Pandas 之外的知識。




Dask 中存在兩個主要的差別,而 Pandas on Ray 則嘗試解決這兩個差別:




1. 用戶需要一直意識到:數據是分散式的,計算是懶惰的。


2. 多線程和多進程之間的權衡是可擴展性和性能之間的權衡。




數據科學家應該用 DataFrame 來思考,而不是動態的任務圖




Dask 用戶一直這樣問自己:






  • 我什麼時候應該通過 .compute() 觸發計算,我什麼時候應該調用一種方法來創建動態任務圖?



  • 我什麼時候應該調用 .persist() 將 DataFrame 保存在內存中?



  • 這個調用在 Dask 的分散式數據幀中是不是有效的?



  • 我什麼時候應該重新分割數據幀?



  • 這個調用返回的是 Dask 數據幀還是 Pandas 數據幀?




使用 Pandas 的數據科學家不一定非得是分散式計算專家,才能對數據進行高效分析。Dask 要求用戶不斷了解為計算而構建的動態任務圖。此外,默認情況下,懶惰計算使每個熟悉的 Pandas 調用返回一個意外的結果。這些差異為 Dask 提供了更好的性能配置,但對於某些用戶來說,學習新 API 的開銷太高。




使用 Pandas on Ray 的時候,用戶看到的數據幀就像他們在看 Pandas 數據幀一樣。




我們要速度,也要擴展性




Dask 默認是以多線程的模式運行的,這意味著一個 Dask 數據幀的所有分割部分都在一個單獨的 Python 進程中。儘管多線程模式讓一些計算變得更快,但是一個單獨的 Python 進程並不能利用機器的多個核心。




或者,Dask 數據幀可以以多進程模式運行,這種模式能夠生成多個 Python 進程。然而,如果一個 Python 進程需要將一個小的 Pandas 數據幀發送到另一個進程,則該數據幀必須通過 Pickle 進行串列化處理,然後在另一個進程中進行去串列化處理,因為這兩個進程沒有共享內存。串列化、拷貝以及去串列化,這三步會帶來高性能損失。即使這個解決方案可以擴展到多個核心,但是高昂的通信成本會對整體性能造成影響。







如上圖所示,由於串列化和拷貝操作,Dask 的多進程模式損傷了 read_csv 操作的性能。




Pandas on Ray 既可以以多線程模式運行,也可以以多進程模式運行。Ray 的默認模式是多進程,因此它可以從一台本地機器的多個核心擴展到一個機器集群上。至於通信方面,Ray 使用共享內存,並且通過 Apache Arrow 實現零拷貝串列化,顯著降低了進程之間的通信代價。




使用 Pandas on Ray,你的 Pandas 工作流可以同時實現快速運行和可擴展性。




read_csv 案例研究




在 AWS m5.2x 大型實例(8 個虛擬核、32GB 內存)上,我們使用 Pandas、Ray 和 Dask(多線程模式)進行了 read_csv 實驗。




我們採用了從 60KB 到 2GB 大小不等的四個數據集:






  • 泰坦尼克數據集:60KB(https://www.kaggle.com/c/titanic/data)



  • Yelp 數據集:31MB(https://www.kaggle.com/c/titanic/data)



  • Kiva Loan 數據集:187MB(https://www.kaggle.com/kiva/data-science-for-good-kiva-crowdfunding/data)



  • NYC Parking Tickets 數據集:2GB(https://www.kaggle.com/new-york-city/nyc-parking-tickets/data)




結果顯示 Ray 的性能是快速且可擴展的,在多個數據集上都優於 Dask。







註:第一個圖表明,在像泰坦尼克數據集這樣的小數據集上,分發數據會損害性能,因為並行化的開銷很大。




MAX 案例研究




為了查看逐行操作和逐列操作時三者的對比結果,我們繼續在相同的環境中進行實驗。







除了在最小的文件上 Pandas 是最快的以外,Pandas on Ray 的逐行操作速度大約是 Pandas 和 Dask 的三倍。在逐列操作上,它大約慢了 2.5 倍,這是因為目前的 Pandas on Ray 實現尚未針對 columnar operation 進行優化。值得注意的是,Dask 的惰性計算和查詢執行規劃不能在單個操作中使用。




通常情況下,Pandas on Ray 是非同步運行的,但是出於實驗目的,我們強制執行同步,以便對 Pandas 和 Dask 進行正確的評估。




結論




我們已經開始構建 Pandas on Ray,這是一個僅更改 import 語句就可以使 Pandas 工作流並行化的庫。到今天為止,我們已經在大約 45 天內實現了 Pandas DataFrame API 的 25%。目前,我們僅在單個節點上加速 Pandas,但很快我們將具備在集群環境中運行 Pandas 的功能。




如果您想試用 Pandas on Ray,請按照 readthedocs 文檔說明(http://ray.readthedocs.io/)從源代碼開始構建。此處使用的代碼目前位於 Ray 的主分支上,但尚未將其轉換為發布版本。 




原文鏈接:https://rise.cs.berkeley.edu/blog/pandas-on-ray/






本文為機器之心編譯,

轉載請聯繫本公眾號獲得授權


?------------------------------------------------


加入機器之心(全職記者/實習生):hr@jiqizhixin.com


投稿或尋求報道:editor@jiqizhixin.com


廣告&商務合作:bd@jiqizhixin.com

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 機器之心 的精彩文章:

從七橋問題開始:全面介紹圖論及其應用
解放程序員,讓AI自行編寫程序

TAG:機器之心 |