汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

簡介: 由汽車之家實時計算平臺負責人邸星星在 4 月 17 日上海站 Meetup 分享的,基於 Flink + Iceberg 的湖倉一體架構實踐。

內容簡要: 一、資料倉庫架構升級的背景 二、基於 Iceberg 的湖倉一體架構實踐 三、總結與收益 四、後續規劃 一、資料倉庫架構升級的背景

1. 基於 Hive 的資料倉庫的痛點

原有的資料倉庫完全基於 Hive 建造而成,主要存在三大痛點:

痛點一:不支援 ACID

1)不支援 Upsert 場景;

2)不支援 Row-level delete,資料修正成本高。

痛點二:時效性難以提升

1)資料難以做到準實時可見;

2)無法增量讀取,無法實現儲存層面的流批統一;

3)無法支援分鐘級延遲的資料分析場景。

痛點三:Table Evolution

1)寫入型 Schema,對 Schema 變更支援不好;

2)Partition Spec 變更支援不友好。

2. Iceberg 關鍵特性

Iceberg 主要有四大關鍵特性:支援 ACID 語義、增量快照機制、開放的表格式和流批介面支援。

支援 ACID 語義 不會讀到不完整的 Commit; 基於樂觀鎖支援併發 Commit; Row-level delete,支援 Upsert。 增量快照機制 Commit 後資料即可見(分鐘級); 可回溯歷史快照。 開放的表格式 資料格式:parquet、orc、avro 計算引擎:Spark、Flink、Hive、Trino/Presto 流批介面支援 支援流、批寫入; 支援流、批讀取。 二、基於 Iceberg 的湖倉一體架構實踐

湖倉一體的意義就是說我不需要看見湖和倉,資料有著打通的元資料的格式,它可以自由的流動,也可以對接上層多樣化的計算生態。

——賈揚清(阿里雲計算平臺高階研究員)

1. Append 流入湖的鏈路

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

上圖為日誌類資料入湖的鏈路,日誌類資料包含客戶端日誌、使用者端日誌以及服務端日誌。這些日誌資料會實時錄入到 Kafka,然後透過 Flink 任務寫到 Iceberg 裡面,最終儲存到 HDFS。

2. Flink SQL 入湖鏈路打通

我們的 Flink SQL 入湖鏈路打通是基於 「Flink 1.11 + Iceberg 0.11」 完成的,對接 Iceberg Catalog 我們主要做了以下內容:

1)Meta Server 增加對 Iceberg Catalog 的支援;

2)SQL SDK 增加 Iceberg Catalog 支援。

然後在這基礎上,平臺開放 Iceberg 表的管理功能,使得使用者可以自己在平臺上建 SQL 的表。

3. 入湖 – 支援代理使用者

第二步是內部的實踐,對接現有預算體系、許可權體系。

因為之前平臺做實時作業的時候,平臺都是預設為 Flink 使用者去執行的,之前儲存不涉及 HDFS 儲存,因此可能沒有什麼問題,也就沒有思考預算劃分方面的問題。

但是現在寫 Iceberg 的話,可能就會涉及一些問題。比如數倉團隊有自己的集市,資料就應該寫到他們的目錄下面,預算也是劃到他們的預算下,同時許可權和離線團隊賬號的體系打通。

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

如上所示,這塊主要是在平臺上做了代理使用者的功能,使用者可以去指定用哪個賬號去把這個資料寫到 Iceberg 裡面,實現過程主要有以下三個。

增加 Table 級別配置:”iceberg.user.proxy” = “targetUser』 1)啟用 Superuser 2)團隊賬號鑑權

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

訪問 HDFS 時啟用代理使用者:

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

訪問 Hive Metastore 時指定代理使用者 1)參考 Spark 的相關實現: org.apache.spark.deploy.security.HiveDelegationTokenProvider 2)動態代理 HiveMetaStoreClient,使用代理使用者訪問 Hive metastore

4. Flink SQL 入湖示例

DDL + DML

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

5. CDC 資料入湖鏈路

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

如上所示,我們有一個 AutoDTS 平臺,負責業務庫資料的實時接入。我們會把這些業務庫的資料接入到 Kafka 裡面,同時它還支援在平臺上配置分發任務,相當於把進 Kafka 的資料分發到不同的儲存引擎裡,在這個場景下是分發到 Iceberg 裡。

