探究Kafka世界的核心:Kafka的主要概念

一個即將退役的碼農 發佈 2024-05-05T15:28:18.589077+00:00

Apache Kafka® 是 一個分布式流處理平台。原本開發自LinkedIn,用作LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。為什麼叫「流處理平台」?

Apache Kafka® 是 一個分布式流處理平台。原本開發自LinkedIn,用作LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。

為什麼叫「流處理平台」?

Kafka具有消息系統的能力,也有實時流式數據處理分析能力,只是我們更多的偏向於把他當做消息隊列系統來使用。

首先是一些概念

1,Kafka作為一個集群,運行在一台或者多台伺服器上.

2,Kafka 通過 topic 對存儲的流數據進行分類。

3,每條記錄中包含一個key,一個value和一個timestamp(時間戳)。

4,Kafka使用Avro作為消息序列化框架

主要設計目標

1,以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間複雜度的訪問性能;

2,高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸;

3,支持Kafka Server間的消息分區,及分布式消費,同時保證每個Partition內的消息順序傳輸;

4,同時支持離線數據處理和實時數據處理;

5,支持在線水平擴展。

為何使用消息系統

  1. 解耦。在項目啟動之初來預測將來項目會碰到什麼需求,是極其困難的。消息系統在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

2,冗餘。有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所採用的」插入-獲取-刪除」範式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

3,擴展性。因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。

4,削峰填谷。在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

5,可恢復性。系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復後被處理。

6,順序保證。在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。

7,緩衝。在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩衝層來幫助任務最高效率的執行———寫入隊列的處理會儘可能的快速。該緩衝有助於控制和優化數據流經過系統的速度。

  1. 異步通信。很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然後在需要的時候再去處理它們。

kafka使用場景:

1,分布式消息。更好的吞吐量,具有複製和容錯的功能,也有強大的持久性;

2,監控數據。涉及到分布式應用程式中匯總數據,然後生成可操作的集中數據源。例如:cannal模擬mysql從庫收到binlog,cannal-client接收sql後發送到kafka,業務方訂閱kafka topic即可。

3,日誌聚合,日誌收集中心。例如:分布式項目的日誌發送到kafka,kafka消費者發送到Logstash,再推送到ElasticSearch,最後使用Kibana圖形化界面訪問(ELK)。

4,流處理。許多Kafka用戶通過管道來處理數據,有多個階段:從topic中消費原始輸入數據,然後聚合,修飾或通過其他方式轉化為新的topic, 以供進一步消費或處理。在很多領域,如股市走向分析、氣象數據測控、網站用戶行為分析,由於數據產生快、實時性強且量大,您很難統一採集這些數據並將其入庫存儲後再做處理,這便導致傳統的數據處理架構不能滿足需求。與傳統架構不同,消息隊列Kafka版以及Storm、Samza、Spark等流計算引擎的出現,就是為了更好地解決這類數據在處理過程中遇到的問題,流計算模型能實現在數據流動的過程中對數據進行實時地捕捉和處理,並根據業務需求進行計算分析,最終把結果保存或者分發給需要的組件。

基本概念

Kafka拓撲結構

一個典型的Kafka集群中包含若干Producer(可以是web前端產生的Page View,或者是伺服器日誌,系統CPU、Memory等),若干Broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。Kafka通過ZooKeeper管理集群配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息。

Kafka基本元素

Broker:Kafka集群包含一個或多個伺服器,這種伺服器被稱為broker

Topic:每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)

Partition:Parition是物理上的概念,每個Topic包含一個或多個Partition.

Producer:負責發布消息到Kafka broker

Consumer:消息消費者,向Kafka broker讀取消息的客戶端。

Consumer Group:每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。

Kafka有四個核心的API:

1,The Producer API 允許一個應用程式發布一串流式的數據到一個或者多個Kafka topic。

2,The Consumer API 允許一個應用程式訂閱一個或多個 topic ,並且對發布給他們的流式數據進行處理。

3,TheStreamsAPI允許一個應用程式作為一個流處理器,消費一個或者多個topic產生的輸入流,然後生產一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。

4,The Connector API 允許構建並運行可重用的生產者或者消費者,將Kafka topics連接到已存在的應用程式或者數據系統。比如,連接到一個關係型資料庫,捕捉表(table)的所有變更內容。如:https://blog.csdn.net/qq_22473611/article/details/100160979

Topics和日誌

1,對於每一個topic, Kafka集群都會維持一個分區日誌。

2,每個分區都是有序且順序不可變的記錄集,並且不斷地追加到結構化的commit log文件。分區中的每一個記錄都會分配一個id號來表示順序,我們稱之為offset,offset用來唯一的標識分區中每一條記錄。

3,事實上,在每一個消費者中唯一保存的元數據是offset(偏移量)即消費在log中的位置.偏移量由消費者所控制:通常在讀取記錄後,消費者會以線性的方式增加偏移量,但是實際上,由於這個位置由消費者控制,所以消費者可以採用任何順序來消費記錄。例如,一個消費者可以重置到一個舊的偏移量,從而重新處理過去的數據;也可以跳過最近的記錄,從"現在"開始消費。

4,這些細節說明Kafka 消費者是非常廉價的—消費者的增加和減少,對集群或者其他消費者沒有多大的影響。比如,你可以使用命令行工具,對一些topic內容執行 tail操作,並不會影響已存在的消費者消費數據。

為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。

因為每條消息都被append到該Partition中,屬於順序寫磁碟,因此效率非常高

分區Partition

1,分區規則指的是將每個Topic劃分成多個分區(Partition),每個分區是一組有序的消息日誌,本質是一個目錄,生產者生產的每條消息只會被發送到其中一個分區。

2,分區 (Partition) 都是一個有序的、不可變的數據序列,消息數據被不斷的添加到序列的尾部。分區中的每一條消息數據都被賦予了一個連續的數字ID,即偏移量 (offset) ,用於唯一標識分區中的每條消息數據。

3,分區(Partition)的作用就是提供負載均衡的能力,單個topic的不同分區可存儲在相同或不同節點機上,為實現系統的高伸縮性(Scalability),不同的分區被放置到不同節點的機器上,各節點機獨立地執行各自分區的讀寫任務,如果性能不足,可通過添加新的節點機器來增加整體系統的吞吐量。

4,partitions 參數控制 topic 將被分片到多少個日誌里。partitions 會產生幾個影響。首先,每個分區只屬於一個台伺服器,所以如果有20個分區,那麼全部數據(包含讀寫負載)將由不超過20個伺服器(不包含副本)處理。最後 partitions 還會影響 consumer 的最大並行度。這在概念部分中有更詳細的討論。

5,分區數目一般說是broker的整數倍 可能考慮的是能夠讓broker上分配到的分區數能夠更均衡一點,一般情況下,分區數可以配置為Broker節點數的整數倍,比如:Broker節點是3,那麼可以設置分區數為3、6、9。分布式系統一般也是3-10台左右,和分區數量匹配上。

分區太多的危害

參考:https://cloud.tencent.com/developer/article/1573167

  1. 文件句柄開銷。每個分區在文件系統上會對應一個目錄,用於存儲維護kafka數據日誌。該目錄通常會有 3 個文件,.log,.index,.timeindex,對應kafka的日誌數據文件和索引文件(老版本 kafka 沒有timeindex文件)。broker會一直保持打開這 3 個文件句柄(file handler)。因此,如果分區數越多,所需要保持打開狀態的文件句柄數也就越多,最終可能會突破單台broker的ulimit -n的上限。。
  2. 內存開銷。producer參數batch.size默認為 16KB,它會為每個分區緩存消息,一旦批次數滿了後,將消息批量發出。一般來說,這個設計是用於提升吞吐性能的。但是由於這個參數是partition級別的,如果分區數越多,這部分緩存所需的內存占用也會越多。假如有 10000 個分區,按照默認配置,這部分緩存就要占用約 157MB 的內存。而consumer端呢?拋開拉取數據所需的內存不說,單說線程的開銷。如果還是 10000 個分區,同時consumer線程數要匹配分區數的話(大部分情況下是最佳的消費吞吐量配置),那麼在consumer client就要創建 10000 個線程,也需要創建大約 10000 個Socket去獲取分區數據,這裡面的線程切換的開銷本身就已經不容小覷了。

伺服器端的開銷也不小,如果閱讀kafka源碼的話就會發現,伺服器端的很多組件在內存中維護了partition級別的緩存,比如controller,FetcherManager等,因此分區數越多,這種緩存的成本就越大。

位移offset

老版本 Consumer 的位移管理是依託於 Apache ZooKeeper 的,它會自動或手動地將位移數據提交到 ZooKeeper 中保存。當 Consumer 重啟後,它能自動從 ZooKeeper 中讀取位移數據,從而在上次消費截止的地方繼續消費。這種設計使得 Kafka Broker 不需要保存位移數據,減少了 Broker 端需要持有的狀態空間,因而有利於實現高伸縮性。

但是,ZooKeeper 其實並不適用於這種高頻的寫操作,因此,Kafka 社區自 0.8.2.x 版本開始,就在醞釀修改這種設計,並最終在新版本 Consumer 中正式推出了全新的位移管理機制,自然也包括這個新的位移主題。

新版本 Consumer 的位移管理機制其實也很簡單,就是將 Consumer 的位移數據作為一條條普通的 Kafka 消息,提交到 __consumer_offsets 中。可以這麼說,__consumer_offsets 的主要作用是保存 Kafka 消費者的位移信息。它要求這個提交過程不僅要實現高持久性,還要支持高頻的寫操作。顯然,Kafka 的主題設計天然就滿足這兩個條件,因此,使用 Kafka 主題來保存位移這件事情,實際上就是一個水到渠成的想法了。

複製因子Replication factor

1,複製因子「replication-factor」控制有多少伺服器將複製每個寫入的消息。如果您設置了3個複製因子,那麼只能最多2個相關的伺服器能出問題,否則您將無法訪問數據。我們建議您使用2或3個複製因子,以便在不中斷數據消費的情況下透明的調整集群。默認為1(1份正本,1份副本),建議設2(1份正本,2份副本);

2,Replication factor 是Kafka持久化保證的核心,它定義了Kafka集群上保存的topic副本數。Replication factor = N 表示我們最多能夠容忍N-1台broker宕機而不必數據丟失。N=1能夠令端到端延時最小化,但卻是最低的持久化保證。

增加副本數會增加備份開銷並給broker額外增加負載。如果clients端帶寬在broker端均勻分布,那麼每個broker都會使用N * w寫帶寬和r + (N - 1) * w讀帶寬,其中w是clients端在broker上的寫入帶寬占用,r是讀帶寬占用。

由此,降低N 對端到端延時影響的最佳方法就是確保每個broker上的負載是均勻的。這會降低commit time,因為commit time是由最慢的那個follower副本決定的。

如果你的Kafka broker使用了過多的磁碟帶寬或CPU,follower就會開始出現追不上leader的情況從而推高了commit time。(其實還需要注意的是,當最小的ISR默認為副本的數量個數時,在出現follower和leader不同步時恰巧leader節點宕機,會導致topic本身不可用)

我們建議為副本同步消息流量設置成使用不同的listener來減少與正常clients流量的干擾。你也可以在follower broker上增加I/O並行度,並增加副本拉取線程數量number.replica.fetchers來改善備份性能。

每個broker上都有分區leader,也會是另一個分區的副本

第一,在 Kafka 中,副本分成兩類:領導者副本(Leader Replica)和追隨者副本(Follower Replica)。每個分區在創建時都要選舉一個副本,稱為領導者副本,其餘的副本自動稱為追隨者副本。

第二,Kafka 的副本機制比其他分布式系統要更嚴格一些。在 Kafka 中,追隨者副本是不對外提供服務的。這就是說,任何一個追隨者副本都不能響應消費者和生產者的讀寫請求。所有的請求都必須由領導者副本來處理,或者說,所有的讀寫請求都必須發往領導者副本所在的 Broker,由該 Broker 負責處理。追隨者副本不處理客戶端請求,它唯一的任務就是從領導者副本異步拉取消息,並寫入到自己的提交日誌中,從而實現與領導者副本的同步。

第三,當領導者副本掛掉了,或者說領導者副本所在的 Broker 宕機時,Kafka 依託於 ZooKeeper 提供的監控功能能夠實時感知到,並立即開啟新一輪的領導者選舉,從追隨者副本中選一個作為新的領導者。老 Leader 副本重啟回來後,只能作為追隨者副本加入到集群中。

你一定要特別注意上面的第二點,即追隨者副本是不對外提供服務的。還記得剛剛我們談到副本機制的好處時,說過 Kafka 沒能提供讀操作橫向擴展以及改善局部性嗎?具體的原因就在於此。

對於客戶端用戶而言,Kafka 的追隨者副本沒有任何作用,它既不能像 MySQL 那樣幫助領導者副本「抗讀」,也不能實現將某些副本放到離客戶端近的地方來改善數據局部性。

既然如此,Kafka 為什麼要這樣設計呢?其實這種副本機制有兩個方面的好處。

1.方便實現「Read-your-writes」。

所謂 Read-your-writes,顧名思義就是,當你使用生產者 API 向 Kafka 成功寫入消息後,馬上使用消費者 API 去讀取剛才生產的消息。

舉個例子,比如你平時發微博時,你發完一條微博,肯定是希望能立即看到的,這就是典型的 Read-your-writes 場景。如果允許追隨者副本對外提供服務,由於副本同步是異步的,因此有可能出現追隨者副本還沒有從領導者副本那裡拉取到最新的消息,從而使得客戶端看不到最新寫入的消息。

2.方便實現單調讀(Monotonic Reads)。

什麼是單調讀呢?就是對於一個消費者用戶而言,在多次消費消息時,它不會看到某條消息一會兒存在一會兒不存在。

如果允許追隨者副本提供讀服務,那麼假設當前有 2 個追隨者副本 F1 和 F2,它們異步地拉取領導者副本數據。倘若 F1 拉取了 Leader 的最新消息而 F2 還未及時拉取,那麼,此時如果有一個消費者先從 F1 讀取消息之後又從 F2 拉取消息,它可能會看到這樣的現象:第一次消費時看到的最新消息在第二次消費時不見了,這就不是單調讀一致性。但是,如果所有的讀請求都是由 Leader 來處理,那麼 Kafka 就很容易實現單調讀一致性。

3,每個副本所在的broker也是其他副本的leader,壓力都一樣,也就沒有抗讀的必要了。

備份日誌:Quorums, ISRs, 和狀態機

1,Kafka 採取了一種稍微不同的方法來選擇它的投票集。 Kafka 不是用大多數投票選擇 leader 。Kafka 動態維護了一個同步狀態的備份的集合 (a set of in-sync replicas), 簡稱 ISR ,在這個集合中的節點都是和 leader 保持高度一致的,只有這個集合的成員才 有資格被選舉為 leader,一條消息必須被這個集合 所有 節點讀取並追加到日誌中了,這條消息才能視為提交。這個 ISR 集合發生變化會在 ZooKeeper 持久化,正因為如此,這個集合中的任何一個節點都有資格被選為 leader 。這對於 Kafka 使用模型中, 有很多分區和並確保主從關係是很重要的。因為 ISR 模型和 f+1 副本,一個 Kafka topic 冗餘 f 個節點故障而不會丟失任何已經提交的消息。

2,另一個重要的設計區別是,Kafka 不要求崩潰的節點恢復所有的數據,在這種空間中的複製算法經常依賴於存在 「穩定存儲」,在沒有違反潛在的一致性的情況下,出現任何故障再恢復情況下都不會丟失。 這個假設有兩個主要的問題。首先,我們在持久性數據系統的實際操作中觀察到的最常見的問題是磁碟錯誤,並且它們通常不能保證數據的完整性。其次,即使磁碟錯誤不是問題,我們也不希望在每次寫入時都要求使用 fsync 來保證一致性, 因為這會使性能降低兩到三個數量級。我們的協議能確保備份節點重新加入ISR 之前,即使它掛時沒有新的數據, 它也必須完整再一次同步數據。

ISRs

afka 引入了 In-sync Replicas,也就是所謂的 ISR 副本集合。ISR 中的副本都是與 Leader 同步的副本,相反,不在 ISR 中的追隨者副本就被認為是與 Leader 不同步的。那麼,到底什麼副本能夠進入到 ISR 中呢?

我們首先要明確的是,Leader 副本天然就在 ISR 中。也就是說,ISR 不只是追隨者副本集合,它必然包括 Leader 副本。甚至在某些情況下,ISR 只有 Leader 這一個副本。

另外,能夠進入到 ISR 的追隨者副本要滿足一定的條件。至於是什麼條件?

這個標準就是 Broker 端參數 replica.lag.time.max.ms 參數值。這個參數的含義是 Follower 副本能夠落後 Leader 副本的最長時間間隔,當前默認值是 10 秒。這就是說,只要一個 Follower 副本落後 Leader 副本的時間不連續超過 10 秒,那麼 Kafka 就認為該 Follower 副本與 Leader 是同步的,即使此時 Follower 副本中保存的消息明顯少於 Leader 副本中的消息。

我們在前面說過,Follower 副本唯一的工作就是不斷地從 Leader 副本拉取消息,然後寫入到自己的提交日誌中。如果這個同步過程的速度持續慢於 Leader 副本的消息寫入速度,那麼在 replica.lag.time.max.ms 時間後,此 Follower 副本就會被認為是與 Leader 副本不同步的,因此不能再放入 ISR 中。此時,Kafka 會自動收縮 ISR 集合,將該副本「踢出」ISR。

值得注意的是,倘若該副本後面慢慢地追上了 Leader 的進度,那麼它是能夠重新被加回 ISR 的。這也表明,ISR 是一個動態調整的集合,而非靜態不變的。

核心總控制器Controller

在Kafka集群中會有一個或者多個broker,其中有一個broker會被選舉為控制器(Kafka Controller),它負責管理整個集群中所有分區和副本的狀態。

1,當某個分區的leader副本出現故障時,由控制器負責為該分區選舉新的leader副本。

2,當檢測到某個分區的ISR集合發生變化時,由控制器負責通知所有broker更新其元數據信息。

3,當使用kafka-topics.sh腳本為某個topic增加分區數量時,同樣還是由控制器負責分區的重新分配。

Controller選舉機制

在kafka集群啟動的時候,會自動選舉一台broker作為controller來管理整個集群,選舉的過程是集群中每個broker都會嘗試在zookeeper上創建一個/controller臨時節點,zookeeper會保證有且僅有一個broker能創建成功,這個broker就會成為集群的總控器controller。

當這個controller角色的broker宕機了,此時zookeeper臨時節點會消失,集群里其他broker會一直監聽這個臨時節點,發現臨時節點消失了,就競爭再次創建臨時節點,zookeeper又會保證有一個broker成為新的controller。

Controller職責

具備控制器身份的broker需要比其他普通的broker多一份職責,具體細節如下:

1,監聽broker相關的變化。為Zookeeper中的/brokers/ids/節點添加BrokerChangeListener,用來處理broker增減的變化。

2,監聽topic相關的變化。為Zookeeper中的/brokers/topics節點添加TopicChangeListener,用來處理topic增減的變化;為Zookeeper中的/admin/delete_topics節點添加TopicDeletionListener,用來處理刪除topic的動作。

3,從Zookeeper中讀取獲取當前所有與topic、partition以及broker有關的信息並進行相應的管理。對於所有topic所對應的Zookeeper中的/brokers/topics/[topic]節點添加PartitionModificationsListener,用來監聽topic中的分區分配變化。

4,更新集群的元數據信息,同步到其他普通的broker節點中。

Partition Replicates副本選舉機制

controller感知到分區leader所在的broker掛了(controller監聽了很多zk節點可以感知到broker存活),controller會從每個parititon的replicas副本列表中取出第一個broker作為leader,當然這個broker需要也同時在ISR列表里。

關鍵字: