大數據Flume技術解析

it智能化專欄 發佈 2022-12-27T01:13:43.429480+00:00

Flume的事務機制: Flume 使用兩個獨立的事務分別負責從 Soucrce 到 Channel ,以及從 Channel 到 Sink 的事件傳遞。

(一)Flume概述、Flume快速入門

1 Flume 概述

1.1 Flume 定義

Flume 是Cloudera 提供的一個高可用的,高可靠的,分布式的海量日誌採集、聚合和傳輸的系統。Flume 基於流式架構,靈活簡單。


Flume 最主要的作用就是,實時讀取伺服器本地磁碟的數據,將數據寫入到 hdfs。

1.2 Flume 基礎架構

1.2.1 Agent

Agent 是一個 JVM 進程,它以事件的形式將數據從源頭送至目的。
Agent 主要由 3 個部分組成, SourceChannelSink

1.2.2 Source

Source 是負責接收數據到 Flume Agent 的組件。 Source 組件可以處理各種類型、各種格式的日誌數據,包括avrothriftexecjmsspooling directorynetcatsequence generatorsysloghttplegacy

1.2.3 Sink

Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被發送到另一個 Flume Agent 。
Sink組件目的地包括
hdfsloggeravrothriftipcFileHBasesolr 、自定義。

1.2.4 Channel

Channel 是位於 Source 和 Sink 之間的緩衝區。因此, Channel 允許 Source 和 Sink 運作在不同的速率上。 Channel 是線程安全的,可以同時處理幾個 Source 的寫入操作和幾個Sink 的讀取操作。

Flume 自帶三種 Channel:Memory ChannelFile Channel 以及 Kafka Channel

Memory Channel是內存中的隊列。 Memory Channel 在不需要關心數據丟失的情景下適用。如果需要關心數據丟失,那麼 Memory Channel 就不應該使用,因為程序死亡、機器宕


File Channel 將所有事件寫到磁碟。因此在程序關閉或機器宕機的情況下不會丟失數據。

1.2.5 Event

傳輸單元,Flume 數據傳輸的基本單元,以 Event 的形式將數據從源頭送至目的地。Event 由HeaderBody 兩部分組成, Header 用來存放該 event 的一些屬性,為 K-V 結構,Body 用來存放該條數據,形式為字節數組。

2 Flume 快速入門

2.1 Flume 安裝部署

2.1.1 安裝地址

(1)Flume 官網地址
http://flume.apache.org/

(2)文檔查看地址
http://flume.apache.org/FlumeUserGuide.html

(3)下載地址
http://arcHive.apache.org/dist/flume/

2.1.2 安裝部署

(1)將 apache-flume-1.7.0-bin.tar.gz 上傳到 linux 的/opt/software 目錄下

(2)解壓 apache-flume-1.7.0 bin.tar.gz 到/opt/module/目錄下

[Tom@Hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
1

(3)修改 apache-flume-1.7.0-bin 的名稱為 flume-1.7.0

[Tom@hadoop102 module]$ mv apache-flume-1.7.0-bin flume-1.7.0
1

(4)將 flume/conf 下的 flume-env.sh.template 文件修改為 flume-env.sh,並配置 flume-env.sh 文件

[Tom@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[Tom@hadoop102 conf]$ vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_212
123

2.2 Flume 入門案例

2.2.1 監控埠數據官方案例

1. 案例需求
使用 Flume 監聽一個埠, 收集該埠數據 ,並列印到控制台。

2. 需求分析


3. 實現步驟
(1)安裝 netcat 工具

[Tom@hadoop102 software]$ sudo yum install -y nc
1

(2)判斷44444 埠是否被占用

[Tom@hadoop102 flume-1.7.0]$ sudo netstat -tunlp | grep 44444
1

(3)創建Flume Agent 配置文件flume-netcat-logger.conf
在 flume-1.7.0 目錄下創建 job 文件夾並進入 job 文件夾。

[Tom@hadoop102 flume]$ mkdir job
[Tom@hadoop102 flume]$ cd job/
12

在 job 文件夾下創建 Flume Agent 配置文件netcat-flume-logger.conf

[Tom@hadoop102 job]$ vim netcat-flume-logger.conf
1

netcat-flume-logger.conf 文件中添加如下內容:

# Name the components on this agent    a1:表示agent的名稱
a1.sources = r1    # r1:表示a1的Source的名稱
a1.sinks = k1    # k1:表示a1的Sink的名稱
a1.channels = c1    # c1: 表示a1的Channel的名稱

# Describe/configure the source
a1.sources.r1.type = netcat    # 表示a1的輸入源類型為netcat埠類型
a1.sources.r1.bind = localhost    # 表示a1的監聽的主機
a1.sources.r1.port = 44444    # 表示a1的監聽的埠號

# Describe the sink
a1.sinks.k1.type = logger    # 表示a1的輸出目的地是控制台logger類型

# Use a channel which buffers events in memory
a1.channels.c1.type = memory    # 表示a1的channel類型是memory內存型
a1.channels.c1.capacity = 1000    # 表示a1的channel總容量為1000個event
a1.channels.c1.transactionCapacity = 100    # 表示a1的channel傳輸時收集到了100條event以後再去提交事務

# Bind the source and sink to the channel
a1.sources.r1.channels = c1    # 表示將r1和c1連接起來
a1.sinks.k1.channel = c1    # 表示將k1和c1連接起來
123456789101112131415161718192021

註:配置文件來源於官方手冊 http://flume.apache.org/FlumeUserGuide.html

(4)先開啟 flume 監聽埠
第一種寫法:

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/netcat-flume-logger.conf  -Dflume.root.logger=INFO,console
1

第二種寫法

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
1

參數說明:
--conf/-c:表示配置文件存儲在conf/目錄
--name/-n:表示給agent 起名為a1
--conf-file/-f:flume 本次啟動讀取的配置文件是在job 文件夾下的flume-telnet.conf文件。
-Dflume.root.logger=INFO,console :-D 表示 flume 運行時動態修改flume.root.logger 參數屬性值,並將控制台日誌列印級別設置為INFO 級別。日誌級別包括:log、info、warn、error。

(5)使用 netcat 工具向本機的 44444 埠發送內容

[Tom@hadoop102 job]$ nc localhost 44444
hello
OK
HUST
OK
12345

(6)在Flume 監聽頁面觀察接收數據情況

2.2.2 實時監控單個追加文件

1. 案例需求:實時監控 Hive 日誌,並上傳到 HDFS 中

2. 需求分析


3. 實現步驟
(1)Flume 要想將數據輸出到HDFS,須持有Hadoop 相關jar 包。將以下 jar 包
拷貝到
opt/module/flume-1.7.0/lib文件夾下。

(2)創建file-flume-hdfs.conf文件

[Tom@hadoop102 job]$ vim file-flume-hdfs.conf
1

註:要想讀取 Linux 系統中的文件,就得按照 Linux 命令的規則執行命令。由於Hive 日誌在 Linux 系統中,所以讀取文件的類型選擇:exec 即 execute 執行的意思。表示執行Linux 命令來讀取文件。
添加如下內容

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive-3.1.2/logs/hive.log

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimestamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 30
#設置每個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
1234567891011121314151617181920212223242526272829303132333435363738394041

注意:對於所有與時間相關的轉義序列,Event Header 中必須存在以』 timestamp』的 key (除非hdfs.useLocalTimeStamp 設置為 true ,此方法會使用 TimestampInterceptor 自動添加 timestamp)。a3.sinks.k3.hdfs.useLocalTimeStamp = true

(3)運行 Flume

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/file-flume-hdfs.conf
1

(4)開啟 Hadoop 和 Hive 並操作 Hive 產生日誌

[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-yarn.sh
[Tom@hadoop102 hive-3.1.2]$ bin/hive
123

(5)在HDFS上查看文件

2.3.3 實時監控目錄下多個新文件

1. 案例需求:使用 Flume 監聽整個目錄的文件,並上傳至HDFS

2. 需求分析


3. 實現步驟

(1)創建配置文件dir-flume-hdfs.conf
創建一個文件

[Tom@hadoop102 job]$ vim dir-flume-hdfs.conf
1

添加如下內容

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /opt/module/flume-1.7.0/upload
#忽略所有以.tmp結尾的文件,不上傳
a2.sources.r2.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 30
#設置每個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
12345678910111213141516171819202122232425262728293031323334353637383940414243

(2)啟動監控文件夾命令

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/dir-flume-hdfs.conf
1

說明:在使用 Spooling Directory Source 時不要在監控目錄中創建並持續修改文件,上傳完成的文件會以.COMPLETED結尾,被監控文件夾每 500 毫秒掃描一次文件變動。

(3)向upload 文件夾中添加文件
/opt/module/flume-1.7.0目錄下創建 upload 文件夾

[Tom@hadoop102 flume]$ mkdir upload
1

向 upload 文件夾中添加文件

[huxili@hadoop102 upload]$ touch hust.txt
[huxili@hadoop102 upload]$ touch hust.tmp
[huxili@hadoop102 upload]$ touch hust.log
123

(4) 查看 HDFS 上的數據


(5)等待 1s,再次查詢 upload 文件夾

[Tom@hadoop102 upload]$ ll
-rw-rw-r--. 1 Tom Tom  0 9月  11 20:38 hust.log.COMPLETED
-rw-rw-r--. 1 Tom Tom  0 9月  11 20:38 hust.tmp
-rw-rw-r--. 1 Tom Tom  0 9月  11 20:38 hust.txt.COMPLETED
1234

2.2.4 實時監控目錄下的多個追加文件

Exec source適用於監控一個實時追加的文件,但不能保證數據不丟失;Spooldir Source能夠保證數據不丟失,且能夠實現斷點續傳,但延遲較高,不能實時監控;而Taildir Source既能夠實現斷點續傳,又可以保證數據不丟失,還能夠進行實時監控。
1. 案例需求:使用 Flume 監聽整個目錄的實時追加文件,並上傳至HDFS。(在實際操作中我們直接列印到控制台,這樣更直觀)

2. 需求分析:


3. 實現步驟
(1)創建配置文件flume-taildir-hdfs.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/files/file1.txt
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.7.0/files/file2.txt
a1.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position.json

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
12345678910111213141516171819202122

(2)啟動監控文件夾命令

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-taildir-hdfs.conf
1

(3)向 files 文件夾中追加內容

[Tom@hadoop102 flume]$ mkdir files
1

向 upload 文件夾中添加文件

[Tom@hadoop102 files]$ echo hello >> file1.txt 
[Tom@hadoop102 files]$ echo hust >> file2.txt 
12

(4)查看數據


Taildir 說明:
Taildir Source 維護了一個json 格式的 position File,其會定期的往 position File 中更新每個文件讀取到的最新的位置,因此能夠實現斷點續傳。Position File 的格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
12

註:Linux 中儲存文件元數據的區域就叫做 inode,每個 inode 都有一個號碼,作業系統用 inode 號碼來識別不同的文件,Unix/Linux 系統內部不使用文件名,而使用 inode 號碼來識別文件。

(二)Flume進階、常見問題

1Flume進階

1.1 Flume 事務

1.2 Flume Agent 內部原理


重要組件
(1)ChannelSelector
ChannelSelector的作用就是選出 Event 將要被發往哪個 Channel。其共有兩種類型,分別是
Replicating(複製)和Multiplexing(多路復用)。
ReplicatingSelector 會將同一個 Event 發往所有的 Channel,Multiplexing 會根據相應的原則,將不同的 Event 發往不同的 Channel 。

(2)SinkProcessor
SinkProcessor 共有三種類型,分別是
DefaultSinkProcessorLoadBalancingSinkProcessorFailoverSinkProcessor
DefaultSinkProcessor 對應的是單個的 Sink,LoadBalancingSinkProcessor 和 FailoverSinkProcessor 對應的是 Sink Group,LoadBalancingSinkProcessor 可以實現負載均衡的功能,FailoverSinkProcessor 可以實現故障轉移的功能。

1.3 Flume 拓撲結構

1.3.1 簡單串聯


這種模式是將多個 flume 順序連接起來了,從最初的 source 開始到最終 sink 傳送的目的存儲系統。此模式不建議橋接過多的 flume 數量,flume 數量過多不僅會影響傳輸速率,而且一旦傳輸過程中某個節點 flume 宕機,會影響整個傳輸系統。

1.3.2 複製和多路復用


Flume 支持將事件流向一個或者多個目的地。這種模式可以將相同數據複製到多個 channel 中,或者將不同數據分發到不同的 channel 中, sink 可以選擇傳送到不同的目的地。

1.3.3 負載均衡和故障轉移


Flume 支持使用將多個 sink 邏輯上分到一個 sink 組, sink 組配合不同的 SinkProcessor 可以實現負載均衡和錯誤恢復的功能。

1.3.4 聚合


這種模式是我們最常見的,也非常實用,日常 web 應用通常分布在上百個伺服器,大者甚至上千個、上萬個伺服器。產生的日誌,處理起來也非常麻煩。用flume 的這種組合方式能很好的解決這一問題,每台伺服器部署一個flume 採集日誌,傳送到一個集中收集日誌的 flume,再由此flume 上傳到hdfs、hive、hbase 等,進行日誌分析。

1.4 Flume 企業開發案例

1.4.1 複製和多路復用

1. 案例需求
使用 Flume-1 監控文件變動,Flume-1 將變動內容傳遞給Flume-2,Flume-2 負責存儲到 HDFS。同時 Flume-1 將變動內容傳遞給Flume-3,Flume-3 負責輸出到 Local FileSystem。

2. 需求分析


3. 實現步驟
(1)準備工作
/opt/module/flume-1.7.0/job目錄下創建 group1 文件夾

[Tom@hadoop102 job]$ cd group1/
1

/opt/module/flume-1.7.0/datas/目錄下創建 flume3 文件夾

[Tom@hadoop102 datas]$ mkdir flume3
1

(2)創建flume-file-flume.conf
配置1 個接收日誌文件的source 和兩個channel、兩個sink,分別輸送給 flume-flume-hdfs 和 flume-flume-dir 。
編輯配置文件

[Tom@hadoop102 group1]$ vim flume-file-flume.conf
1

添加如下內容

#name
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/data/hive.log
a1.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position1.json

# 將數據流複製給所有channel
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Sink
# sink 端的 avro 是一個數據發送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

#Bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
12345678910111213141516171819202122232425262728293031323334353637

(3)創建 flume-flume-hdfs .conf
配置上級 Flume 輸出的 Source,輸出是到 HDFS 的 Sink 。
編輯配置文件

[Tom@hadoop102 group1]$ vim flume-flume-hdfs.conf
1

添加如下內容

#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/group1/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#設置每個文件的滾動大小
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
123456789101112131415161718192021222324252627282930313233343536373839404142

(4)創建 flume-flume-dir .conf
配置上級 Flume 輸出的 Source,輸出是到本地目錄的 Sink 。
編輯配置文件

[Tom@hadoop102 group1]$ vim flume-flume-dir.conf
1

添加如下內容

#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.7.0/data/group1

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
12345678910111213141516171819202122

提示:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,並不會創建新的目錄。

(5)執行配置文件
分別啟動對應的 flume 進程:flume-flume-dir,flume-flume-hdfs,flume-file-flume 。

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
12345

(6)啟動 Hadoop 並向 hive.log 添加數據

[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-yarn.sh

[Tom@hadoop102 data]$ echo hello >> hive.log 
[Tom@hadoop102 data]$ echo hust >> hive.log 
12345

(7)檢查 HDFS 上數據

(8)檢查 /opt/module/flume-1.7.0/datas/flume3目錄中數據

總用量 16
-rw-rw-r--. 1 Tom Tom  6 9月  12 22:02 1631453983368-46
-rw-rw-r--. 1 Tom Tom  5 9月  12 22:02 1631453983368-47
-rw-rw-r--. 1 Tom Tom  0 9月  12 22:03 1631453983368-48
1234

1.4.2 負載均衡和故障轉移

1. 案例需求
使用 Flume1 監控一個埠,其 sink 組中的 sink 分別對接 Flume2 和 Flume3 ,採用 FailoverSinkProcessor ,實現故障轉移的功能。

2. 需求分析


3. 實現步驟
(1)準備工作
/opt/module/flume-1.7.0/job目錄下創建 group2 文件夾

[Tom@hadoop102 job]$ cd group2/
1

(2)創建 flume-netcat-flume.conf
配置1 個 netcat source 和1 個channel、1 個sink group(2 個sink),分別輸送給 flume-flume-console1 和 flume-flume-console2。
編輯配置文件

[Tom@hadoop102 group2]$ vim flume-netcat-flume.conf
1

添加如下內容

#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sinkgroups = g1

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

#SinkGroup
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
123456789101112131415161718192021222324252627282930313233343536

(3)創建 flume-flume-console1 .conf
配置上級 Flume 輸出的 Source,輸出是到本地控制台。
編輯配置文件

[Tom@hadoop102 group2]$ vim flume-flume-console1.conf
1

添加如下內容

#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
123456789101112131415161718192021

(4)創建 flume-flume-console2 .conf
配置上級 Flume 輸出的 Source,輸出是到本地控制台。
編輯配置文件

[Tom@hadoop102 group2]$ vim flume-flume-console2.conf
1

添加如下內容

#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
123456789101112131415161718192021

<font color= size=3 >(5)執行配置文件
分別開啟對應配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume 。

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

[huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-consosole1.conf -Dflume.root.logger=INFO,console

[huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
12345

(6)使用 netcat 工具向本機的 44444 埠發送內容

[Tom@hadoop102 flume-1.7.0]$ nc localhost 44444
1

(7)查看flume-flume-console2 及 flume-flume-console1 的控制台列印日誌

(8)將 flume-flume-console2 kill ,觀察 flume-flume-console1 的控制台列印情況。

使用jps -ml查看 Flume 進程

[Tom@hadoop102 job]$ jps -ml
5696 org.apache.flume.node.Application -n a3 -f job/group2/flume-flume-console2.conf
5430 org.apache.flume.node.Application -n a1 -f job/group2/flume-netcat-flume.conf
5275 org.apache.flume.node.Application -n a2 -f job/group2/flume-flume-console1.conf
3581 org.apache.hadoop.hdfs.server.datanode.DataNode
5821 sun.tools.jps.Jps -ml
3438 org.apache.hadoop.hdfs.server.namenode.NameNode
1234567

1.4.3 聚合

1. 案例需求
hadoop102 上的 Flume1 監控文件
opt/module/data/group.log
hadoop103 上的 Flume2 監控某一個埠的數據流,
Flume1 與 Flume2 將數據發送給 hadoop104 上的 Flume3,Flume3 將最終數據列印到控制台。

2. 需求分析


3. 實現步驟
(1)準備工作
分發 Flume

[Tom@hadoop102 module]$ xsync flume
1

在hadoop102、hadoop103 以及hadoop104 的/opt/module/flume-1.7.0/job目錄下創建一個group3 文件夾。

[Tom@hadoop102 job]$ mkdir group3
[Tom@hadoop103 job]$ mkdir group3
[Tom@hadoop104 job]$ mkdir group3
123

(2)創建 flume1-logger-flume.conf
配置 Source 用於監控 hive.log 文件,配置 Sink 輸出數據到下一級 Flume。
在 hadoop102 上編輯配置文件

[Tom@hadoop102 group3]$ vim flume1-logger-flume.conf
1

添加如下內容

#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = TAILDIR
a2.sources.r1.filegroups = f1
a2.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/data/flume.log
a2.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position2.json

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
123456789101112131415161718192021222324

(3)創建 flume2-netcat-flume.conf
配置 Source 監控埠 44444 數據流,配置 Sink 數據到下一級 Flume:在 hadoop103 上編輯配置文件

[Tom@hadoop103 group3 ]$ vim flume2-netcat-flume.conf
1

添加如下內容

#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = netcat
a3.sources.r1.bind = localhost
a3.sources.r1.port = 44444

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = avro
a3.sinks.k1.hostname = hadoop104
a3.sinks.k1.port = 4142

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
1234567891011121314151617181920212223

(4)創建 flume3-flume-logger.conf
配置 source 用於接收 flume1 與 flume2 發送過來的數據流,最終合併後 sink 到控制台。
在 hadoop104 上編輯配置文件

[Tom@hadoop104 group3 ]$ touch flume3-flume-logger.conf
[Tom@hadoop104 group3 ]$ vim flume3-flume-logger.conf
12

添加如下內容

#name
a4.sources = r1 r2
a4.channels = c1
a4.sinks = k1

#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = hadoop104
a4.sources.r1.port = 4141

a4.sources.r2.type = avro
a4.sources.r2.bind = hadoop104
a4.sources.r2.port = 4142

#Channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100

#Sink
a4.sinks.k1.type = logger

#Bind
a4.sources.r1.channels = c1
a4.sources.r2.channels = c1
a4.sinks.k1.channel = c1
1234567891011121314151617181920212223242526

(5) 執行配置文件
分別開啟對應配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf 。

[Tom@hadoop104 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a4 -f job/group4/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

[Tom@hadoop103 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group4/flume2-netcat-flume.conf

[huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group4/flume1-logger-flume.conf  
12345

(6)在 hadoop102 上向 /opt/module/flume-1.7.0/data/目錄下的 group .log 追加內容

[Tom@hadoop102 data]$ echo "hello" >> flume.log 
1

(7)在 hadoop103 上向 44444 埠發送數據

[Tom@hadoop103 flume-1.7.0]$ nc localhost 44444
hust
OK
123

(8)檢查 hadoop104 上數據

1.5 自定義 Interceptor

1. 案例需求
使用 Flume 採集伺服器本地日誌,需要按照日誌類型的不同,將不同種類的日誌發往不同的分析系統。

2. 需求分析
在實際的開發中,一台伺服器產生的日誌類型可能有很多種,不同類型的日誌可能需要發送到不同的分析系統。此時會用到 Flume 拓撲結構中的 Multiplexing 結構, Multiplexing 的原理是,根據 event 中 Header 的某個 key 的值,將不同的 event 發送到不同的 Channel 中,所以我們需要自定義一個Interceptor,為不同類型的event 的Header 中的key 賦予不同的值。
在該案例中,我們以埠數據模擬日誌,以數字(單個)和字母(單個)模擬不同類型的日誌,我們需要自定義 interceptor 區分數字和字母,將其分別發往不同的分析系統(Channel)(實際測試時,我們測試字符串是否包含』『hello』』)。


3. 實現步驟
(1)創建一個 maven 項目,並引入以下依賴。

<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
</dependencies>
1234567

(2)定義 CustomInterceptor 類並實現 Interceptor 接口。

package com.Tom.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TypeInterceptor implements Interceptor {
    // 聲明一個存放事件的集合
    private List<Event> addHeaderEvents;

    @Override
    public void initialize() {
        // 初始化
        addHeaderEvents = new ArrayList<Event>();
    }

    @Override
    // 單個事件攔截
    public Event intercept(Event event) {
        // 1. 獲取事件的頭信息
        Map<String, String> headers = event.getHeaders();

        // 2. 獲取事件中的body信息
        String body = new String(event.getBody());

        // 3. 根據body中是否有"hello"來決定添加怎樣的頭信息
        if(body.contains("hello")){
            // 4. 添加頭信息
            headers.put("topic", "first");
        } else {
            headers.put("topic", "second");
        }
        return event;
    }

    @Override
    // 批量事件攔截
    public List<Event> intercept(List<Event> events) {
        // 1. 清空集合
        addHeaderEvents.clear();
        // 2. 遍歷events
        for (Event event : events){
            // 3. 給每一個事件添加頭信息
            addHeaderEvents.add(intercept(event));
        }
        // 4. 返回結果
        return addHeaderEvents;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970

(3)編輯 flume 配置文件
為 hadoop102 上的 Flume1 配置 1 個 netcat source,1 個 sink group(2 個 avro sink),並配置相應的 ChannelSelector 和 interceptor 。

#Name
a2.sources = r1
a2.channels = c1 c2
a2.sinks = k1 k2

#Source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444

#Interceptor
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type = com.Tom.interceptor.TypeInterceptor$Builder

#Channel Selector
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.first = c1
a2.sources.r1.selector.mapping.second = c2

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop103
a2.sinks.k1.port = 4141

a2.sinks.k2.type = avro
a2.sinks.k2.hostname = hadoop104
a2.sinks.k2.port = 4142

#Bind
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2
123456789101112131415161718192021222324252627282930313233343536373839404142

為 hadoop103 上的 Flume 2 配置一個 avro source 和一個 logger sink 。

#Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop103
a3.sources.r1.port = 4141

#Sink
a3.sinks.k1.type = logger

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Bind
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1
123456789101112131415161718192021

為hadoop104 上的 Flume3 配置一個 avro source 和一個 logger sink 。

#Name
a4.sources = r1
a4.sinks = k1
a4.channels = c1

#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = hadoop104
a4.sources.r1.port = 4142

#Sink
a4.sinks.k1.type = logger
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100

# Channel
a4.sinks.k1.channel = c1
a4.sources.r1.channels = c1
12345678910111213141516171819

(4)分別在 hadoop103,hadoop104,hadoop102 上啟動 flume 進程 (注意啟動順序)。

[Tom@hadoop103 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/interceptor/flume3.conf -Dflume.root.logger=INFO,console

[Tom@hadoop104 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a4 -f job/interceptor/flume4.conf -Dflume.root.logger=INFO,console

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/interceptor/flume2.conf 
12345

(5)在 hadoop102 使用 netcat 向 localhost:44444 發送字符串。

[Tom@hadoop102 flume-1.7.0]$ nc localhost 44444
helloworld
OK
world   
OK
thanks
OK
hello hust
OK
123456789

(6)觀察 hadoop103 和 hadoop104 列印的日誌 。


1.6 自定義 Source

1. 介紹
Source 是負責接收數據到 Flume Agent 的 組件。 Source 組件可以處理各種類型、各種格式的日誌數據 包括 avro 、 thrift 、 exec 、 jms 、 spooling directory 、 netcat 、 sequence、generator 、 syslog 、 http 、 legacy 。官方提供的 source 類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 source 。
官方也提供了自定義 source 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source
根據官方說明自定義 MySource 需要繼承 AbstractSource 類並實現 Configurable 和 PollableSource 接口。
實現相應方法:
getBackOffSleepIncrement() //暫不用
getMaxBackOffSleepInterval() //暫不用
configure(Context context) //初始化 context (讀取配置文件內容)
process() //獲取數據封裝成 event 並寫入 channel ,這個方法將被循環調用。
使用場景:讀取 MySQL 數據或者其他文件系統。

2. 需求
使用 flume 接收數據,並給每條數據添加前綴,輸出到控制台。前綴可從 flume 配置文件中配置。


3. 分析


4. 編碼
導入 pom 依賴

<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>
1234567

編寫代碼

package source;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

public class MySource extends AbstractSource implements Configurable, PollableSource {
    // 定義全局的前綴和後綴
    private String prefix;
    private String subfix;

    /**
     * 1. 接受數據(for循環造數據)
     * 2. 封裝為事件
     * 3. 將事件傳給channel
     */
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        // 1. 接收數據
        try {
            for (int i = 0; i < 5; ++i){
                // 2. 構建事件對象
                SimpleEvent evevt = new SimpleEvent();

                //3. 給事件設置值
                evevt.setBody((prefix + "--" + i + "--" + subfix).getBytes());

                //4. 將事件傳給channel
                getChannelProcessor().processEvent(evevt);

                status = Status.READY;
            }
        } catch (Exception e) {
            e.printStackTrace();
            status = Status.BACKOFF;
        }

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 返回結果
        return status;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    @Override
    public void configure(Context context) {
        // 讀取配置文件, 給前後綴賦值
        prefix = context.getString("prefix");
        subfix = context.getString("subfix", "Tom");

    }
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869

5. 測試
(1)打包。將寫好的代碼打包,並放到 flume 的 lib 目錄(opt/module/flume)下。

(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = source.MySource
a1.sources.r1.prefix = online
a1.sources.r1.subfix = offline

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
123456789101112131415161718192021

(3)開啟任務

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
1

(4)結果展示

1.7 自定義 Sink

1. 介紹
Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被發送到另一個 Flume Agent 。
Sink 是完全事務性的。在從 Channel 批量刪除數據之前,每個 Sink 用 Channel 啟動一個事務。批量事件一旦成功寫出到存儲系統或下一個 Flume Agent,Sink 就利用 Channel 提交事務。
事務一旦被提交,該 Channel 從自己的內部緩衝區刪除事件。
Sink 組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。官方提供的Sink 類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些Sink。
官方也提供了自定義sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink
根據官方說明自定義 MySink 需要繼承 AbstractSink 類並實現 Configurable 接口。
實現相應方法:
configure(Context context) //初始化context(讀取配置文件內容)
process() //從 Channel 讀取獲取數據(event),這個方法將被循環調用。
使用場景:讀取 Channel 數據寫入 MySQL 或者其他文件系統。

2. 需求
使用 flume 接收數據,並在 Sink 端給每條數據添加前綴和後綴,輸出到控制台。前後綴可在 flume 任務配置文件中配置。
流程分析:


3. 編碼

package sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {
    // 獲取Logger對象
    private Logger logger = LoggerFactory.getLogger(MySink.class);

    // 定義兩個屬性,前後綴
    private String prefix;
    private String subfix;

    /**
     * 1 獲取Channel
     * 2 從Channel獲取事務及數據
     * 3 發送數據
     */
    @Override
    public Status process() throws EventDeliveryException {
        // 1 定義返回值
        Status status = null;

        // 2 獲取Channel
        Channel channel = getChannel();

        // 3 從Channel獲取事務
        Transaction transaction = channel.getTransaction();

        // 4 開啟事務
        transaction.begin();

        try {
            // 5 從Channel獲取數據
            Event event = channel.take();

            // 6 處理事件
            if (event != null){
                String body = new String(event.getBody());
                logger.info(prefix + body  + subfix);
                // logger.error(prefix + body  + subfix);
            }

            // 7 提交事務
            transaction.commit();

            // 8 成功提交, 修改狀態信息
            status = Status.READY;
        } catch (ChannelException e) {
            e.printStackTrace();

            // 9 提交事務失敗
            transaction.rollback();

            // 10 修改狀態
            status = Status.BACKOFF;

        } finally {
            // 11 最終, 關閉事務
            transaction.close();
        }

        // 12 返回狀態信息
        return status;
    }

    @Override
    public void configure(Context context) {

        // 讀取配置文件, 為前後綴賦值
        prefix = context.getString("prefix");
        subfix = context.getString("subfix", "Tom");
    }
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677

4. 測試
(1)打包。將寫好的代碼打包,並放到 flume 的 lib 目錄(/opt/module/flume)下。
(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = sink.MySink
a1.sinks.k1.prefix = online--
a1.sinks.k1.subfix = --offline

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1234567891011121314151617181920212223

(3)開啟任務

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console

[Tom@hadoop102 flume-1.7.0]$ nc localhost 44444
hello
OK
HUST
OK
1234567

(4)結果展示

2 常見問題

2.1 你是如何實現 Flume 數據傳輸的監控的?

使用第三方框架 Ganglia 實時監控Flume。

2.2 Flume 的 Source,Sink,Channel 的作用?你們 Source 是什麼類型?

1. 作用
(1)Source 組件是專門用來收集數據的,可以處理各種類型、各種格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
(2)Channel 組件對採集到的數據進行緩存,可以存放在 Memory 或File 中。
(3)Sink 組件是用於把數據發送到目的地的組件,目的地包括 HDFS、Logger、avro、thrift、ipc、file、Hbase、solr、自定義。

2. 我公司採用的Source 類型為
(1)監控後台日誌:exec
(2)監控後台產生日誌的埠:netcat、exec、spooldir

2.3 Flume 的 Channel Selectors


Channel Selectors,可以讓不同的項目日誌通過不同的 Channel 到不同的 Sink 中去。
官方文檔上 Channel Selectors 有兩種類型:Replicating Channel Selector (default)和 Multiplexing Channel Selector
這兩種Selector的區別是:Replicating 會將source過來的events發往所有channel,而Multiplexing可以選擇該發往哪些Channel。

2.4 Flume 參數調優

(1)Source
增加 Source 個數(使用 Tair Dir Source 時可增加 FileGroups 個數)可以增大 Source 的讀取數據的能力。例如:當某一個目錄產生的文件過多時需要將這個文件目錄拆分成多個文件目錄,同時配置好多個 Source 以保證 Source 有足夠的能力獲取到新產生的數據。
batchSize 參數決定 Source 一次批量運輸到 Channel 的 event 條數,適當調大這個參數可以提高Source 搬運 Event 到 Channel 時的性能。
(2)Channel
type 選擇 memory 時 Channel 的性能最好,但是如果 Flume 進程意外掛掉可能會丟失數據。 type 選擇 file 時 Channel 的容錯性更好,但是性能上會比 memory channel 差。
使用 file Channel 時 dataDirs 配置多個不同盤下的目錄可以提高性能。
Capacity 參數決定 Channel 可容納最大的 event 條數。 transactionCapacity 參數決定每
次 Source 往 channel 裡面寫的最大 event 條數和每次 Sink 從 channel 裡面讀的最大 event
條數。
transactionCapacity 需要大於 Source 和 Sink 的 batchSize 參數。
(3)Sink
增加 Sink 的個數可以增加 Sink 消費 event 的能力。 Sink 也不是越多越好夠用就行,過
多的 Sink 會占用系統資源,造成系統資源不必要的浪費。
batchSize 參數決定 Sink 一次批量從 Channel 讀取的 event 條數,適當調大這個參數可
以提高 Sink 從 Channel 搬出 event 的性能。

2.5 Flume 的事務機制

Flume的事務機制(類似資料庫的事務機制): Flume 使用兩個獨立的事務分別負責從 Soucrce 到 Channel ,以及從 Channel 到 Sink 的事件傳遞。比如 spooling directory source 為文件的每一行創建一個事件,一旦事務中所有的事件全部傳遞到 Channel 且提交成功,那麼 Soucrce 就將該文件標記為完成。同理,事務以類似的方式處理從 Channel 到 Sink 的傳遞過程,如果因為某種原因使得事件無法記錄,那麼事務將會回滾。且所有的事件都會保持到 Channel 中,等待重新傳遞。

2.6 Flume 採集數據會丟失嗎?

根據 Flume 的架構原理, Flume 是不可能丟失數據的,其內部有完善的事務機制,Source 到 Channel 是事務性的, Channel 到 Sink 是事務性的,因此這兩個環節不會出現數據的丟失,唯一可能丟失數據的情況是 Channel 採用 memory Channel agent 宕機導致數據丟失,或者 Channel 存儲數據已滿,導致 Source 不再寫入,未寫入的數據丟失。
Flume 不會丟失數據,但是有可能造成數據的重複,例如數據已經成功由 Sink 發出,但是沒有接收到響應, Sink 會再次發送數據,此時可能會導致數據的重複 。

關鍵字: