從未如此簡單:10分鐘帶你逆襲Kafka!

51cto 發佈 2020-03-17T01:49:55+00:00

@KafkaListener > kafkaMessage = Optional.ofNullable); if ) {

【51CTO.com原創稿件】Apache Kafka 是一個快速、可擴展的、高吞吐的、可容錯的分布式「發布-訂閱」消息系統, 使用 Scala 與 Java 語言編寫,能夠將消息從一個端點傳遞到另一個端點。

較之傳統的消息中間件(例如 ActiveMQ、RabbitMQ),Kafka 具有高吞吐量、內置分區、支持消息副本和高容錯的特性,非常適合大規模消息處理應用程式。

Kafka 官網:http://kafka.apache.org/

Kafka 主要設計目標如下:

  • 以時間複雜度為 O(1) 的方式提供消息持久化能力,即使對 TB 級以上數據也能保證常數時間的訪問性能。
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒 100K 條消息的傳輸。
  • 支持 Kafka Server 間的消息分區,及分布式消費,同時保證每個 Partition 內的消息順序傳輸。
  • 同時支持離線數據處理和實時數據處理。
  • 支持在線水平擴展。

Kafka 通常用於兩大類應用程式:

  • 建立實時流數據管道,以可靠地在系統或應用程式之間獲取數據。
  • 構建實時流應用程式,以轉換或響應數據流。

要了解 Kafka 如何執行這些操作,讓我們從頭開始深入研究 Kafka 的功能。

首先幾個概念:

  • Kafka 在一個或多個可以跨越多個數據中心的伺服器上作為集群運行。
  • Kafka 集群將記錄流存儲在稱為主題的類別中。
  • 每個記錄由一個鍵,一個值和一個時間戳組成。

Kafka 架構體系如下圖:

Kafka 的應用場景非常多, 下面我們就來舉幾個我們最常見的場景:

①用戶的活動跟蹤:用戶在網站的不同活動消息發布到不同的主題中心,然後可以對這些消息進行實時監測、實時處理。

當然,也可以加載到 Hadoop 或離線處理數據倉庫,對用戶進行畫像。像淘寶、天貓、京東這些大型電商平台,用戶的所有活動都要進行追蹤的。

②日誌收集如下圖:

③限流削峰如下圖:

④高吞吐率實現:Kafka 與其他 MQ 相比,最大的特點就是高吞吐率。為了增加存儲能力,Kafka 將所有的消息都寫入到了低速大容量的硬碟。

按理說,這將導致性能損失,但實際上,Kafka 仍然可以保持超高的吞吐率,並且其性能並未受到影響。

其主要採用如下方式實現了高吞吐率:

  • 順序讀寫:Kafka 將消息寫入到了分區 Partition 中,而分區中的消息又是順序讀寫的。順序讀寫要快於隨機讀寫。
  • 零拷貝:生產者、消費者對於 Kafka 中的消息是採用零拷貝實現的。
  • 批量發送:Kafka 允許批量發送模式。
  • 消息壓縮:Kafka 允許對消息集合進行壓縮。

Kafka的優點如下:

①解耦:在項目啟動之初來預測將來項目會碰到什麼需求,是極其困難的。

消息系統在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。

這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

②冗餘(副本):有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。

消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。

許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

③擴展性:因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。

④靈活性&峰值處理能力:在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。

使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

⑤可恢復性:系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復後被處理。

⑥順序保證:在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。Kafka 保證一個 Partition 內的消息的有序性。

⑦緩衝:在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。

消息隊列通過一個緩衝層來幫助任務最高效率的執行,寫入隊列的處理會儘可能的快速。該緩衝有助於控制和優化數據流經過系統的速度。

⑧異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然後在需要的時候再去處理它們。

Kafka 於其他 MQ 對比如下:

①RabbitMQ:RabbitMQ 是使用 Erlang 編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP,SMTP,STOMP,也正因如此,它非常重量級,更適合於企業級的開發。

同時實現了 Broker 構架,這意味著消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。

②Redis:Redis 是一個基於 Key-Value 對的 NoSQL 資料庫,開發維護很活躍。

雖然它是一個 Key-Value 資料庫存儲系統,但它本身支持 MQ 功能,所以完全可以當做一個輕量級的隊列服務來使用。

對於 RabbitMQ 和 Redis 的入隊和出隊操作,各執行 100 萬次,每 10 萬次記錄一次執行時間。測試數據分為 128Bytes、512Bytes、1K 和 10K 四個不同大小的數據。

實驗表明:入隊時,當數據比較小時 Redis 的性能要高於 RabbitMQ,而如果數據大小超過了 10K,Redis 則慢的無法忍受;出隊時,無論數據大小,Redis 都表現出非常好的性能,而 RabbitMQ 的出隊性能則遠低於 Redis。

③ZeroMQ:ZeroMQ 號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。

ZeroMQ 能夠實現 RabbitMQ 不擅長的高級/複雜的隊列,但是開發人員需要自己組合多種技術框架,技術上的複雜度是對這 MQ 能夠應用成功的挑戰。

ZeroMQ 具有一個獨特的非中間件的模式,你不需要安裝和運行一個消息伺服器或中間件,因為你的應用程式將扮演這個伺服器角色。

你只需要簡單的引用 ZeroMQ 程序庫,可以使用 NuGet 安裝,然後你就可以愉快的在應用程式之間發送消息了。

但是 ZeroMQ 僅提供非持久性的隊列,也就是說如果宕機,數據將會丟失。其中,Twitter 的 Storm 0.9.0 以前的版本中默認使用 ZeroMQ 作為數據流的傳輸(Storm 從 0.9 版本開始同時支持 ZeroMQ 和 Netty 作為傳輸模塊)。

④ActiveMQ:ActiveMQ 是 Apache 下的一個子項目。類似於 ZeroMQ,它能夠以代理人和點對點的技術實現隊列。同時類似於 RabbitMQ,它少量代碼就可以高效地實現高級應用場景。

⑤Kafka/Jafka:Kafka 是 Apache 下的一個子項目,是一個高性能跨語言分布式發布/訂閱消息隊列系統,而 Jafka 是在 Kafka 之上孵化而來的,即 Kafka 的一個升級版。

具有以下特性:

  • 快速持久化,可以在 O(1) 的系統開銷下進行消息持久化。
  • 高吞吐,在一台普通的伺服器上既可以達到 10W/s 的吞吐速率。
  • 完全的分布式系統,Broker、Producer、Consumer 都原生自動支持分布式,自動實現負載均衡。
  • 支持 Hadoop 數據並行加載,對於像 Hadoop 的一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。

Kafka 通過 Hadoop 的並行加載機制統一了在線和離線的消息處理。Apache Kafka 相對於 ActiveMQ 是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。

Kafka的幾種重要角色如下:

①Kafka 作為存儲系統:任何允許發布與使用無關的消息發布的消息隊列都有效地充當了運行中消息的存儲系統。Kafka 的不同之處在於它是一個非常好的存儲系統。

寫入 Kafka 的數據將寫入磁碟並進行複製以實現容錯功能。Kafka 允許生產者等待確認,以便直到完全複製並確保即使寫入伺服器失敗的情況下寫入也不會完成。

Kafka 的磁碟結構可以很好地擴展使用-無論伺服器上有 50KB 還是 50TB 的持久數據,Kafka 都將執行相同的操作。

由於認真對待存儲並允許客戶端控制其讀取位置,因此您可以將 Kafka 視為一種專用於高性能,低延遲提交日誌存儲,複製和傳播的專用分布式文件系統。

②Kafka 作為消息傳遞系統:Kafka 的流概念與傳統的企業消息傳遞系統相比如何?

傳統上,消息傳遞具有兩種模型:排隊和發布訂閱。在隊列中,一組使用者可以從伺服器中讀取內容,並且每條記錄都將轉到其中一個。

在發布-訂閱記錄中廣播給所有消費者。這兩個模型中的每一個都有優點和缺點。

排隊的優勢在於,它允許您將數據處理劃分到多個使用者實例上,從而擴展處理量。

不幸的是,隊列不是多用戶的—一次進程讀取了丟失的數據。發布-訂閱允許您將數據廣播到多個進程,但是由於每條消息都傳遞給每個訂閱者,因此無法擴展處理。

Kafka 的消費者群體概念概括了這兩個概念。與隊列一樣,使用者組允許您將處理劃分為一組進程(使用者組的成員)。與發布訂閱一樣,Kafka 允許您將消息廣播到多個消費者組。

Kafka 模型的優點在於,每個主題都具有這些屬性-可以擴展處理範圍,並且是多訂閱者,無需選擇其中一個。

與傳統的消息傳遞系統相比,Kafka 還具有更強的訂購保證。傳統隊列將記錄按順序保留在伺服器上,如果多個使用者從隊列中消費,則伺服器將按記錄的存儲順序分發記錄。

但是,儘管伺服器按順序分發記錄,但是這些記錄是異步傳遞給使用者的,因此它們可能在不同的使用者上亂序到達。

這實際上意味著在並行使用的情況下會丟失記錄的順序。消息傳遞系統通常通過「專有使用者」的概念來解決此問題,該概念僅允許一個進程從隊列中使用,但是,這當然意味著在處理中沒有並行性。

Kafka 做得更好,通過在主題內具有並行性(即分區)的概念,Kafka 能夠在用戶進程池中提供排序保證和負載均衡。

這是通過將主題中的分區分配給消費者組中的消費者來實現的,以便每個分區都由組中的一個消費者完全消費。

通過這樣做,我們確保使用者是該分區的唯一讀取器,並按順序使用數據。由於存在許多分區,因此仍然可以平衡許多使用者實例上的負載。但是請注意,使用者組中的使用者實例不能超過分區。

③Kafka 用作流處理:僅讀取,寫入和存儲數據流是不夠的,目的是實現對流的實時處理。

在 Kafka 中,流處理器是指從輸入主題中獲取連續數據流,對該輸入進行一些處理並生成連續數據流以輸出主題的任何東西。

例如,零售應用程式可以接受銷售和裝運的輸入流,並輸出根據此數據計算出的重新訂購和價格調整流。

可以直接使用生產者和消費者 API 進行簡單處理。但是,對於更複雜的轉換,Kafka 提供了完全集成的 Streams API。

這允許構建執行非重要處理的應用程式,這些應用程式計算流的聚合或將流連接在一起。

該功能有助於解決此類應用程式所面臨的難題:處理無序數據,在代碼更改時重新處理輸入,執行狀態計算等。

流 API 建立在 Kafka 提供的核心原語之上:它使用生產者和使用者 API 進行輸入,使用 Kafka 進行狀態存儲,並使用相同的組機制來實現流處理器實例之間的容錯。

Kafka 中的關鍵術語解釋

Topic:主題。在 Kafka 中,使用一個類別屬性來劃分消息的所屬類,劃分消息的這個類稱為 Topic。Topic 相當於消息的分類標籤,是一個邏輯概念。

物理上不同 Topic 的消息分開存儲,邏輯上一個 Topic 的消息雖然保存於一個或多個 Broker 上但用戶只需指定消息的 Topic 即可生產或消費數據而不必關心數據存於何處。

Partition:分區。Topic 中的消息被分割為一個或多個 Partition,其是一個物理概念,對應到系統上 就是一個或若干個目錄。Partition 內部的消息是有序的,但 Partition 間的消息是無序的。

Segment 段。將 Partition 進一步細分為了若干的 Segment,每個 Segment 文件的大小相等。

Broker:Kafka 集群包含一個或多個伺服器,每個伺服器節點稱為一個 Broker。

Broker 存儲 Topic 的數據。如果某 Topic 有 N 個 Partition,集群有 N 個 Broker,那麼每個 Broker 存儲該 Topic 的一個 Partition。

如果某 Topic 有 N 個 Partition,集群有(N+M)個 Broker,那麼其中有 N 個 Broker 存儲該 Topic 的一個 Partition,剩下的 M 個 Broker 不存儲該 Topic 的 Partition 數據。

如果某 Topic 有 N 個 Partition,集群中 Broker 數目少於 N 個,那麼一個 Broker 存儲該 Topic 的一個或多個 Partition。

在實際生產環境中,儘量避免這種情況的發生,這種情況容易導致 Kafka 集群數據不均衡。

Producer:生產者。即消息的發布者,生產者將數據發布到他們選擇的主題。

生產者負責選擇將哪個記錄分配給主題中的哪個分區。即:生產者生產的一條消息,會被寫入到某一個 Partition。

Consumer:消費者。可以從 Broker 中讀取消息。一個消費者可以消費多個 Topic 的消息;一個消費者可以消費同一個 Topic 中的多個 Partition 中的消息;一個 Partiton 允許多個 Consumer 同時消費。

Consumer Group:Consumer Group 是 Kafka 提供的可擴展且具有容錯性的消費者機制。

組內可以有多個消費者,它們共享一個公共的 ID,即 Group ID。組內的所有消費者協調在一起來消費訂閱主題 的所有分區。

Kafka 保證同一個 Consumer Group 中只有一個 Consumer 會消費某條消息。

實際上,Kafka 保證的是穩定狀態下每一個 Consumer 實例只會消費某一個或多個特定的 Partition,而某個 Partition 的數據只會被某一個特定的 Consumer 實例所消費。

下面我們用官網的一張圖, 來標識 Consumer 數量和 Partition 數量的對應關係。

由兩台伺服器組成的 Kafka 群集,其中包含四個帶有兩個使用者組的分區(P0-P3)。消費者組 A 有兩個消費者實例,組 B 有四個。

對於這個消費組, 以前一直搞不明白,我自己的總結是:Topic 中的 Partitoin 到 Group 是發布訂閱的通信方式。

即一條 Topic 的 Partition 的消息會被所有的 Group 消費,屬於一對多模式;Group 到 Consumer 是點對點通信方式,屬於一對一模式。

舉個例子:不使用 Group 的話,啟動 10 個 Consumer 消費一個 Topic,這 10 個 Consumer 都能得到 Topic 的所有數據,相當於這個 Topic 中的任一條消息被消費 10 次。

使用 Group 的話,連接時帶上 groupid,Topic 的消息會分發到 10 個 Consumer 上,每條消息只被消費 1 次。

Replizcas of partition:分區副本。副本是一個分區的備份,是為了防止消息丟失而創建的分區的備份。

Partition Leader:每個 Partition 有多個副本,其中有且僅有一個作為 Leader,Leader 是當前負責消息讀寫 的 Partition。即所有讀寫操作只能發生於 Leader 分區上。

Partition Follower:所有 Follower 都需要從 Leader 同步消息,Follower 與 Leader 始終保持消息同步。Leader 與 Follower 的關係是主備關係,而非主從關係。

ISR:

  • ISR,In-Sync Replicas,是指副本同步列表。ISR 列表是由 Leader 負責維護。
  • AR,Assigned Replicas,指某個 Partition 的所有副本, 即已分配的副本列表。
  • OSR,Outof-Sync Replicas,即非同步的副本列表。
  • AR=ISR+OSR

Offset:偏移量。每條消息都有一個當前 Partition 下唯一的 64 字節的 Offset,它是相當於當前分區第一條消息的偏移量。

Broker Controller:Kafka集群的多個 Broker 中,有一個會被選舉 Controller,負責管理整個集群中 Partition 和 Replicas 的狀態。

只有 Broker Controller 會向 Zookeeper 中註冊 Watcher,其他 Broker 及分區無需註冊。即 Zookeeper 僅需監聽 Broker Controller 的狀態變化即可。

HW 與 LEO:

  • HW,HighWatermark,高水位,表示 Consumer 可以消費到的最高 Partition 偏移量。HW 保證了 Kafka 集群中消息的一致性。確切地說,是保證了 Partition 的 Follower 與 Leader 間數 據的一致性。
  • LEO,Log End Offset,日誌最後消息的偏移量。消息是被寫入到 Kafka 的日誌文件中的, 這是當前最後一個寫入的消息在 Partition 中的偏移量。
  • 對於 Leader 新寫入的消息,Consumer 是不能立刻消費的。Leader 會等待該消息被所有 ISR 中的 Partition Follower 同步後才會更新 HW,此時消息才能被 Consumer 消費。

我相信你看完上面的概念還是懵逼的,好吧!下面我們就用圖來形象話的表示兩者的關係吧:

Zookeeper:Zookeeper 負責維護和協調 Broker,負責 Broker Controller 的選舉。在 Kafka 0.9 之前版本,Offset 是由 ZK 負責管理的。

總結:ZK 負責 Controller 的選舉,Controller 負責 Leader 的選舉。

Coordinator:一般指的是運行在每個 Broker 上的 Group Coordinator 進程,用於管理 Consumer Group 中的各個成員,主要用於 Offset 位移管理和 Rebalance。一個 Coordinator 可以同時管理多個消費者組。

Rebalance:當消費者組中的數量發生變化,或者 Topic 中的 Partition 數量發生了變化時,Partition 的所有權會在消費者間轉移,即 Partition 會重新分配,這個過程稱為再均衡 Rebalance。

再均衡能夠給消費者組及 Broker 帶來高性能、高可用性和伸縮,但在再均衡期間消費者是無法讀取消息的,即整個 Broker 集群有小一段時間是不可用的。因此要避免不必要的再均衡。

Offset Commit:Consumer 從 Broker 中取一批消息寫入 Buffer 進行消費,在規定的時間內消費完消息後,會自動將其消費消息的 Offset 提交給 Broker,以記錄下哪些消息是消費過的。當然,若在時限內沒有消費完畢,其是不會提交 Offset 的。

Kafka的工作原理和過程

①消息寫入算法

消息發送者將消息發送給 Broker, 並形成最終的可供消費者消費的 log,是已給比較複雜的過程:

  • Producer 先從 Zookeeper 中找到該 Partition 的 Leader。
  • Producer將消息發送給該 Leader。
  • Leader 將消息接入本地的 log,並通知 ISR 的 Followers。
  • ISR 中的 Followers 從 Leader 中 Pull 消息, 寫入本地 log 後向 Leader 發送 Ack。
  • Leader 收到所有 ISR 中的 Followers 的 Ack 後,增加 HW 並向 Producer 發送 Ack,表示消息寫入成功。

②消息路由策略

在通過 API 方式發布消息時,生產者是以 Record 為消息進行發布的。

Record 中包含 Key 與 Value,Value 才是我們真正的消息本身,而 Key 用於路由消息所要存放的 Partition。

消息要寫入到哪個 Partition 並不是隨機的,而是有路由策略的:

  • 若指定了 Partition,則直接寫入到指定的 Partition。
  • 若未指定 Partition 但指定了 Key,則通過對 Key 的 Hash 值與 Partition 數量取模,該取模。
  • 結果就是要選出的 Partition 索引。
  • 若 Partition 和 Key 都未指定,則使用輪詢算法選出一個 Partition。

③HW 截斷機制

如果 Partition Leader 接收到了新的消息, ISR 中其它 Follower 正在同步過程中,還未同步完畢時 leader 宕機。

此時就需要選舉出新的 Leader。若沒有 HW 截斷機制,將會導致 Partition 中 Leader 與 Follower 數據的不一致。

當原 Leader 宕機後又恢復時,將其 LEO 回退到其宕機時的 HW,然後再與新的 Leader 進行數據同步,這樣就可以保證老 Leader 與新 Leader 中數據一致了,這種機制稱為 HW 截斷機制。

④消息發送的可靠性

生產者向 Kafka 發送消息時,可以選擇需要的可靠性級別。通過 request.required.acks 參數的值進行設置。

0 值:異步發送。生產者向 Kafka 發送消息而不需要 Kafka 反饋成功 Ack。該方式效率最高,但可靠性最低。

其可能會存在消息丟失的情況:

  • 在傳輸過程中會出現消息丟失。
  • 在 Broker 內部會出現消息丟失。
  • 會出現寫入到 Kafka 中的消息的順序與生產順序不一致的情況。

1 值:同步發送。生產者發送消息給 Kafka,Broker 的 Partition Leader 在收到消息後馬上發送成功 Ack(無需等等 ISR 中的 Follower 同步)。

生產者收到後知道消息發送成功,然後會再發送消息。如果一直未收到 Kafka 的 Ack,則生產者會認為消息發送失敗,會重發消息。

該方式對於 Producer 來說,若沒有收到 Ack,一定可以確認消息發送失敗了,然後可以重發。

但是,即使收到了 ACK,也不能保證消息一定就發送成功了。故,這種情況,也可能會發生消息丟失的情況。

-1 值:同步發送。生產者發送消息給 Kafka,Kafka 收到消息後要等到 ISR 列表中的所有副本都 同步消息完成後,才向生產者發送成功 Ack。

如果一直未收到 Kafka 的 Ack,則認為消息發送 失敗,會自動重發消息。該方式會出現消息重複接收的情況。

⑤消費者消費過程解析

生產者將消息發送到 Topitc 中,消費者即可對其進行消費,其消費過程如下:

  • Consumer 向 Broker 提交連接請求,其所連接上的 Broker 都會向其發送Broker Controller 的通信 URL,即配置文件中的 Listeners 地址。
  • 當 Consumer 指定了要消費的 Topic 後,會向 Broker Controller 發送消費請求。
  • Broker Controller 會為 Consumer 分配一個或幾個 Partition Leader,並將該 Partition 的當前 Offset 發送給 Consumer。
  • Consumer 會按照 Broker Controller 分配的 Partition 對其中的消息進行消費。
  • 當 Consumer 消費完該條消息後,Consumer 會向 Broker 發送一個消息已經被消費反饋,即該消息的 Offset。
  • 在 Broker 接收到 Consumer 的 Offset 後,會更新相應的 __consumer_offset 中。
  • 以上過程會一直重複,知道消費者停止請求消費。
  • Consumer 可以重置 Offset,從而可以靈活消費存儲在 Broker 上的消息。

⑥Partition Leader 選舉範圍

當 Leader 宕機後,Broker Controller 會從 ISR 中挑選一個 Follower 成為新的 Leader。

如果 ISR 中沒有其他副本怎麼辦?可以通過 unclean.leader.election.enable 的值來設置 Leader 選舉範圍。

False:必須等到 ISR 列表中所有的副本都活過來才進行新的選舉。該策略可靠性有保證,但可用性低。

True:在 ISR 列表中沒有副本的情況下,可以選擇任意一個沒有宕機的主機作為新的 Leader,該策略可用性高,但可靠性沒有保證。

⑦重複消費問題的解決方案

同一個 Consumer 重複消費:當 Consumer 由於消費能力低而引發了消費超時,則可能會形成重複消費。

在某數據剛好消費完畢,但是正準備提交 Offset 時候,消費時間超時,則 Broker 認為這條消息未消費成功。這時就會產生重複消費問題。其解決方案:延長 Offset 提交時間。

不同的 Consumer 重複消費:當 Consumer 消費了消息,但還沒有提交 Offset 時宕機,則這些已經被消費過的消息會被重複消費。其解決方案:將自動提交改為手動提交。

⑧從架構設計上解決 Kafka 重複消費的問題

我們在設計程序的時候,比如考慮到網絡故障等一些異常的情況,我們都會設置消息的重試次數,可能還有其他可能出現消息重複,那我們應該如何解決呢?下面提供三個方案:

方案一:保存並查詢

給每個消息都設置一個獨一無二的 uuid,所有的消息,我們都要存一個 uuid。

我們在消費消息的時候,首先去持久化系統中查詢一下看這個看是否以前消費過,如沒有消費過,在進行消費,如果已經消費過,丟棄就好了。

下圖表明了這種方案:

方案二:利用冪等

冪等(Idempotence)在數學上是這樣定義的,如果一個函數 f(x) 滿足:f(f(x)) = f(x),則函數 f(x) 滿足冪等性。

這個概念被拓展到計算機領域,被用來描述一個操作、方法或者服務。一個冪等操作的特點是,其任意多次執行所產生的影響均與一次執行的影響相同。

一個冪等的方法,使用同樣的參數,對它進行多次調用和一次調用,對系統產生的影響是一樣的。所以,對於冪等的方法,不用擔心重複執行會對系統造成任何改變。

我們舉個例子來說明一下。在不考慮並發的情況下,「將 X 老師的帳戶餘額設置為 100 萬元」,執行一次後對系統的影響是,X 老師的帳戶餘額變成了 100 萬元。

只要提供的參數 100 萬元不變,那即使再執行多少次,X 老師的帳戶餘額始終都是 100 萬元,不會變化,這個操作就是一個冪等的操作。

再舉一個例子,「將 X 老師的餘額加 100 萬元」,這個操作它就不是冪等的,每執行一次,帳戶餘額就會增加 100 萬元,執行多次和執行一次對系統的影響(也就是帳戶的餘額)是不一樣的。

所以,通過這兩個例子,我們可以想到如果系統消費消息的業務邏輯具備冪等性,那就不用擔心消息重複的問題了,因為同一條消息,消費一次和消費多次對系統的影響是完全一樣的。也就可以認為,消費多次等於消費一次。

那麼,如何實現冪等操作呢?最好的方式就是,從業務邏輯設計上入手,將消費的業務邏輯設計成具備冪等性的操作。

但是,不是所有的業務都能設計成天然冪等的,這裡就需要一些方法和技巧來實現冪等。

下面我們介紹一種常用的方法:利用資料庫的唯一約束實現冪等。

例如,我們剛剛提到的那個不具備冪等特性的轉帳的例子:將 X 老師的帳戶餘額加 100 萬元。在這個例子中,我們可以通過改造業務邏輯,讓它具備冪等性。

首先,我們可以限定,對於每個轉帳單每個帳戶只可以執行一次變更操作,在分布式系統中,這個限制實現的方法非常多,最簡單的是我們在資料庫中建一張轉帳流水錶。

這個表有三個欄位:轉帳單 ID、帳戶 ID 和變更金額,然後給轉帳單 ID 和帳戶 ID 這兩個欄位聯合起來創建一個唯一約束,這樣對於相同的轉帳單 ID 和帳戶 ID,表里至多只能存在一條記錄。

這樣,我們消費消息的邏輯可以變為:「在轉帳流水錶中增加一條轉帳記錄,然後再根據轉帳記錄,異步操作更新用戶餘額即可。」

在轉帳流水錶增加一條轉帳記錄這個操作中,由於我們在這個表中預先定義了「帳戶 ID 轉帳單 ID」的唯一約束,對於同一個轉帳單同一個帳戶只能插入一條記錄,後續重複的插入操作都會失敗,這樣就實現了一個冪等的操作。

方案三:設置前提條件

為更新的數據設置前置條件另外一種實現冪等的思路是,給數據變更設置一個前置條件,如果滿足條件就更新數據,否則拒絕更新數據,在更新數據的時候,同時變更前置條件中需要判斷的數據。

這樣,重複執行這個操作時,由於第一次更新數據的時候已經變更了前置條件中需要判斷的數據,不滿足前置條件,則不會重複執行更新數據操作。

比如,剛剛我們說過,「將 X 老師的帳戶的餘額增加 100 萬元」這個操作並不滿足冪等性,我們可以把這個操作加上一個前置條件,變為:「如果 X 老師的帳戶當前的餘額為 500 萬元,將餘額加 100 萬元」,這個操作就具備了冪等性。

對應到消息隊列中的使用時,可以在發消息時在消息體中帶上當前的餘額,在消費的時候進行判斷資料庫中,當前餘額是否與消息中的餘額相等,只有相等才執行變更操作。

但是,如果我們要更新的數據不是數值,或者我們要做一個比較複雜的更新操作怎麼辦?用什麼作為前置判斷條件呢?

更加通用的方法是,給你的數據增加一個版本號屬性,每次更數據前,比較當前數據的版本號是否和消息中的版本號一致,如果不一致就拒絕更新數據,更新數據的同時將版本號 +1,一樣可以實現冪等。

Kafka 集群搭建

我們在工作中,為了保證環境的高可用,防止單點,Kafka 都是以集群的方式出現的,下面就帶領大家一起搭建一套 Kafka 集群環境。

我們在官網下載 Kafka,下載地址為:http://kafka.apache.org/downloads,下載我們需要的版本,推薦使用穩定的版本。

搭建集群

①下載並解壓

cd /usr/local/src 

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz 

mkdir /data/servers 

tar xzvf kafka_2.11-2.4.0.tgz -C /data/servers/ 

cd /data/servers/kafka_2.11-2.4.0 

②修改配置文件

Kafka 的配置文件 $KAFKA_HOME/config/server.properties,主要修改一下下面幾項:

確保每個機器上的id不一樣 
 broker.id=0 
  配置服務端的監控地址 
 listeners=PLAINTEXT://192.168.51.128:9092 
  kafka 日誌目錄 
 log.dirs=/data/servers/kafka_2.11-2.4.0/logs 
 #kafka設置的partitons的個數 
 num.partitions=1 
 
  zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群 
 zookeeper.connect=192.168.51.128:2181 

因為我自己是本機做實驗,所有使用的是一個主機的不同埠,在線上,就是不同的機器,大家參考即可。

我們這裡使用 Kafka 的 Zookeeper,只啟動一個節點,但是正真的生產過程中,是需要 Zookeeper 集群,自己搭建就好,後期我們也會出 Zookeeper 的教程,大家請關注就好了。

③拷貝 3 份配置文件

#創建對應的日誌目錄 
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9092 
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9093 
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9094 
 
#拷貝三份配置文件 
cp server.properties server_9092.properties  
cp server.properties server_9093.properties  
cp server.properties server_9094.properties  

④修改不同埠對應的文件

#9092的id為0, 9093的id為1, 9094的id為2 
 broker.id=0 
 # 配置服務端的監控地址, 分別在不通的配置文件中寫入不同的埠 
 listeners=PLAINTEXT://192.168.51.128:9092 
 # kafka 日誌目錄, 目錄也是對應不同的埠 
 log.dirs=/data/servers/kafka_2.11-2.4.0/logs/9092 
 # kafka設置的partitons的個數 
 num.partitions=1 
 # zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群 
 zookeeper.connect=192.168.51.128:2181 

修改 Zookeeper 的配置文件:

dataDir=/data/servers/zookeeper 
server.1=192.168.51.128:2888:3888 

然後創建 Zookeeper 的 myid 文件:

echo "1"> /data/servers/zookeeper/myid 

⑤啟動 Zookeeper

使用 Kafka 內置的 Zookeeper:

cd /data/servers/kafka_2.11-2.4.0/bin 
zookeeper-server-start.sh -daemon ../config/zookeeper.properties  
netstat -anp |grep 2181 

啟動 Kafka:

./kafka-server-start.sh -daemon ../config/server_9092.properties    
./kafka-server-start.sh -daemon ../config/server_9093.properties    
./kafka-server-start.sh -daemon ../config/server_9094.properties    

Kafka 的操作

①Topic

我們先來看一下創建 Topic 常用的參數吧:

  • --create:創建 topic
  • --delete:刪除 topic
  • --alter:修改 topic 的名字或者 partition 個數
  • --list:查看 topic
  • --describe:查看 topic 的詳細信息
  • --topic
  • --zookeeper

示例:

cd /data/servers/kafka_2.11-2.4.0/bin 
# 創建topic  test1 
kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1 
# 創建topic test2 
kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test2 
# 查看topic 
kafka-topics.sh --list --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094  

②自動創建 Topic

我們在工作中,如果我們不想去管理 Topic,可以通過 Kafka 的配置文件來管理。

我們可以讓 Kafka 自動創建 Topic,需要在我們的 Kafka 配置文件中加入如下配置文件:

auto.create.topics.enable=true 

如果刪除 Topic 想達到物理刪除的目的,也是需要配置的:

delete.topic.enable=true 

③發送消息

他們可以通過客戶端的命令生產消息,先來看看 kafka-console-producer.sh 常用的幾個參數吧:

  • --topic <String: topic>:指定 topic
  • --timeout <Integer: timeout_ms>:超時時間
  • --sync:異步發送消息
  • --broker-list <String: broker-list>:官網提示: REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.

這個參數是必須的:

kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 

④消費消息

我們也還是先來看看 kafka-console-consumer.sh 的參數吧:

  • --topic <String: topic>:指定 topic
  • --group <String: consumer group id>:指定消費者組
  • --from-beginning:指定從開始進行消費, 如果不指定, 就從當前進行消費
  • --bootstrap-server:Kafka 的連接地址
kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning 

Kafka 的日誌

Kafka 的日誌分兩種:

  • 第一種日誌是我們的 Kafka 的啟動日誌,就是我們排查問題,查看報錯信息的日誌。
  • 第二種日誌就是我們的數據日誌,Kafka 是我們的數據是以日誌的形式存在存檔中的,我們第二種所說的日誌就是我們的 Partiton 與 Segment。

那我們就來說說備份和分區吧:我們創建一個分區,一個備份,那麼 test 就應該在三台機器上或者三個數據目錄只有一個 test-0。(分區的下標是從 0 開始的)

如果我們創建 N 個分區,我們就會在三個伺服器上發現,test_0-n,如果我們創建 M 個備份,我們就會在發現,test_0 到 test_n 每一個都是 M 個。

Kafka API

使用 Kafka 原生的 API

①消費者自動提交

定義自己的生產者:

import org.apache.kafka.clients.producer.Callback; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.clients.producer.RecordMetadata; 
 
import java.util.Properties; 
 
/** 
 * @ClassName MyKafkaProducer 
 * @Description TODO 
 * @Author lingxiangxiang 
 * @Date 3:37 PM 
 * @Version 1.0 
 **/ 
public class MyKafkaProducer { 
    private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer; 
 
    public MyKafkaProducer() { 
        Properties properties = new Properties(); 
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); 
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); 
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
        // 設置批量發送 
        properties.put("batch.size", 16384); 
        // 批量發送的等待時間50ms, 超過50ms, 不足批量大小也發送 
        properties.put("linger.ms", 50); 
        this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties); 
    } 
 
    public boolean sendMsg() { 
        boolean result = true; 
        try { 
            // 正常發送, test2是topic, 0代表的是分區, 1代表的是key, hello world是發送的消息內容 
            final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world"); 
            producer.send(record); 
            // 有回調函數的調用 
            producer.send(record, new Callback() { 
                @Override 
                public void onCompletion(RecordMetadata recordMetadata, Exception e) { 
                    System.out.println(recordMetadata.topic()); 
                    System.out.println(recordMetadata.partition()); 
                    System.out.println(recordMetadata.offset()); 
                } 
            }); 
          // 自己定義一個類 
            producer.send(record, new MyCallback(record)); 
        } catch (Exception e) { 
            result = false; 
        } 
        return result; 
    } 
} 

定義生產者發送成功的回調函數:

import org.apache.kafka.clients.producer.Callback; 
import org.apache.kafka.clients.producer.RecordMetadata; 
 
/** 
 * @ClassName MyCallback 
 * @Description TODO 
 * @Author lingxiangxiang 
 * @Date 3:51 PM 
 * @Version 1.0 
 **/ 
public class MyCallback implements Callback { 
    private Object msg; 
 
    public MyCallback(Object msg) { 
        this.msg = msg; 
    } 
 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception e) { 
        System.out.println("topic = " + metadata.topic()); 
        System.out.println("partiton = " + metadata.partition()); 
        System.out.println("offset = " + metadata.offset()); 
        System.out.println(msg); 
    } 
} 

生產者測試類:在生產者測試類中,自己遇到一個坑,就是最後自己沒有加 sleep,就是怎麼檢查自己的代碼都沒有問題,但是最後就是沒法發送成功消息,最後加了一個 sleep 就可以了。

因為主函數 main 已經執行完退出,但是消息並沒有發送完成,需要進行等待一下。當然,你在生產環境中可能不會遇到這樣問題,呵呵!

代碼如下:

import static java.lang.Thread.sleep; 
 
/** 
 * @ClassName MyKafkaProducerTest 
 * @Description TODO 
 * @Author lingxiangxiang 
 * @Date 3:46 PM 
 * @Version 1.0 
 **/ 
