Spark Redis MongoDB大數據平台數據服務框架scala源碼推薦

大數據java架構師 發佈 2019-12-28T05:58:53+00:00

先轉發,加關注,然後私信「大數據平台」獲取下載地址大數據平台數據服務框架。實現了Kafka實時數據過濾、清洗、轉換、消費,實現了Spark SQL對Redis、MongoDB等非關係型資料庫的數據的讀寫;集成了規則引擎,可基於規則引擎實現客戶標籤、畫像等相關功能。

先轉發,加關注,然後私信「大數據平台」獲取下載地址


大數據平台數據服務框架。實現了Kafka實時數據過濾、清洗、轉換、消費,實現了Spark SQL對Redis、MongoDB等非關係型資料庫的數據的讀寫;集成了規則引擎,可基於規則引擎實現客戶標籤、畫像等相關功能。

DataService-Framework

項目介紹

基於大數據平台的數據處理服務框架。
結合大數據項目實際使用場景,提取出的一些通用的功能,形成大數據平台數據處理框架。
目前主要實現的功能有:
1、參數信息配置模塊,可實現採用資料庫進行配置和Properties文件進行配置
2、集成Kafka,實現了Kafka的生產者和消費者相關的功能
3、集成MongoDB,實現了MongoDB的數據讀取、寫入等,實現了SparkSQL通過DataFrame與MongoDB的數據進行交互,並且實現了分頁讀取、流式讀取等特殊讀取方式
4、集成Redis,實現了Redis的讀取、寫入等,實現了SparkSQL通過DataFrame與Redis的數據進行交互

5、SparkStreaming流式處理Kafka、MongoDB的數據
6、手動記錄Kafka的偏移量,實現了基於資料庫進行記錄和基於Zookeeper進行記錄
7、集成了規則引擎,客戶標籤、客戶畫像等功能可基於規則引擎進行實現

軟體架構

軟體結構如下:

DataService-Framework      項目根目錄
├── commons             公共功能模塊,提供配置文件讀取、資料庫連接、日誌列印、工具類等公共功能,以供其他模塊調用。  
├── examples            樣例模塊,提供各個功能點的樣例代碼。  
├── kafka-clients       KafkaClients相關功能,比如生產者、消費者等。
├── kafka-streams       主題數據過濾模塊,Kafka自帶的流處理功能,業務系統記錄的日誌如果包含了大量的:程序異常日誌、資料庫操作日誌、調試日誌等日誌信息,而採集的數據只需要日誌文件中的特定數據的日誌記錄,那麼對於我們採集到的日誌來說,可能會有90%以上的日誌都是垃圾數據,但是Flume組件沒有提供日誌過濾功能,而Spark程序又不應該消費這些數據。這時就需要提供一個中間層,將Flume採集到的Topic1的日誌中滿足條件的數據篩選出來放到Topic2中,Spark程序只需要消費Topic2的數據即可,過濾條件按照正則表達式進行配置。這樣Spark消費Topic2的數據都是我們需要的數據,並且我們可以及時的清理掉Topic1的數據以釋放空間。
├── rule-engine         規則引擎功能。
├── spark-sql           SparkSQL相關功能,擴展了Dataset/DataFrame的方法,集成Redis數據的讀寫、MongoDB數據的讀寫。  
├── spark-streaming     SparkStreaming實時數據處理模塊,通過SparkStreaming程序,准實時消費Kafka中的數據,流式方式處理MongoDB中的數據。
└── third-party         第三方源碼
      ├── hammurabi     Scala規則引擎
      ├── mongodb       Spark操作MongoDB
      └── redislabs     Spark操作Redis

功能擴展

目前,軟體實現了Flume數據採集、Kafka主題數據過濾、SparkStreaming實時數據處理。但是SparkStreaming的數據處理只實現了代碼值標準化等基礎功能。並且,目前默認支持的採集日誌格式只有兩種:分隔符分隔欄位的數據、JSON格式的數據。
功能擴展可以從兩個方面進行:

  1. SparkStreaming程序擴展,可以繼續增加程序處理功能,完成更複雜的數據處理,比如:指標加工、客戶行為分析、客戶畫像等
  2. 日誌格式擴展,目前只開發了支持兩種類型的日誌格式,可以自定義類實現com.service.data.spark.streaming.process.TopicValueProcess接口,以實現其他格式的日誌內容的解析。自定義實現類後,需要在spark-kafka模塊的resources/META-INF/services/com.service.data.spark.streaming.process.TopicValueProcess文件中添加一行記錄類名稱,並且在使用過程中將其配置到資料庫中即可。

使用說明

數據端配置工具:數據端配置工具.xlsx
環境搭建部署文檔:環境搭建部署文檔.docx
軟體開發打包文檔:軟體開發打包文檔.docx

先轉發,加關注,然後私信「大數據平台」獲取下載地址

關鍵字: