當前位置:
首頁 > 知識 > 使用Spring Cloud和Reactor在微服務中實現Event Sourcing

使用Spring Cloud和Reactor在微服務中實現Event Sourcing

當在微服務架構中構建應用時,狀態管理成為分散式系統的問題,相比於傳統monolithic應用,將狀態管理通過事務機制實現,微服務能夠通過一種跨多個不同應用和資料庫的新的分散式事務來管理一致性。

這篇文章中,我們將看看微服務中的數據一致性和高可用性的權衡。本文以一個在線電子商務網站為案例,使用Spring Boot和Spring Cloud實現,展示如何使用Reactor在微服務中實現Reactive流,從而通過event sourcing事件溯源實現分散式事務,最後,我們會使用Docker 和 Maven構建、運行、協調多容器 應用。

由於每個微服務可能擁有自己的專有資料庫,微服務本身雖然是無狀態的,但是微服務管理的狀態是保存在自己資料庫中,如果不同微服務之間需要共享狀態,也就是說需要訪問同一個資料庫的數據表怎麼辦?或者說,多個微服務需要訪問同一個微服務以便獲得同一個狀態怎麼辦?

最終一致性Eventual Consistency

當建立微服務時,我們被強迫面對狀態的最終一致性問題,這是因為每個微服務都擁有自己的資料庫資源,每個資料庫都配置了不同的一致性和可用性權衡策略。

最終一致性是一種用於描述在分散式系統中數據的操作模型,在分散式系統中狀態是被複制然後跨網路多節點保存,在關係資料庫集群中,最終一致性被用來在集群多個節點之間協調數據複製的寫操作,資料庫集群中這種寫操作挑戰是:各個節點接受到的寫操作必須嚴格按照複製的次序進行,這個次序是有時間損耗的,從這個角度看,資料庫在集群節點之間的這種狀態複製還是可以被認為是一種最終一致性,所有節點狀態在未來某個時刻最終匯聚到一個一致性狀態,也就是說,最終達成狀態一致性。

當構建微服務時,最終一致性是開發者 DBA和架構師頻繁打交道的問題,當開始在分散式系統中進行狀態處理時,頭疼問題更加嚴重。核心問題是:

如何在保證數據一致性基礎上保證高可用性呢?

事務日誌

幾乎所有資料庫都支持高可用性集群,大多數資料庫對系統一致性模型提供一個易於理解的方式,保證強一致性模型的安全方式是維持資料庫事務操作的有序日誌,理論上理由非常簡單,一個事務日誌是一系列數據更新操作的動作有序記錄集合,當其他節點從主節點獲得這個事務日誌時,能夠按照這種有序動作集合重新播放這些操作,從而更新自己所在節點的資料庫狀態,當這個事務日誌完成後,次節點的狀態最終會和主節點狀態一致,

這種事務日誌非常類似於財務中記賬模型,或者類似銀行儲蓄卡列印出來的流水賬,哪天存入一筆鈔票(更新操作),哪天又提取了一筆鈔票(更新操作),最後當前餘額是多少(代表資料庫當前狀態)。

使用Spring Cloud和Reactor在微服務中實現Event Sourcing

Event Sourcing

Event sourcing事件溯源是借鑒資料庫事務日誌的一種數據持久方式,在ES中,事務單元變得更細粒度,使用一系列有序的事件來代表存儲在資料庫中的領域模型狀態,一旦一個事件被加入事件日誌,它就不能被移走或重新排序,事件被認為是不可變的,事件序列只能被追加方式存儲。

因為微服務將系統切分成一個個松耦合的小系統,每個系統後面都獨佔自己的資料庫,雖然,微服務是無態的,但是它需要操作自己資料庫的狀態,如何保證微服務之間操作資料庫數據的一致性成了微服務實踐中重要問題,使用ES能夠幫助我們實現這點。.

聚合可以被認為是產生任何對象的一致性狀態,它提供校訂方法用來進行重播產生對象中狀態變化的歷史。它能使用事件流提供分析數據許多必要輸入,能夠採取補償方式對不一致應用狀態實現事件回滾。

Reactor

Project Reactor 是一個開源基於JVM實現Reactive流規範的開發框架,是Spring生態系統一個成員,在微服務中,經常在一個上下文下需要和其他微服務交互操作,由於微服務架構天然屬性是最終一致性,而最終一致性並不保證數據的安全性。它提供我們一個使用非同步非堵塞方式進行通訊的方式,這裡正是使用Reactor目的所在。

很少情況下,領域模型狀態會被跨微服務共享,但是如果在微服務之間需要共享狀態怎麼辦?或者說多個微服務需要訪問同一個資料庫數據表怎麼辦?在微服務中ES只保存有序事件的日誌,使用事件流取代領域模型在資料庫中存儲,我們可以存儲有序事件流來代表對象的狀態,這樣,意味著我們就不再使用基於HTTP的RESTful進行微服務之間同步通訊,這些同步會造成堵塞和性能延遲。

代碼案例

我們以電子商務商店中購物車服務為案例,展示Reactor + ES是如何實現的:

購物車服務Shopping Cart Service是一個MYSQL資料庫擁有者,有一個數據表稱為cart_event. 這個表包含用戶操作動作產生的有序事件日誌,用戶操作就是反覆將商品加入購物車或去除等各種購物車管理操作。

購物車事件有如下:

// These will be the events that are stored in the event log for a cart
public enum CartEventType {
ADD_ITEM,
REMOVE_ITEM,
CLEAR_CART,
CHECKOUT
}

CartEventType是枚舉類型,已經列出了4種不同的事件類型。這些事件類型中的每一個都代表用戶在購物車上執行的動作。根據ES,這些購物車事件可以影響用戶的購物車的最終狀態結果。當用戶添加或刪除一個商品條碼到他們的購物車時,一個動作產生一個事件,會對購物車中進行遞增或遞減一行條目。當這些事件使用同樣順序進行回放時,同樣一系列的條目會被重新創建或刪除:

使用Spring Cloud和Reactor在微服務中實現Event Sourcing

idcreated_atlast_modifiedcart_event_typeproduct_idquantityuser_id1146099097164514609909716450SKU-12464202146099281639814609928163981SKU-12464103146099282647414609928264740SKU-12464204146099283287214609928328720SKU-12464205146099283602714609928360271SKU-1246450

我們看到每行都有一個唯一時間戳來確保嚴格順序,使用整數來代表4個購物車事件類型,product_id 和數據quantity都是每次加入購物車的商品條碼信息。

那麼使用什麼存儲庫保存事件流?目前ES方面標準的存儲庫是 Apache Kafka。我們使用它來存儲我們的事件序列。也就是說,微服務之間共享狀態是通過共享Kafka的事件日誌實現的。


下面我們回到購物車,購物車微服務提供一個REST API方法接受來自Web端的事件。Web端發出事件的控制器 ShoppingCartControllerV1.java:

@RequestMapping(path = "/events", method = RequestMethod.POST)
public ResponseEntity addCartEvent(@RequestBody CartEvent cartEvent) throws Exception {
return Optional.ofNullable(shoppingCartService.addCartEvent(cartEvent))
.map(event -> new ResponseEntity(HttpStatus.NO_CONTENT))
.orElseThrow(() -> new Exception("Could not find shopping cart"));
}

在上面的代碼示例,我們定義了一個用於收集來自客戶端新的CartEvent對象的控制器方法。這種方法的目的是在向事件日誌追加事件。當客戶端調用REST API檢索用戶的購物車,它將產生一個購物車聚合,使用Reactive流合併了所有購物車事件流。

下面在ShoppingCartServiceV1.java中使用Reactor產生購物車事件流:

public ShoppingCart aggregateCartEvents(User user, Catalog catalog) throws Exception {

// 從kafka獲得某個用戶的購物車操作事件流
Flux<CartEvent> cartEvents =
Flux.fromStream(cartEventRepository.getCartEventStreamByUser(user.getId()));

//執行事件流的事件直至最後一個事件發生的最終狀態。也就是購物車的最終狀態
ShoppingCart shoppingCart = cartEvents
.takeWhile(cartEvent -> !ShoppingCart.isTerminal(cartEvent.getCartEventType()))
.reduceWith(() -> new ShoppingCart(catalog), ShoppingCart::incorporate)
.get();

// Generate the list of line items in the cart from the aggregate
shoppingCart.getLineItems();

return shoppingCart;
}

在上面的代碼示例中,我們可以看到三個步驟來生成購物車對象,然後返回到客戶端。第一步是從事件存儲的數據源中創建一個Reactive流。一旦流建立,我們可以從事件流中產生我們的聚合。這些事件流不斷改變購物車狀態直至到最終狀態,然後就可以將最終購物返回給用戶客戶端。

在reactive流的聚合的reduce中,我們使用了一個稱為incorporate方法,這個方法是接受CartEvent對象,而CartEvent對象是用來改變購物車狀態的

下面是ShoppingCart.java:

public ShoppingCart incorporate(CartEvent cartEvent) {

// Remember that thing about safety properties in microservices?
Flux<CartEventType> validCartEventTypes =
Flux.fromStream(Stream.of(CartEventType.ADD_ITEM,
CartEventType.REMOVE_ITEM));

// The CartEvent"s type must be either ADD_ITEM or REMOVE_ITEM
if (validCartEventTypes.exists(cartEventType ->
cartEvent.getCartEventType().equals(cartEventType)).get()) {

// Update the aggregate view of each line item"s quantity from the event type
productMap.put(cartEvent.getProductId(),
productMap.getOrDefault(cartEvent.getProductId(), 0) +
(cartEvent.getQuantity() * (cartEvent.getCartEventType()
.equals(CartEventType.ADD_ITEM) ? 1 : -1)));
}

// Return the updated state of the aggregate to the reactive stream"s reduce method
return this;
}

在上面代碼中我們看到ShoppingCart的incorporate方法實現,我們接受一個CartEvent對象然後確保事件類型是正確的,我們確保事件類型是 ADD_ITEM 或 REMOVE_ITEM.

下一步是更新購物車中每個條目的聚合視圖,通過映射相應的事件類型到商品條目的數量遞增或遞減。最後我們返回這樣一個帶有最終可變狀態的購物車給客戶端。

整個源碼案例:

https://github.com/kbastani/spring-cloud-event-sourcing-example

你可以使用 Docker Compose 運行這個案例,步驟如下:

首先下載Docker, 使用下面命令初始化一個虛擬VM:

$ docker-machine create env event-source-demo --driver virtualbox --virtualbox-memory "11000" --virtualbox-disk-size "100000"
$ eval "$(docker-machine env event-source-demo)"

安裝好下面必備組件:

  • Maven 3
  • Java 8
  • Docker
  • Docker Compose

在下載源碼項目根目錄執行:

sh run.sh

系統會自動下載依賴和安裝。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

sprintboot+mybatis踩坑:查詢不到數據list「null」——支持駝峰配置
Spring Cloud服務發現與服務註冊Eureka + Eureka Server的搭建

TAG:程序員小新人學習 |