public class MyKafkaProducerTest { 
    public static void main(String[] args) throws InterruptedException { 
        MyKafkaProducer producer = new MyKafkaProducer(); 
        boolean result = producer.sendMsg(); 
        System.out.println("send msg " + result); 
        sleep(1000); 
    } 
} 

消費者類:

import kafka.utils.ShutdownableThread; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
 
import java.util.Arrays; 
import java.util.Collections; 
import java.util.Properties; 
 
/** 
 * @ClassName MyKafkaConsumer 
 * @Description TODO 
 * @Author lingxiangxiang 
 * @Date 4:12 PM 
 * @Version 1.0 
 **/ 
public class MyKafkaConsumer extends ShutdownableThread { 
 
    private KafkaConsumer<Integer, String> consumer; 
 
    public MyKafkaConsumer() { 
        super("KafkaConsumerTest", false); 
        Properties properties = new Properties(); 
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); 
        properties.put("group.id", "mygroup"); 
        properties.put("enable.auto.commit", "true"); 
        properties.put("auto.commit.interval.ms", "1000"); 
        properties.put("session.timeout.ms", "30000"); 
        properties.put("heartbeat.interval.ms", "10000"); 
        properties.put("auto.offset.reset", "earliest"); 
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        this.consumer = new KafkaConsumer<Integer, String>(properties); 
    } 
 
    @Override 
    public void doWork() { 
        consumer.subscribe(Arrays.asList("test2")); 
        ConsumerRecords<Integer, String>records = consumer.poll(1000); 
        for (ConsumerRecord record : records) { 
            System.out.println("topic = " + record.topic()); 
            System.out.println("partition = " + record.partition()); 
            System.out.println("key = " + record.key()); 
            System.out.println("value = " + record.value()); 
        } 
    } 
} 

消費者的測試類:

/** 
 * @ClassName MyConsumerTest 
 * @Description TODO 
 * @Author lingxiangxiang 
 * @Date 4:23 PM 
 * @Version 1.0 
 **/ 
public class MyConsumerTest { 
    public static void main(String[] args) { 
        MyKafkaConsumer consumer = new MyKafkaConsumer(); 
        consumer.start(); 
        System.out.println("=================="); 
    } 
} 

②消費者同步手動提交

前面的消費者都是以自動提交 Offset 的方式對 Broker 中的消息進行消費的,但自動提交 可能會出現消息重複消費的情況。

所以在生產環境下,很多時候需要對 Offset 進行手動提交, 以解決重複消費的問題。

手動提交又可以劃分為同步提交、異步提交,同異步聯合提交。這些提交方式僅僅是 doWork() 方法不相同,其構造器是相同的。

所以下面首先在前面消費者類的基礎上進行構造器的修改,然後再分別實現三種不同的提交方式。

同步提交方式是,消費者向 Broker 提交 Offset 後等待 Broker 成功響應。若沒有收到響應,則會重新提交,直到獲取到響應。

而在這個等待過程中,消費者是阻塞的。其嚴重影響了消費者的吞吐量。

修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:

import kafka.utils.ShutdownableThread; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
 
import java.util.Arrays; 
import java.util.Collections; 
import java.util.Properties; 
 
/** 
 * @ClassName MyKafkaConsumer 
 * @Description TODO 
 * @Author lingxiangxiang 
 * @Date 4:12 PM 
 * @Version 1.0 
 **/ 
public class MyKafkaConsumer extends ShutdownableThread { 
 
    private KafkaConsumer<Integer, String> consumer; 
 
    public MyKafkaConsumer() { 
        super("KafkaConsumerTest", false); 
        Properties properties = new Properties(); 
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); 
        properties.put("group.id", "mygroup"); 
      // 這裡要修改成手動提交 
        properties.put("enable.auto.commit", "false"); 
        // properties.put("auto.commit.interval.ms", "1000"); 
        properties.put("session.timeout.ms", "30000"); 
        properties.put("heartbeat.interval.ms", "10000"); 
        properties.put("auto.offset.reset", "earliest"); 
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        this.consumer = new KafkaConsumer<Integer, String>(properties); 
    } 
    @Override 
    public void doWork() { 
        consumer.subscribe(Arrays.asList("test2")); 
        ConsumerRecords<Integer, String>records = consumer.poll(1000); 
        for (ConsumerRecord record : records) { 
            System.out.println("topic = " + record.topic()); 
            System.out.println("partition = " + record.partition()); 
            System.out.println("key = " + record.key()); 
            System.out.println("value = " + record.value()); 
 
          //手動同步提交 
          consumer.commitSync(); 
        } 
 
    } 
} 

③消費者異步手工提交

手動同步提交方式需要等待 Broker 的成功響應,效率太低,影響消費者的吞吐量。

異步提交方式是,消費者向 Broker 提交 Offset 後不用等待成功響應,所以其增加了消費者的吞吐量。

import kafka.utils.ShutdownableThread; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
 
import java.util.Arrays; 
import java.util.Collections; 
import java.util.Properties; 
 
/** 
 * @ClassName MyKafkaConsumer 
 * @Description TODO 
 * @Author lingxiangxiang 
 * @Date 4:12 PM 
 * @Version 1.0 
 **/ 
public class MyKafkaConsumer extends ShutdownableThread { 
 
    private KafkaConsumer<Integer, String> consumer; 
 
    public MyKafkaConsumer() { 
        super("KafkaConsumerTest", false); 
        Properties properties = new Properties(); 
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); 
        properties.put("group.id", "mygroup"); 
      // 這裡要修改成手動提交 
        properties.put("enable.auto.commit", "false"); 
        // properties.put("auto.commit.interval.ms", "1000"); 
        properties.put("session.timeout.ms", "30000"); 
        properties.put("heartbeat.interval.ms", "10000"); 
        properties.put("auto.offset.reset", "earliest"); 
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        this.consumer = new KafkaConsumer<Integer, String>(properties); 
    } 
 
    @Override 
    public void doWork() { 
        consumer.subscribe(Arrays.asList("test2")); 
        ConsumerRecords<Integer, String>records = consumer.poll(1000); 
        for (ConsumerRecord record : records) { 
            System.out.println("topic = " + record.topic()); 
            System.out.println("partition = " + record.partition()); 
            System.out.println("key = " + record.key()); 
            System.out.println("value = " + record.value()); 
 
          //手動同步提交 
          // consumer.commitSync(); 
          //手動異步提交 
          // consumer.commitAsync(); 
          // 帶回調公共的手動異步提交 
          consumer.commitAsync((offsets, e) -> { 
            if(e != null) { 
              System.out.println("提交次數, offsets = " + offsets); 
              System.out.println("exception = " + e); 
            } 
          }); 
        } 
    } 
} 

Spring Boot 使用 Kafka

現在大家的開發過程中,很多都用的是 Spring Boot 的項目,直接啟動了,如果還是用原生的 API,就是有點 Low 了啊,那 Kafka 是如何和 Spring Boot 進行聯合的呢?

maven 配置:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> 
   <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>2.1.1</version> 
   </dependency> 

添加配置文件,在 application.properties 中加入如下配置信息:

Kafka 連接地址:

spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 

生產者:

spring.kafka.producer.acks = 0 
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer 
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer 
spring.kafka.producer.retries = 3 
spring.kafka.producer.batch-size = 4096 
spring.kafka.producer.buffer-memory = 33554432 
spring.kafka.producer.compression-type = gzip 

消費者:

spring.kafka.consumer.group-id = mygroup 
spring.kafka.consumer.auto-commit-interval = 5000 
spring.kafka.consumer.heartbeat-interval = 3000 
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer 
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer 
spring.kafka.consumer.auto-offset-reset = earliest 
spring.kafka.consumer.enable-auto-commit = true 
# listenner, 標識消費者監聽的個數 
spring.kafka.listener.concurrency = 8 
# topic的名字 
kafka.topic1 = topic1 

生產者:

import lombok.extern.slf4j.Slf4j; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.kafka.core.KafkaTemplate; 
 
@Service 
@Slf4j 
public class MyKafkaProducerServiceImpl implements MyKafkaProducerService { 
        @Resource 
    private KafkaTemplate<String, String> kafkaTemplate; 
        // 讀取配置文件 
    @Value("${kafka.topic1}") 
    private String topic; 
 
    @Override 
    public void sendKafka() { 
      kafkaTemplate.send(topic, "hell world"); 
    } 
} 

消費者:

@Component 
@Slf4j 
public class MyKafkaConsumer { 
  @KafkaListener(topics = "${kafka.topic1}") 
    public void listen(ConsumerRecord<?, ?> record) { 
        Optional<?> kafkaMessage = Optional.ofNullable(record.value()); 
        if (kafkaMessage.isPresent()) { 
            log.info("----------------- record =" + record); 
            log.info("------------------ message =" + kafkaMessage.get()); 
} 

【51CTO原創稿件,合作站點轉載請註明原文作者和出處為51CTO.com】

關鍵字: