Kafka 入門一篇文章就夠了

程序員cxuan 發佈 2020-06-03T05:00:09+00:00

broker 集群:broker 是集群 的組成部分,broker 集群由一個或多個 broker 組成,每個集群都有一個 broker 同時充當了集群控制器的角色。

初識 Kafka

什麼是 Kafka

Kafka 是由 Linkedin 公司開發的,它是一個分布式的,支持多分區、多副本,基於 Zookeeper 的分布式消息流平台,它同時也是一款開源的基於發布訂閱模式的消息引擎系統

Kafka 的基本術語

消息:Kafka 中的數據單元被稱為消息,也被稱為記錄,可以把它看作資料庫表中某一行的記錄。

批次:為了提高效率, 消息會分批次寫入 Kafka,批次就代指的是一組消息。

主題:消息的種類稱為 主題(Topic),可以說一個主題代表了一類消息。相當於是對消息進行分類。主題就像是資料庫中的表。

分區:主題可以被分為若干個分區(partition),同一個主題中的分區可以不在一個機器上,有可能會部署在多個機器上,由此來實現 kafka 的伸縮性,單一主題中的分區有序,但是無法保證主題中所有的分區有序

生產者: 向主題發布消息的客戶端應用程式稱為生產者(Producer),生產者用於持續不斷的向某個主題發送消息。

消費者:訂閱主題消息的客戶端程序稱為消費者(Consumer),消費者用於處理生產者產生的消息。

消費者群組:生產者與消費者的關係就如同餐廳中的廚師和顧客之間的關係一樣,一個廚師對應多個顧客,也就是一個生產者對應多個消費者,消費者群組(Consumer Group)指的就是由一個或多個消費者組成的群體。

偏移量:偏移量(Consumer Offset)是一種元數據,它是一個不斷遞增的整數值,用來記錄消費者發生重平衡時的位置,以便用來恢複數據。

broker: 一個獨立的 Kafka 伺服器就被稱為 broker,broker 接收來自生產者的消息,為消息設置偏移量,並提交消息到磁碟保存。

broker 集群:broker 是集群 的組成部分,broker 集群由一個或多個 broker 組成,每個集群都有一個 broker 同時充當了集群控制器的角色(自動從集群的活躍成員中選舉出來)。

副本:Kafka 中消息的備份又叫做 副本(Replica),副本的數量是可以配置的,Kafka 定義了兩類副本:領導者副本(Leader Replica) 和 追隨者副本(Follower Replica),前者對外提供服務,後者只是被動跟隨。

重平衡:Rebalance。消費者組內某個消費者實例掛掉後,其他消費者實例自動重新分配訂閱主題分區的過程。Rebalance 是 Kafka 消費者端實現高可用的重要手段。

Kafka 的特性(設計原則)

  • 高吞吐、低延遲:kakfa 最大的特點就是收發消息非常快,kafka 每秒可以處理幾十萬條消息,它的最低延遲只有幾毫秒。
  • 高伸縮性: 每個主題(topic) 包含多個分區(partition),主題中的分區可以分布在不同的主機(broker)中。
  • 持久性、可靠性: Kafka 能夠允許數據的持久化存儲,消息被持久化到磁碟,並支持數據備份防止數據丟失,Kafka 底層的數據存儲是基於 Zookeeper 存儲的,Zookeeper 我們知道它的數據能夠持久存儲。
  • 容錯性: 允許集群中的節點失敗,某個節點宕機,Kafka 集群能夠正常工作
  • 高並發: 支持數千個客戶端同時讀寫

Kafka 的使用場景

  • 活動跟蹤:Kafka 可以用來跟蹤用戶行為,比如我們經常回去淘寶購物,你打開淘寶的那一刻,你的登陸信息,登陸次數都會作為消息傳輸到 Kafka ,當你瀏覽購物的時候,你的瀏覽信息,你的搜索指數,你的購物愛好都會作為一個個消息傳遞給 Kafka ,這樣就可以生成報告,可以做智能推薦,購買喜好等。
  • 傳遞消息:Kafka 另外一個基本用途是傳遞消息,應用程式向用戶發送通知就是通過傳遞消息來實現的,這些應用組件可以生成消息,而不需要關心消息的格式,也不需要關心消息是如何發送的。
  • 度量指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
  • 日誌記錄:Kafka 的基本概念來源於提交日誌,比如我們可以把資料庫的更新發送到 Kafka 上,用來記錄資料庫的更新時間,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
  • 流式處理:流式處理是有一個能夠提供多種應用程式的領域。
  • 限流削峰:Kafka 多用於網際網路領域某一時刻請求特別多的情況下,可以把請求寫入Kafka 中,避免直接請求後端程序導致服務崩潰。

Kafka 的消息隊列

Kafka 的消息隊列一般分為兩種模式:點對點模式和發布訂閱模式

Kafka 是支持消費者群組的,也就是說 Kafka 中會有一個或者多個消費者,如果一個生產者生產的消息由一個消費者進行消費的話,那麼這種模式就是點對點模式

如果一個生產者或者多個生產者產生的消息能夠被多個消費者同時消費的情況,這樣的消息隊列成為發布訂閱模式的消息隊列

Kafka 系統架構

