簡化資料獲取!Uber開源深度學習分佈訓練庫Petastorm
公眾號/AI前線
策劃編輯 | Natalie
作者 | Uber ATG
翻譯 | 無明
編輯 | Natalie
AI 前線導讀: 近年來,深度學習在解決模式識別問題方面發揮了關鍵作用。Uber Advanced Technologies Group(ATG)使用深度學習來解決自動駕駛領域的各種問題,他們的很多模型需要來自感測器數十 TB 的訓練資料。Uber ATG 的研究人員和工程師正在積極推動跨多個問題領域的自動駕駛技術,如感知、預測和規劃。為了支援這些工作,他們致力於開發資料集儲存解決方案,讓研究人員更容易獲得資料,從而可以專注於模型實驗。本文將介紹 Petastorm,一個由 Uber ATG 開發的開源資料訪問庫。這個庫可以直接基於數 TB Parquet 格式的資料集進行單機或分散式訓練和深度學習模型評估。Petastorm 支援基於 Python 的機器學習框架,如 Tensorflow、Pytorch 和 PySpark,也可以直接用在 Python 程式碼中。
深度學習叢集的搭建
即使是在現代硬體上訓練深度模型也很耗時,而且在很多情況下,很有必要在多臺機器上分配訓練負載。典型的深度學習叢集需要執行以下幾個步驟:
一臺或多臺機器讀取集中式或本地資料集。
每臺機器計算損失函式的值,並根據模型引數計算梯度。在這一步通常會使用 GPU。
通過組合估計的梯度(通常由多臺機器以分散式的方式計算得出)來更新模型係數。
圖 1:在這個深度學習叢集架構中,有三個計算節點使用中央資料集。
考慮到 GPU 的成本,很有必要提高 GPU 叢集的利用率。經過調優的資料訪問層可以確保用於訓練的資料對 GPU 總是可用的,這樣 GPU 就不會處於空閒狀態。
簡化模型架構研究
準備數 TB 來自多個數據源的同步資料通常很容易出錯。我們希望為研究人員提供單個數據集,讓他們可以處理各種任務,無需為每種任務建立新的資料集。
為此,需要遵守以下原則:
資料集需要包含研究人員可能用到的資料的超集,這樣他們就可以為特定實驗選擇列和行的子集。
對資料集中的感測器資料的預處理應該保持在最低限度。我們鼓勵研究人員進行實時的預處理,並
將其作為訓練 / 評估程式的一部分。在很多情況下,這可以通過其他未被充分利用的 CPU 來完成。
在業界,深度學習應用程式的資料集儲存通常分為兩類:多檔案和記錄流式資料集。
多檔案資料集
在這種情況下,每個張量 / 影象 / 標籤集被儲存在單獨的檔案(例如,PNG、JPEG、NPZ 和 CSV)中。整個資料集被儲存為一個或多個檔案系統目錄,每個目錄包含大量的檔案。檔案數量可能達到數百萬個(例如,ImageNet 有 120 萬個檔案)。如果以這種格式儲存,Uber ATG 的資料集將超過 1 億個檔案。
這種方法讓使用者可以隨機訪問資料集中任何行的任何列。但是,多次往返檔案系統的成本很高,所以很難大規模實現,特別是在使用現代分散式檔案系統時,如 HDFS 和 S3(這些系統通常針對大塊資料的快速讀取進行了優化)。
記錄流式資料集
另外一種方式是將資料行的集合組合在一起,儲存成一個或多個檔案。例如,Tensorflow 使用 protobuf 檔案(TFRecord)。其他流行的格式還包括 HDF5 和 Python pickle 檔案。
這種方法適用於 HDFS 和 S3 檔案系統。但是,查詢特定列需要通過網路傳輸所有欄位,然後丟棄未使用的資料。如果要查詢單行,還需要自定義索引。
在評估了多個方案後,我們決定使用 Apache Parquet 儲存格式,它在一定程度規避了上述兩種方法的一些缺點:
便於進行大量連續讀取(對 HDFS/S3 友好);
支援快速訪問單個列;
在某些情況下允許更快的行查詢;
與 Apache Spark 完美整合,可作為現成的查詢 / 操作框架。
列式儲存和 Apache Parquet
列式資料儲存按照列(而不是行)的順序來組織資料。例如,從自動駕駛車輛感測器記錄的資料可能看起來像這樣:
行和列儲存之間的差異如下所示:
以列式順序儲存資料允許使用者只加載列的子集,從而減少通過網路傳輸的資料量。對於收集來自自動駕駛車輛感測器的資料來說,這種好處是顯而易見的:試想一下,如果你的實驗只需要來自某個攝像頭的影象,那麼就可以從同一行的 10 張高解析度影象中載入其中的一張。
Apache Parquet 是一種列式儲存格式,近年來越來越流行。它得到了 Apache Spark、Apache Arrow 和其他開源專案的支援,並且非常適合用於進行簡化模型架構研究。
Tensorflow 和 Pytorch 是深度學習社群常用的框架。這些框架本身並不支援 Parquet 儲存訪問,因此我們構建了 Petastorm 來填補這一空白。
Petastorm 簡介
通常,一個數據集是通過連線多個數據源的記錄而生成的。這個由 Apache Spark 的 Python 介面 PySpark 生成的資料集稍後將被用在機器學習訓練中。Petastorm 提供了一個簡單的功能,使用 Petastorm 特定的元資料對標準的 Parquet 進行了擴充套件,從而讓它可以與 Petastorm 相容。
有了 Petastorm,消費資料就像在 HDFS 或檔案系統中建立和迭代讀取物件一樣簡單。Petastorm 使用 PyArrow 來讀取 Parquet 檔案。
圖 2:將多個數據源組合到單個表格結構中,從而生成資料集。可以多次使用相同的資料集進行模型訓練和評估。
生成資料集
要使用 Petastorm 生成資料集,使用者首先需要定義資料模式,也就是 Unischema。這是使用者唯一需要定義模式的地方,Petastorm 會將它轉換為其他框架所需的格式,例如 PySpark、Tensorflow 和 Python。
Unischema 的例項被序列化為 Parquet 儲存元資料中的自定義欄位,可以使用資料集的路徑來讀取它。
下面的示例演示瞭如何建立 Unischema 例項。必需的欄位屬性包括:欄位名稱、資料型別(使用 NumPy 資料型別來表示)、多維陣列、用於資料編碼 / 解碼的 codec,以及一個表示欄位是否可為空的布林值。
HelloWorldSchema = Unischema(‘HelloWorldSchema’, [
UnischemaField(‘id’, np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField(‘image1’, np.uint8, (128, 256, 3) CompressedImageCodec(‘png’), False),
UnischemaField(‘array_4d’, np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])
我們使用 PySpark 來寫入 Petastorm 資料集。下面的示例演示瞭如何使用 Petastorm 建立 1000 行資料。
rows_count = 10
with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(row_generator)\
.map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema())\
.write \
.parquet(‘file:///tmp/hello_world_dataset’)
materialize_dataset 上下文管理器在開始時執行必要的配置,並在最後寫入 Petastorm 元資料。輸出的 URL 可以指向 HDFS 或檔案系統的位置。
rowgroup_size_mb 定義了 Parquet 行組的大小(以兆位元組為單位)。
row_generator 是一個返回與 HelloWorldSchema 匹配的 Python 字典的函式。
dict_to_spark_row 根據 HelloWorldSchema 來驗證資料型別,並將字典轉換為 pyspark.Row 物件。
讀取資料集
接下來,我們將概述如何使用 Python 程式碼以及在兩個常用的機器學習框架(Tensorflow 和 Pytorch)中讀取資料集。
Python
在 Python 程式碼中,可以直接使用 Reader 例項訪問 Petastorm 資料集。Reader 實現了迭代器介面,所以讀取資料很簡單:
with Reader(‘file:///tmp/hello_world_dataset’) as reader:
# Pure python
for sample in reader:
print(sample.id)
plt.imshow(sample.image1)
Tensorflow
下面的示例顯示瞭如何將資料集流式傳輸到 Tensorflow。examples 是一個元組,它的鍵來自 Unischema,而值為 tf.tensor 物件:
with Reader(‘file:///tmp/hello_world_dataset’) as reader:
tensor = tf_tensors(reader)
with tf.Session() as sess:
sample = sess.run(tensor)
print(sample.id)
plt.imshow(sample.image1)
在不久的將來,使用者可以使用 tf.data.Dataset 介面來訪問資料。
Pytorch
Petastorm 資料集可以通過介面卡類 petastorm.pytorch.DataLoader 整合到 Pytorch 中,如下所示:
with DataLoader(Reader(‘file:///tmp/hello_world_dataset’)) as train_loader:
sample = next(iter(train_loader))
print(sample[‘id’])
plt.plot(sample[‘image1’])
使用 Spark 分析資料集
Spark 本身支援 Parquet 資料格式,因此可以使用各種 Spark 工具來分析和操作資料集。下面的示例演示瞭如何將 Petastorm 資料集讀取為 Spark RDD 物件:
rdd = dataset_as_rdd(‘file:///tmp/hello_world_dataset’, spark,
[HelloWorldSchema.id, HelloWorldSchema.image1])
print(rdd.first().id)
標準的 PySpark 工具可用於處理 Petastorm 資料集。請注意,資料並不會被解碼,而且只有在 Parquet 格式中具有相應原生表示的欄位的值(例如標量)才有意義:
# Create a dataframe object from a parquet file
dataframe = spark.read.parquet(dataset_url)
# Show a schema
dataframe.printSchema()
# Count all
dataframe.count()
# Show a single column
dataframe.select(‘id’).show()
可以使用 SQL 查詢 Petastorm 資料集:
number_of_rows = spark.sql(
‘SELECT count(id) ‘
‘from parquet.`file:///tmp/hello_world_dataset`’).collect()
Petastorm 的特性
Petastorm 提供了各種特性來支援自動駕駛演算法的訓練,包括行過濾、資料分片、shuffle、對欄位子集的訪問,以及對時間序列資料(n-gram)的支援。
典型資料集的結構包括:
在自動駕駛汽車測試執行期間收集的感測器資料的多個列,包括攝像頭、鐳射定位器和雷達。
手動生成的標籤,作為行的欄位進行儲存。
行資料按照時間順序排序,並按照汽車的測試執行進行分組,行組大小通常在 30 到 100 範圍內。
並行執行策略
Petastorm 提供了兩種並行化資料載入和解碼操作的策略:一種基於執行緒池,另一種基於程序池。策略的選擇取決於所讀取的資料型別。
通常,當行中包含編碼的高解析度影象時,應使用執行緒池策略。在這種情況下,大部分處理時間用於通過 C++ 程式碼來解碼影象。這個時候不會持有 Python 全域性直譯器鎖(GIL)。
當行很小時,使用程序池策略更合適。在這種情況下,大部分處理都通過 Python 程式碼來完成。這個時候必須並行執行多個程序,這樣才能克服 GIL 導致的執行序列化。
n-gram
有些模型需要時間上下文,以便更好地解釋環境或預測環境中參與者的未來行為。
如果底層的資料是按時間排列,Petastorm 就可以提供這樣的時間上下文。如果向 Petastorm Reader 物件請求 n-gram,那麼後續的行將被分組到單個訓練樣本中。
下圖顯示了長度為 3 的 n-gram 的分組。AV Log#0 和 AV Log#1 表示兩種不同的車載記錄:
圖 3:在讀取資料集時構造 n-gram。n-gram 不能跨 Parquet 行組。
請注意,n-gram 分組不能跨 Parquet 行組。在圖 3 中,row-group 0 生成了三個 n-gram,而 row-group 1 只生成一個,另外三個來自 row-group 2。n-gram 節省了 IO 和 CPU 頻寬,因為不需要進行磁碟資料複製,也不需要進行重複載入和解碼。
n-gram 按照它們在資料集中出現的順序來生成,因此使用者需要讓資料集中的順序與訪問模式保持一致。
shuffle
如果資料集支援 n-gram 訪問模式,那麼它的行資料時按時間戳排序的。Parquet 支援載入行組中的全部行。因此,資料將被載入到高度相關的樣本組中(例如,從一輛自動駕駛車輛的攝像頭獲取的連續兩張影象將非常相似)。連續樣本之間的高相關性不是我們所期望的,它們會降低訓練演算法的效能。為了減少相關性,Petastorm 提供了 shuffle 的功能。
圖 4:通過隨機選擇要載入的行組,然後將個體樣本放入記憶體 shuffle 緩衝區來實現 shuffle。
Petastorm 從資料集中的隨機選擇一組行組。解碼過的行被放入行 shuffle 緩衝區,然後從緩衝區中選擇一個隨機行返回給使用者。
行謂詞(過濾器)
對於在多個實驗室中重用的資料集例項,能夠有效地選擇行子集是非常重要的。Petastorm 支援行謂詞。Petastorm 行謂詞利用了 Parquet 儲存分割槽,只加載符合條件的列。
在未來,我們計劃將 Parquet 的謂詞下推(pushdown)功能整合到 Petastorm 中,以進一步加快查詢。
行組索引
Petastorm 支援儲存一個鍵與一組行組的對映。這種對映有助於快速查詢符合特定條件的行組。在使用“行謂詞”的地方,需要進行額外的過濾。
為分散式訓練進行分片
在分散式訓練環境中,每個程序通常負責訓練資料的一個子集。一個程序的資料子集與其他程序的資料子集正交。Petastorm 支援將資料集的讀時分片轉換為正交的樣本集。
圖 5:Petastorm 將資料集的非重疊子集提供給參與分散式訓練的不同機器。
本地快取
Petastorm 支援在本地儲存中快取資料。當網路連線速度較慢或頻寬很昂貴時,這會派上用場。
圖 6:如果啟用了本地快取,每個會話僅下載一次資料。
在第一個時間段,從遠端儲存讀取一組樣本,並儲存到本地快取中。在隨後的時間段,將從本地快取中讀取所有資料。
Petastorm 架構
Petastorm 的設計目標包括:
通過單資料模式定義進行資料的編碼和解碼。
為 ML 框架和純 Python 程式碼提供可用的高資料載入頻寬。
將 Apache Spark 作為分散式叢集計算框架來生成資料集。
與 Python、ML 平臺無關的 Petastorm 核心元件的實現。
呈現給 Tensorflow 和 PyTorch 框架的原生介面。
etl 包實現了生成資料集的功能。
Reader 是訓練和計算程式碼使用的主要資料載入引擎。Reader 使用 Python 實現,不依賴任何 ML 框架(Tensorflow、Pytorch),並且可以通過 Python 代來例項化和使用。
為 Tensorflow 和 PyTorch 提供介面卡。
Unischema 可以被資料集生成和資料載入程式碼引用。
圖 7:Petastorm 提供了支援資料集生成和讀取的元件。Unischema 定義了可供兩者使用的公共資料模式。
對 Parquet 的修復
當我們開始使用 Spark 寫入 Parquet 資料集時遇到了一些麻煩。原因主要是資料的行大小,它包含了幾個數兆位元組的欄位。我們的第一個問題是資料集中的行組比預期的要大得多,導致記憶體不足等問題。深入研究程式碼後發現,parquet-mr 在檢查行組是否達到使用者設定的目標大小之前,強制限制行組最少為 100 行。針對這個問題,Parquet 已經有一個相關的拉取請求,於是我們 fork 了這個程式碼庫,並做了一些修改,讓行組的大小符合我們的要求。
在解決了行組大小的問題後,我們發現,當我們嘗試生成較小的行組或使用更大的欄位時,Spark 作業會耗盡記憶體。通過深入研究生成的資料集,我們發現,在新增新欄位或減少行組大小時,儲存檔案元資料的 Parquet 頁尾大小顯著增加了。
原來 Parquet 會為代表影象的巨大二進位制欄位或其他多維陣列生成統計資料。由於 Parquet 在會頁尾中儲存這些欄位的最小值和最大值,因此,如果行組大小足夠小,那麼頁尾就會變得很大,直至無法全部放到記憶體中。這個問題在 parquet-mr 程式碼庫中已經得到了解決,但是我們使用的是 Spark 2.1.0(依賴了 Parquet 1.8.1)。為了解決這個問題,我們升級了 Spark(Parquet 1.8.3 中已經修復了這個問題)。
下一步
下面我們重點介紹一下計劃在不久的將來推出的一些改進:
減少 shuffle 的記憶體佔用
大行組有助於提高 IO 利用率和資料載入速度。不過,它們也會增加連續樣本之間的相關性。我們正在積極改進 shuffle 機制。
謂詞下推支援
Pyarrow 將很快提供謂詞下推支援。我們希望用它來實現更快的行過濾。
改進與 Spark 的整合
在 Spark 中訪問 Petastorm 資料集時,某些操作似乎比預期花費更多的時間或記憶體。我們需要進一步調查 Parquet 庫程式碼,以瞭解有效處理大型欄位的其他細微差別。
額外的儲存格式
Petastorm 抽象了底層儲存格式。我們可以將 Parquet 以外的儲存格式整合到 Petastorm 中,從而為實驗和資料載入效能調整提供更大的自由。
GitHub 開源專案傳送門:
Petastorm:
http://www.github.com/uber/petastorm
parquet-mr:
https://github.com/apache/parquet-mr
英文原文:
https://eng.uber.com/petastorm/