文件

使用 MinIO 的分散式 HDP Spark 和 Hive

1. 雲原生架構

cloud-native

Kubernetes 在運算節點上彈性地管理無狀態的 Spark 和 Hive 容器。Spark 與 Kubernetes 有原生的排程器整合。由於歷史原因,Hive 在 Kubernetes 之上使用 YARN 排程器。

對 MinIO 物件儲存的所有存取均透過 S3/SQL SELECT API 進行。除了運算節點之外,MinIO 容器也由 Kubernetes 作為有狀態容器進行管理,並將本機儲存 (JBOD/JBOF) 對應為持續本機卷。此架構啟用多租戶 MinIO,允許在客戶之間隔離數據。

MinIO 也支援類似於 AWS 區域和層級的多叢集、多站點聯邦。使用 MinIO 資訊生命週期管理 (ILM),您可以將數據設定為在基於 NVMe 的熱儲存和基於 HDD 的溫儲存之間分層。所有數據都使用每個物件金鑰進行加密。租戶之間的存取控制和身分管理由 MinIO 使用 OpenID Connect 或 Kerberos/LDAP/AD 進行管理。

2. 先決條件

3. 設定 Hadoop、Spark、Hive 以使用 MinIO

成功安裝後,導覽至 Ambari UI http://<ambari-server>:8080/,並使用預設憑證登入:[使用者名稱:admin,密碼:admin]

ambari-login

3.1 設定 Hadoop

導覽至 Services -> HDFS -> CONFIGS -> ADVANCED,如下所示

hdfs-configs

導覽至 Custom core-site,以設定 _s3a_ 連接器的 MinIO 參數

s3a-config

sudo pip install yq
alias kv-pairify='yq ".configuration[]" | jq ".[]" | jq -r ".name + \"=\" + .value"'

舉例來說,假設有一組 12 個運算節點,總記憶體為 1.2TiB,我們需要進行以下設定以獲得最佳結果。為 core-site.xml 新增以下最佳項目,以使用 MinIO 設定 s3a。這裡最重要的選項是

cat ${HADOOP_CONF_DIR}/core-site.xml | kv-pairify | grep "mapred"

mapred.maxthreads.generate.mapoutput=2 # Num threads to write map outputs
mapred.maxthreads.partition.closer=0 # Asynchronous map flushers
mapreduce.fileoutputcommitter.algorithm.version=2 # Use the latest committer version
mapreduce.job.reduce.slowstart.completedmaps=0.99 # 99% map, then reduce
mapreduce.reduce.shuffle.input.buffer.percent=0.9 # Min % buffer in RAM
mapreduce.reduce.shuffle.merge.percent=0.9 # Minimum % merges in RAM
mapreduce.reduce.speculative=false # Disable speculation for reducing
mapreduce.task.io.sort.factor=999 # Threshold before writing to drive
mapreduce.task.sort.spill.percent=0.9 # Minimum % before spilling to drive

S3A 是用於連接 S3 和其他相容 S3 的物件儲存服務(如 MinIO)的連接器。MapReduce 工作負載通常以與 HDFS 相同的方式與物件儲存互動。這些工作負載依賴 HDFS 的原子性重新命名功能來完成將資料寫入資料儲存區的動作。物件儲存操作本質上是原子性的,它們不需要/實作重新命名 API。預設的 S3A 提交器透過複製和刪除 API 來模擬重新命名。這種互動模式會因為寫入放大而導致顯著的效能損失。例如,Netflix 開發了兩個新的暫存提交器 - 目錄暫存提交器和分割區暫存提交器 - 以充分利用原生物件儲存操作。這些提交器不需要重新命名操作。這兩個暫存提交器以及另一個名為 Magic 提交器的新增項目都經過評估,用於基準測試。

研究發現,在三個提交器中,目錄暫存提交器是最快的,S3A 連接器應配置以下參數以獲得最佳結果

cat ${HADOOP_CONF_DIR}/core-site.xml | kv-pairify | grep "s3a"

fs.s3a.access.key=minio
fs.s3a.secret.key=minio123
fs.s3a.path.style.access=true
fs.s3a.block.size=512M
fs.s3a.buffer.dir=${hadoop.tmp.dir}/s3a
fs.s3a.committer.magic.enabled=false
fs.s3a.committer.name=directory
fs.s3a.committer.staging.abort.pending.uploads=true
fs.s3a.committer.staging.conflict-mode=append
fs.s3a.committer.staging.tmp.path=/tmp/staging
fs.s3a.committer.staging.unique-filenames=true
fs.s3a.connection.establish.timeout=5000
fs.s3a.connection.ssl.enabled=false
fs.s3a.connection.timeout=200000
fs.s3a.endpoint=http://minio:9000
fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

fs.s3a.committer.threads=2048 # Number of threads writing to MinIO
fs.s3a.connection.maximum=8192 # Maximum number of concurrent conns
fs.s3a.fast.upload.active.blocks=2048 # Number of parallel uploads
fs.s3a.fast.upload.buffer=disk # Use drive as the buffer for uploads
fs.s3a.fast.upload=true # Turn on fast upload mode
fs.s3a.max.total.tasks=2048 # Maximum number of parallel tasks
fs.s3a.multipart.size=512M # Size of each multipart chunk
fs.s3a.multipart.threshold=512M # Size before using multipart uploads
fs.s3a.socket.recv.buffer=65536 # Read socket buffer hint
fs.s3a.socket.send.buffer=65536 # Write socket buffer hint
fs.s3a.threads.max=2048 # Maximum number of threads for S3A

其餘的優化選項將在以下連結中討論

套用組態變更後,請繼續重新啟動 Hadoop 服務。

hdfs-services

3.2 設定 Spark2

導覽至 Services -> Spark2 -> CONFIGS,如下所示

spark-config

導覽至「Custom spark-defaults」以設定 _s3a_ 連接器的 MinIO 參數

spark-config

spark-defaults.conf 新增以下最佳項目,以使用 MinIO 設定 Spark。

spark.hadoop.fs.s3a.access.key minio
spark.hadoop.fs.s3a.secret.key minio123
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.block.size 512M
spark.hadoop.fs.s3a.buffer.dir ${hadoop.tmp.dir}/s3a
spark.hadoop.fs.s3a.committer.magic.enabled false
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads true
spark.hadoop.fs.s3a.committer.staging.conflict-mode append
spark.hadoop.fs.s3a.committer.staging.tmp.path /tmp/staging
spark.hadoop.fs.s3a.committer.staging.unique-filenames true
spark.hadoop.fs.s3a.committer.threads 2048 # number of threads writing to MinIO
spark.hadoop.fs.s3a.connection.establish.timeout 5000
spark.hadoop.fs.s3a.connection.maximum 8192 # maximum number of concurrent conns
spark.hadoop.fs.s3a.connection.ssl.enabled false
spark.hadoop.fs.s3a.connection.timeout 200000
spark.hadoop.fs.s3a.endpoint http://minio:9000
spark.hadoop.fs.s3a.fast.upload.active.blocks 2048 # number of parallel uploads
spark.hadoop.fs.s3a.fast.upload.buffer disk # use disk as the buffer for uploads
spark.hadoop.fs.s3a.fast.upload true # turn on fast upload mode
spark.hadoop.fs.s3a.impl org.apache.hadoop.spark.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.max.total.tasks 2048 # maximum number of parallel tasks
spark.hadoop.fs.s3a.multipart.size 512M # size of each multipart chunk
spark.hadoop.fs.s3a.multipart.threshold 512M # size before using multipart uploads
spark.hadoop.fs.s3a.socket.recv.buffer 65536 # read socket buffer hint
spark.hadoop.fs.s3a.socket.send.buffer 65536 # write socket buffer hint
spark.hadoop.fs.s3a.threads.max 2048 # maximum number of threads for S3A

套用組態變更後,請繼續重新啟動 Spark 服務。

spark-config

3.3 設定 Hive

導覽至 Services -> Hive -> CONFIGS -> ADVANCED,如下所示

hive-config

導覽至「Custom hive-site」以設定 _s3a_ 連接器的 MinIO 參數

hive-config

hive-site.xml 新增以下最佳項目,以使用 MinIO 設定 Hive。

hive.blobstore.use.blobstore.as.scratchdir=true
hive.exec.input.listing.max.threads=50
hive.load.dynamic.partitions.thread=25
hive.metastore.fshandler.threads=50
hive.mv.files.threads=40
mapreduce.input.fileinputformat.list-status.num-threads=50

如需更多關於這些選項的資訊,請造訪 https://www.cloudera.com/documentation/enterprise/5-11-x/topics/admin_hive_on_s3_tuning.html

hive-config

套用組態變更後,請繼續重新啟動所有 Hive 服務。

hive-config

4. 執行範例應用程式

成功安裝 Hive、Hadoop 和 Spark 後,我們現在可以繼續執行一些範例應用程式,以查看它們是否配置正確。我們可以利用 Spark Pi 和 Spark WordCount 程式來驗證我們的 Spark 安裝。我們還可以探索如何從命令列和 Spark Shell 執行 Spark 工作。

4.1 Spark Pi

透過執行以下運算密集型範例來測試 Spark 安裝,該範例透過向一個圓形「投擲飛鏢」來計算 pi。該程式在單位正方形((0,0) 到 (1,1))中產生點,並計算有多少點落在正方形內的單位圓內。結果會逼近 pi。

按照以下步驟執行 Spark Pi 範例

  • 以使用者 ‘spark’ 登入。

  • 當工作執行時,函式庫現在可以在中間處理期間使用 MinIO

  • 導覽至具有 Spark 用戶端的節點,並存取 spark2-client 目錄

cd /usr/hdp/current/spark2-client
su spark
  • 使用 org.apache.spark 中的程式碼,在 yarn-client 模式下執行 Apache Spark Pi 工作

./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn-client \
    --num-executors 1 \
    --driver-memory 512m \
    --executor-memory 512m \
    --executor-cores 1 \
    examples/jars/spark-examples*.jar 10

該工作應產生如下所示的輸出。請注意輸出中的 pi 值。

17/03/22 23:21:10 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.302805 s
Pi is roughly 3.1445191445191445

也可以在瀏覽器中檢視工作狀態,方法是導覽至 YARN ResourceManager Web UI 並點擊工作歷史伺服器資訊。

4.2 WordCount

WordCount 是一個簡單的程式,用於計算文字檔案中單字出現的頻率。程式碼會建立一個稱為計數的 (String, Int) 對資料集,並將該資料集儲存到檔案中。

以下範例將 WordCount 程式碼提交至 Scala Shell。為 Spark WordCount 範例選擇一個輸入檔案。我們可以將任何文字檔案作為輸入。

  • 以使用者 ‘spark’ 登入。

  • 當工作執行時,函式庫現在可以在中間處理期間使用 MinIO

  • 導覽至具有 Spark 用戶端的節點,並存取 spark2-client 目錄

cd /usr/hdp/current/spark2-client
su spark

以下範例使用 log4j.properties 作為輸入檔案

4.2.1 將輸入檔案上傳到 HDFS:

hadoop fs -copyFromLocal /etc/hadoop/conf/log4j.properties
          s3a://testbucket/testdata

4.2.2 執行 Spark Shell:

./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m

該命令應產生如下所示的輸出。(帶有額外的狀態訊息)

Spark context Web UI available at http://172.26.236.247:4041
Spark context available as 'sc' (master = yarn, app id = application_1490217230866_0002).
Spark session available as 'spark'.
Welcome to


      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0.2.6.0.0-598
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala>
  • scala> 提示符下,輸入以下命令來提交工作,將節點名稱、檔案名稱和檔案位置替換為您的值

scala> val file = sc.textFile("s3a://testbucket/testdata")
file: org.apache.spark.rdd.RDD[String] = s3a://testbucket/testdata MapPartitionsRDD[1] at textFile at <console>:24

scala> val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:25

scala> counts.saveAsTextFile("s3a://testbucket/wordcount")

使用以下其中一種方法來檢視工作輸出

在 Scala Shell 中檢視輸出

scala> counts.count()
364

若要從 MinIO 檢視輸出,請離開 Scala Shell。檢視 WordCount 工作狀態

hadoop fs -ls s3a://testbucket/wordcount

輸出應與以下類似

Found 3 items
-rw-rw-rw-   1 spark spark          0 2019-05-04 01:36 s3a://testbucket/wordcount/_SUCCESS
-rw-rw-rw-   1 spark spark       4956 2019-05-04 01:36 s3a://testbucket/wordcount/part-00000
-rw-rw-rw-   1 spark spark       5616 2019-05-04 01:36 s3a://testbucket/wordcount/part-00001