如上圖所示,一個典型的 Kafka 集群中包含若干Producer(可以是web前端產生的Page View,或者是伺服器日誌,系統CPU、Memory等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息。

核心 API

Kafka 有四個核心API,它們分別是

  • Producer API,它允許應用程式向一個或多個 topics 上發送消息記錄
  • Consumer API,允許應用程式訂閱一個或多個 topics 並處理為其生成的記錄流
  • Streams API,它允許應用程式作為流處理器,從一個或多個主題中消費輸入流並為其生成輸出流,有效的將輸入流轉換為輸出流。
  • Connector API,它允許構建和運行將 Kafka 主題連接到現有應用程式或數據系統的可用生產者和消費者。例如,關係資料庫的連接器可能會捕獲對表的所有更改

Kafka 為何如此之快

Kafka 實現了零拷貝原理來快速移動數據,避免了內核之間的切換。Kafka 可以將數據記錄分批發送,從生產者到文件系統(Kafka 主題日誌)到消費者,可以端到端的查看這些批次的數據。

批處理能夠進行更有效的數據壓縮並減少 I/O 延遲,Kafka 採取順序寫入磁碟的方式,避免了隨機磁碟尋址的浪費,更多關於磁碟尋址的了解,請參閱 程式設計師需要了解的硬核知識之磁碟 。

總結一下其實就是四個要點

  • 順序讀寫
  • 零拷貝
  • 消息壓縮
  • 分批發送

Kafka 安裝和重要配置

Kafka 安裝我在 Kafka 系列第一篇應該比較詳細了,詳情見帶你漲姿勢的認識一下kafka 這篇文章。

那我們還是主要來說一下 Kafka 中的重要參數配置吧,這些參數對 Kafka 來說是非常重要的。

broker 端配置

  • broker.id

每個 kafka broker 都有一個唯一的標識來表示,這個唯一的標識符即是 broker.id,它的默認值是 0。這個值在 kafka 集群中必須是唯一的,這個值可以任意設定,

  • port

如果使用配置樣本來啟動 kafka,它會監聽 9092 埠。修改 port 配置參數可以把它設置成任意的埠。要注意,如果使用 1024 以下的埠,需要使用 root 權限啟動 kakfa。

  • zookeeper.connect

用於保存 broker 元數據的 Zookeeper 地址是通過 zookeeper.connect 來指定的。比如我可以這麼指定 localhost:2181 表示這個 Zookeeper 是運行在本地 2181 埠上的。我們也可以通過 比如我們可以通過 zk1:2181,zk2:2181,zk3:2181 來指定 zookeeper.connect 的多個參數值。該配置參數是用冒號分割的一組 hostname:port/path 列表,其含義如下

hostname 是 Zookeeper 伺服器的機器名或者 ip 地址。

port 是 Zookeeper 客戶端的埠號

/path 是可選擇的 Zookeeper 路徑,Kafka 路徑是使用了 chroot 環境,如果不指定默認使用跟路徑。

如果你有兩套 Kafka 集群,假設分別叫它們 kafka1 和 kafka2,那麼兩套集群的zookeeper.connect參數可以這樣指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2

  • log.dirs

Kafka 把所有的消息都保存到磁碟上,存放這些日誌片段的目錄是通過 log.dirs 來制定的,它是用一組逗號來分割的本地系統路徑,log.dirs 是沒有默認值的,你必須手動指定他的默認值。其實還有一個參數是 log.dir,如你所知,這個配置是沒有 s 的,默認情況下只用配置 log.dirs 就好了,比如你可以通過 /home/kafka1,/home/kafka2,/home/kafka3 這樣來配置這個參數的值。

  • num.recovery.threads.per.data.dir

對於如下3種情況,Kafka 會使用可配置的線程池來處理日誌片段。

伺服器正常啟動,用於打開每個分區的日誌片段;

伺服器崩潰後重啟,用於檢查和截斷每個分區的日誌片段;

伺服器正常關閉,用於關閉日誌片段。

默認情況下,每個日誌目錄只使用一個線程。因為這些線程只是在伺服器啟動和關閉時會用到,所以完全可以設置大量的線程來達到井行操作的目的。特別是對於包含大量分區的伺服器來說,一旦發生崩憤,在進行恢復時使用井行操作可能會省下數小時的時間。設置此參數時需要注意,所配置的數字對應的是 log.dirs 指定的單個日誌目錄。也就是說,如果 num.recovery.threads.per.data.dir 被設為 8,並且 log.dir 指定了 3 個路徑,那麼總共需要 24 個線程。

  • auto.create.topics.enable

默認情況下,kafka 會使用三種方式來自動創建主題,下面是三種情況:

當一個生產者開始往主題寫入消息時

當一個消費者開始從主題讀取消息時

當任意一個客戶端向主題發送元數據請求時

auto.create.topics.enable參數我建議最好設置成 false,即不允許自動創建 Topic。在我們的線上環境裡面有很多名字稀奇古怪的 Topic,我想大概都是因為該參數被設置成了 true 的緣故。

主題默認配置

Kafka 為新創建的主題提供了很多默認配置參數,下面就來一起認識一下這些參數

  • num.partitions

num.partitions 參數指定了新創建的主題需要包含多少個分區。如果啟用了主題自動創建功能(該功能是默認啟用的),主題分區的個數就是該參數指定的值。該參數的默認值是 1。要注意,我們可以增加主題分區的個數,但不能減少分區的個數。

  • default.replication.factor

這個參數比較簡單,它表示 kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務default.replication.factor 的默認值為1,這個參數在你啟用了主題自動創建功能後有效。

  • log.retention.ms

Kafka 通常根據時間來決定數據可以保留多久。默認使用 log.retention.hours 參數來配置時間,默認是 168 個小時,也就是一周。除此之外,還有兩個參數 log.retention.minutes 和 log.retentiion.ms 。這三個參數作用是一樣的,都是決定消息多久以後被刪除,推薦使用 log.retention.ms。

  • log.retention.bytes

另一種保留消息的方式是判斷消息是否過期。它的值通過參數 log.retention.bytes 來指定,作用在每一個分區上。也就是說,如果有一個包含 8 個分區的主題,並且 log.retention.bytes 被設置為 1GB,那麼這個主題最多可以保留 8GB 數據。所以,當主題的分區個數增加時,整個主題可以保留的數據也隨之增加。

  • log.segment.bytes

上述的日誌都是作用在日誌片段上,而不是作用在單個消息上。當消息到達 broker 時,它們被追加到分區的當前日誌片段上,當日誌片段大小到達 log.segment.bytes 指定上限(默認為 1GB)時,當前日誌片段就會被關閉,一個新的日誌片段被打開。如果一個日誌片段被關閉,就開始等待過期。這個參數的值越小,就越會頻繁的關閉和分配新文件,從而降低磁碟寫入的整體效率。

  • log.segment.ms

上面提到日誌片段經關閉後需等待過期,那麼 log.segment.ms 這個參數就是指定日誌多長時間被關閉的參數和,log.segment.ms 和 log.retention.bytes 也不存在互斥問題。日誌片段會在大小或時間到達上限時被關閉,就看哪個條件先得到滿足。

  • message.max.bytes

broker 通過設置 message.max.bytes 參數來限制單個消息的大小,默認是 1000 000, 也就是 1MB,如果生產者嘗試發送的消息超過這個大小,不僅消息不會被接收,還會收到 broker 返回的錯誤消息。跟其他與字節相關的配置參數一樣,該參數指的是壓縮後的消息大小,也就是說,只要壓縮後的消息小於 mesage.max.bytes,那麼消息的實際大小可以大於這個值

這個值對性能有顯著的影響。值越大,那麼負責處理網絡連接和請求的線程就需要花越多的時間來處理這些請求。它還會增加磁碟寫入塊的大小,從而影響 IO 吞吐量。

  • retention.ms

規定了該主題消息被保存的時常,默認是7天,即該主題只能保存7天的消息,一旦設置了這個值,它會覆蓋掉 Broker 端的全局參數值。

  • retention.bytes

retention.bytes:規定了要為該 Topic 預留多大的磁碟空間。和全局參數作用相似,這個值通常在多租戶的 Kafka 集群中會有用武之地。當前默認值是 -1,表示可以無限使用磁碟空間。

JVM 參數配置

JDK 版本一般推薦直接使用 JDK1.8,這個版本也是現在中國大部分程式設計師的首選版本。

說到 JVM 端設置,就繞不開堆這個話題,業界最推崇的一種設置方式就是直接將 JVM 堆大小設置為 6GB,這樣會避免很多 Bug 出現。

JVM 端配置的另一個重要參數就是垃圾回收器的設置,也就是平時常說的 GC 設置。如果你依然在使用 Java 7,那麼可以根據以下法則選擇合適的垃圾回收器:

  • 如果 Broker 所在機器的 CPU 資源非常充裕,建議使用 CMS 收集器。啟用方法是指定-XX:+UseCurrentMarkSweepGC。
  • 否則,使用吞吐量收集器。開啟方法是指定-XX:+UseParallelGC。

當然了,如果你已經在使用 Java 8 了,那麼就用默認的 G1 收集器就好了。在沒有任何調優的情況下,G1 表現得要比 CMS 出色,主要體現在更少的 Full GC,需要調整的參數更少等,所以使用 G1 就好了。

一般 G1 的調整隻需要這兩個參數即可

  • MaxGCPauseMillis

該參數指定每次垃圾回收默認的停頓時間。該值不是固定的,G1可以根據需要使用更長的時間。它的默認值是 200ms,也就是說,每一輪垃圾回收大概需要200 ms 的時間。

  • InitiatingHeapOccupancyPercent

該參數指定了 G1 啟動新一輪垃圾回收之前可以使用的堆內存百分比,默認值是45,這就表明G1在堆使用率到達45之前不會啟用垃圾回收。這個百分比包括新生代和老年代。

Kafka Producer

在 Kafka 中,我們把產生消息的那一方稱為生產者,比如我們經常回去淘寶購物,你打開淘寶的那一刻,你的登陸信息,登陸次數都會作為消息傳輸到 Kafka 後台,當你瀏覽購物的時候,你的瀏覽信息,你的搜索指數,你的購物愛好都會作為一個個消息傳遞給 Kafka 後台,然後淘寶會根據你的愛好做智能推薦,致使你的錢包從來都禁不住誘惑,那麼這些生產者產生的消息是怎麼傳到 Kafka 應用程式的呢?發送過程是怎麼樣的呢?

儘管消息的產生非常簡單,但是消息的發送過程還是比較複雜的,如圖



我們從創建一個ProducerRecord 對象開始,ProducerRecord 是 Kafka 中的一個核心類,它代表了一組 Kafka 需要發送的 key/value 鍵值對,它由記錄要發送到的主題名稱(Topic Name),可選的分區號(Partition Number)以及可選的鍵值對構成。

在發送 ProducerRecord 時,我們需要將鍵值對對象由序列化器轉換為字節數組,這樣它們才能夠在網絡上傳輸。然後消息到達了分區器。

如果發送過程中指定了有效的分區號,那麼在發送記錄時將使用該分區。如果發送過程中未指定分區,則將使用key 的 hash 函數映射指定一個分區。如果發送的過程中既沒有分區號也沒有,則將以循環的方式分配一個分區。選好分區後,生產者就知道向哪個主題和分區發送數據了。

ProducerRecord 還有關聯的時間戳,如果用戶沒有提供時間戳,那麼生產者將會在記錄中使用當前的時間作為時間戳。Kafka 最終使用的時間戳取決於 topic 主題配置的時間戳類型。

  • 如果將主題配置為使用 CreateTime,則生產者記錄中的時間戳將由 broker 使用。
  • 如果將主題配置為使用LogAppendTime,則生產者記錄中的時間戳在將消息添加到其日誌中時,將由 broker 重寫。

然後,這條消息被存放在一個記錄批次里,這個批次里的所有消息會被發送到相同的主題和分區上。由一個獨立的線程負責把它們發到 Kafka Broker 上。

Kafka Broker 在收到消息時會返回一個響應,如果寫入成功,會返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區里的偏移量,上面兩種的時間戳類型也會返回給用戶。如果寫入失敗,會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還是失敗的話,就返回錯誤消息。

創建 Kafka 生產者

要向 Kafka 寫入消息,首先需要創建一個生產者對象,並設置一些屬性。Kafka 生產者有3個必選的屬性

  • bootstrap.servers

該屬性指定 broker 的地址清單,地址的格式為 host:port。清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找到其他的 broker 信息。不過建議至少要提供兩個 broker 信息,一旦其中一個宕機,生產者仍然能夠連接到集群上。

  • key.serializer

broker 需要接收到序列化之後的 key/value 值,所以生產者發送的消息需要經過序列化之後才傳遞給 Kafka Broker。生產者需要知道採用何種方式把 Java 對象轉換為字節數組。key.serializer 必須被設置為一個實現了org.apache.kafka.common.serialization.Serializer 接口的類,生產者會使用這個類把鍵對象序列化為字節數組。這裡拓展一下 Serializer 類

Serializer 是一個接口,它表示類將會採用何種方式序列化,它的作用是把對象轉換為字節,實現了 Serializer 接口的類主要有 ByteArraySerializer、StringSerializer、IntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默認使用的序列化器,其他的序列化器還有很多,你可以通過 這裡 查看其他序列化器。要注意的一點:key.serializer 是必須要設置的,即使你打算只發送值的內容

  • value.serializer

與 key.serializer 一樣,value.serializer 指定的類會將值序列化。

下面代碼演示了如何創建一個 Kafka 生產者,這裡只指定了必要的屬性,其他使用默認的配置

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

來解釋一下這段代碼

  • 首先創建了一個 Properties 對象
  • 使用 StringSerializer 序列化器序列化 key / value 鍵值對
  • 在這裡我們創建了一個新的生產者對象,並為鍵值設置了恰當的類型,然後把 Properties 對象傳遞給他。

Kafka 消息發送

實例化生產者對象後,接下來就可以開始發送消息了,發送消息主要由下面幾種方式

簡單消息發送

Kafka 最簡單的消息發送如下:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

producer.send(record);

代碼中生產者(producer)的 send() 方法需要把 ProducerRecord 的對象作為參數進行發送,ProducerRecord 有很多構造函數,這個我們下面討論,這裡調用的是

public ProducerRecord(String topic, K key, V value) {}

這個構造函數,需要傳遞的是 topic主題,key 和 value。

把對應的參數傳遞完成後,生產者調用 send() 方法發送消息(ProducerRecord對象)。我們可以從生產者的架構圖中看出,消息是先被寫入分區中的緩衝區中,然後分批次發送給 Kafka Broker。



發送成功後,send() 方法會返回一個 Future(java.util.concurrent) 對象,Future 對象的類型是 RecordMetadata 類型,我們上面這段代碼沒有考慮返回值,所以沒有生成對應的 Future 對象,所以沒有辦法知道消息是否發送成功。如果不是很重要的信息或者對結果不會產生影響的信息,可以使用這種方式進行發送。

我們可以忽略發送消息時可能發生的錯誤或者在伺服器端可能發生的錯誤,但在消息發送之前,生產者還可能發生其他的異常。這些異常有可能是 SerializationException(序列化失敗),BufferedExhaustedException 或 TimeoutException(說明緩衝區已滿),又或是 InterruptedException(說明發送線程被中斷)

同步發送消息

第二種消息發送機制如下所示

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}


這種發送消息的方式較上面的發送方式有了改進,首先調用 send() 方法,然後再調用 get() 方法等待 Kafka 響應。如果伺服器返回錯誤,get() 方法會拋出異常,如果沒有發生錯誤,我們會得到 RecordMetadata 對象,可以用它來查看消息記錄。

生產者(KafkaProducer)在發送的過程中會出現兩類錯誤:其中一類是重試錯誤,這類錯誤可以通過重發消息來解決。比如連接的錯誤,可以通過再次建立連接來解決;無主錯誤則可以通過重新為分區選舉首領來解決。KafkaProducer 被配置為自動重試,如果多次重試後仍無法解決問題,則會拋出重試異常。另一類錯誤是無法通過重試來解決的,比如消息過大對於這類錯誤,KafkaProducer 不會進行重試,直接拋出異常。

異步發送消息

同步發送消息都有個問題,那就是同一時間只能有一個消息在發送,這會造成許多消息無法直接發送,造成消息滯後,無法發揮效益最大化。

比如消息在應用程式和 Kafka 集群之間一個來回需要 10ms。如果發送完每個消息後都等待響應的話,那麼發送100個消息需要 1 秒,但是如果是異步方式的話,發送 100 條消息所需要的時間就會少很多很多。大多數時候,雖然Kafka 會返回 RecordMetadata 消息,但是我們並不需要等待響應。

為了在異步發送消息的同時能夠對異常情況進行處理,生產者提供了回掉支持。下面是回調的一個例子

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
        producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

首先實現回調需要定義一個實現了org.apache.kafka.clients.producer.Callback的類,這個接口只有一個 onCompletion方法。如果 kafka 返回一個錯誤,onCompletion 方法會拋出一個非空(non null)異常,這裡我們只是簡單的把它列印出來,如果是生產環境需要更詳細的處理,然後在 send() 方法發送的時候傳遞一個 Callback 回調的對象。

生產者分區機制

Kafka 對於數據的讀寫是以分區為粒度的,分區可以分布在多個主機(Broker)中,這樣每個節點能夠實現獨立的數據寫入和讀取,並且能夠通過增加新的節點來增加 Kafka 集群的吞吐量,通過分區部署在多個 Broker 來實現負載均衡的效果。

上面我們介紹了生產者的發送方式有三種:不管結果如何直接發送、發送並返回結果、發送並回調。由於消息是存在主題(topic)的分區(partition)中的,所以當 Producer 生產者發送產生一條消息發給 topic 的時候,你如何判斷這條消息會存在哪個分區中呢?

這其實就設計到 Kafka 的分區機制了。

分區策略

Kafka 的分區策略指的就是將生產者發送到哪個分區的算法。Kafka 為我們提供了默認的分區策略,同時它也支持你自定義分區策略。

如果要自定義分區策略的話,你需要顯示配置生產者端的參數 Partitioner.class,我們可以看一下這個類它位於 org.apache.kafka.clients.producer 包下

public interface Partitioner extends Configurable, Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close();
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Partitioner 類有三個方法,分別來解釋一下

  • partition(): 這個類有幾個參數: topic,表示需要傳遞的主題;key 表示消息中的鍵值;keyBytes表示分區中序列化過後的key,byte數組的形式傳遞;value 表示消息的 value 值;valueBytes 表示分區中序列化後的值數組;cluster表示當前集群的原數據。Kafka 給你這麼多信息,就是希望讓你能夠充分地利用這些信息對消息進行分區,計算出它要被發送到哪個分區中。
  • close() : 繼承了 Closeable 接口能夠實現 close() 方法,在分區關閉時調用。
  • onNewBatch(): 表示通知分區程序用來創建新的批次

其中與分區策略息息相關的就是 partition() 方法了,分區策略有下面這幾種

順序輪詢

順序分配,消息是均勻的分配給每個 partition,即每個分區存儲一次消息。就像下面這樣

上圖表示的就是輪詢策略,輪訓策略是 Kafka Producer 提供的默認策略,如果你不使用指定的輪訓策略的話,Kafka 默認會使用順序輪訓策略的方式。

隨機輪詢

隨機輪詢簡而言之就是隨機的向 partition 中保存消息,如下圖所示



實現隨機分配的代碼只需要兩行,如下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先計算出該主題總的分區數,然後隨機地返回一個小於它的正整數。

本質上看隨機策略也是力求將數據均勻地打散到各個分區,但從實際表現來看,它要遜於輪詢策略,所以如果追求數據的均勻分布,還是使用輪詢策略比較好。事實上,隨機策略是老版本生產者使用的分區策略,在新版本中已經改為輪詢了。

按照 key 進行消息保存

這個策略也叫做 key-ordering 策略,Kafka 中每條消息都會有自己的key,一旦消息被定義了 Key,那麼你就可以保證同一個 Key 的所有消息都進入到相同的分區裡面,由於每個分區下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略,如下圖所示


實現這個策略的 partition 方法同樣簡單,只需要下面兩行代碼即可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

上面這幾種分區策略都是比較基礎的策略,除此之外,你還可以自定義分區策略。

生產者壓縮機制

壓縮一詞簡單來講就是一種互換思想,它是一種經典的用 CPU 時間去換磁碟空間或者 I/O 傳輸量的思想,希望以較小的 CPU 開銷帶來更少的磁碟占用或更少的網絡 I/O 傳輸。如果你還不了解的話我希望你先讀完這篇文章 程式設計師需要了解的硬核知識之壓縮算法,然後你就明白壓縮是怎麼回事了。

Kafka 壓縮是什麼

Kafka 的消息分為兩層:消息集合 和 消息。一個消息集合中包含若干條日誌項,而日誌項才是真正封裝消息的地方。Kafka 底層的消息日誌由一系列消息集合日誌項組成。Kafka 通常不會直接操作具體的一條條消息,它總是在消息集合這個層面上進行寫入操作。

在 Kafka 中,壓縮會發生在兩個地方:Kafka Producer 和 Kafka Consumer,為什麼啟用壓縮?說白了就是消息太大,需要變小一點 來使消息發的更快一些。

Kafka Producer 中使用 compression.type 來開啟壓縮

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

上面代碼表明該 Producer 的壓縮算法使用的是 GZIP

有壓縮必有解壓縮,Producer 使用壓縮算法壓縮消息後並發送給伺服器後,由 Consumer 消費者進行解壓縮,因為採用的何種壓縮算法是隨著 key、value 一起發送過去的,所以消費者知道採用何種壓縮算法。

Kafka 重要參數配置

在上一篇文章 帶你漲姿勢的認識一下kafka中,我們主要介紹了一下 kafka 集群搭建的參數,本篇文章我們來介紹一下 Kafka 生產者重要的配置,生產者有很多可配置的參數,在文檔里(http://kafka.apache.org/documentation/#producerconfigs)都有說明,我們介紹幾個在內存使用、性能和可靠性方面對生產者影響比較大的參數進行說明

key.serializer

用於 key 鍵的序列化,它實現了 org.apache.kafka.common.serialization.Serializer 接口

value.serializer

用於 value 值的序列化,實現了 org.apache.kafka.common.serialization.Serializer 接口

acks

acks 參數指定了要有多少個分區副本接收消息,生產者才認為消息是寫入成功的。此參數對消息丟失的影響較大

  • 如果 acks = 0,就表示生產者也不知道自己產生的消息是否被伺服器接收了,它才知道它寫成功了。如果發送的途中產生了錯誤,生產者也不知道,它也比較懵逼,因為沒有返回任何消息。這就類似於 UDP 的運輸層協議,只管發,伺服器接受不接受它也不關心。
  • 如果 acks = 1,只要集群的 Leader 接收到消息,就會給生產者返回一條消息,告訴它寫入成功。如果發送途中造成了網絡異常或者 Leader 還沒選舉出來等其他情況導致消息寫入失敗,生產者會受到錯誤消息,這時候生產者往往會再次重發數據。因為消息的發送也分為 同步 和 異步,Kafka 為了保證消息的高效傳輸會決定是同步發送還是異步發送。如果讓客戶端等待伺服器的響應(通過調用 Future 中的 get() 方法),顯然會增加延遲,如果客戶端使用回調,就會解決這個問題。
  • 如果 acks = all,這種情況下是只有當所有參與複製的節點都收到消息時,生產者才會接收到一個來自伺服器的消息。不過,它的延遲比 acks =1 時更高,因為我們要等待不只一個伺服器節點接收消息。

buffer.memory

此參數用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到伺服器的消息。如果應用程式發送消息的速度超過發送到伺服器的速度,會導致生產者空間不足。這個時候,send() 方法調用要麼被阻塞,要麼拋出異常,具體取決於 block.on.buffer.null 參數的設置。

compression.type

此參數來表示生產者啟用何種壓縮算法,默認情況下,消息發送時不會被壓縮。該參數可以設置為 snappy、gzip 和 lz4,它指定了消息發送給 broker 之前使用哪一種壓縮算法進行壓縮。下面是各壓縮算法的對比



retries

生產者從伺服器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領),在這種情況下,reteis 參數的值決定了生產者可以重發的消息次數,如果達到這個次數,生產者會放棄重試並返回錯誤。默認情況下,生產者在每次重試之間等待 100ms,這個等待參數可以通過 retry.backoff.ms 進行修改。

batch.size

當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。當批次被填滿,批次里的所有消息會被發送出去。不過生產者井不一定都會等到批次被填滿才發送,任意條數的消息都可能被發送。

client.id

此參數可以是任意的字符串,伺服器會用它來識別消息的來源,一般配置在日誌里

max.in.flight.requests.per.connection

此參數指定了生產者在收到伺服器響應之前可以發送多少消息,它的值越高,就會占用越多的內存,不過也會提高吞吐量。把它設為1 可以保證消息是按照發送的順序寫入伺服器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生產者在發送數據時等待伺服器返回的響應時間,metadata.fetch.timeout.ms 指定了生產者在獲取元數據(比如目標分區的首領是誰)時等待伺服器返迴響應的時間。如果等待時間超時,生產者要麼重試發送數據,要麼返回一個錯誤。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配----如果在指定時間內沒有收到同步副本的確認,那麼 broker 就會返回一個錯誤。

max.block.ms

此參數指定了在調用 send() 方法或使用 partitionFor() 方法獲取元數據時生產者的阻塞時間當生產者的發送緩衝區已捕,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

max.request.size

該參數用於控制生產者發送的請求大小。它可以指能發送的單個消息的最大值,也可以指單個請求里所有消息的總大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基於 TCP 實現的,為了保證可靠的消息傳輸,這兩個參數分別指定了 TCP Socket 接收和發送數據包的緩衝區的大小。如果它們被設置為 -1,就使用作業系統的默認值。如果生產者或消費者與 broker 處於不同的數據中心,那麼可以適當增大這些值。

Kafka Consumer

應用程式使用 KafkaConsumer 從 Kafka 中訂閱主題並接收來自這些主題的消息,然後再把他們保存起來。應用程式首先需要創建一個 KafkaConsumer 對象,訂閱主題並開始接受消息,驗證消息並保存結果。一段時間後,生產者往主題寫入的速度超過了應用程式驗證數據的速度,這時候該如何處理?如果只使用單個消費者的話,應用程式會跟不上消息生成的速度,就像多個生產者像相同的主題寫入消息一樣,這時候就需要多個消費者共同參與消費主題中的消息,對消息進行分流處理。

Kafka 消費者從屬於消費者群組。一個群組中的消費者訂閱的都是相同的主題,每個消費者接收主題一部分分區的消息。下面是一個 Kafka 分區消費示意圖

上圖中的主題 T1 有四個分區,分別是分區0、分區1、分區2、分區3,我們創建一個消費者群組1,消費者群組中只有一個消費者,它訂閱主題T1,接收到 T1 中的全部消息。由於一個消費者處理四個生產者發送到分區的消息,壓力有些大,需要幫手來幫忙分擔任務,於是就演變為下圖

這樣一來,消費者的消費能力就大大提高了,但是在某些環境下比如用戶產生消息特別多的時候,生產者產生的消息仍舊讓消費者吃不消,那就繼續增加消費者。

如上圖所示,每個分區所產生的消息能夠被每個消費者群組中的消費者消費,如果向消費者群組中增加更多的消費者,那麼多餘的消費者將會閒置,如下圖所示

向群組中增加消費者是橫向伸縮消費能力的主要方式。總而言之,我們可以通過增加消費組的消費者來進行水平擴展提升消費能力。這也是為什麼建議創建主題時使用比較多的分區數,這樣可以在消費負載高的情況下增加消費者來提升性能。另外,消費者的數量不應該比分區數多,因為多出來的消費者是空閒的,沒有任何幫助。

Kafka 一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應用讀取這個消息。換句話說,每個應用都可以讀到全量的消息。為了使得每個應用都能讀到全量消息,應用需要有不同的消費組。對於上面的例子,假如我們新增了一個新的消費組 G2,而這個消費組有兩個消費者,那麼就演變為下圖這樣

在這個場景中,消費組 G1 和消費組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬於不同的應用。

總結起來就是如果應用需要讀取全量消息,那麼請為該應用設置一個消費組;如果該應用消費能力不足,那麼可以考慮在這個消費組裡增加消費者

消費者組和分區重平衡

消費者組是什麼

消費者組(Consumer Group)是由一個或多個消費者實例(Consumer Instance)組成的群組,具有可擴展性和可容錯性的一種機制。消費者組內的消費者共享一個消費者組ID,這個ID 也叫做 Group ID,組內的消費者共同對一個主題進行訂閱和消費,同一個組中的消費者只能消費一個分區的消息,多餘的消費者會閒置,派不上用場。

我們在上面提到了兩種消費方式

  • 一個消費者群組消費一個主題中的消息,這種消費模式又稱為點對點的消費方式,點對點的消費方式又被稱為消息隊列
  • 一個主題中的消息被多個消費者群組共同消費,這種消費模式又稱為發布-訂閱模式

消費者重平衡

我們從上面的消費者演變圖中可以知道這麼一個過程:最初是一個消費者訂閱一個主題並消費其全部分區的消息,後來有一個消費者加入群組,隨後又有更多的消費者加入群組,而新加入的消費者實例分攤了最初消費者的部分消息,這種把分區的所有權通過一個消費者轉到其他消費者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示

重平衡非常重要,它為消費者群組帶來了高可用性 和 伸縮性,我們可以放心的添加消費者或移除消費者,不過在正常情況下我們並不希望發生這樣的行為。在重平衡期間,消費者無法讀取消息,造成整個消費者組在重平衡的期間都不可用。另外,當分區被重新分配給另一個消費者時,消息當前的讀取狀態會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態之前會拖慢應用程式。

消費者通過向組織協調者(Kafka Broker)發送心跳來維護自己是消費者組的一員並確認其擁有的分區。對於不同不的消費群體來說,其組織協調者可以是不同的。只要消費者定期發送心跳,就會認為消費者是存活的並處理其分區中的消息。當消費者檢索記錄或者提交它所消費的記錄時就會發送心跳。

如果過了一段時間 Kafka 停止發送心跳了,會話(Session)就會過期,組織協調者就會認為這個 Consumer 已經死亡,就會觸發一次重平衡。如果消費者宕機並且停止發送消息,組織協調者會等待幾秒鐘,確認它死亡了才會觸發重平衡。在這段時間裡,死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協調者它要離開群組,組織協調者會觸發一次重平衡,儘量降低處理停頓。

重平衡是一把雙刃劍,它為消費者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到現在社區還無法修改。

重平衡的過程對消費者組有極大的影響。因為每次重平衡過程中都會導致萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機》中 p76 關於 Serial 收集器的描述):

更重要的是它在進行垃圾收集時,必須暫停其他所有的工作線程。直到它收集結束。Stop The World 這個名字聽起來很帥,但這項工作實際上是由虛擬機在後台自動發起並完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對很多應用來說都是難以接受的。

也就是說,在重平衡期間,消費者組中的消費者實例都會停止消費,等待重平衡的完成。而且重平衡這個過程很慢......

創建消費者

上面的理論說的有點多,下面就通過代碼來講解一下消費者是如何消費的

在讀取消息之前,需要先創建一個 KafkaConsumer 對象。創建 KafkaConsumer 對象與創建 KafkaProducer 對象十分相似 --- 把需要傳遞給消費者的屬性放在 properties 對象中,後面我們會著重討論 Kafka 的一些配置,這裡我們先簡單的創建一下,使用3個屬性就足矣,分別是 bootstrap.server,key.deserializer,value.deserializer 。

這三個屬性我們已經用過很多次了,如果你還不是很清楚的話,可以參考 帶你漲姿勢是認識一下Kafka Producer

還有一個屬性是 group.id 這個屬性不是必須的,它指定了 KafkaConsumer 是屬於哪個消費者群組。創建不屬於任何一個群組的消費者也是可以的

Properties properties = new Properties();
        properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

主題訂閱

創建好消費者之後,下一步就開始訂閱主題了。subscribe() 方法接受一個主題列表作為參數,使用起來比較簡單

consumer.subscribe(Collections.singletonList("customerTopic"));

為了簡單我們只訂閱了一個主題 customerTopic,參數傳入的是一個正則表達式,正則表達式可以匹配多個主題,如果有人創建了新的主題,並且主題的名字與正則表達式相匹配,那麼會立即觸發一次重平衡,消費者就可以讀取新的主題。

要訂閱所有與 test 相關的主題,可以這樣做

consumer.subscribe("test.*");

輪詢

我們知道,Kafka 是支持訂閱/發布模式的,生產者發送數據給 Kafka Broker,那麼消費者是如何知道生產者發送了數據呢?其實生產者產生的數據消費者是不知道的,KafkaConsumer 採用輪詢的方式定期去 Kafka Broker 中進行數據的檢索,如果有數據就用來消費,如果沒有就再繼續輪詢等待,下面是輪詢等待的具體實現

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
      int updateCount = 1;
      if (map.containsKey(record.value())) {
        updateCount = (int) map.get(record.value() + 1);
      }
      map.put(record.value(), updateCount);
    }
  }
}finally {
  consumer.close();
}

  • 這是一個無限循環。消費者實際上是一個長期運行的應用程式,它通過輪詢的方式向 Kafka 請求數據。
  • 第三行代碼非常重要,Kafka 必須定期循環請求數據,否則就會認為該 Consumer 已經掛了,會觸發重平衡,它的分區會移交給群組中的其它消費者。傳給 poll() 方法的是一個超市時間,用 java.time.Duration 類來表示,如果該參數被設置為 0 ,poll() 方法會立刻返回,否則就會在指定的毫秒數內一直等待 broker 返回數據。
  • poll() 方法會返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息,記錄所在分區的信息、記錄在分區中的偏移量,以及記錄的鍵值對。我們一般會遍歷這個列表,逐條處理每條記錄。
  • 在退出應用程式之前使用 close() 方法關閉消費者。網絡連接和 socket 也會隨之關閉,並立即觸發一次重平衡,而不是等待群組協調器發現它不再發送心跳並認定它已經死亡。

線程安全性

在同一個群組中,我們無法讓一個線程運行多個消費者,也無法讓多個線程安全的共享一個消費者。按照規則,一個消費者使用一個線程,如果一個消費者群組中多個消費者都想要運行的話,那麼必須讓每個消費者在自己的線程中運行,可以使用 Java 中的 ExecutorService 啟動多個消費者進行進行處理。

消費者配置

到目前為止,我們學習了如何使用消費者 API,不過只介紹了幾個最基本的屬性,Kafka 文檔列出了所有與消費者相關的配置說明。大部分參數都有合理的默認值,一般不需要修改它們,下面我們就來介紹一下這些參數。

  • fetch.min.bytes

該屬性指定了消費者從伺服器獲取記錄的最小字節數。broker 在收到消費者的數據請求時,如果可用的數據量小於 fetch.min.bytes 指定的大小,那麼它會等到有足夠的可用數據時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題使用頻率不是很高的時候就不用來回處理消息。如果沒有很多可用數據,但消費者的 CPU 使用率很高,那麼就需要把該屬性的值設得比默認值大。如果消費者的數量比較多,把該屬性的值調大可以降低 broker 的工作負載。

  • fetch.max.wait.ms

我們通過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數據時才會把它返回給消費者。而 fetch.max.wait.ms 則用於指定 broker 的等待時間,默認是 500 毫秒。如果沒有足夠的數據流入 kafka 的話,消費者獲取的最小數據量要求就得不到滿足,最終導致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把參數值設置的小一些。如果 fetch.max.wait.ms 被設置為 100 毫秒的延遲,而 fetch.min.bytes 的值設置為 1MB,那麼 Kafka 在收到消費者請求後,要麼返回 1MB 的數據,要麼在 100 ms 後返回所有可用的數據。就看哪個條件首先被滿足。

  • max.partition.fetch.bytes

該屬性指定了伺服器從每個分區里返回給消費者的最大字節數。它的默認值時 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節。如果一個主題有20個分區和5個消費者,那麼每個消費者需要至少4 MB的可用內存來接收記錄。在為消費者分配內存時,可以給它們多分配一些,因為如果群組裡有消費者發生崩潰,剩下的消費者需要處理更多的分區。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節數(通過 max.message.size 屬性配置大),否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。 在設置該屬性時,另外一個考量的因素是消費者處理數據的時間。消費者需要頻繁的調用 poll() 方法來避免會話過期和發生分區再平衡,如果單次調用poll() 返回的數據太多,消費者需要更多的時間進行處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。

  • session.timeout.ms

這個屬性指定了消費者在被認為死亡之前可以與伺服器斷開連接的時間,默認是 3s。如果消費者沒有在 session.timeout.ms 指定的時間內發送心跳給群組協調器,就會被認定為死亡,協調器就會觸發重平衡。把它的分區分配給消費者群組中的其它消費者,此屬性與 heartbeat.interval.ms 緊密相關。heartbeat.interval.ms 指定了 poll() 方法向群組協調器發送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發送心跳。所以,這兩個屬性一般需要同時修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那麼 heartbeat.interval.ms 應該是 1s。把 session.timeout.ms 值設置的比默認值小,可以更快地檢測和恢復崩憤的節點,不過長時間的輪詢或垃圾收集可能導致非預期的重平衡。把該屬性的值設置得大一些,可以減少意外的重平衡,不過檢測節點崩潰需要更長的時間。

  • auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下的該如何處理。它的默認值是 latest,意思指的是,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據。另一個值是 earliest,意思指的是在偏移量無效的情況下,消費者將從起始位置處開始讀取分區的記錄。

  • enable.auto.commit

我們稍後將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true,為了儘量避免出現重複數據和數據丟失,可以把它設置為 false,由自己控制何時提交偏移量。如果把它設置為 true,還可以通過 auto.commit.interval.ms屬性來控制提交的頻率

  • partition.assignment.strategy

我們知道,分區會分配給群組中的消費者。PartitionAssignor 會根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者,Kafka 有兩個默認的分配策略Range 和 RoundRobin

  • client.id

該屬性可以是任意字符串,broker 用他來標識從客戶端發送過來的消息,通常被用在日誌、度量指標和配額中

  • max.poll.records

該屬性用於控制單次調用 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢中需要處理的數據量。

  • receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數據時用到的 TCP 緩衝區也可以設置大小。如果它們被設置為 -1,就使用作業系統默認值。如果生產者或消費者與 broker 處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。

提交和偏移量的概念

特殊偏移

我們上面提到,消費者在每次調用poll() 方法進行定時輪詢的時候,會返回由生產者寫入 Kafka 但是還沒有被消費者消費的記錄,因此我們可以追蹤到哪些記錄是被群組裡的哪個消費者讀取的。消費者可以使用 Kafka 來追蹤消息在分區中的位置(偏移量)

消費者會向一個叫做 _consumer_offset 的特殊主題中發送消息,這個主題會保存每次所發送消息中的分區偏移量,這個主題的主要作用就是消費者觸發重平衡後記錄偏移使用的,消費者每次向這個主題發送消息,正常情況下不觸發重平衡,這個主題是不起作用的,當觸發重平衡後,消費者停止工作,每個消費者可能會分到對應的分區,這個主題就是讓消費者能夠繼續處理消息所設置的。

如果提交的偏移量小於客戶端最後一次處理的偏移量,那麼位於兩個偏移量之間的消息就會被重複處理

如果提交的偏移量大於最後一次消費時的偏移量,那麼處於兩個偏移量中間的消息將會丟失

既然_consumer_offset 如此重要,那麼它的提交方式是怎樣的呢?下面我們就來說一下####提交方式

KafkaConsumer API 提供了多種方式來提交偏移量

自動提交

最簡單的方式就是讓消費者自動提交偏移量。如果 enable.auto.commit 被設置為true,那麼每過 5s,消費者會自動把從 poll() 方法輪詢到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認是 5s。與消費者里的其他東西一樣,自動提交也是在輪詢中進行的。消費者在每次輪詢中會檢查是否提交該偏移量了,如果是,那麼就會提交從上一次輪詢中返回的偏移量。

提交當前偏移量

把 auto.commit.offset 設置為 false,可以讓應用程式決定何時提交偏移量。使用 commitSync() 提交偏移量。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功後馬上返回,如果提交失敗就拋出異常。

commitSync() 將會提交由 poll() 返回的最新偏移量,如果處理完所有記錄後要確保調用了 commitSync(),否則還是會有丟失消息的風險,如果發生了在均衡,從最近一批消息到發生在均衡之間的所有消息都將被重複處理。

異步提交

異步提交 commitAsync() 與同步提交 commitSync() 最大的區別在於異步提交不會進行重試,同步提交會一致進行重試。

同步和異步組合提交

一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大的問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。但是如果在關閉消費者或再均衡前的最後一次提交,就要確保提交成功。

因此,在消費者關閉之前一般會組合使用commitAsync和commitSync提交偏移量

提交特定的偏移量

消費者API允許調用 commitSync() 和 commitAsync() 方法時傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

關鍵字: