當前位置:
首頁 > 最新 > 深度解讀!時序資料庫HiTSDB:分散式流式聚合引擎

深度解讀!時序資料庫HiTSDB:分散式流式聚合引擎

阿里妹導讀:高性能時間序列資料庫 (High-Performance Time Series Database , 簡稱 HiTSDB) 是一種高性能,低成本,穩定可靠的在線時序資料庫服務, 提供高效讀寫,高壓縮比存儲、時序數據插值及聚合計算,時間線多維分析,主要服務於監控系統和IoT領域。 目前已在阿里巴巴集團多項內部業務中獲得廣泛運用,穩定服務於2016年雙11、2017年雙11,


HiTSDB時序資料庫引擎在服務於阿里巴巴集團內的客戶時,根據集團業務特性做了很多針對性的優化。 然而在HiTSDB雲產品的打磨過程中逐漸發現,很多針對性的優化很難在公有雲上針對特定用戶去實施。

於此同時, 在公有雲客戶使用HiTSDB的過程中,發現了越來越多由於聚合查詢導致的問題,比如: 返回數據點過多會出現棧溢出等錯誤,聚合點過多導致OOM, 或者無法完成聚合,實例完全卡死等等問題。這些問題主要由於原始的聚合引擎架構上的缺陷導致。

因此HiTSDB開發團隊評估後決定圍繞新的聚合引擎架構對HiTSDB引擎進行升級,包含: 存儲模型的改造,索引方式的升級,實現全新的流式聚合,數據遷移,性能評測。本文主要圍繞這5個方面進行梳理,重點在「全新的流式聚合部分」。

1.1 時序的數據存儲格式。

一個典型的時序數據由兩個維度來表示,一個維度表示時間軸,隨著時間的不斷流入,數據會不斷地追加。 另外一個維度是時間線,由指標和數據源組成,數據源就是由一系列的標籤標示的唯一數據採集點。例如指標cpu.usage的數據來自於機房,應用,實例等維度組合成的採集點。 這樣大家邏輯上就可以抽象出來一個id+的時序數據模型。這種數據模型的存儲是如何呢。一般有兩種典型的數據存儲思路:

1.2 時序模型的熱點問題處理

生產環境中業務方採集的指標類型多種多樣,對指標的採集周期各不相同。比如cpu.usage這個指標的變化頻率比較快,業務方關注度高,採集周期通常很短,1秒,5秒,10秒等等。 然而指標disk.usage這個指標變化趨勢相對平滑,採集周期通常為1分鐘,5分鐘, 10分鐘等。這種情況下,數據的存儲如果針對同一個指標不做特殊處理,容易形成熱點問題。 假設按照指標類型進行存儲資源的分片,想像一下如果有20個業務,每個業務10個集群,每個集群500台主機,採集周期是1秒的話,每秒就會有10萬個cpu.usage的指標數據點落到同一個存儲資源實例中, 而disk.usage採集周期為1分鐘,所以大約只有1666個指標數據點落到另外一個存儲資源上,這樣數據傾斜的現象非常嚴重。

1.2.1 分桶

這類問題的經典解法就是分桶。比如除了指標類型外,同時將業務名和主機名作為維度標識tags,把指標cpu.usage劃分到不同的桶裡面。 寫入時根據時間線哈希值分散寫入到不同的桶裡面。 OpenTSDB在處理熱點問題也是採用了分桶模式,但是需要廣播讀取,根本原因在於查詢方式需要在某個時間窗口內的全局掃描。 所以設置OpenTSDB的分桶數量需要一個平衡策略,如果數量太少,熱點還是有局部性的問題,如果太多,查詢時廣播讀帶來的開銷會非常大。

與其相比較,HiTSDB避免了廣播讀,提高了查詢效率。由於HiTSDB在查詢時,下發到底層存儲掃描數據之前,首先會根據查詢語句得到精確命中的時間線。 有了具體的時間線就可以確定桶的位置,然後到相應的塊區域取數據,不存在廣播讀。 關於HiTSDB如何在查詢數據的時候獲取命中的時間線,相信讀者這個疑問會在讀取完倒排這一節的時候消釋。

1.2.2 Region Pre-Split

當一個表剛被創建的時候,HBase默認分配一個Region給新表。所有的讀寫請求都會訪問到同一個regionServer的同一個region中。 此時集群中的其他regionServer會處於比較空閑的狀態,這個時候就達不到負載均衡的效果了。 解決這個問題使用pre-split,在創建新表的時候根據分桶個數採用自定義的pre-split的演算法,生成多個region。 byte[][] splitKeys =new byte[bucketNumber-1][]; splitKeys[bucketIndex-1] = (bucketIndex&0xFF);

2.1 時序數據中的多維時間線

多維支持對於任何新一代時序資料庫都是極其重要的。 時序數據的類型多種多樣,來源更是非常複雜,不止有單一維度上基於時間的有序數值,還有多維時間線相關的大量組合。 舉個簡單例子,cpu的load可以有三個維度描述cpu core, host, app應用,每個維度可以有百級別甚至萬級別的標籤值。 sys.cpu.load cpu=1 host=ipA app=hitsdb,各個維度組合後時間線可以輕鬆達到百萬級別。 如何管理這些時間線,建立索引並且提供高效的查詢是時序資料庫裡面需要解決的重要問題。 目前時序領域比較主流的做法是採用倒排索引的方式。

2.2 倒排索引基本組合

基本的時間線在倒排中的組合思路如下:

時間線的原始輸入值:

倒排構建後:

查詢時間線 cpu=3 and host=ipB:

取交集後查詢結果為7:

2.3 倒排面臨的問題以及優化思路

倒排主要面臨的是內存膨脹的問題:

posting list過長, 對於高緯度的tag,比如「機房=杭州」,杭州可能會有千級別甚至萬級別的機器,這就意味著posting list需要存儲成千上萬個64-bit的id。 解決這個問題的思路是採用壓縮posting list的方式, 在構建posting list的時候對數組裡面的id進行排序,然後採用delta編碼的方式壓縮。

如果Tag鍵值對直接作為term使用,內存佔用取決於字元串的大小,採用字元串字典化,也可大大減少內存開銷。


3.1 HiTSDB聚合引擎的技術痛點

HiTSDB現有聚合引擎公有雲公測以及集體內部業務運行中,暴露發現了以下問題:

3.1.1 Materialization執行模式造成Heap內存易打爆

下圖顯示了原查詢引擎的架構圖。HiTSDB以HBase作為存儲,原引擎通過Async HBase client 從HBase獲取時序數據。由於HBase的數據讀取是一個耗時的過程,通常的解法是採用非同步HBase client的API,從而有效提高系統的並行性。但原聚合引擎採用了一種典型的materialization的執行方式:1)啟動多個非同步HBase API啟HBase讀,2)只有當查詢所涉及的全部時序數據讀入到內存中後,聚合運算才開始啟動。這種把HBase Scan結果先在內存中materialized再聚合的方式使得HiTSDB容易發生Heap內存打爆的現象。尤其當用戶進行大時間範圍查詢,或者查詢的時間線的數據非常多的時候,因為涉及的時序數據多,HiTSDB會發生Heap OOM而導致查詢失敗。

3.1.2 大查詢打爆HBase的問題

兩個原因造成HiTSDB處理聚合查詢的時候,容易發生將底層HBase打爆。

HBase 可能讀取多餘時間線數據。HiTSDB的時間線採用指標+時間窗口+標籤的編碼方式存儲在HBase。典型的查詢是用戶指定一個指標,時間範圍,以及空間維度上標籤要尋找的匹配值。空間維度的標籤查詢條件並不都是在標籤編碼前綴。當這種情況發生時,HiTSDB倒排索引不能根據空間維度的查詢條件,精確定位到具體的HBase的查詢條件,而是採用先讀取再過濾的方式。這意味著HBase有可能讀取很多冗餘數據,從而加重HBase的負載。

HiTSDB有可能在短時間內下發太多HBase讀請求。一方面,HiTSDB在HBase採用分片存儲方式,對每一個分片,都至少啟動一個讀請求,另一方面,因為上面提到的materialization的執行方式,一個查詢涉及到的HBase讀請求同時非同步提交,有可能在很短時間內向HBase下發大量的讀請求。這樣,一個大查詢就有可能把底層的HBase打爆。

當這種情況發生時,更糟糕的場景是HiTSDB無法處理時序數據的寫入請求,造成後續新數據的丟失。

3.1.3 執行架構高度耦合,修改或增加功能困難

聚合引擎主要針對應用場景是性能監控,查詢模式固定,所以引擎架構採用單一模式,把查詢,過濾,填值/插值,和聚合運算的邏輯高度耦合在一起。這種引擎架構對於監控應用的固定查詢沒有太多問題,但HiTSDB目標不僅僅是監控場景下的簡單查詢,而是著眼於更多應用場景下的複雜查詢。

我們發現採用原有引擎的架構,很難在原有基礎上進行增加功能,或修改原來的實現。本質上的原因在於原有聚合引擎沒有採用傳統資料庫所通常採用的執行架構,執行層由可定製的多個執行運算元組成,查詢語義可以由不同的執行運算元組合而完成。這個問題在產品開發開始階段並不感受很深,但確是嚴重影響HiTSDB拓寬應用場景,增加新功能的一個重要因素。

3.1.4 聚合運算效率有待提高

原有引擎在執行聚合運算的時候,也和傳統資料庫所通常採用的iterative執行模式一樣,迭代執行聚合運算。問題在於每次iteration執行,返回的是一個時間點。Iterative 執行每次返回一條時間點,或者一條記錄,常見於OLTP這樣的場景,因為OLTP的查詢所需要訪問的記錄數很小。但對HiTSDB查詢有可能需要訪問大量時間線數據,這樣的執行方式效率上並不可取。

原因1)每次處理一個時間點,都需要一系列的函數調用,性能上有影響,2)iterative循環迭代所涉及到的函數調用,無法利用新硬體所支持的SIMD並行執行優化,也無法將函數代碼通過inline等JVM常用的hotspot的優化方式。在大數據量的場景下,目前流行的通用做法是引入Vectorization processing, 也就是每次iteration返回的不再是一條記錄,而是一個記錄集(batch of rows),比如Google Spanner 用batch-at-a-time 代替了row-at-a-time, Spark SQL同樣也在其執行層採用了Vectorization的執行模式。

3.2 流式聚合引擎設計思路

針對HiTSDB原有聚合運算引擎上的問題,為了優化HiTSDB,支持HiTSDB商業化運營,我們決定改造HiTSDB聚合運算引擎。下圖給出了新聚合查詢引擎的基本架構。

3.2.1 pipeline執行模式

借鑒傳統資料庫執行模式,引入pipeline的執行模式(aka Volcano / Iterator 執行模式)。Pipeline包含不同的執行計算運算元(operator), 一個查詢被物理計劃生成器解析分解成一個DAG或者operator tree, 由不同的執行運算元組成,DAG上的root operator負責驅動查詢的執行,並將查詢結果返回調用者。在執行層面,採用的是top-down需求驅動 (demand-driven)的方式,從root operator驅動下面operator的執行。這樣的執行引擎架構具有優點:

這種架構方式被很多資料庫系統採用並證明是有效;

介面定義清晰,不同的執行計算運算元可以獨立優化,而不影響其他運算元;

易於擴展:通過增加新的計算運算元,很容易實現擴展功能。比如目前查詢協議里只定義了tag上的查詢條件。如果要支持指標值上的查詢條件(cpu.usage >= 70% and cpu.usage

每個operator,實現如下介面:

Open : 初始化並設置資源

Next : 調用輸入operator的next()獲得一個batch of time series, 處理輸入,輸出batch of time series

Close : 關閉並釋放資源

我們在HiTSDB中實現了以下運算元:

ScanOp: 用於從HBase非同步讀取時間線數據

DsAggOp: 用於進行降採樣計算,並處理填值

AggOp:用於進行分組聚合運算,分成PipeAggOp, MTAggOp

RateOp: 用於計算時間線值的變化率

3.2.2 執行計算運算元一個batch的時間線數據為運算單位

在計算運算元之間以一個batch的時間線數據為單位,提高計算引擎的執行性能。其思想借鑒於OLAP系統所採用的Vectorization的處理模式。這樣Operator在處理一個batch的多條時間線,以及每條時間線的多個時間點,能夠減少函數調用的代價,提高loop的執行效率。

每個Operator以流式線的方式,從輸入獲得時間線batch, 經過處理再輸出時間線batch, 不用存儲輸入的時間線batch,從而降低對內存的要求。只有當Operator的語義要求必須將輸入materialize,才進行這樣的操作(參見下面提到的聚合運算元的不同實現)。

3.2.3. 區分不同查詢場景,採用不同聚合運算元分別優化

HiTSDB原來的聚合引擎採用materialization的執行模式,很重要的一個原因在於處理時序數據的插值運算,這主要是因為時序數據的一個典型特點是時間線上不對齊:不同的時間線在不同的時間戳上有數據。HiTSDB兼容OpenTSDB的協議,引入了插值(interpolation)的概念,目的在於聚合運算時通過指定的插值方式,在不對齊的時間戳上插入計算出來的值,從而將不對齊的時間線數據轉換成對齊的時間線。插值是在同一個group的所有時間線之間比較,來決定在哪個時間戳上需要進行插值 (參見OpenTSDB 文檔)。

為了優化聚合查詢的性能,我們引入了不同的聚合運算運算元。目的在於針對不同的查詢的語義,進行不同的優化。有些聚合查詢需要插值,而有些查詢並不要求插值;即使需要插值,只需要把同一聚合組的時間線數據讀入內存,就可以進行插值運算。

PipeAggOp: 當聚合查詢滿足以下條件時,

1)不需要插值: 查詢使用了降採樣(downsample),並且降採樣的填值採用了非null/NaN的策略。這樣的查詢,經過降採樣後,時間線的數據都是對齊補齊的,也就是聚合函數所用到的插值不再需要。

