戰「疫」期,阿里云云效團隊在家高效開發實錄

阿里云云棲號 發佈 2020-04-16T01:19:33+00:00

ratings.groupby.agg通過一行簡單的pandas.read_csv 就可以讀取 CSV 數據,接著按 userId 做分組聚合,求 rating 這列在每組的總和、平均、最大、最小值。

背景

在數據科學世界,Python 是一個不可忽視的存在,且有愈演愈烈之勢。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。

Numpy

Numpy 是數值計算的基礎包,內部提供了多維數組(ndarray)這樣一個數據結構,用戶可以很方便地在任意維度上進行數值計算。

我們舉一個蒙特卡洛方法求解 Pi 的例子。這背後的原理非常簡單,現在我們有個半徑為1的圓和邊長為2的正方形,他們的中心都在原點。現在我們生成大量的均勻分布的點,讓這些點落在正方形內,通過簡單的推導,我們就可以知道,Pi 的值 = 落在圓內的點的個數 / 點的總數 * 4。

這裡要注意,就是隨機生成的點的個數越多,結果越精確。

用 Numpy 實現如下:

import numpy as np

N = 10 ** 7  # 1千萬個點

data = np.random.uniform(-1, 1, size=(N, 2))  # 生成1千萬個x軸和y軸都介於-1和1間的點
inside = (np.sqrt((data ** 2).sum(axis=1)) < 1).sum()  # 計算到原點的距離小於1的點的個數
pi = 4 * inside / N
print('pi: %.5f' % pi)

可以看到,用 Numpy 來進行數值計算非常簡單,只要寥寥數行代碼,而如果讀者習慣了 Numpy 這種面相數組的思維方式之後,無論是代碼的可讀性還是執行效率都會有巨大提升。

pandas

pandas 是一個強大的數據分析和處理的工具,它其中包含了海量的 API 來幫助用戶在二維數據(DataFrame)上進行分析和處理。

pandas 中的一個核心數據結構就是 DataFrame,它可以簡單理解成表數據,但不同的是,它在行和列上都包含索引(Index),要注意這裡不同於資料庫的索引的概念,它的索引可以這麼理解:當從行看 DataFrame 時,我們可以把 DataFrame 看成行索引到行數據的這麼一個字典,通過行索引,可以很方便地選中一行數據;列也同理。

我們拿 movielens 的數據集 作為簡單的例子,來看 pandas 是如何使用的。這裡我們用的是 Movielens 20M Dataset.

import pandas as pd

ratings = pd.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']})

通過一行簡單的 pandas.read_csv 就可以讀取 CSV 數據,接著按 userId 做分組聚合,求 rating 這列在每組的總和、平均、最大、最小值。

「食用「 pandas 的最佳方式,還是在 Jupyter notebook 里,以交互式的方式來分析數據,這種體驗會讓你不由感嘆:人生苦短,我用 xx()

scikit-learn

scikit-learn 是一個 Python 機器學習包,提供了大量機器學習算法,用戶不需要知道算法的細節,只要通過幾個簡單的 high-level 接口就可以完成機器學習任務。當然現在很多算法都使用深度學習,但 scikit-learn 依然能作為基礎機器學習庫來串聯整個流程。

我們以 K-最鄰近算法為例,來看看用 scikit-learn 如何完成這個任務。

import pandas as pd
from sklearn.neighbors import NearestNeighbors

df = pd.read_csv('data.csv')  # 輸入是 CSV 文件,包含 20萬個向量,每個向量10個元素
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)
neighbors = nn.kneighbors(df)

fit接口就是 scikit-learn 里最常用的用來學習的接口。可以看到整個過程非常簡單易懂。

Mars——Numpy、pandas 和 scikit-learn 的並行和分布式加速器

Python 數據科學棧非常強大,但它們有如下幾個問題:

  1. 現在是多核時代,這幾個庫里鮮有操作能利用得上多核的能力。
  2. 隨著深度學習的流行,用來加速數據科學的新的硬體層出不窮,這其中最常見的就是 GPU,在深度學習前序流程中進行數據處理,我們是不是也能用上 GPU 來加速呢?
  3. 這幾個庫的操作都是命令式的(imperative),和命令式相對應的就是聲明式(declarative)。命令式的更關心 how to do,每一個操作都會立即得到結果,方便對結果進行探索,優點是很靈活;缺點則是中間過程可能占用大量內存,不能及時釋放,而且每個操作之間就被割裂了,沒有辦法做算子融合來提升性能;那相對應的聲明式就剛好相反,它更關心 what to do,它只關心結果是什麼,中間怎麼做並沒有這麼關心,典型的聲明式像 SQL、TensorFlow 1.x,聲明式可以等用戶真正需要結果的時候才去執行,也就是 lazy evaluation,這中間過程就可以做大量的優化,因此性能上也會有更好的表現,缺點自然也就是命令式的優點,它不夠靈活,調試起來比較困難。

為了解決這幾個問題,Mars 被我們開發出來,Mars 在 MaxCompute 團隊內部誕生,它的主要目標就是讓 Numpy、pandas 和 scikit-learn 等數據科學的庫能夠並行和分布式執行,充分利用多核和新的硬體。

Mars 的開發過程中,我們核心關注的幾點包括:

  1. 我們希望 Mars 足夠簡單,只要會用 Numpy、pandas 或 scikit-learn 就會用 Mars。
  2. 避免重複造輪子,我們希望能利用到這些庫已有的成果,只需要能讓他們被調度到多核/多機上即可。
  3. 聲明式和命令式兼得,用戶可以在這兩者之間自由選擇,靈活度和性能兼而有之。
  4. 足夠健壯,生產可用,能應付各種 failover 的情況。

當然這些是我們的目標,也是我們一直努力的方向。

Mars tensor:Numpy 的並行和分布式加速器

上面說過,我們的目標之一是,只要會用 Numpy 等數據科學包,就會用 Mars。我們直接來看代碼,還是以蒙特卡洛為例。變成 Mars 的代碼是什麼樣子呢?

import mars.tensor as mt

N = 10 ** 10

data = mt.random.uniform(-1, 1, size=(N, 2))
inside = (mt.sqrt((data ** 2).sum(axis=1)) < 1).sum()
pi = (4 * inside / N).execute()
print('pi: %.5f' % pi)

可以看到,區別就只有兩處:import numpy as np 變成 import mars.tensor as mt ,後續的 np. 都變成 mt. ;pi 在列印之前調用了一下 .execute() 方法。

也就是默認情況下,Mars 會按照聲明式的方式,代碼本身移植的代價極低,而在真正需要一個數據的時候,通過 .execute() 去觸發執行。這樣能最大限度得優化性能,以及減少中間過程內存消耗。

這裡,我們還將數據的規模擴大了 1000 倍,來到了 100 億個點。之前 1/1000 的數據量的時候,在我的筆記本上需要 757ms;而現在數據擴大一千倍,光 data 就需要 150G 的內存,這用 Numpy 本身根本無法完成。而使用 Mars,計算時間只需要 3min 44s,而峰值內存只需要 1G 左右。假設我們認為內存無限大,Numpy 需要的時間也就是之前的 1000 倍,大概是 12min 多,可以看到 Mars 充分利用了多核的能力,並且通過聲明式的方式,極大減少了中間內存占用。

前面說到,我們試圖讓聲明式和命令式兼得,而使用命令式的風格,只需要在代碼的開始配置一個選項即可。

import mars.tensor as mt
from mars.config import options

options.eager_mode = True  # 打開 eager mode 後,每一次調用都會立即執行,行為和 Numpy 就完全一致

N = 10 ** 7

data = mt.random.uniform(-1, 1, size=(N, 2))
inside = (mt.linalg.norm(data, axis=1) < 1).sum()
pi = 4 * inside / N  # 不需要調用 .execute() 了
print('pi: %.5f' % pi.fetch())  # 目前需要 fetch() 來轉成 float 類型,後續我們會加入自動轉換

Mars DataFrame:pandas 的並行和分布式加速器

看過怎麼樣輕鬆把 Numpy 代碼遷移到 Mars tensor ,想必讀者也知道怎麼遷移 pandas 代碼了,同樣也只有兩個區別。我們還是以 movielens 的代碼為例。

import mars.dataframe as md

ratings = md.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}).execute()

Mars Learn:scikit-learn 的並行和分布式加速器

Mars Learn 也同理,這裡就不做過多闡述了。但目前 Mars learn 支持的 scikit-learn 算法還不多,我們也在努力移植的過程中,這需要大量的人力和時間,歡迎感興趣的同學一起參與。

import mars.dataframe as md
from mars.learn.neighbors import NearestNeighbors

df = md.read_csv('data.csv')  # 輸入是 CSV 文件,包含 20萬個向量,每個向量10個元素
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)  # 這裡 fit 的時候也會整體觸發執行,因此機器學習的高層接口都是立即執行的
neighbors = nn.kneighbors(df).fetch()  # kneighbors 也已經觸發執行,只需要 fetch 數據

這裡要注意的是,對於機器學習的 fit、predict 等高層接口,Mars Learn 也會立即觸發執行,以保證語義的正確性。

RAPIDS:GPU 上的數據科學

相信細心的觀眾已經發現,GPU 好像沒有被提到。不要著急,這就要說到 RAPIDS。

在之前,雖然 CUDA 已經將 GPU 編程的門檻降到相當低的一個程度了,但對於數據科學家們來說,在 GPU 上處理 Numpy、pandas 等能處理的數據無異於天方夜譚。幸運的是,NVIDIA 開源了 RAPIDS 數據科學平台,它和 Mars 的部分思想高度一致,即使用簡單的 import 替換,就可以將 Numpy、pandas 和 scikit-learn 的代碼移植到 GPU 上。

其中,RAPIDS cuDF 用來加速 pandas,而 RAPIDS cuML 用來加速 scikit-learn。

對於 Numpy 來說,CuPy 已經很好地支持用 GPU 來加速了,這樣 RAPIDS 也得以把重心放在數據科學的其他部分。

CuPy:用 GPU 加速 Numpy

還是蒙特卡洛求解 Pi。

import cupy as cp
 
N = 10 ** 7

data = cp.random.uniform(-1, 1, size=(N, 2))
inside = (cp.sqrt((data ** 2).sum(axis=1)) < 1).sum()
pi = 4 * inside / N
print('pi: %.5f' % pi)

在我的測試中,它將 CPU 的 757ms,降到只有 36ms,提升超過 20 倍,可以說效果非常顯著。這正是得益於 GPU 非常適合計算密集型的任務。

RAPIDS cuDF:用 GPU 加速 pandas

將 import pandas as pd 替換成 import cudf,GPU 內部如何並行,CUDA 編程這些概念,用戶都不再需要關心。

import cudf

ratings = cudf.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']})

運行時間從 CPU 上的 18s 提升到 GPU 上的 1.66s,提升超過 10 倍。

RAPIDS cuML:用 GPU 加速 scikit-learn

同樣是 k-最鄰近問題。

import cudf
from cuml.neighbors import NearestNeighbors

df = cudf.read_csv('data.csv')
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)
neighbors = nn.kneighbors(df)

運行時間從 CPU 上 1min52s,提升到 GPU 上 17.8s。

Mars 和 RAPIDS 結合能帶來什麼?

RAPIDS 將 Python 數據科學帶到了 GPU,極大地提升了數據科學的運行效率。它們和 Numpy 等一樣,是命令式的。通過和 Mars 結合,中間過程將會使用更少的內存,這使得數據處理量更大;Mars 也可以將計算分散到多機多卡,以提升數據規模和計算效率。

在 Mars 里使用 GPU 也很簡單,只需要在對應函數上指定 gpu=True。例如創建 tensor、讀取 CSV 文件等都適用。

import mars.tensor as mt
import mars.dataframe as md

a = mt.random.uniform(-1, 1, size=(1000, 1000), gpu=True)
df = md.read_csv('ml-20m/ratings.csv', gpu=True)

下圖是用 Mars 分別在 Scale up 和 Scale out 兩個維度上加速蒙特卡洛計算 Pi 這個任務。一般來說,我們要加速一個數據科學任務,可以有這兩種方式,Scale up 是指可以使用更好的硬體,比如用更好的 CPU、更大的內存、使用 GPU 替代 CPU等;Scale out 就是指用更多的機器,用分布式的方式提升效率。

可以看到在一台 24 核的機器上,Mars 計算需要 25.8s,而通過分布式的方式,使用 4 台 24 核的機器的機器幾乎以線性的時間提升。而通過使用一個 NVIDIA TESLA V100 顯卡,我們就能將單機的運行時間提升到 3.98s,這已經超越了4台 CPU 機器的性能。通過再將單卡拓展到多卡,時間進一步降低,但這裡也可以看到,時間上很難再線性擴展了,這是因為 GPU 的運行速度提升巨大,這個時候網絡、數據拷貝等的開銷就變得明顯。

性能測試

我們使用了 https://github.com/h2oai/db-benchmark 的數據集,測試了三個數據規模的 groupby 和 一個數據規模的 join。而我們主要對比了 pandas 和 DASK。DASK 和 Mars 的初衷很類似,也是試圖並行和分布式化 Python 數據科學,但它們的設計、實現、分布式都存在較多差異,這個後續我們再撰文進行詳細對比。

測試機器配置是 500G 內存、96 核、NVIDIA V100 顯卡。Mars 和 DASK 在 GPU 上都使用 RAPIDS 執行計算。

Groupby

數據有三個規模,分別是 500M、5G 和 20G。

查詢也有三組。

查詢一

df = read_csv('data.csv')
df.groupby('id1').agg({'v1': 'sum'})

查詢二

df = read_csv('data.csv')
df.groupby(['id1', 'id2']).agg({'v1': 'sum'})

查詢三

df = read_csv('data.csv')
df.gropuby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'})

數據大小 500M,性能結果

數據大小 5G,性能結果

數據大小 20G,性能結果

數據大小到 20G 時,pandas 在查詢2會內存溢出,得不出結果。

可以看到,隨著數據增加,Mars 的性能優勢會愈發明顯。

得益於 GPU 的計算能力,GPU 運算性能相比於 CPU 都有數倍的提升。如果單純使用 RAPIDS cuDF,由於顯存大小的限制,數據來到 5G 都難以完成,而由於 Mars 的聲明式的特點,中間過程對顯存的使用大幅得到優化,所以整組測試來到 20G 都能輕鬆完成。這正是 Mars + RAPIDS 所能發揮的威力。

Join

測試查詢:

x = read_csv('x.csv')
y = read_csv('y.csv')
x.merge(y, on='id1')

測試數據 x 為500M,y 包含10行數據。

總結

RAPIDS 將 Python 數據科學帶到了 GPU,極大提升了數據分析和處理的效率。Mars 的注意力更多放在並行和分布式。相信這兩者的結合,在未來會有更多的想像空間。

關鍵字: