思科依据 Amoro + Apache Iceberg 构建云原生湖仓实践

Amoro 是一个构建在 Apache Iceberg 等敞开数据湖表格之上的湖仓办理体系,供给了一套可插拔的数据自优化机制和办理服务,旨在为用户带来开箱即用的湖仓运用体会。

01 作者简介

白旭, 来自思科 WebEx 数据平台的 Software Engineer,首要担任数据湖仓一体的研发与优化。

02 Amoro 在 Webex

CiscoWebexproductsprovidecapabilitiesincludingonlinemeetings,teammessagingandfilesharing.ThesuiteisconsideredaleadingcollaborationplatformintheunifiedcommunicationsareaandisgearedtowardbothsmallgroupcollaborationforSMBsaswellaslargegroupmeetingsforenterprise-widedeployments.

为什么挑选 Amoro

Webex 大数据团队开始选用 Hive 存储格局作为首要规范。可是,由于 Hive 存储格局的限制,纠正特定客户数据和进行数据回溯都变得十分低效。此外,运用 Hive 也增加了保护开支。鉴于这些挑战,咱们着手寻找替代的存储处理方案。这时,咱们发现了 Apache Iceberg,它的设计不仅能够下降咱们的运营和保护本钱,还能提高核心事务运营的功率。因而,咱们着手构建依据 Apache Iceberg 的数据湖。

上一年开始 Webex 逐渐运用 Iceberg V2 format 来当作默许的表格局。V2 的重要特性之一便是 row-level 的更新才能,可是引入了要害的难题:读取 V2 表时Merge-on-Read(MOR)带来的功能问题。假如累计了大量的 Delete 文件,依靠 Iceberg 的报表查询的时效性将大打折扣,乃至达到了不可查的状况。

咱们最先开始测验运用 Spark comapction procedures 来兼并小文件,经过调度体系守时顺次履行。

思科依据 Amoro + Apache Iceberg 构建云原生湖仓实践

可是这样相对原始的保护手段有许多弊端:

  • 高资源占用:每个Spark job或许会超过40core和300GB的内存
  • 履行时间长:每个环境有许多Iceberg表需求兼并,而且当兼并数据量大的表时会阻塞其他表,这使得每个Pocedure履行时间都过长
  • 低容错:一个兼并任务或许包括多个Iceberg表,当某个表呈现过错时会导致整个Job的失利
  • 保护困难:兼并失利的表需求不断重启,手动修复,乃至或许会被遗漏

正是有上述多个痛点,咱们很快引入了 Amoro 来处理 Iceberg 表的许多保护问题。咱们选用 External Flink Optimizer 注册的方法,拉取并处理 Amoro Management Service(AMS)生产的优化任务。而且敞开了快照过期,数据过期等服务来削减存储体系的压力。

思科依据 Amoro + Apache Iceberg 构建云原生湖仓实践

Amoro 处理了传统守时调度的 Spark job 的许多痛点并带来了多个优势:

  • 更高的资源运用率:单从资源占用最多的环境来看,运用 Flink Optimizer 节省了 70% 左右的资源运用
  • 高容错:失利的 Optimization process/task 能够在下次扫描后重新测验优化
  • 及时性:继续的兼并优化能够确保 Iceberg 表的查询功率维持在可控范围,确保报表的查询功率
  • 自办理:Amoro 仅依据表的 properties 来确定是否敞开了优化,能够操控表级的优化敞开或关闭和相关策略
  • 可视化:Amoro 供给 WebUI,将优化状况,表的基本信息等状况呈现给开发者

运用状况

Cisco Webex 在 Amoro(前Arctic)项目开源后不久就测验用其来处理 Iceberg 的小文件兼并,快照过期清理等数据湖办理的各类问题。直至今日,Amoro 已在 Webex 上有了必定的实践规划:

  • 多数据中心,多集群布置:最多7个不同数据中心,Hadoop 集群环境
  • 多个环境:咱们不仅在 Hadoop 环境中运用 Amoro 办理 Iceberg 表,近期将其布置在 AWS 环境并优化 AWS 上 Iceberg 表
  • 1000 Iceberg 表

不过依据公司的实践状况,咱们会逐渐将 Hadoop 环境迁移至 AWS 环境,在此期间遇到的实践问题和处理方案能够在此篇文章做些讨论。

03 AmoroonAWS

AWS 上需求处理多个问题,首先的是 Iceberg AWS 集成如 Catalog 和 FileSystem 的切换等,别的便是 AMS 端的适配。

IcebergAWSIntegrations

咱们将 Iceberg 上线到 AWS 后,从 HiveCatalog 切换至 GlueCatalog,而且也将文件体系由 HDFS 切换为 S3。相关于 HDFS,S3 有着更完善的权限操控,能够给不同 IAM 账号区分 bucket,乃至文件的不同权限,来完结细粒度的权限操控。这是确保数据湖的数据安全性和隐私的要害要素。其次,HDFS 一般需求自行办理硬件和保护,因而或许需求更多的初始本钱和运营本钱,S3 是云服务,其本钱模型是按运用量计费,没有硬件保护本钱。

Hive Metastore 作为服务在不同的环境需求各自布置,一起也需求保护 MySQL 来存储元数据信息,比较于 Glue 增加了自身保护本钱。比如咱们在实践的生产环境中就遇到因为 MySQL 连接数达到上限而不得重启 HMS 的状况。而 Glue 相对 HMS 更简略,不需求手动保护,也削减多个环境分开布置导致的数据孤岛问题。

LockManager

关于 Iceberg 来说,有些文件体系如 S3 不供给文件的写入排斥来确保 metadata 原子性,HMS 依靠 MySQL 做了排他锁,而 Glue 需求自界说锁的完结,在 AWS 上 Iceberg 供给 DynamoDBLockManager 确保表的并发修正。

思科依据 Amoro + Apache Iceberg 构建云原生湖仓实践

  1. Iceberg 修正新 metadata.json 文件前向 LockManager 服务测验获取锁
  2. 假如有其他进程拿到锁时会重新测验,这个测验的次数和距离均有相关参数能够装备
  3. 拿到锁后将当时 metadata 文件的方位替换为新写入的 metadata-v2.json 地址
  4. 假如在多次测验后仍无法获取锁,那么此次提交或许会失利,然后再次重试
  5. 成功提交 metadata.json 文件后最终 Iceberg 开释掉锁

DynamoDB lock table 其数据模型如下:

PrimaryKey Attributes
LockEntityID LeaseDuration(ms) Version LockOwnerID
pda.orders 15000 d3b9b4ec-6c02-4e7e-9570-927ba1bafa67 s3://wap-bucket/orders/metadata/d3b9b4ec-6c02-4e7e-9570-927ba1bafa67-metadata.json
pda.customers 15000 0f50e24d-e7da-4c8b-aa4b-1b95a50c7f38 s3://wap-bucket/customers/metadata/0f50e24d-e7da-4c8b-aa4b-1b95a50c7f38-metadata.json
pda.products 15000 2dab53a2-7c63-4b95-8fe1-567f73e58d6c s3://wap-bucket/products/metadata/2dab53a2-7c63-4b95-8fe1-567f73e58d6c-metadata.json
  • entityId 是 DynamoDB 表中的 Key,由 Iceberg 数据库名称和表名组成
  • leaseduration是锁心跳超时时间,假如锁超时会主动开释
  • version是一串随机由 DynomoDBHeartbeat 更新的 UUID,确保锁被当时线程持有
  • ownerId 是待写入的新 metadata.json 文件

运用 DynamoDB 办理 Iceberg 表的并发修正,能够避免呈现 Hive Metastore Service 中锁未开释的导致 job block,因为 DynamoDB 中假如拿锁的进程反常退出,其锁也会因为 lease 到期后主动开释,无需手动解锁。

权限操控

AWS IAM 账户能够对 S3,Glue,DynamoDB 等相关服务做细粒度的权限操控。当时咱们给每个团队区分一个 IAM 账号,IAM account 授予相关 S3 bucket 的读写权限,一起也能够办理 Glue 的权限。咱们的 AMS 服务和读写 Job 都是运行在 Kubernetes 上,因而能够天然运用 Namespace 作为单位来分配 IAM 账号,办理好每个人的相关 namespace 的权限,进而能够简略完结 Iceberg 表的权限操控,后边也会对 S3 和 Glue 权限做更具体的区分。

思科依据 Amoro + Apache Iceberg 构建云原生湖仓实践

S3Intelligent-Tiering

咱们 Webex team 在调研 S3 读写 Iceberg 本钱的过程中发现 AWS S3 中的一项特点比较有价值,可是 Iceberg 中暂未适配:storage-class。将该装备设置为 S3 Intelligent-Tiering 能够依据不同拜访频率优化拜访本钱:

Amazon S3 Intelligent-Tiering 存储类旨在经过当拜访形式改动时主动将数据移动到最具本钱效益的拜访层来优化存储本钱。S3 Intelligent-Tiering 存储类主动将目标存储在三个拜访层中:一个针对频频拜访进行了优化的层,一个针对不频频拜访进行了优化的更低本钱的层,以及一个针对很少拜访的数据优化的极低本钱层。每月只需付出少量的目标监控和主动化费用,S3 Intelligent-Tiering 即可将接连 30天未拜访的目标移动到不频频拜访层,完结 40%的节省,并在 90天未拜访之后,将其移动到归档即时拜访层,完结 68%的节省。

该修正已贡献给 Iceberg 社区,并在1.4.0中发布。

AMSAWSAdaptions

前期 AMS 为了运行在 AWS 环境,经过 Custom Catalog 的方法创立并适配 Iceberg 的 GlueCatalog,而且重构了 Arctic 的 FileIO 相关接口以支撑目标文件体系。估计 0.6.0 版别将 GlueCatalog 作为单独的 Catalog 类型,例如要求填写 IAM 等相关特点,会对 AWS 环境会有更好的适配。

无论是 AMS 还是 Optimizer 都需求拜访 Iceberg 及其文件,直接的方法是给上述组件赋予能够拜访所需兼并表读写权限的 IAM account,咱们是将 IAM 相关的认证放在环境变量中,默许的读取链DefaultAWSCredentialsProviderChain会正确拿到相关认证信息完结鉴权。在 k8s 环境中放在 env 变量中即可,例如:

apiVersion: apps/v1kind: Deploymentmetadata: labels: app.kubernetes.io/name: ams name: amsspec: ... template: metadata: labels: app.kubernetes.io/name: ams spec: ... containers: - env: - name: AWS_ACCESS_KEY_ID value: AKIXXXXXXXXXXXXXXXX - name: AWS_SECRET_ACCESS_KEYvalue:fjHyrM1wTJ8CLP13 GU1bCGG1RGlL1xT1lXyBb11

04 布置实践

此篇文章咱们依据 Amoro 0.6.0 的版别作为示例,大致整理以 Helm charts 为模版在 Kubernetes 环境上布置 Amoro 和敞开 Iceberg 优化的流程。运用 Helm Chart 布置相对简略,更加具体的上云布置方法能够参阅之前的文章:ApacheIceberg Arctic构建云原生湖仓实战

值得注意的是,咱们对 Amoro 及 Helm 做了少许改动,因而会与 Amoro 社区的 Helm charts 略有不同,但大致流程是一致的。

打包镜像

手动打包:

mvncleaninstall-DskipTests-am-e-pldist

之后将打包好的 zip 包放入带有 Dockerfile 的目录中并打包 dockerimage:

docker build docker/ams/ --platform amd64 -t xxx/amoro && docker push xxx/amoro

打包好的 docker image 会在下述 Deployment 资源中引证并布置。

编写 Helmchart

除了基本的装备信息会放入Values中以外,带有保密特点的 IAM 认证信息, Database 的账户暗码都会存入 Secrets。将 AMS 的装备信息如bin/config.sh,conf/config.yaml抽出作为 ConfigMap 别离挂载至 env 和 volumeMounts。限于篇幅原因不会在此悉数列出装备文件。

  • _helpers.tpl 模版,与预界说了镜像,label 等根底信息
{{- define "udp.amoro.image.fullname" -}}
{{ .Values.image.repository }}/{{ .Values.image.component }}:{{ .Values.image.tag | default .Chart.AppVersion }}
{{- end -}}
{{- define "udp.amoro.common.labels" -}}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end -}}
{{- define "amoro.home" -}}
{{ .Values.amoroHome | default "/usr/local/amoro" }}
{{-end-}}
  • _pod.tpl 模版,界说 pod 挂载的装备文件等
{{- define "amoro.pod.container.mounts" }}
- name: logs
  mountPath: {{ include "amoro.home" . }}/logs
- name: conf
  mountPath: {{ include "amoro.home" . }}/conf/config.yaml
  readOnly: true
  subPath: "config.yaml"
{{- if or .Values.amoroConf.log4j2 }}
{{- /* log4j2.yaml from config-map*/ -}}
- name: conf
  mountPath: {{ include "amoro.home" . }}/conf/log4j2.xml
  readOnly: true
  subPath: "log4j2.xml"
{{- end }}
{{- if or .Values.jvmOptions }}
- name: conf
  mountPath: {{ include "amoro.home" . }}/conf/jvm.properties
  readOnly: true
  subPath: "jvm.properties"
{{- end -}}
{{- end -}}
{{- /* define amoro.pod.container.mounts end */ -}}
{{/* defined volumes for pod */}}
{{- define "amoro.pod.volumes" -}}
- name: conf
  configMap:
    name: config.yaml
- name: logs
  emptyDir: {}
{{- end -}}
{{-/*define"amoro.pod.volumes"end*/-}}
  • Deployment

与社区不同的是,咱们经过IRSA或IAM认证放入环境变量中的方法进行进行统一办理和更新;别的Optimizer选用的是external形式,更重视AMS和Optimizer之间的连通性,所以把livenessProbe探针改为optimizing的TCP端口。

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app.kubernetes.io/name: ams
  name: ams
spec:
  replicas: {{ .Values.replicas }}
  selector:
    matchLabels:  
      app.kubernetes.io/name: ams
  strategy:
    type: {{ .Values.strategy.type | quote }}
    rollingUpdate:
      maxSurge: {{ .Values.strategy.rollingUpdate.maxSurge | quote }}
      maxUnavailable: {{ .Values.strategy.rollingUpdate.maxUnavailable | quote }}
  template:
    metadata:
      labels:
        app.kubernetes.io/name: ams
    spec:
      {{- if .Values.affinity }}
      affinity:
        {{- toYaml .Values.affinity | nindent 8 }}
      {{- end }}
      {{- if .Values.nodeSelector }}
      nodeSelector:
        {{- toYaml .Values.nodeSelector | nindent 8 }}
      {{- end }}
      {{- if .Values.tolerations }}
      tolerations:
        {{- toYaml .Values.tolerations | nindent 8 }}
      {{- end }}
      {{- if .Values.image.pullSecret }}
      imagePullSecrets:
        - name: {{ .Values.image.pullSecret }}
      {{- end }}
      serviceAccountName: {{ .Values.serviceAccount.name }}
      containers:
        - env:
          - name: AMS_DATABASE_PASSWORD
            valueFrom:
              secretKeyRef:
                name: udp-amoro-externaldb-secret
                key: database-password-udp
        image: {{ include "udp.amoro.image.fullname" .}}
        imagePullPolicy: {{ .Values.image.pullPolicy }}
        name: ams
        ports:
          - containerPort: {{ .Values.ports.amoroServer }}
            name: "amoro-server"
          - containerPort: {{ .Values.ports.optimizing }}
            name: "optimizing"
          - containerPort: {{ .Values.ports.jmxExporter }}
            name: "jmx-exporter"
        {{- if .Values.livenessProbe.enabled }}
        livenessProbe:
          tcpSocket:
            port: {{ .Values.ports.optimizing }}
          initialDelaySeconds: {{ .Values.livenessProbe.initialDelaySeconds }}
          periodSeconds: {{ .Values.livenessProbe.periodSeconds }}                     
          timeoutSeconds: {{ .Values.livenessProbe.timeoutSeconds }}                   
          failureThreshold: {{ .Values.livenessProbe.failureThreshold }}               
          successThreshold: {{ .Values.livenessProbe.successThreshold }}
        {{- end }}
        {{- if .Values.readinessProbe.enabled }}
        readinessProbe:
          httpGet:
            path: /versionInfo
            port: amoro-server
          initialDelaySeconds: {{ .Values.readinessProbe.initialDelaySeconds }}        
          periodSeconds: {{ .Values.readinessProbe.periodSeconds }} 
          timeoutSeconds: {{ .Values.readinessProbe.timeoutSeconds }}                 
          failureThreshold: {{ .Values.readinessProbe.failureThreshold }}             
          successThreshold: {{ .Values.readinessProbe.successThreshold }}
        {{- end }}
        resources:
          {{- toYaml .Values.resources | nindent 12 }}
        volumeMounts:
          {{- include "amoro.pod.container.mounts" . | nindent 12}}
        securityContext:
          {{- toYaml .Values.podSecurityContext | nindent 12 }}
    dnsPolicy: ClusterFirst
    restartPolicy: {{ .Values.image.restartPolicy }}
    schedulerName: default-scheduler
    terminationGracePeriodSeconds: 30
    volumes:
      {{-include"amoro.pod.volumes".|nindent8}}
  • Service

界说AmoroPod拜访的端口

apiVersion: v1
kind: Service
metadata:
  labels:
    {{- include "udp.amoro.labels" . | nindent 4 }}
  name: ams
spec:
  ports:
    - name: amoro-server
      port: {{ .Values.ports.amoroServer }}
      protocol: TCP
    - name: optimizing
      port: {{ .Values.ports.optimizing }}
      protocol: TCP
    - name: jmx-exporter
    port: {{ .Values.ports.jmxExporter }}
    protocol: TCP  
selector:
  {{-include"udp.amoro.labels".|nindent4}}
  • ServiceAccount
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
  name: {{ .Values.serviceAccount.name }}
  labels:
    {{- include "udp.amoro.labels" . | nindent 4 }}
  annotations:
    eks.amazonaws.com/role-arn: {{ .Values.serviceAccount.iamRoleARN }}
{{-end-}}
  • Secert

咱们结合 SecertStore,ExternalSecret 办理敏感信息,将其存储在外部体系(如 Vault,Azure Key Vault 等)中,并经过引证这些外部密钥和凭证将其注入 Kubernetes 的 Secert 目标中。

  • Ingress

Ingress 办理 Kubernetes 集群内部的 HTTP,HTTPS 路由,将外部流量路由到集群内部的服务,所以装备 Ingress 后咱们能够经过 host 的方法拜访其 WebUI

{{- if .Values.ingress.enabled -}}
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: amoro
  labels:
    {{- include "udp.amoro.labels" . | nindent 4 }}
  {{- with .Values.ingress.annotations }}
  annotations:
    {{- toYaml . | nindent 4 }}
  {{- end }}
spec:
  rules:
    - host: {{ .Values.ingress.host }} 
      http:
        paths:
          - path: {{ .Values.ingress.path }}
            pathType: {{ .Values.ingress.pathType }} 
            backend: 
              service:
                name: ams                
                port:
                  number: {{ .Values.ports.amoroServer }}
{{-end}}
  • Podmonitor

为了便利监控 AMS 的状况,咱们在 Docker 镜像中装备 Prometheus 并暴露 jmx exporter 端口,以便搜集和存储与 Amoro Pod 相关的衡量目标。

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  labels:
    app.kubernetes.io/name: amoro-monitor
  name: amoro-monitor
spec:
  namespaceSelector:
    any: true 
  podMetricsEndpoints:
    - interval: 60s
      port: jmx-exporter
  selector:
    matchLabels:
      app.kubernetes.io/name:ams

编写完Helm 模版后经过如下命令装置布置

-- 装置/晋级amoro helm
helmupgrade-installamoro./--namespaceamoro

注册 GlueCatalog

思科依据 Amoro + Apache Iceberg 构建云原生湖仓实践

  • warehouse为必填项,指定数据仓库根目录,当然你也能够运用简易的 warehouse 地址作为占位

  • 在生产环境中装备主张装备lock-impllock.table用于确保 metadata.json 文件的修正原子性

  • 关于运用 IRSA 认证的 AMS,需求设置参数client.credentials-provider为 software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider 来获取正确的认证信息

05 未来规划

1.增量 SORT/ZORDERSORT:Dataskipping 关于 Iceberg 的查询功率提升是明显的,尤其是关于查询条件相对固定的报表。此外更高效的文件跳过也能够削减读取表时对文件的 IO,下降外部拜访 S3 时造成的流量开支。Clustering 的优化方法也让针对不同表的智能优化成为或许。

2.完善监控告警:当时会依据 Amoro 的相关记录和目标做监控,例如 optimizing/pending/committing 超时告警,pod 状况监控等。除了这些还需求依据表本身的状况做健康状况的监控,提前预报表的读/写扩大。

3.KubernetesOptimizer:在生产环境中咱们一直以来都运用 ExternalFlinkOptimizer 来做 Iceberg 的 compaction 任务,这样的方法其实并不便利灵敏的调整 optimizer 的资源。假如像 LocalOptimizer 一样弹性的办理 Pod,那能够必定程度上削减保护外部 Optimizer 的本钱,也节省了一部分 JobManger 的资源。


End~

假如你对数据湖,湖仓一体、table format 或 Amoro 社区感兴趣,欢迎联系咱们深入交流。

关于 Amoro 的更多资讯可检查:

官网amoro.netease.com/

源码github.com/NetEase/amo…

作者:Amoro Community

修改:Viridian