6. Flink SQL CDC 入湖鏈路打通

下面是我們基於 「Flink1.11 + Iceberg 0.11」 支援 CDC 入湖所做的改動:

改進 Iceberg Sink: Flink 1.11 版本為 AppendStreamTableSink,無法處理 CDC 流,修改並適配。 表管理 1)支援 Primary key(PR1978) 2)開啟 V2 版本:”iceberg.format.version” = “2”

7. CDC 資料入湖

1. 支援 Bucket

Upsert 場景下,需要確保同一條資料寫入到同一 Bucket 下,這又如何實現?

目前 Flink SQL 語法不支援宣告 bucket 分割槽,透過配置的方式宣告 Bucket:

“partition.bucket.source”=”id”, // 指定 bucket 欄位

“partition.bucket.num”=”10”, // 指定 bucket 數量

2. Copy-on-write sink

做 Copy-on-Write 的原因是原本社群的 Merge-on-Read 不支援合併小檔案,所以我們臨時去做了 Copy-on-write sink 的實現。目前業務一直在測試使用,效果良好。

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

上方為 Copy-on-Write 的實現,其實跟原來的 Merge-on-Read 比較類似,也是有 StreamWriter 多並行度寫入和 FileCommitter 單並行度順序提交。

在 Copy-on-Write 裡面,需要根據表的資料量合理設定 Bucket 數,無需額外做小檔案合併。

StreamWriter 在 snapshotState 階段多並行度寫入 1)增加 Buffer; 2)寫入前需要判斷上次 checkpoint 已經 commit 成功; 3)按 bucket 分組、合併,逐個 Bucket 寫入。 FileCommitter 單並行度順序提交 1)table.newOverwrite() 2)Flink.last.committed.checkpoint.id

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

8. 示例 – CDC 資料配置入湖

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

如上圖所示,在實際使用中,業務方可以在 DTS 平臺上建立或配置分發任務即可。

例項型別選擇 Iceberg 表,然後選擇目標庫,表明要把哪個表的資料同步到 Iceberg 裡,然後可以選原表和目標表的欄位的對映關係是什麼樣的,配置之後就可以啟動分發任務。啟動之後,會在實時計算平臺 Flink 裡面提交一個實時任務,接著用 Copy-on-write sink 去實時地把資料寫到 Iceberg 表裡面。

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

9. 入湖其他實踐

實踐一:減少 empty commit

問題描述: 在上游 Kafka 長期沒有資料的情況下,每次 Checkpoint 依舊會生成新的 Snapshot,導致大量的空檔案和不必要的 Snapshot。 解決方案(PR – 2042): 增加配置 Flink.max-continuousempty-commits,在連續指定次數 Checkpoint 都沒有資料後才真正觸發 Commit,生成 Snapshot。

實踐二:記錄 watermark

問題描述: 目前 Iceberg 表本身無法直接反映資料寫入的進度,離線排程難以精準觸發下游任務。 解決方案( PR – 2109 ): 在 Commit 階段將 Flink 的 Watermark 記錄到 Iceberg 表的 Properties 中,可直觀的反映端到端的延遲情況,同時可以用來判斷分割槽資料完整性,用於排程觸發下游任務。

實踐三:刪表最佳化

問題描述: 刪除 Iceberg 可能會很慢,導致平臺介面相應超時。因為 Iceberg 是面向物件儲存來抽象 IO 層的,沒有快速清除目錄的方法。 解決方案: 擴充套件 FileIO,增加 deleteDir 方法,在 HDFS 上快速刪除表資料。

10. 小檔案合併及資料清理

定期為每個表執行批處理任務(spark 3),分為以下三個步驟:

1. 定期合併新增分割槽的小檔案:

table.expireSnapshots().expireOld erThan(timestamp).commit();

3. 清理 orphan 檔案,預設清理 3 天前,且無法觸及的檔案:

removeOrphanFilesAction.older Than(timestamp).execute();

11. 計算引擎 – Flink

Flink 是實時平臺的核心計算引擎,目前主要支援資料入湖場景,主要有以下幾個方面的特點。

資料準實時入湖: Flink 和 Iceberg 在資料入湖方面整合度最高,Flink 社群主動擁抱資料湖技術。 平臺整合: AutoStream 引入 IcebergCatalog,支援透過 SQL 建表、入湖 AutoDTS 支援將 MySQL、SQLServer、TiDB 表配置入湖。 流批一體: 在流批一體的理念下,Flink 的優勢會逐漸體現出來。

12. 計算引擎 – Hive

Hive 在 SQL 批處理層面 Iceberg 和 Spark 3 整合度更高,主要提供以下三個方面的功能。

定期小檔案合併及 meta 資訊查詢: SELECT * FROM prod.db.table.history 還可檢視 snapshots, files, manifests。 離線資料寫入: 1)Insert into 2)Insert overwrite 3)Merge into 分析查詢: 主要支援日常的準實時分析查詢場景。

13. 計算引擎 – Trino/Presto

AutoBI 已經和 Presto 整合,用於報表、分析型查詢場景。

Trino 1)直接將 Iceberg 作為報表資料來源 2)需要增加元資料快取機制:https://github.com/trinodb/trino/issues/7551 Presto 社群整合中:https://github.com/prestodb/presto/pull/15836

14. 踩過的坑

1. 訪問 Hive Metastore 異常

問題描述:HiveConf 的構造方法的誤用,導致 Hive 客戶端中宣告的配置被覆蓋,導致訪問 Hive metastore 時異常

解決方案(PR-2075):修復 HiveConf 的構造,顯示呼叫 addResource 方法,確保配置不會被覆蓋:hiveConf.addResource(conf);

2.Hive metastore 鎖未釋放

問題描述:「CommitFailedException: Timed out after 181138 ms waiting for lock xxx.」 原因是 hiveMetastoreClient.lock 方法,在未獲得鎖的情況下,也需要顯示 unlock,否則會導致上面異常。

解決方案(PR-2263):最佳化 HiveTableOperations#acquireLock 方法,在獲取鎖失敗的情況下顯示呼叫 unlock 來釋放鎖。

3. 元資料檔案丟失

問題描述:Iceberg 表無法訪問,報 「NotFoundException Failed to open input stream for file : xxx.metadata.json」

三、收益與總結

1. 總結

透過對湖倉一體、流批融合的探索,我們分別做了總結。

湖倉一體 1)Iceberg 支援 Hive Metastore; 2)總體使用上與 Hive 表類似:相同資料格式、相同的計算引擎。 流批融合 準實時場景下實現流批統一:同源、同計算、同儲存。

2. 業務收益

資料時效性提升: 入倉延遲從 2 小時以上降低到 10 分鐘以內;演演算法核心任務 SLA 提前 2 小時完成。 準實時的分析查詢: 結合 Spark 3 和 Trino,支援準實時的多維分析查詢。 特徵工程提效: 提供準實時的樣本資料,提高模型訓練時效性。 CDC 資料準實時入倉: 可以在數倉針對業務表做準實時分析查詢。

3. 架構收益 – 準實時數倉

汽車之家:基於 Flink + Iceberg 的湖倉一體架構實踐

上方也提到了,我們支援準實時的入倉和分析,相當於是為後續的準實時數倉建設提供了基礎的架構驗證。準實時數倉的優勢是一次開發、口徑統一、統一儲存,是真正的批流一體。劣勢是實時性較差,原來可能是秒級、毫秒級的延遲,現在是分鐘級的資料可見性。

但是在架構層面上,這個意義還是很大的,後續我們能看到一些希望,可以把整個原來 「T + 1」 的數倉,做成準實時的數倉,提升數倉整體的資料時效性,然後更好地支援上下游的業務。

四、後續規劃

1. 跟進 Iceberg 版本

全面開放 V2 格式,支援 CDC 資料的 MOR 入湖。

2. 建設準實時數倉

基於 Flink 透過 Data pipeline 模式對數倉各層表全面提速。

3. 流批一體

隨著 upsert 功能的逐步完善,持續探索儲存層面流批一體。

4. 多維分析

基於 Presto/Spark3 輸出準實時多維分析。