可靠的微服務資料交換與發件箱模式 ·Debezium
作為其業務邏輯的一部分,微服務通常不僅需要更新自己的本地資料儲存,而且還需要向其他服務通知發生的資料更改。發件箱模式描述了一種讓服務以安全和一致的方式執行這兩項任務的方法; 它為源服務提供即時“讀取您自己的寫入”語義,同時提供跨服務邊界的可靠,最終一致的資料交換。
如果你已經構建了幾個微服務,你可能會同意 最困難的部分是資料 :微服務不是孤立存在的,而且往往需要在彼此之間傳播資料和資料變化。
例如,考慮一個管理採購訂單的微服務:當下新訂單時,可能必須將有關該訂單的資訊轉發給貨運服務(因此它可以組裝一個或多個訂單的貨件)和客戶服務(因此它可以根據新訂單更新客戶的總貸方餘額等事項。
有不同的方法讓訂單服務瞭解其他兩個關於新採購訂單的方法; 例如,它可以呼叫這些服務提供的一些 REST , grpc 或其他(同步)API。但是,這可能會產生一些不希望的耦合:傳送服務必須知道要呼叫哪些其他服務以及在哪裡找到它們。它也必須準備好暫時無法使用這些服務。通過提供請求路由,重試,斷路器等功能, Istio 等服務 網路 可以在這裡提供幫助。
任何同步方法的一般問題是,如果沒有它呼叫的其他服務,一個服務就無法真正執行。雖然緩衝和重試可能有助於僅需要通知某些事件的其他服務,但如果服務實際上需要查詢其他服務以獲取資訊,則情況並非如此。例如,當下訂單時,訂單服務可能需要從庫存服務獲得所購買商品庫存的次數。
這種同步方法的另一個缺點是它缺乏可重玩性,即新消費者在事件傳送後到達並且仍然能夠從頭開始消費整個事件流的可能性。
這兩個問題都可以通過使用非同步資料交換方法來解決:即讓訂單,庫存和其他服務通過持久的訊息日誌(如 Apache Kafka) 傳播事件。通過訂閱這些事件流,將通知每個服務有關其他服務的資料更改。它可以對這些事件做出反應,並且如果需要,可以使用針對其自身需求定製的表示在其自己的資料儲存中建立該資料的本地表示。例如,這種檢視可以被非規範化以有效地支援特定的訪問模式,或者它可以僅包含與消費服務相關的原始資料的子集。
持久日誌還支援可重複播放性,即可以根據需要新增新的消費者,從而實現您最初可能沒有想到的用例,而無需觸及源服務。例如,考慮一個數據倉庫,該資料倉庫應保留有關所有訂單的資訊,或基於 Elasticsearch的 採購訂單上的一些全文搜尋功能。一旦採購訂單事件出現在Kafka主題中(Kafka主題的保留策略設定可用於確保事件保留在主題中,只要它對於給定的用例和業務要求是必需的),新的消費者可以訂閱,處理主題從一開始就實現了微服務資料庫,搜尋索引,資料倉庫等所有資料的檢視。
處理主題增長
根據資料量(記錄的數量和大小,變化的頻率),將事件長時間或甚至無限期地保留在主題中可能是也可能不可行。通常,在給定時間點之後,與給定資料項(例如,特定購買訂單)有關的一些或甚至所有事件可能有資格從商業角度刪除。請參閱下面的“從Kafka主題中刪除事件”框,瞭解有關從Kafka主題中刪除事件的更多想法,以便將其大小保持在範圍內
雙重寫的問題
微服務為了提供它們的業務功能,微服務通常具有它們自己的本地資料儲存。例如,訂單服務可以使用關係資料庫來持久儲存關於採購訂單的資訊。放置新訂單時,這可能會導致服務資料庫INSERT中的表PurchaseOrder中的操作。同時,服務可能希望向Apache Kafka傳送有關新訂單的事件,以便將該資訊傳播給其他感興趣的服務。
但是,簡單地發出這兩個請求可能會導致潛在的不一致。原因是我們不能擁有一個跨越服務資料庫和Apache Kafka的共享事務,因為後者不支援在分散式(XA)事務中加入。因此,在不幸的情況下,我們最終可能會在本地資料庫中保留新的採購訂單,但沒有將相應的訊息傳送給Kafka(例如由於某些網路問題)。或者,反過來說,我們可能已將訊息傳送給Kafka但未能在本地資料庫中保留採購訂單。兩種情況都是不可取的 這可能導致無法為看似成功下單的訂單建立貨件。或者貨物被建立,
那麼如何避免這種情況呢?答案是隻修改兩個資源(資料庫或Apache的卡夫卡)中一個,然後以最終一致的方式驅動第二個的更新。
讓我們首先考慮只寫入Apache Kafka的情況。
當收到新的採購訂單時,訂單服務不會INSERT同步進入其資料庫; 相反,它只會向Kafka主題傳送描述新訂單的事件。因此,一次只能修改一個資源,如果出現問題,我們會立即發現它並向訂單服務的呼叫方報告請求失敗。
同時,服務本身將訂閱該Kafka主題。這樣,當新訊息到達主題時它將被通知,並且它可以在其資料庫中保留新的採購訂單。
但是,這裡有一個微妙的挑戰,那就是缺乏“讀你自己的寫入資料”語義:例如,我們假設訂單服務還有一個API,用於搜尋給定客戶的所有采購訂單。在放置新訂單後立即呼叫該API時,由於處理來自Kafka主題的訊息的非同步性質,可能會發生採購訂單尚未保留在服務的資料庫中,因此該查詢不會返回該訂單。這可能導致非常混亂的使用者體驗,因為使用者例如可能錯過他們的購物歷史中新放置的訂單。
有辦法處理這種情況,例如,服務可以將新放置的採購訂單保留在記憶體中並基於此資料響應後續查詢。儘管在實現更復雜的查詢或考慮訂單服務可能還包括群集設定中的多個節點時,就需要在群集內傳播該資料。
現在,我們看看另外一個方式,只是同步寫入資料庫並基於此驅動向Apache Kafka匯出訊息的情況會怎樣?這是發件箱模式的用武之地。
發件箱模式
這種方法的想法是在服務的資料庫中有一個“發件箱”表。當接收到下訂單的請求時,不僅INSERT進入PurchaseOrder表中,而且,作為同一事務的一部分,還將表示要傳送的事件的記錄插入該發件箱表中。
該記錄描述了服務中發生的事件,例如它可能是一個JSON結構,表示已經放置了新的採購訂單,包括訂單本身的資料,訂單行以及上下文資訊(如使用情況)案例識別符號。通過通過發件箱表中的記錄顯式地發出事件,可以確保以適合外部消費者的方式構造事件。這也有助於確保事件使用者在例如更改內部域模型或PurchaseOrder表時不會中斷。
非同步程序監視該表以查詢新條目。如果有,它會將事件作為訊息傳播到Apache Kafka。這為我們提供了非常好的特性平衡:通過同步寫入PurchaseOrder表,源服務受益於“讀取您自己的寫入”語義。一旦提交了第一個交易,後續的採購訂單查詢將返回新的持久訂單。與此同時,我們通過Apache Kafka獲得可靠,非同步,最終一致的資料傳播到其他服務。
現在,發件箱模式實際上並不是一個新想法。它已經使用了相當長的一段時間。實際上,即使使用實際上可以參與分散式事務的JMS樣式的訊息代理,也可以避免任何耦合以及遠端資源(如訊息代理)的停機時間的潛在影響。您還可以在Chris Richardson優秀的 microservices.io 網站上找到該模式的描述。
然而,該模式得到的關注遠遠少於它應得的,並且在微服務環境中尤其有用。正如我們所看到的,可以使用變更資料捕獲和Debezium以非常優雅和有效的方式實現發件箱模式。在下面,讓我們探討如何。
基於變更資料捕獲的實現
基於日誌的變更資料捕獲 (CDC)非常適合捕獲發件箱表中的新條目並將其流式傳輸到Apache Kafka。與任何基於輪詢的方法相反,事件捕獲在近實時中以非常低的開銷發生。Debezium附帶了幾個資料庫的 CDC聯結器 ,如MySQL,Postgres和SQL Server。以下示例將使用 Postberes的Debezium聯結器 。
您可以在GitHub上找到 該示例 的完整 原始碼 。有關構建和執行示例程式碼的詳細資訊,請參閱 README.md 。該示例以兩個微服務, 訂單服務 和 發貨服務為中心 。兩者都是用Java實現的,使用 CDI 作為元件模型,使用JPA / Hibernate訪問各自的資料庫。訂單服務在 WildFly上 執行,並公開一個簡單的REST API,用於下訂單和取消特定訂單行。它使用Postgres資料庫作為其本地資料儲存。裝運服務基於 Thorntail ; 通過Apache Kafka,它接收訂單服務匯出的事件,並在自己的MySQL資料庫中建立相應的貨件條目。Debezium對訂單服務的Postgres資料庫的事務日誌(“預寫日誌”,WAL)進行了定製,以便捕獲發件箱表中的任何新事件並將它們傳播到Apache Kafka。
解決方案的整體架構如下圖所示:
請注意,該模式與這些特定的實現選擇無關。使用Spring Boot(例如利用Spring Data 對域事件 的 支援 ),普通JDBC或除Java之外的其他程式語言等替代技術同樣可以實現。
現在讓我們仔細看看解決方案的一些相關元件。
發件箱表
該outbox表位於訂單服務的資料庫中,具有以下結構:
Column|Type| Modifiers --------------+------------------------+----------- id| uuid| not <b>null</b> aggregatetype | character varying(255) | not <b>null</b> aggregateid| character varying(255) | not <b>null</b> type| character varying(255) | not <b>null</b> payload| jsonb| not <b>null</b>
它的列是這些:
- id:每條訊息的唯一ID; 消費者可以使用它來檢測任何重複事件,例如在故障後重新啟動以讀取訊息時。在建立新事件時生成。
- aggregatetype:與給定事件相關的聚合根的型別; 理念是,依賴於領域驅動設計的相同概念,匯出事件應該引用聚合( “可以被視為單個單元的域物件叢集”
),其中聚合根提供唯一的入口點用於訪問聚合中的任何實體。例如,這可以是“採購訂單”或“客戶”。
此值將用於將事件路由到Kafka中的相應主題,因此會有與採購訂單相關的所有事件的主題,所有與客戶相關的事件的一個主題等。請注意,還包含與子實體相關的事件一個這樣的聚合應該使用相同的型別。因此,例如,表示取消單個訂單行(它是採購訂單彙總的一部分)的事件也應該使用其聚合根的型別“訂單”,以確保此事件也將進入“訂單”Kafka主題。
- aggregateid:受給定事件影響的聚合根的id; 例如,這可以是採購訂單的ID或客戶ID; 與聚合型別類似,與聚合中包含的子實體相關的事件應使用包含聚合根的id,例如訂單行取消事件的採購訂單ID。此ID將在以後用作Kafka訊息的金鑰。這樣,與一個聚合根或其任何包含的子實體相關的所有事件都將進入該Kafka主題的同一分割槽,這將確保該主題的使用者將消耗與該主題中的同一聚合相關的所有事件。生產時的確切順序。
- type:事件型別,例如“訂單已建立”或“訂單行已取消”。允許消費者觸發合適的事件處理程式。
- payload:具有實際事件內容的JSON結構,例如包含採購訂單,有關購買者的資訊,包含的訂單行,其價格等。
將事件傳送到發件箱
為了“傳送”事件到發件箱,訂單服務中的程式碼通常只能INSERT進入發件箱表。但是,最好採用稍微抽象的API,如果需要,可以在以後更輕鬆地調整發件箱的實現細節。 CDI活動 非常方便。它們可以在應用程式程式碼中引發,並由發件箱事件傳送者同步處理,它將INSERT在發件箱表中執行所需操作。
所有發件箱事件型別都應實現以下合同,類似於之前顯示的發件箱表的結構:
<b>public</b> <b>interface</b> ExportedEvent { String getAggregateId(); String getAggregateType(); JsonNode getPayload(); String getType(); }
為了產生這樣的事件,應用程式程式碼使用注入的Event例項,例如在OrderService類中:
@ApplicationScoped <b>public</b> <b>class</b> OrderService { @PersistenceContext <b>private</b> EntityManager entityManager; @Inject <b>private</b> Event<ExportedEvent> event; @Transactional <b>public</b> PurchaseOrder addOrder(PurchaseOrder order) { order = entityManager.merge(order); event.fire(OrderCreatedEvent.of(order)); event.fire(InvoiceCreatedEvent.of(order)); <b>return</b> order; } @Transactional <b>public</b> PurchaseOrder updateOrderLine(<b>long</b> orderId, <b>long</b> orderLineId, OrderLineStatus newStatus) { <font><i>// ...</i></font><font> } } </font>
在該addOrder()方法中,JPA實體管理器用於在資料庫中保留傳入的訂單,並且注入event用於觸發相應的OrderCreatedEvent和InvoiceCreatedEvent。同樣,請記住,儘管存在“事件”的概念,但這兩件事情發生在同一個事務中。即在此交易中,將在資料庫中插入三條記錄:一張在帶有采購訂單的表中,另一張在發件箱表中。
實際的事件實現是直截了當的; 例如,這是OrderCreatedEvent類:
<b>public</b> <b>class</b> OrderCreatedEvent implements ExportedEvent { <b>private</b> <b>static</b> ObjectMapper mapper = <b>new</b> ObjectMapper(); <b>private</b> <b>final</b> <b>long</b> id; <b>private</b> <b>final</b> JsonNode order; <b>private</b> OrderCreatedEvent(<b>long</b> id, JsonNode order) { <b>this</b>.id = id; <b>this</b>.order = order; } <b>public</b> <b>static</b> OrderCreatedEvent of(PurchaseOrder order) { ObjectNode asJson = mapper.createObjectNode() .put(<font>"id"</font><font>, order.getId()) .put(</font><font>"customerId"</font><font>, order.getCustomerId()) .put(</font><font>"orderDate"</font><font>, order.getOrderDate().toString()); ArrayNode items = asJson.putArray(</font><font>"lineItems"</font><font>); <b>for</b> (OrderLine orderLine : order.getLineItems()) { items.add( mapper.createObjectNode() .put(</font><font>"id"</font><font>, orderLine.getId()) .put(</font><font>"item"</font><font>, orderLine.getItem()) .put(</font><font>"quantity"</font><font>, orderLine.getQuantity()) .put(</font><font>"totalPrice"</font><font>, orderLine.getTotalPrice()) .put(</font><font>"status"</font><font>, orderLine.getStatus().name()) ); } <b>return</b> <b>new</b> OrderCreatedEvent(order.getId(), asJson); } @Override <b>public</b> String getAggregateId() { <b>return</b> String.valueOf(id); } @Override <b>public</b> String getAggregateType() { <b>return</b> </font><font>"Order"</font><font>; } @Override <b>public</b> String getType() { <b>return</b> </font><font>"OrderCreated"</font><font>; } @Override <b>public</b> JsonNode getPayload() { <b>return</b> order; } } </font>
請注意 Jackson 如何ObjectMapper用於建立事件有效負載的JSON表示。
現在讓我們看看消耗任何被啟用的程式碼ExportedEvent並對outbox表進行相應的寫操作:
@ApplicationScoped <b>public</b> <b>class</b> EventSender { @PersistenceContext <b>private</b> EntityManager entityManager; <b>public</b> <b>void</b> onExportedEvent(@Observes ExportedEvent event) { OutboxEvent outboxEvent = <b>new</b> OutboxEvent( event.getAggregateType(), event.getAggregateId(), event.getType(), event.getPayload() ); entityManager.persist(outboxEvent); entityManager.remove(outboxEvent); } }
它相當簡單:對於每個事件,CDI執行時將呼叫該onExportedEvent()方法。OutboxEvent實體的一個例項持久儲存在資料庫中 - 並立即刪除!
起初這可能會令人驚訝。但是,在記住基於日誌的CDC如何工作時,它是有意義的:它不會檢查資料庫中表的實際內容,而是會關閉僅附加事務日誌。一旦事務提交,呼叫persist()並將在日誌中remove()建立一個INSERT和一個DELETE條目。之後,Debezium將處理這些事件:對於任何事件INSERT,具有事件有效負載的訊息將被髮送到Apache Kafka。DELETE另一方面,事件可以被忽略,因為從發件箱表中刪除僅僅是技術性,不需要任何傳播到訊息代理。因此,我們可以通過CDC捕獲新增到發件箱表中的事件,但是當查看錶本身的內容時,它將始終為空。這意味著表格不需要額外的磁碟空間(除了將在某個時刻自動丟棄的日誌檔案元素),也不需要單獨的管理過程來阻止它無限增長。
註冊Debezium聯結器
有了outbox實現,就可以註冊Debezium Postgres聯結器了,這樣它就可以捕獲發件箱表中的任何新事件並將它們轉發給Apache Kafka。這可以通過將以下JSON請求釋出到Kafka Connect的REST API來完成:
{ <font>"name"</font><font>: </font><font>"outbox-connector"</font><font>, </font><font>"config"</font><font>: { </font><font>"connector.class"</font><font> : </font><font>"io.debezium.connector.postgresql.PostgresConnector"</font><font>, </font><font>"tasks.max"</font><font> : </font><font>"1"</font><font>, </font><font>"database.hostname"</font><font> : </font><font>"order-db"</font><font>, </font><font>"database.port"</font><font> : </font><font>"5432"</font><font>, </font><font>"database.user"</font><font> : </font><font>"postgresuser"</font><font>, </font><font>"database.password"</font><font> : </font><font>"postgrespw"</font><font>, </font><font>"database.dbname"</font><font> : </font><font>"orderdb"</font><font>, </font><font>"database.server.name"</font><font> : </font><font>"dbserver1"</font><font>, </font><font>"schema.whitelist"</font><font> : </font><font>"inventory"</font><font>, </font><font>"table.whitelist"</font><font> : </font><font>"inventory.outboxevent"</font><font>, </font><font>"tombstones.on.delete"</font><font> : </font><font>"false"</font><font>, </font><font>"transforms"</font><font> : </font><font>"router"</font><font>, </font><font>"transforms.router.type"</font><font> : </font><font>"io.debezium.examples.outbox.routingsmt.EventRouter"</font><font> } } </font>
這將設定一個例項io.debezium.connector.postgresql.PostgresConnector,從指定的Postgres例項捕獲更改。請注意,通過表白名單,僅outboxevent捕獲表中的更改。它還應用了名為的單個訊息轉換(SMT)EventRouter。
刪除Kafka主題中的事件
通過設定tombstones.on.deleteto false,當從發件箱表中刪除事件記錄時,聯結器將不會發出刪除標記(“tombstones”)。這是有道理的,因為從發件箱表中刪除不應影響相應Kafka主題中事件的保留。相反,可以在Kafka中配置事件主題的特定保留時間,例如,將所有采購訂單事件保留30天。
或者,可以使用壓縮的主題。這需要對發件箱表中的事件設計進行一些更改:
他們必須描述整個集合; 因此,例如,表示取消單個訂單行的事件也應描述包含採購訂單的完整當前狀態; 這樣,在日誌壓縮執行之後,當只看到與給定訂單有關的最後一個事件時,消費者也能夠獲得採購訂單的整個狀態。
它們必須還有一個boolean屬性,指示特定事件是否表示刪除事件的聚合根。OrderDeleted然後,可以由下一節中描述的事件路由SMT使用這樣的事件(例如型別)來為該聚合根生成刪除標記。然後,當OrderDeleted事件已寫入主題時,日誌壓縮將刪除與給定採購訂單相關的所有事件。
當然,在刪除事件時,事件流將不再從一開始就可以重新播放。根據具體的業務需求,僅保留給定採購訂單,客戶等的最終狀態可能就足夠了。這可以通過使用壓縮的主題和主題delete.retention.ms設定的足夠值來實現。另一個選擇可能是將歷史事件移動到某種冷儲存(例如Amazon S3儲存桶),如果需要可以從中檢索它們,然後從Kafka主題中讀取最新事件。採用哪種方法取決於開發和執行解決方案的團隊的具體要求,預期資料量和專業知識。
主題路由
預設情況下,Debezium聯結器會將源自一個給定表的所有更改事件傳送到同一主題,即我們最終會得到一個名為Kafka的主題dbserver1.inventory.outboxevent,該主題將包含所有事件,包括訂單事件,客戶事件等。
為了簡化僅對特定事件型別感興趣的消費者的實現,更有意義的是,具有多個主題,例如OrderEvents,CustomerEvents等等。例如,裝運服務可能對任何客戶事件不感興趣。通過僅訂閱該OrderEvents主題,它將確保永遠不會收到任何客戶事件。
為了將從發件箱表捕獲的更改事件路由到不同的主題,使用該自定義SMT EventRouter。以下是其apply()方法的程式碼,Kafka Connect將為Debezium聯結器發出的每條記錄呼叫它:
@Override <b>public</b> R apply(R record) { <font><i>// Ignoring tombstones just in case</i></font><font> <b>if</b> (record.value() == <b>null</b>) { <b>return</b> record; } Struct struct = (Struct) record.value(); String op = struct.getString(</font><font>"op"</font><font>); </font><font><i>// ignoring deletions in the outbox table</i></font><font> <b>if</b> (op.equals(</font><font>"d"</font><font>)) { <b>return</b> <b>null</b>; } <b>else</b> <b>if</b> (op.equals(</font><font>"c"</font><font>)) { Long timestamp = struct.getInt64(</font><font>"ts_ms"</font><font>); Struct after = struct.getStruct(</font><font>"after"</font><font>); String key = after.getString(</font><font>"aggregateid"</font><font>); String topic = after.getString(</font><font>"aggregatetype"</font><font>) + </font><font>"Events"</font><font>; String eventId = after.getString(</font><font>"id"</font><font>); String eventType = after.getString(</font><font>"type"</font><font>); String payload = after.getString(</font><font>"payload"</font><font>); Schema valueSchema = SchemaBuilder.struct() .field(</font><font>"eventType"</font><font>, after.schema().field(</font><font>"type"</font><font>).schema()) .field(</font><font>"ts_ms"</font><font>, struct.schema().field(</font><font>"ts_ms"</font><font>).schema()) .field(</font><font>"payload"</font><font>, after.schema().field(</font><font>"payload"</font><font>).schema()) .build(); Struct value = <b>new</b> Struct(valueSchema) .put(</font><font>"eventType"</font><font>, eventType) .put(</font><font>"ts_ms"</font><font>, timestamp) .put(</font><font>"payload"</font><font>, payload); Headers headers = record.headers(); headers.addString(</font><font>"eventId"</font><font>, eventId); <b>return</b> record.newRecord(topic, <b>null</b>, Schema.STRING_SCHEMA, key, valueSchema, value, record.timestamp(), headers); } </font><font><i>// not expecting update events, as the outbox table is "append only",</i></font><font> </font><font><i>// i.e. event records will never be updated</i></font><font> <b>else</b> { <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Record of unexpected op type: "</font><font> + record); } } </font>
當收到刪除事件(op= d)時,它將丟棄該事件,因為從發件箱表中刪除事件記錄與下游消費者無關。收到建立事件(op= c)時,事情變得更有趣。這樣的記錄將傳播到Apache Kafka。
Debezium的更改事件具有複雜的結構,包含所表示行的old(before)和new(after)狀態。要傳播的事件結構是從after狀態獲得的。在aggregatetype從捕獲的事件記錄值被用來構建主題的名稱將事件傳送到。例如,aggregatetype設定為的事件Order將傳送到OrderEvents主題。aggregateid用作訊息金鑰,確保該聚合的所有訊息都將進入該主題的同一分割槽。訊息值是包含原始事件有效負載(編碼為JSON)的結構,指示事件何時生成的時間戳和事件型別。最後,事件UUID作為Kafka頭欄位傳播。這允許消費者進行有效的重複檢測,而不必檢查實際的訊息內容。
Apache Kafka中的事件
現在讓我們來看看OrderEvents和CustomerEvents主題。
如果您已經檢查了示例源並通過Docker Compose啟動了所有元件(請參閱示例專案中的README.md檔案以獲取更多詳細資訊),您可以通過訂單服務的REST API下載採購訂單,如下所示:
cat resources/data/create-order-request.json | http POST http:<font><i>//localhost:8080/order-service/rest/orders</i></font><font> </font>
同樣,可以取消特定的訂單行:
cat resources/data/cancel-order-line-request.json | http PUT http:<font><i>//localhost:8080/order-service/rest/orders/1/lines/2</i></font><font> </font>
當使用諸如非常實用的 kafkacat 實用程式之類的工具時,您現在應該在OrderEvents主題中看到類似這樣的訊息:
kafkacat -b kafka:9092 -C -o beginning -f 'Headers: %h\nKey: %k\nValue: %s\n' -q -t OrderEvents
Headers: eventId=d03dfb18-8af8-464d-890b-09eb8b2dbbdd Key: <font>"4"</font><font> Value: {</font><font>"eventType"</font><font>:</font><font>"OrderCreated"</font><font>,</font><font>"ts_ms"</font><font>:1550307598558,</font><font>"payload"</font><font>:</font><font>"{\"id\": 4, \"lineItems\": [{\"id\": 7, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 8, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \㠃-01-31T12:13:01\", \"customerId\": 123}"</font><font>} Headers: eventId=49f89ea0-b344-421f-b66f-c635d212f72c Key: </font><font>"4"</font><font> Value: {</font><font>"eventType"</font><font>:</font><font>"OrderLineUpdated"</font><font>,</font><font>"ts_ms"</font><font>:1550308226963,</font><font>"payload"</font><font>:</font><font>"{\"orderId\": 4, \"newStatus\": \"CANCELLED\", \"oldStatus\": \"ENTERED\", \"orderLineId\": 7}"</font><font>} </font>
payload具有訊息值的欄位是原始事件的字串ified JSON表示。Debezium Postgres聯結器將JSONB列作為字串發出(使用io.debezium.data.Json邏輯型別名稱),這就是引號被轉義的原因。 JQ 用處,更具體地說,它是fromjson操作者,用於顯示在一個更可讀的方式事件負載:
kafkacat -b kafka:9092 -C -o beginning -t Order | jq '.payload | fromjson'
{ <font>"id"</font><font>: 4, </font><font>"lineItems"</font><font>: [ { </font><font>"id"</font><font>: 7, </font><font>"item"</font><font>: </font><font>"Debezium in Action"</font><font>, </font><font>"status"</font><font>: </font><font>"ENTERED"</font><font>, </font><font>"quantity"</font><font>: 2, </font><font>"totalPrice"</font><font>: 39.98 }, { </font><font>"id"</font><font>: 8, </font><font>"item"</font><font>: </font><font>"Debezium for Dummies"</font><font>, </font><font>"status"</font><font>: </font><font>"ENTERED"</font><font>, </font><font>"quantity"</font><font>: 1, </font><font>"totalPrice"</font><font>: 29.99 } ], </font><font>"orderDate"</font><font>: </font><font>"2019-01-31T12:13:01"</font><font>, </font><font>"customerId"</font><font>: 123 } { </font><font>"orderId"</font><font>: 4, </font><font>"newStatus"</font><font>: </font><font>"CANCELLED"</font><font>, </font><font>"oldStatus"</font><font>: </font><font>"ENTERED"</font><font>, </font><font>"orderLineId"</font><font>: 7 } </font>
您還可以檢視CustomerEvents主題,以便在新增採購訂單時檢查表示建立發票的事件。
消費服務中的重複檢測
此時,我們實現的發件箱模式功能齊全; 當訂單服務收到下訂單(或取消訂單行)的請求時,它將在其資料庫的purchaseorder和orderline表中保持相應的狀態。同時,在同一事務中,相應的事件條目將新增到同一資料庫中的發件箱表中。Debezium Postgres聯結器捕獲對該表的任何插入,並將事件路由到與給定事件所代表的聚合型別相對應的Kafka主題。
為了總結,讓我們探討另一種微服務(例如貨運服務)如何使用這些訊息。該服務的切入點是常規的Kafka消費者實現,這不是太令人興奮,因此為了簡潔起見在此省略。您可以在示例儲存庫中找到其 原始碼 。對於Order主題上的每個傳入訊息,消費者呼叫OrderEventHandler:
@ApplicationScoped <b>public</b> <b>class</b> OrderEventHandler { <b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(OrderEventHandler.<b>class</b>); @Inject <b>private</b> MessageLog log; @Inject <b>private</b> ShipmentService shipmentService; @Transactional <b>public</b> <b>void</b> onOrderEvent(UUID eventId, String key, String event) { <b>if</b> (log.alreadyProcessed(eventId)) { LOGGER.info(<font>"Event with UUID {} was already retrieved, ignoring it"</font><font>, eventId); <b>return</b>; } JsonObject json = Json.createReader(<b>new</b> StringReader(event)).readObject(); JsonObject payload = json.containsKey(</font><font>"schema"</font><font>) ? json.getJsonObject(</font><font>"payload"</font><font>) :json; String eventType = payload.getString(</font><font>"eventType"</font><font>); Long ts = payload.getJsonNumber(</font><font>"ts_ms"</font><font>).longValue(); String eventPayload = payload.getString(</font><font>"payload"</font><font>); JsonReader payloadReader = Json.createReader(<b>new</b> StringReader(eventPayload)); JsonObject payloadObject = payloadReader.readObject(); <b>if</b> (eventType.equals(</font><font>"OrderCreated"</font><font>)) { shipmentService.orderCreated(payloadObject); } <b>else</b> <b>if</b> (eventType.equals(</font><font>"OrderLineUpdated"</font><font>)) { shipmentService.orderLineUpdated(payloadObject); } <b>else</b> { LOGGER.warn(</font><font>"Unkown event type"</font><font>); } log.processed(eventId); } } </font>
完成的第一件事onOrderEvent()是檢查之前是否已處理具有給定UUID的事件。如果是這樣,將忽略對該同一事件的任何進一步呼叫。這是為了防止由此資料管道的“至少一次”語義引起的任何重複事件處理。例如,在確認分別使用源資料庫或訊息傳遞代理檢索特定事件之前,可能會發生Debezium聯結器或使用服務失敗。在這種情況下,在重新啟動Debezium或消費服務之後,可能會再次處理一些事件。將事件UUID傳播為Kafka訊息頭允許有效地檢測和排除消費者中的重複。
如果第一次收到訊息,則解析訊息值,並ShippingService使用事件有效負載呼叫與特定事件型別對應的方法的業務方法。最後,訊息被標記為使用訊息日誌處理。
這MessageLog只是跟蹤服務的本地資料庫中表中所有消耗的事件:
@ApplicationScoped <b>public</b> <b>class</b> MessageLog { @PersistenceContext <b>private</b> EntityManager entityManager; @Transactional(value=TxType.MANDATORY) <b>public</b> <b>void</b> processed(UUID eventId) { entityManager.persist(<b>new</b> ConsumedMessage(eventId, Instant.now())); } @Transactional(value=TxType.MANDATORY) <b>public</b> <b>boolean</b> alreadyProcessed(UUID eventId) { <b>return</b> entityManager.find(ConsumedMessage.<b>class</b>, eventId) != <b>null</b>; } }
這樣,如果由於某種原因回滾事務,原始訊息也不會被標記為已處理,並且異常將冒泡到Kafka事件消費者迴圈。這允許稍後重新嘗試處理該訊息。
請注意,在將任何不可處理的訊息重新路由到死信佇列或類似訊息之前,更完整的實現應該只負責重試給定訊息一定次數。訊息日誌表上也應該有一些內容; 週期性地,可以刪除早於消費者與代理提交的當前偏移的所有事件,因為它確保這些訊息不會再次傳播給消費者。
總結
發件箱模式是在不同微服務之間傳播資料的好方法。
通過僅修改單個資源(源服務自己的資料庫),它避免了在不共享一個公共事務上下文(資料庫和Apache Kafka)的情況下同時更改多個資源的任何潛在不一致。通過首先寫入資料庫,源服務立即“讀取您自己的寫入”語義,這對於一致的使用者體驗很重要,允許在寫入後呼叫的查詢方法立即反映任何資料更改。
同時,該模式使非同步事件傳播到其他微服務。Apache Kafka是服務之間訊息傳遞的高度可擴充套件和可靠的主幹。給定正確的主題保留設定,新的消費者可能在最初生成事件後很長時間內出現,並根據事件歷史建立自己的本地狀態。
將Apache Kafka置於整體架構的中心也可確保所涉及服務的分離。例如,如果解決方案的單個元件失效或在一段時間內不可用,例如在更新期間,事件將在稍後處理:在重新啟動之後,Debezium聯結器將繼續從它離開的位置拖出發件箱表。之前。同樣,任何消費者都將繼續處理其先前偏移的主題。通過跟蹤已經成功處理的訊息,可以檢測重複項並從重複處理中排除重複項。
當然,不同服務之間的這種事件管道最終是一致的,即諸如運輸服務之類的消費者可能落後於諸如訂單服務之類的生產者。通常,這很好,並且可以根據應用程式的業務邏輯進行處理。例如,通常不需要在下訂單的同一秒內建立貨件。此外,整體解決方案的端到端延遲通常很低(幾秒甚至亞秒範圍),這要歸功於基於日誌的變更資料捕獲,它允許近實時傳送事件。
要記住的最後一件事是,通過發件箱公開的事件的結構應該被視為發射服務的API的一部分。即在需要時,應仔細調整其結構並考慮相容性因素。這是為了確保在升級生產服務時不會意外破壞任何消費者。同時,消費者在處理訊息時應該寬容,例如在遇到接收事件中的未知屬性時不會失敗。
非常感謝Hans-Peter Grahsl,Jiri Pechanec,Justin Holmes和RenéKerner在撰寫這篇文章時的反饋!
關於Debezium
Debezium是一個開源分散式平臺,可將現有資料庫轉換為事件流,因此應用程式幾乎可以立即檢視和響應資料庫中每個已提交的行級更改。Debezium構建於 Kafka 之上,提供 Kafka Connect 相容聯結器,可監控特定的資料庫管理系統。Debezium記錄了Kafka日誌中資料更改的歷史記錄,因此您的應用程式可以隨時停止和重新啟動,並且可以輕鬆地使用它在未執行時丟失的所有事件,從而確保正確且完整地處理所有事件。Debezium是 開源 的下 Apache許可證,版本2.0 。