深入了解Apache Kafka

聞數起舞 發佈 2020-04-01T13:10:19+00:00

在大數據時代,每秒(大量)的數據(量)都是從各種來源(如社交媒體,我當前正在寫的博客,電子商務等)中生成的,這些數據存儲在不同的平台上 在不同的模式(品種)。


在大數據時代,每秒(大量)的數據(量)都是從各種來源(如社交媒體,我當前正在寫的博客,電子商務等)中生成的,這些數據存儲在不同的平台上 在不同的模式(品種)。 為了執行任何ETL(提取,轉換,加載)操作,需要一個消息傳遞/流傳輸系統,該系統應該是異步且鬆散耦合的,即來自各種源/客戶端(如hdfs,Cassandra,RDBMS,應用程式日誌文件等)的數據都可以。 可以在同一時間將其轉儲到一個位置,而無需所有客戶端相互依賴。 解決該問題的方法之一是Kafka —一種由LinkedIn創建的開源分布式流平台,後來捐贈給了Apache。 它是用Scala編寫的。

術語

消息:基本上是一個鍵值對,在值部分包含有用的數據/記錄。

主題:對於多租戶,可以創建多個主題,這些主題只是將消息發布和訂閱到的源名稱。

偏移量:消息以類似於提交日誌的順序形式存儲,並且從0開始為每個消息提供順序ID。

代理:Kafka群集由代理組成,代理只是託管由Zookeeper維護的無狀態伺服器的群集中的節點。 由於這裡沒有主從概念,因此所有代理都是對等的。 在繼續進行之前,讓我們先了解一下ZooKeeper。

什麼是Zookeeper?為什麼在Kafka Cluster中需要它?

Zookeeper是用於分布式集群管理的系統。 它是一個分布式鍵值存儲。 對其進行了高度優化,但寫入速度較慢。 它由稱為集合的奇數個znode組成。 在Kafka中,它需要用於:

· 控制器選舉:針對特定主題的分區中的所有讀取和寫入均通過副本的領導者進行。 每當領導者下台時,Zookeeper都會選舉新的領導者。

· 主題配置:與某個主題相關的元數據,即某個特定主題是否位於代理中,存在多少分區等,存儲在Zookeeper端,並在產生消息時不斷保持同步。

· 主題的訪問控制列表(ACL)由Zookeeper維護。

為什麼選擇Kafka?

Kafka的一些關鍵功能(這是傳統消息傳遞系統所面臨的挑戰)使其更加流行:

· 高吞吐量:吞吐量表示每秒可以處理的消息數(消息速率)。 由於我們可以劃分可分布在不同代理之間的主題,因此每秒可以實現數千次讀寫。

· 分布式:分布式系統是指被分成多個運行中的計算機的系統,所有這些計算機在一個群集中一起工作,以顯示為最終用戶的單個節點。 當Kafka在幾個稱為"代理"的節點上存儲,讀取和寫入數據時,就進行了分布。該代理與Zookeeper共同創建了一個稱為" Kafka群集"的生態系統。

· 持久性:消息隊列完全保留在磁碟上,而不是保留在內存中,並且可以在不同節點上存儲相同數據的多個副本/副本,稱為ISR(同步副本)。 因此,由於故障轉移方案而導致數據丟失的可能性不大,並且使其具有持久性。

· 可擴展性:任何系統都可以水平或垂直擴展。 垂直可伸縮性意味著將更多資源(如CPU,內存)添加到相同的節點,並導致較高的運營成本。 通過簡單地在集群中添加更多的節點就可以實現水平可伸縮性,從而增加了容量需求。 Kafka水平擴展意味著每當我們用完容量/空間不足時,就可以在集群中添加新的節點/代理。

· 容錯:如果我們有n個主題,每個主題有m個分區,那麼如果我們將複製因子設置為q,則所有n * m個分區都將在q個代理上被複製。 因此,將其容忍為q-1的因子,即我們可以承受q-1代理節點的故障。 複製因子應始終小於或等於代理數量,因為違反此條件將最終在單個代理上具有同一副本的兩個副本,這是沒有意義的。

注意:默認情況下,Kafka保證至少一次傳送,並允許用戶通過在生產者上禁用重試並在處理一批消息之前提交其偏移量來最多實施一次傳送。

動手:

您可以從以下網址下載Kafka scala庫:

提取zip後,假設HOME_DIR = kafka_2.12–2.3.0 /

首先,我們需要通過在HOME_DIR / config / zookeeper.properties中提供dataDir名稱和埠來打開Zookeeper。

./bin/zookeeper-server-start.sh ../config/zookeeper.properties

默認情況下,Zookeeper將在2181埠上啟動並運行。 現在,我們需要啟動Kafka伺服器。 該腳本位於bin文件夾中。 與伺服器相關的配置可以在HOME_DIR / config / server.properties中進行。 讓我們做

broker.id=101
listeners=PLAINTEXT:localhost:9091
logldirs=some_log_dir/kafka-logs-1

./bin/kafka-servert-start.sh ../config/server.properties

這將在埠9091處啟動Kafka伺服器。

Creating a topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 partitions 1 --topic topic1Describing a particular topic:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic1Deleting a topic:
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic1


注意:一旦設置為特定值,分區將無法減少,只能增加。

生產者如何編寫消息?

生產者首先獲取主題的元數據,以便知道需要使用消息更新哪個代理。 元數據也存儲在代理中,並且與Zookeeper保持連續同步,因為Zookeeper節點通常比經紀人節點少得多。 因此,許多生產者想連接到Zookeeper來訪問元數據,並且性能降低。 現在,一旦生產者獲取了有關主題和分區的元數據,它將消息寫入領導者經紀人節點的日誌中,而關注者(ISR)將其複製。


此寫入操作可以是同步的[即 僅當關注者還在其日誌中複製該消息時]或異步[即, 只用新消息更新領導者,狀態發送給生產者]。 保留期限:磁碟上的消息可以保留特定的持續時間,稱為保留期限,在此期限之後,將自動清除舊消息,並且不再可供使用。 默認情況下,設置為7天。

可以通過三種策略將消息寫到主題:

A send(key,value,topic,partition):專門提供需要進行寫操作的分區。 不建議使用此方法,因為它可能會導致分區大小不平衡。

B send(key,value,topic):在這裡,默認的HashPartitioner用於確定要寫入消息的分區,方法是查找key的哈希並取mod。 該主題的分區。 也可以編寫我們自己的自定義分區程序。

C send(key = null,value,topic):在這種情況下,消息以循環方式存儲在所有分區中。

動手:

Creating a console producer:
bin/kafka-console-producer.sh --broker-list localhost:9091 --topic topic1
Changing retention period for a topic to 10sec:
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name topic1 --add-config retention.ms=10000


生產者可以批量發送消息以提高效率。 批次達到特定大小限制後,將其一次轉儲到隊列中。 但是,偏移量僅對所有單個消息而言是連續的,並且在將其傳遞給使用者API之前在使用者端進行縮小。

生產者API:

Kafka生產者是可以充當Kafka集群中數據源的應用程式。 生產者可以使用Kafka jar文件/依賴項提供的API將消息發布到一個或多個Kafka主題。 在發送消息之前,需要設置包含存儲消息配置的屬性對象。 主類ProducerRecord,KafkaProducer,Callback。

示例API:

Producer<String, String> producer = new KafkaProducer <>(createProperties());

        ProducerRecord<String, String> record = new
            ProducerRecord<>(TOPIC_NAME, "SyncKey", "SyncMessage");

        try {
            RecordMetadata recordMetadata = producer.send(record).get();
            System.out.println("Message is sent to Partition no " +
                recordMetadata.partition() + " and offset " + recordMetadata.offset());

        } catch (Exception e) {
            System.out.println("Exception--> "+ e);
        } finally{
            producer.close();
        }

    }
    private static Properties createProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9091,localhost:9092");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;

    }

消費者如何檢索消息?

消費者還以生產者通過查找元數據來寫入消息的方式檢索消息,並從領導者分區讀取消息。 由於Kafka的速度非常快並且可以獲取實時消息,因此單個消費者肯定會在從稱為"消費者滯後"的主題中讀取大量消息中存在延遲。

為了克服此問題,可以創建一個"消費者"組,該"消費者"組由多個具有相同組標識的消費者組成。 每個使用者都連接有一個唯一的分區,該分區在所有使用者之間平均分配。 將分區分配給特定的使用者是組協調員的責任-群集中的一個經紀人被提名擔任此角色。 為了管理活動使用者列表,組中的所有使用者將他們的心跳發送到組協調器。 組中使用方的數量應小於或等於該特定主題中的分區的數量,違反條件將最終導致某個用戶閒置的情況。

動手:

Creating a simple Consumer:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic topic1
Creating consumer to read from particular offset and partition:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic topic1 --offset 0 --partition 1
Creating a consumer group:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic topic1 --group groupName
Multiple process/consumer of the above groupName can be created which is called Consumer Group.


一個以上的消費者可以同時閱讀一個主題。 現在,為了記住特定讀取的偏移量,提供了稱為使用者偏移量的存儲作為隱藏主題-__consumer_offsets,用於存儲特定組的使用者讀取的分區的最後偏移量。

消費者偏移的鍵為—> [組ID,主題,分區]和值—> [偏移,...]

使用者API:

與生產者API相似,Kafka提供了一些類來連接到引導伺服器並獲取消息。 傳遞標準數據類型以外的消息時,需要編寫反序列化程序。

可以在以下位置找到針對生產者和消費者的Kafka Java API:https://github.com/ercsonusharma/learnkafka

Kafka如何做到快速?

Kafka遵循一定的策略,這是其設計的一部分,以使其性能更好,更快。

· 無隨機磁碟訪問:它使用稱為不可變隊列的順序數據結構,其中讀寫操作始終為恆定時間O(1)。 它在末尾附加消息,並從頭開始或從特定偏移量讀取。

· 順序I / O:現代作業系統將其大部分可用內存分配給磁碟緩存,並且更快地存儲和檢索順序數據。

· 零複製:由於根本沒有修改磁碟數據,因此不必要地將它們加載到應用程式內存中。 因此,它不是將其加載到應用程式,而是通過套接字,NIC緩衝區和網絡從內核上下文緩衝區發送相同的數據。

· 消息的批處理:為了避免多次網絡呼叫,將多個消息分組在一起。

· 消息壓縮:在通過網絡傳輸消息之前,使用gzip,snappy等壓縮算法對消息進行壓縮,然後在使用者API層將其解壓縮。

數據如何駐留在Broker實例/物理磁碟上?

在打開Kafka伺服器之前,代理中的所有消息都存儲在配置文件中配置的日誌目錄(log-dir-1)中。 在該目錄中,可以找到包含特定主題分區的文件夾,格式為topic_name-partition_number,例如 主題1–0。 __consumer_offsets主題也存儲在同一日誌目錄中。

在特定主題的分區目錄中,可以找到Kafka分段文件0000–00.log,索引文件0000–00.index和時間索引0000–00.timeindex。 當達到舊的段大小或時間限制時,會在創建新的段文件時將屬於該分區的所有數據寫入活動段中。 索引將每個偏移量映射到其消息在日誌中的位置。 由於偏移量是順序的,因此將二進位搜索應用於特定偏移量的日誌文件中的數據索引。

記錄壓縮主題

重複鍵標記為要從段文件中刪除。 可以通過將空值傳遞給特定鍵來更新和刪除此處的值。

謝謝閱讀!

(本文翻譯自Sonu Sharma的文章《Apache Kafka in Depth》,參考:https://medium.com/@sonusharma.mnnit/apache-kafka-in-depth-49aae1e844be)

關鍵字: