發布事件到 Kafka
MinIO 支援將 Bucket 通知 事件發布到 Kafka 服務端點。
MinIO 依賴 https://github.com/Shopify/sarama 專案進行 Kafka 連線,並共用該專案的 Kafka 支援。請參閱 sarama
的 相容性和 API 穩定性 章節以了解更多詳細資訊。
將 Kafka 端點新增到 MinIO 部署
以下程序會新增一個新的 Kafka 服務端點,以支援 MinIO 部署中的 Bucket 通知。
先決條件
Kafka 最低版本和支援版本
MinIO 依賴 https://github.com/Shopify/sarama 專案進行 Kafka 連線,並共用該專案的 Kafka 支援。請參閱 sarama
的 相容性和 API 穩定性 章節以了解更多詳細資訊。
MinIO mc
命令列工具
1) 將 Kafka 端點新增到 MinIO
您可以使用環境變數或設定執行階段組態設定來組態新的 Kafka 服務端點。
MinIO 支援使用 環境變數 指定 Kafka 服務端點和相關的組態設定。minio server
程序會在下次啟動時套用指定的設定。
以下範例程式碼會設定與組態 Kafka 服務端點相關的所有環境變數。最小必要變數為 MINIO_NOTIFY_KAFKA_ENABLE
和 MINIO_NOTIFY_KAFKA_BROKERS
set MINIO_NOTIFY_KAFKA_ENABLE_<IDENTIFIER>="on"
set MINIO_NOTIFY_KAFKA_BROKERS_<IDENTIFIER>="<ENDPOINT>"
set MINIO_NOTIFY_KAFKA_TOPIC_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_USERNAME_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_PASSWORD_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_MECHANISM_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_SASL_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_QUEUE_DIR_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_QUEUE_LIMIT_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_VERSION_<IDENTIFIER>="<string>"
set MINIO_NOTIFY_KAFKA_COMMENT_<IDENTIFIER>="<string>"
將
<IDENTIFIER>
取代為 Kafka 服務端點的唯一描述性字串。對於與新目標服務端點相關的所有環境變數,使用相同的<IDENTIFIER>
值。以下範例假設識別符為PRIMARY
。如果指定的
<IDENTIFIER>
與 MinIO 部署上現有的 Kafka 服務端點相符,則新設定會覆寫該端點的任何現有設定。使用mc admin config get notify_kafka
來檢閱 MinIO 部署上目前已設定的 Kafka 端點。將
<ENDPOINT>
替換為以逗號分隔的 Kafka brokers 清單。例如:"kafka1.example.com:2021,kafka2.example.com:2021"
如需每個環境變數的完整文件,請參閱 用於 Bucket 通知的 Kafka 服務。
MinIO 支援使用 mc admin config set
命令和 notify_kafka
組態金鑰,在執行中的 minio server
程序上新增或更新 Kafka 端點。您必須重新啟動 minio server
程序,才能套用任何新的或更新的組態設定。
以下範例程式碼會設定與設定 Kafka 服務端點相關的所有設定。最少的必要設定是 notify_kafka brokers
mc admin config set ALIAS/ notify_kafka:IDENTIFIER \
brokers="<ENDPOINT>" \
topic="<string>" \
sasl_username="<string>" \
sasl_password="<string>" \
sasl_mechanism="<string>" \
tls_client_auth="<string>" \
tls="<string>" \
tls_skip_verify="<string>" \
client_tls_cert="<string>" \
client_tls_key="<string>" \
version="<string>" \
queue_dir="<string>" \
queue_limit="<string>" \
comment="<string>"
將
IDENTIFIER
替換為 Kafka 服務端點的唯一描述性字串。此程序中的以下範例假設識別碼為PRIMARY
。如果指定的
IDENTIFIER
與 MinIO 部署上現有的 Kafka 服務端點相符,則新設定會覆寫該端點的任何現有設定。使用mc admin config get notify_kafka
來檢閱 MinIO 部署上目前已設定的 Kafka 端點。將
ENDPOINT
替換為以逗號分隔的 Kafka brokers 清單。例如:"kafka1.example.com:2021,kafka2.example.com:2021"
如需每個設定的完整文件,請參閱 Kafka Bucket 通知組態設定。
2) 重新啟動 MinIO 部署
您必須重新啟動 MinIO 部署,才能套用組態變更。使用 mc admin service restart
命令來重新啟動部署。
mc admin service restart ALIAS
將 ALIAS
替換為要重新啟動的部署的 別名。
minio server
程序會在啟動時針對每個已設定的 Kafka 目標印出一行,類似於以下內容:
SQS ARNs: arn:minio:sqs::primary:kafka
使用相關的 Kafka 部署作為目標來設定 bucket 通知時,您必須指定 ARN 資源。
識別 bucket 通知的 ARN
您在先前建立端點時,已定義 <IDENTIFIER>
,以指派給 bucket 通知的目標 ARN。以下步驟會傳回部署上已設定的 ARN。尋找您指定的 <IDENTIFIER>
,以識別先前建立的 ARN。
檢閱 JSON 輸出
複製並執行以下命令,將
ALIAS
替換為部署的 別名。mc admin info --json ALIAS
在 JSON 輸出中,尋找金鑰
info.sqsARN
。您需要的 ARN 是符合您指定的
<IDENTIFIER>
的金鑰值。例如,
arn:minio:sqs::primary:kafka
。
使用 jq 來剖析 JSON 以取得值
3) 使用 Kafka 端點作為目標來設定 Bucket 通知
使用 mc event add
命令新增一個新的 bucket 通知事件,並將已設定的 Kafka 服務作為目標
mc event add ALIAS/BUCKET arn:minio:sqs::primary:kafka \
--event EVENTS
將
ALIAS
替換為 MinIO 部署的 別名。將
BUCKET
替換為要在其中設定事件的 bucket 名稱。將
EVENTS
替換為以逗號分隔的 事件 清單,MinIO 會針對這些事件觸發通知。
使用 mc event ls
來檢視指定通知目標的所有已設定 bucket 事件
mc event ls ALIAS/BUCKET arn:minio:sqs::primary:kafka
4) 驗證已設定的事件
對您設定新事件的 bucket 執行動作,並檢查 Kafka 服務的通知資料。所需的動作取決於設定 bucket 通知時指定的 events
。
例如,如果 bucket 通知組態包含 s3:ObjectCreated:Put
事件,您可以使用 mc cp
命令在 bucket 中建立新物件並觸發通知。
mc cp ~/data/new-object.txt ALIAS/BUCKET
更新 MinIO 部署中的 Kafka 端點
以下程序會更新現有的 Kafka 服務端點,以支援 MinIO 部署中的 bucket 通知。
先決條件
Kafka 最低版本和支援版本
MinIO 依賴 https://github.com/Shopify/sarama 專案進行 Kafka 連線,並共用該專案的 Kafka 支援。請參閱 sarama
的 相容性和 API 穩定性 章節以了解更多詳細資訊。
MinIO mc
命令列工具
1) 列出部署中已設定的 Kafka 端點
使用 mc admin config get
命令來列出部署中目前已設定的 Kafka 服務端點
mc admin config get ALIAS/ notify_kafka
將 ALIAS
替換為 MinIO 部署的 別名。
命令輸出類似以下內容
notify_kafka:primary tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version=""
notify_kafka:secondary tls_skip_verify="off" queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version=""
notify_kafka
金鑰是 Kafka 通知設定 的最上層組態金鑰。brokers
金鑰指定給定 notify_kafka 金鑰的 Kafka 服務端點。notify_kafka:<IDENTIFIER>
後綴描述該 Kafka 服務端點的唯一識別碼。
記下您想要更新的 Kafka 服務端點的識別碼,以便進行下一個步驟。
2) 更新 Kafka 端點
使用 mc admin config set
命令,設定 Kafka 服務端點的新組態
mc admin config set ALIAS/ notify_kafka:<IDENTIFIER> \
brokers="https://kafka1.example.net:9200, https://kafka2.example.net:9200" \
topic="<string>" \
sasl_username="<string>" \
sasl_password="<string>" \
sasl_mechanism="<string>" \
tls_client_auth="<string>" \
tls="<string>" \
tls_skip_verify="<string>" \
client_tls_cert="<string>" \
client_tls_key="<string>" \
version="<string>" \
queue_dir="<string>" \
queue_limit="<string>" \
comment="<string>"
notify_kafka brokers
組態設定是 Kafka 服務端點的最少必要設定。所有其他組態設定都是選用的。如需 Kafka 組態設定的完整清單,請參閱 Kafka 通知設定。
3) 重新啟動 MinIO 部署
您必須重新啟動 MinIO 部署,才能套用組態變更。使用 mc admin service restart
命令來重新啟動部署。
mc admin service restart ALIAS
將 ALIAS
替換為要重新啟動的部署的 別名。
minio server
程序會在啟動時針對每個已設定的 Kafka 目標印出一行,類似於以下內容:
SQS ARNs: arn:minio:sqs::primary:kafka
4) 驗證變更
對具有事件組態的儲存桶執行操作,該組態使用更新後的 Kafka 服務端點,並檢查 Kafka 服務以獲取通知資料。所需的操作取決於配置儲存桶通知時指定的 events
。
例如,如果 bucket 通知組態包含 s3:ObjectCreated:Put
事件,您可以使用 mc cp
命令在 bucket 中建立新物件並觸發通知。
mc cp ~/data/new-object.txt ALIAS/BUCKET