2)聚合函數可以支持漸進式迭代計算模式 (Incremental iterative aggregation), 比如sum, count ,avg, min, max, zerosum, mimmim, mimmax,我們可以採用incremental聚合的方式,而不需要把全部輸入數據讀入內存。這個執行運算元採用了流水線的方式,每次從輸入的operator獲得一系列時間線,計算分組並更新聚合函數的部分值,完成後可以清理輸入的時間線,其自身只用保留每個分組的聚合函數的值。

MTAgOp: 需要插值,並且輸入運算元無法幫助將時間線ID預先分組,這種方式回退到原來聚合引擎所採用的執行模式。

對於MTAggOp, 我們可以引入分組聚合的方法進行優化:

GroupedAggOp: 需要插值,但是輸入運算元能夠保證已經將時間線的ID根據標識(tags)進行排序分組,這樣在流水線處理中,只要materialize最多一個組的數據,這樣的運算元比起內存保留所有分組時間線,內存要求要低,同時支持不同組之間的並行聚合運算。

3.2.4 查詢優化器和執行器

引入執行運算元和pipeline執行模式後,我們可以在HiTSDB分成兩大模塊,查詢優化器和執行器。優化器根據查詢語義和執行運算元的不同特點,產生不同的執行計劃,優化查詢處理。例如HiTSDB可以利用上面討論的三個聚合運算運算元,在不同的場景下,使用不同的執行運算元,以降低查詢執行時的內存開銷和提高執行效率為目的。這樣的處理方式相比於原來聚合引擎單一的執行模式,更加優化。


HiTSDB新的聚合引擎採用的底層存儲格式與以前的版本並不兼容。 公有雲公測期間運行在舊版本實例的數據,需要遷移至新的聚合引擎。 同時熱升級出現了問題,數據遷移還應回滾功能,將新版本的數據點轉換成舊的數據結構,實現版本回滾。 整體方案對於用戶的影響做到:寫入無感知,升級過程中,歷史數據不可讀。

4.1 數據遷移架構

並發轉換和遷移數據: 原有的HiTSDB數據點已經在寫入的時候進行了分片。默認有20個Salts。數據遷移工具會對每個Salt的數據點進行並發處理。 每個「Salt」都有一個Producer和一個Consumer。Producer負責開啟HBase Scanner獲取數據點。 每個Scanner非同步對HBase進行掃描,每次獲取HBASE_MAX_SCAN_SIZE行數的數據點。然後將HBase的Row Key轉換成新的結構。

最後將該Row放到所有的一個Queue上等待Consumer消費。 Consumer每次會處理HBASE_PUT_BATCHSIZE或者HBASE_PUT_MIN_DATAPOINTS的數據量。 每次Consumer順利寫入該Batch的時候,我們會在UID表中記錄對應「Salt」的數據處理位置。 這樣便於故障重啟時Producer從最後一次成功的地方重新開始獲取數據點進行轉換。 數據遷移工具對HBase的操作都採用非同步的讀寫。當掃描數據或者寫入數據失敗的時候,我們會進行有限制的嘗試。 如果超出嘗試次數,我們就終止該「Salt」的數據遷移工作,其他」Salt「的工作不受到任何影響。 當下次工具自動重啟時,我們會出現問題的」Salt「數據繼續進行遷移,直到所有數據全部順利轉換完成。

流控限制: 大部分情況下,Producer對HBase的掃描數據要快於Consumer對HBase的寫入。 為了防止Queue的數據積壓對內存造成壓力同時為了減少Producer掃描數據時對HBase的壓力,我們設置了流控。 當Queue的大小達到HBASE_MAX_REQUEST_QUEUE_SIZE時候,Producer會暫時停止對HBase的數據掃描等待Consumer消費。 當Queue的大小減少到HBASE_RESUME_SCANNING_REQUEST_QUEUE_SIZE時候,Producer會重新恢復。

Producer和Consumer進程的退出

順利完成時候如何退出: 當一切進展順利時候,當Producer完成數據掃描之後,會在Queue上放一個EOS(End of Scan),然後退出。 Consumer遇到EOS就會知道該Batch為最後一批,成功處理完該Batch之後就會自動退出。

失敗後如何關閉: Consumer遇到問題時:當Consumer寫入HBase失敗之後,consumer會設置一個Flag,然後退出線程。 每當Producer準備進行下一個HBASE_MAX_SCAN_SIZE的掃描時候,他會先檢查該Flag。 如果被設置,他會知道對應的Consumer線程已經失敗並且退出。Producer也會停止掃描並且退出。 Producer遇到問題時:當Producer掃描數據失敗時,處理方式和順利完成時候類似。都是通過往Queue上EOS來完成通知。 下次重啟時,Producer會從上次記錄的數據處理位置開始重新掃描。

4.2 數據遷移的一致性

由於目前雲上版本HiTSDB為雙節點,在結點升級結束後會自動重啟HiTSDB。自動啟動腳本會自動運行數據遷移工具。 如果沒有任何預防措施,此時兩個HiTSDB節點會同時進行數據遷移。雖然數據上不會造成任何丟失或者損壞, 但是會對HBase造成大量的寫入和讀取壓力從而嚴重影響用戶的正常的寫入和查詢性能。

為了防止這樣的事情發生,我們通過HBase的Zoo Keeper實現了類似FileLock鎖,我們稱為DataLock,的機制保證只有一個結點啟動數據遷移進程。 在數據遷移進程啟動時,他會通過類似非阻塞的tryLock()的形式在Zoo Keeper的特定路徑創建一個暫時的節點。 如果成功創建節點則代表成果獲得DataLock。如果該節點已經存在,即被另一個HiTSDB創建,我們會收到KeeperException。這樣代表未獲得鎖,馬上返回失敗。 如果未成功獲得DataLock,該節點上的數據遷移進程就會自動退出。成果獲得DataLock的節點則開始進行數據遷移。

4.3 數據遷移中的"執行一次"

當所有「Salt」的數據點全部順利完成遷移之後,我們會在HBase的舊錶中插入一行新數據,data_conversion_completed。 此行代表了數據遷移工程全部順利完成。同時自動腳本會每隔12個小時啟動數據遷移工具,這樣是為了防止上次數據遷移沒有全部完成。 每次啟動時,我們都會先檢查「data_conversion_completed」標誌。如果標誌存在,工具就會馬上退出。 此項操作只會進行一次HBase的查詢,比正常的健康檢查成本還要低。所以周期性的啟動數據遷移工具並不會對HiTSDB或者HBase產生影響。

4.4. 數據遷移的評測

測試機型: 4core,8G,SSD

效果:上線後無故障完成100+實例數據的遷移,熱升級。


測試環境配置

192.168.12.3 2.1.5版本

192.168.12.4 2.2.0版本(Pipelined Engine)

測試數據 - 1萬條時間,不同的採集頻率和時間窗口,還有查詢命中的時間線數量。

Case 1: 數據採集頻率5s, 查詢命中1000條,時間窗口3600s

Case 2: 數據採集頻率1s,查詢命中1條,時間窗口36000s

總結: 新的查詢聚合引擎將查詢速度提高了10倍以上。


本文介紹了高性能時間序列資料庫HiTSDB引擎在商業化運營之前進行的優化升級,目的是提高HiTSDB引擎的穩定性,數據寫入和查詢性能以及新功能的擴展性。HiTSDB已經在阿里雲正式商業化運營,我們將根據用戶反饋,進一步提高HiTSDB引擎,更好服務於HiTSDB的客戶。


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 阿里技術 的精彩文章:

開放下載!《阿里巴巴Android開發手冊》正式發布
首次公開!深度學習在知識圖譜構建中的應用

TAG:阿里技術 |