F1 Query: Declarative Querying at Scale

阿里云云棲號 發佈 2020-01-14T15:18:27+00:00

F1Query 就在這個背景下誕生了,用論文中的話說就是we present F1 Query, an SQL query engine that is unique not because of its focus on doing one thing well, but in

距離 Google 的上一篇 F1 論文,也就是 F1: A Distributed SQL Database That Scales 已經 5 年過去了,Google 在今年的 VLDB 上終於發布了 F1 的新版本 F1 Query: Declarative Querying at Scale,我們今天就來看一下這篇論文。安利一下,在 PingCAP 的 paper party 上,黃東旭,主頁連結大神對這篇論文的講解非常精彩,文章中也部分引用了他的觀點,在此鳴謝。

2013 年的 F1 是基於 Spanner,主要提供 OLTP 服務,而新的 F1 則定位則是大一統:旨在處理 OLTP/OLAP/ETL 等多種不同的 workload。但是這篇新的 F1 論文對 OLTP 的討論則是少之又少,據八卦是 Spanner 開始原生支持之前 F1 的部分功能,導致 F1 對 OLTP 的領地被吞併了。下面看一下論文的具體內容,疏漏之處歡迎指正。

0. 摘要

F1 Query 是一個大一統的 SQL 查詢處理平台,可以處理存儲在 Google 內部不同存儲介質(Bigtable, Spanner, Google Spreadsheet)上面的不同格式文件。簡單來說,F1 Query 可以同時支持如下功能:OLTP 查詢,低延遲 OLAP 查詢,ETL 工作流。F1 Query 的特性包括:

  • 為不同數據源的數據提供統一視圖
  • 利用數據中心的資源提供高吞吐和低延遲的查詢
  • High Scalability
  • Extensibility

1. 背景

在 Google 內部的數據處理和分析的 use case 非常複雜,對很多方面都有不同的要求,比如數據大小、延遲、數據源以及業務邏輯支持。結果導致了許多數據處理系統都只 focus 在一個點上,比如事務式查詢、OLAP 查詢、ETL 工作流。這些不同的系統往往具有不同的特性,不管是使用還是開發上都會有極大的不便利。

F1 Query 就在這個背景下誕生了,用論文中的話說就是

we present F1 Query, an SQL query engine that is unique not because of its focus on doing one thing well, but instead because it aims to cover all corners of the requirements space for enterprise data processing and analysis.

F1 Query 旨在覆蓋數據處理和分析的所有方面。F1 Query 在內部已經應用到了多個產品線,比如 Advertising, Shopping, Analytics 和 Payment。

在 F1 Query 的系統設計過程中,下面幾點考量具有非常關鍵的作用。

  • Data Fragmentation: Google 內部的數據由於本身的特性不同,會被存儲到不同的存儲系統中。這樣會導致一個應用程式依賴的數據可能橫跨多個數據存儲系統中,甚至以不同的文件格式。對於這個問題,F1 Query 對於這些數據提供一個統一的數據視圖。
  • Datacenter Architecture: F1 Query 的目標是多數據中心,這個和傳統的 shared nothing 架構的數據處理系統不同相同。傳統的模式為了降低延遲,往往需要考慮 locality,也就是數據和計算越近越好。由於 Google 內部的網絡環境優勢,locality 的優勢顯得不是那麼重要。所以 F1 Query 更強調計算和存儲分離,這樣計算節點和存儲節點的擴展性(scalability)都會更好。畢竟 Google 內部的系統,scalability 才是第一法則。還有一點值得一提的是,由於使用了 GFS 的更強版本: Colossue File System,磁碟不會成為瓶頸。
  • Scalability: 在 F1 Query 中,short query 會在單個節點上執行,larger query 會以分布式的模式執行,largest query 以批處理 MapReduce 模式執行。對於這些模式,F1 Query 可以通過增加運算的並行度來優化。
  • Extensibility: 對於那些無法用 SQL 語義來表達的查詢需求,F1 通過提供 user-defined functions (UDF)、user-defined aggregate functions (UDAs) 和 table-valued functions (TVF) 來支持。

2. 架構

F1 的架構圖如下所示:

下面的方框裡面是每個 Datacenter 一套。關於各個組件的介紹如下:

  • 用戶通過 client libary 和 F1 Server 交互
  • F1 Master 負責 query 的狀態的運行時監控和其他組件的管理
  • F1 Server 收到用戶請求,對於 short query 直接單機執行查詢;對於 larger query 轉發到多台 worker 上並行執行查詢。最後再匯總結果返回給 client。
  • F1 Worker 負責具體查詢執行
  • F1 Server 和 Worker 都是無狀態的,方便擴展

2.1 query 執行

用戶通過 client libary 提交 query 到 F1 Server 上,F1 Server 首先解析和分析 SQL,然後將涉及到的數據源提取出來,如果某些數據源在當前 datacenter 不存在,則直接將 query 返回給 client 並告知哪些 F1 Server 距離哪些數據源更近。這裡直接將請求返回給業務層,由業務層去 retry,設計的也是非常的簡單。儘管前面說到要將存儲和計算分離,但是這個地方的設計還是考慮到了 locality,datacenter 級別的 locality,畢竟 locality 對查詢延遲的影響還是巨大的。

F1 Server 將 query 解析並優化成 DAG,然後由執行層來執行,具體執行模式(interactive 還是 batch)由用戶指定。原文是: Based on a client- specified execution mode preference, F1 Query executes queries on F1 servers and workers in an interactive mode or in a batch mode.

對於交互式查詢模式(interactive mode)有單節點集中執行模式和多節點分布式執行模式,query 優化會根據啟發式的算法來決定採用哪種模式。集中式下,F1 Server 解析分析 query,然後在當前節點上直接執行並接收查詢結果。分布式下,接收 query 的 F1 Server 充當一個 query coordinator 的角色,將 query 拆解並下發給 worker。交互式查詢在數據量不太大的情況下往往具有不錯的性能和高效的資源利用率。

除了交互式查詢還有一種模式是批處理模式(batch mode)。批處理模式使用 MapReduce 框架異步提交執行執行,相比交互式這種 long-running 方式,批處理模式的可靠性(reliabitly)更高。

2.2 數據源

數據查詢支持跨 datacenter。存儲計算分離模式使得多數據源的支持更加簡單,比如 Spanner, Bigtable, CSV, columnar file 等。為了支持多數據源,F1 Query 在他們之上抽象出了一層,讓數據看起來都是存儲在關係型表裡面。而各個數據源的元數據就存儲在 catalog service 裡面。

對於沒有存儲到 catalog service 裡面的表數據,只要提供一個DEFINE TABLE即可查詢。

DEFINE TABLE People(
      format = 『csv』,
      path = 『/path/to/peoplefile』,
      columns = 『name:STRING,
                 DateOfBirth:DATE』);
    SELECT Name, DateOfBirth FROM People
    WHERE Name = 『John Doe』;

論文中沒有提到的是單看這個 DEFINE TABLE 可以表現力不夠,所說這些信息並不足以表現出數據的行為:

  • 是否支持 partition?
  • 是否支持 邏輯下推?
  • 是否支持索引?
  • 是否支持多種 掃描模式?
  • 對於新數據源的支持可以通過 Table-Valued Function (TVF) 的方式來支持。

2.3 Data Sink

query 的結果可以直接返回給 client,也可以插入到另外一個表裡面。

2.4 SQL

SQL 2011。之所以是 2011 是因為其他老的系統使用的是 2011。

3. 交互式查詢

交互式查詢模式是默認的查詢模式。如前所述,交互式查詢有集中式和分布式,具體使用哪種由優化器分析 client 的 query 然後決定。

3.1 Single Threaded Execution Kernel

集中式的查詢如下圖所示,是一種 pull-based 的單線程執行方式。

3.2 Distributed Execution

如前所述,由優化器分析完 query 決定是否採用分布式模式。在分布式這種模式下接收到 query 的 F1 Server 充當一個 coordinator 的角色,將執行 plan 推給 worker。worker 是多線程的,可以並發執行單個 query 的無依賴的 fragment。Fragment 是執行計劃切分出來的執行計劃片段,非常像 MR 或者 Spark 中的 stage。Fragment 之間通過 Exchange Operator (數據重分布) 連接。

Fragment 的切分過程如下:優化器使用一種基於數據分布依賴的 bottom-up 策略。具體來說每個算子對於輸入數據的分布都有要求,比如 hash 或者依賴其他欄位的分布。典型的例子有 group by key 和 hash join。如果當前的數據分布滿足前後兩個算子的要求,則兩個算子就被放到一個 Fragment 裡面,否則就被分到兩個 Fragment 裡面,然後通過 Exchange Operator 來連接。

下一步就是計算每個 Fragment 的並行度,Fragment 之間並行度互相獨立。葉子節點的 Fragment 的底層 table scan 決定最初的並行度,然後上層通過 width calculator 逐步計算。比如 hash-join 的底層兩個 Fragment 分別是 100-worker 和 50-worker,則 hash-join 這個 Fragment 會使用 100-worker 的並行度。下面是一個具體的例子。

SELECT Clicks.Region, COUNT(*) ClickCount
  FROM Ads JOIN Clicks USING (AdId)
  WHERE Ads.StartDate > 『2018-05-14』 AND
        Clicks.OS = 『Chrome OS』
  GROUP BY Clicks.Region
  ORDER BY ClickCount DESC;

上面 SQL 對應的 Fragment 和一種可能 worker 並行度如下圖所示:

3.3 Partitioning Strategy

數據重分布也就是 Fragment 之間的 Exchange Operator,對於每條數據,數據發送者通過分區函數來計算數據的目的分區數,每個分區數對應一個 worker。Exchange Operator 通過 RPC 調用,擴展可以支持到每個 Fragment 千級的 partion 並發。要求再高就需要使用 batch mode。

查詢優化器將 scan 操作作為執行計劃的葉子節點和 N 個 worker 節點並發。為了並發執行 scan 操作,數據必須要被並發分布,然後由所有 worker 一起產生輸出結果。有時候數據的 partition 會超過 N,而 scan 並發度為 N,多餘的 partition 就交由空閒的 worker 去處理,這樣可以避免數據傾斜。

3.4 Performance Considerations

F1 Query 的主要性能問題在於數據傾斜和訪問模式不佳。Hash join 對於 hot key 尤為敏感。當 hot key 被 worker 載入到內存的時候可能會因為數據量太大而寫入磁碟,從而導致性能下降。

論文中舉了一個 lookup join 的例子,這裡不打算詳述了。

對於這種數據傾斜的問題,F1 Query 的解決方案是 Dynamic Key Range,但是論文中對其描述還是不夠詳細。

F1 Query 對於交互式查詢採用存內存計算,而且沒有 check point。因為是內存計算,所以速度非常的快,但是由於沒有 checkpoint 等 failover 的機制,只能依賴於業務層的重試。

4. 批處理

像 ETL,都是通過 Batch Mode 來處理的。Google 以前都是通過 MapReduce 或者 FlumeJava 來開發的,開發成本一般比較高。相比 SQL 這種方式,不能有效復用 SQL 優化,所以 F1 Query 選擇使用 SQL 來做。

如前所述,交互式查詢不適合處理 worker failure,而 batch mode,也就是批處理這種模式特別適合處理 failover(每一個 stage 結果落盤)。批處理模式復用交互式 SQL query 的一些特性,比如 query 優化,執行計劃生成。交互式模式和批處理模式的核心區別在於調度方式不同:交互式模式是同步的,而批處理模式是異步的。

4.1 Batch Execution Framework

批處理使用的框架是 MapReduce,Fragment 被抽象成 MapReduce 中的 stage,stage 的輸出結果被存儲到 Colossus file system (GFS 二代)。

在 Fragment 映射有一點值得注意的是嚴格來說,Fragment 的 DAG 映射到 mr 是 map-reduce-reduce,對這種模式做一個簡單的變通變成:map-reduce-map<identity>-reduce,如下圖:

關於 MapReduce 的更詳細信息可以參考 Google 03 年那篇論文。

4.2 Batch Service Framework

Framework 會對 batch mode query 的執行進行編排。具體包括:query 註冊,query 分發,調度已經監控 mr 作業的執行。當 F1 Server 接收到一個 batch mode query,它會先生成執行計劃並將 query 註冊到 Query Registry,全局唯一的 Spanner db,用來 track batch mode query。Query Distributor 然後將 query 分發給 datacenter。Query Scheduler 會定期從 Registry 拿到 query,然後生成執行計劃並交給 Query Executor 來處理。

Service Framework 的健壯性非常好:Query Distributor 是選主(master-elect)模式;Query Scheduler 在每個 datacenter 有多個。query 的所有執行狀態都是保存在 Query Registry,這就保證其他的組件是無狀態的。容錯處理:MapReduce 的 stage 會被重試,如果 datacenter 出問題,query 會被分配到新的 datacenter 上重新執行。

5. 優化器

SQL 優化器類似 Spark Catalyst,架構如下圖,不細說了。

6. EXTENSIBILITY

對於很多複雜業務邏輯無法用 SQL 來描述,F1 針對這種情況提供了一種用戶自定義函數的方法,包括 UDF (user-define functions),UDA (aggrega- tion functions) 和 TVF (table-valued functions)。對於簡單的UDF需求,通常直接以SQL或者LUA的形式作為query的一部分;對於更複雜或者性能要求高的UDF需求,則可以用其它高級語言以UDF Server的形式實現。

UDF Server 和 F1 Query 是 RPC 調用關係,有 client 單獨部署在同一個 datacenter。udf server 完全有 client 來控制,無狀態,基本可以無限擴展。

6.1 Scalar Functions

UDF 並不是新的概念,UDF Server 這種部署方式看上去還算新穎一點。但是 UDF Server 這種單獨部署模式一個可能的問題是延遲問題,這裡通過批量流水線的方式來減少延遲。下面是 UDF 的一個例子。

local function string2unixtime(value)
  local y,m,d = match("(%d+)%-(%d+)%-(%d+)")
  return os.time({year=y, month=m, day=d})
end

6.2 Aggregate Functions

UDA 是對多行輸入產生一個單一的輸出,要實現 UDA,用戶需要實現算子 Initialize, Accumulate, and Finalize。另外如要要對多個 UDA 的子聚合結果進行再聚合,用戶可以實現 Reaccumulate。

6.3 Table-Valued Functions

TVF 的輸入是一個 table,輸出是另外一個 table。這種在機器學習的模型訓練場景下比較有用。下面是論文中的具體的一個例子:EventsFromPastDays 就是一個 TVF。

SELECT * FROM EventsFromPastDays(
     3, TABLE Clicks);

當然 TVF 也支持用 SQL 來描述,如下。

CREATE TABLE FUNCTION EventsFromPastDays(
     num_days INT64, events ANY TABLE) AS
     SELECT * FROM events
     WHERE date >= DATE_SUB(
         CURRENT_DATE(),
         INTERVAL num_days DAY);

7. Production Metric

下面是 F1 Query 在 Production 環境下的幾個 metrics。

8. 總結

回過頭來看 F1 Query 最新的這篇論文給人最大的啟發就是大一統的思想,這個很有可能是行業發展趨勢。回想一下 MapReduce 論文由 Google 於 2003 年發表,開源實現 Hadoop 於 2005 問世。不妨期待了一下未來的 3 到 5 年的 F1 Query 的開源產品。

作者:陶克路

本文為阿里雲原創內容,未經允許不得轉載。

關鍵字: