一起來學kafka之Kafka集群搭建

馬士兵教育cto 發佈 2024-04-03T00:58:21.277677+00:00

什麼是 KafkaKafka是一種高吞吐量、分布式、可擴展的消息中間件系統,最初由LinkedIn公司開發。隨著不斷的發展,在最新的版本中它定義為分布式的流處理平台,現在在大數據應用中也是十分廣泛。

什麼是 Kafka

Kafka是一種高吞吐量、分布式、可擴展的消息中間件系統,最初由LinkedIn公司開發。隨著不斷的發展,在最新的版本中它定義為分布式的流處理平台,現在在大數據應用中也是十分廣泛。

它可以處理大量的實時數據流,被廣泛應用於日誌收集、事件處理、流處理、消息隊列等場景。

Kafka的架構包含producer(生產者)、consumer(消費者)、Broker(代理伺服器)等組件。生產者可以將消息發送到Kafka集群,消費者可以從Kafka集群訂閱消息並進行處理,而broker則是消息的中轉伺服器,負責存儲和轉發消息。

Kafka的特點包括:

  • 高吞吐量:Kafka可以處理海量的數據流,支持每秒百萬級別的消息處理。
  • 可擴展性:Kafka的集群可以根據需要進行水平擴展,從而提高系統的性能和容量。
  • 可靠性:Kafka支持多副本機制,可以保證數據的可靠性和高可用性。
  • 靈活性:Kafka支持多種消息格式和協議,可以與各種系統和工具進行集成。

Kafka是一個開源的項目,已經成為了Apache軟體基金會的頂級項目.

Kafka & 核心概念

接著,我們看下它的核心概念,這些概念都很重要,在後邊的學習中都會遇到,概念一定要搞明白,對於理解Kafka的工作原理和使用方法非常重要。不然學習起來比較懵, 下面一起看一下核心概念:

Topic

Topic是消息的邏輯容器,用於對消息進行分類和存儲。在Kafka中,消息會被發布到指定的topic中,並且可以被一個或多個消費者訂閱。Topic是Kafka的核心概念之一,是實現消息傳遞的基礎。

Producer

Producer是消息的生產者,用於向指定的topic中發送消息。Producer負責將消息發送到Kafka集群中的broker節點,並且可以在發送消息時指定消息的key,以便Kafka將消息分配到指定的partition中。

Consumer

Consumer是消息的消費者,用於從指定的topic中接收消息。Consumer負責從Kafka集群中的broker節點獲取消息,並且可以指定從哪個partition中獲取消息。消費者可以以不同的方式進行消息消費,例如批量消費、輪詢消費等。

Broker

Broker是Kafka集群中的一個節點,用於存儲和管理消息。Broker是Kafka的核心組件之一,負責接收和處理生產者發送的消息,並將其存儲到磁碟中,同時還負責將消息轉發給消費者。

Partition

Partition是Kafka中實現數據分片的機制,一個topic可以被分成多個partition,每個partition都是一個有序的消息隊列。消息在被發送到一個topic時,會被根據指定的key進行hash計算,然後被分配到對應的partition中。

Offset

Offset是Kafka中的一個重要概念,用於標識每個消息在一個partition中的位置。每個partition都有一個唯一的offset值,消費者可以根據offset來獲取指定位置的消息。Kafka還提供了一種特殊的topic,稱為__consumer_offsets,用於存儲消費者消費的位置信息。

Kafka & 主要架構

如圖:

                    +---------+        +---------+
                    |Producer |        |Consumer |
                    +---------+        +---------+
                         |                   |
                         |                   |
                    +---------+        +---------+
                    |  Broker |        |  Broker |
                    +---------+        +---------+
                     /          \       /         \
                    /            \     /           \
          +---------+   +---------+ +---------+   +---------+
          |Partition|   |Partition| |Partition|   |Partition|
          +---------+   +---------+ +---------+   +---------+
            /     \        /     \       /     \       /     \
           /       \      /       \     /       \     /       \
   +----------+  +----------+ +----------+ +----------+ +----------+
   |Replica   |  |Replica   | |Replica   | |Replica   | |Replica   |
   +----------+  +----------+ +----------+ +----------+ +----------+
        Leader          Follower      Follower      Follower      Follower
            |                   |            |           |
            |                   |            |           |
          Write               Read         Read       Read
            |                   |            |           |
            +--------+----------+------------+-----------+
                     |                       |
                     |                       |
                 +---------+            +---------+
                 |  Disk   |            |  Memory |
                 +---------+            +---------+


複製代碼

在這個流程圖中,主要有以下幾個流程:

  • Producer將消息發送到Broker節點,Broker將消息存儲到對應的Partition中。
  • 每個Partition可以有多個Replica,其中一個Replica被選為Leader,其餘Replica為Follower。
  • Leader負責處理消息的寫操作,將消息追加到Partition中。
  • Follower負責與Leader保持同步,定期從Leader中拉取消息並複製到本地副本中,以保證數據的一致性。
  • Consumer從Broker中讀取消息,可以指定消費某個Topic中的指定Partition中的消息,也可以進行批量消費或實時消費。
  • Broker將消息存儲在磁碟中,同時也會緩存部分消息到內存中,以提高讀寫性能。

為了提高集群的可用性和穩定性, 架構中還會引入ZooKeeper, ZooKeeper用於維護Kafka集群中的Broker節點信息、Partition信息、Topic信息等。

Leader & Follower

上述提到了leader和Follower,有的小夥伴可能不知道是啥,這裡講下為啥會有這個?

在Kafka的分布式架構中,每個Partition可以有多個Replica,其中一個Replica被選為Leader,其餘Replica為Follower。

Leader是指在一個Partition中,負責處理該Partition所有消息的讀寫操作的Replica。當Producer發送消息到該Partition時,消息會首先被發送到Leader所在的Replica,Leader再將消息追加到Partition中,然後將消息複製到所有Follower的Replica中。在讀取數據時,Consumer也只能從Leader所在的Replica中讀取消息,而Follower只負責與Leader保持同步,不參與讀寫操作。

Leader的選舉方式與ZooKeeper密切相關,當Leader所在的Replica出現故障時,ZooKeeper會自動選舉新的Leader,以保證Partition中的數據一致性和可用性。由於只有Leader負責讀寫操作,因此可以有效避免數據的衝突和重複`。

ZooKeeper

如果有小夥伴不知道ZooKeeper是啥,給大家簡要介紹一下:

ZooKeeper是一個分布式協調服務,常用於分布式系統中的協調與通知。Kafka使用ZooKeeper來進行集群管理、Leader選舉、存儲Metadata信息等。ZooKeeper是Kafka的重要組成部分,沒有ZooKeeper的支持,Kafka集群無法正常運行,往後的發展趨勢可能會不用強依賴ZooKeeper`。

在Kafka中,每個Broker都會向ZooKeeper註冊自己的節點信息,包括Broker ID、IP位址和埠號等。同時,每個Partition的Metadata信息也會存儲在ZooKeeper中,包括該Partition的Replica信息、Leader信息、ISR信息等。當Broker加入或退出集群時,ZooKeeper會自動通知其他Broker更新集群的狀態信息。在Leader選舉時,ZooKeeper會根據預設的算法選舉出新的Leader,並通知其他Broker更新Partition`的狀態信息。

除了Kafka之外,ZooKeeper還被廣泛應用於Hadoop、HBase、Solr等其他分布式系統中,是一個非常成熟和穩定的分布式協調服務。再舉一個,Dubbo RPC服務開發框架,如果有用過Dubbo的小夥伴,ZooKeeper一定不會陌生。

這塊知識點,大家一定要搞懂,也是面試的熱點問題~

Kafka & 集群搭建

這裡教大家如何使用docker部署Kafka集群,需要大家安裝好docker, 如果不會安裝的可以參考之前的文章es集群搭建。

給大家提前準備好了docker-compose.yml文件,配置有點多,如果沒有一些docker基礎,可能會看不懂,不過沒關係,不影響我們的部署,

直接執行docker-compose up -d就完了。因為安裝的東西比較多,包含zookeeper集群,kafka集群,kafka-ui管理後台,這個ui後台是一個開源的系統,界面比較整潔,推薦給大家,命令執行後稍稍等待一會~

version: '3.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    volumes:
      - ./data/zookeeper/zoo1/data:/data
      - ./data/zookeeper/zoo1/datalog:/datalog  

  zoo2:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo2
    container_name: zoo2
    ports:
      - "2182:2182"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2182
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    volumes:
      - ./data/zookeeper/zoo2/data:/data
      - ./data/zookeeper/zoo2/datalog:/datalog    

  zoo3:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo3
    container_name: zoo3
    ports:
      - "2183:2183"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2183
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    volumes:
      - ./data/zookeeper/zoo3/data:/data
      - ./data/zookeeper/zoo3/datalog:/datalog    



  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    volumes:
      - ./data/kafka_data1:/kafka/data  
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka2:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    volumes:
      - ./data/kafka_data2:/kafka/data    
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka3:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    volumes:
      - ./data/kafka_data3:/kafka/data    
    depends_on:
      - zoo1
      - zoo2
      - zoo3
      
  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 9999:8080
    depends_on:
      - kafka1
      - kafka2
      - kafka3
  
    environment:
      KAFKA_CLUSTERS_0_NAME: k1
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092
    
      KAFKA_CLUSTERS_1_NAME: k2
      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka2:29093
      
      KAFKA_CLUSTERS_2_NAME: k3
      KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: kafka3:29094
複製代碼

然後瀏覽器打開localhost:9999可以訪問UI後台, 我們可以通過後台新建topic來驗證集群是否工作。

這裡再給大家推薦一個GUI工具,Kafka Assistant這個工具方便我們日常開發測試使用,界面也很簡潔,直接桌面端安裝,利用它發送幾條消息。

然後我們到後台頁面,觀察三個節點,發現topic里都有test,並且消息都是存在的

結束語

本節到這裡就結束, 概念有點多,需要好好理理,後邊會結合實際案例給大家繼續講它的概念和工作原理。下節帶大家看下Springboot整合Kafka實戰

關鍵字: