作者:庄宇

什么是Fan-out Fan-in

工作流编列过程中,为了加速大使命处理的功率,能够运用 Fan-out Fan-in 使命编列,将大使命分解成小使命,然后并行运转小使命,最终聚合成果。

ACK One Argo工作流:完成动态 Fan-out/Fan-in 使命编列

由上图,能够运用 DAG(有向无环图)编列 Fan-out Fan-in 使命,子使命的拆分方法分为静态和动态,分别对应静态 DAG 和动态 DAG。动态 DAG Fan-out Fan-in 也能够理解为 MapReduce。每个子使命为 Map,最终聚合成果为 Reduce。

静态 DAG: 拆分的子使命分类是固定的,例如:在数据搜集场景中,一起搜集数据库 1 和数据库 2 中的数据,最终聚合成果。

动态 DAG: 拆分的子使命分类是动态的,取决于前一个使命的输出成果,例如:在数据处理场景中,使命 A 能够扫描待处理的数据集,为每个子数据集(例如:一个子目录)发动子使命 Bn 处理,当一切子使命 Bn 运转结束后,在子使命 C 中聚合成果,详细发动多少个子使命 B 取决由使命 A 的输出成果。依据实际的事务场景,能够在使命 A 中自定义子使命的拆分规则。

ACKOne分布式工作流 Argo 集群

在实际的事务场景中,为了加速大使命的履行,提升功率,往往需求将一个大使命分解成数千个子使命,为了保证数千个子使命的一起运转,需求调度数万核的 CPU 资源,叠加多使命需求竞争资源,一般 IDC 的离线使命集群难以满意需求。例如:自动驾驭仿真使命,修正算法后的回归测验,需求对一切驾驭场景仿真,每个小驾驭场景的仿真能够由一个子使命运转,开发团队为加速迭代速度,要求一切子场景测验并行履行。

如果您在数据处理,仿真核算和科学核算等场景中,需求运用动态 DAG 的方法编列使命,或许一起需求调度数万核的 CPU 资源加速使命运转,您能够运用阿里云ACK One 分布式工作流 Argo 集群 [ 1]

ACK One 分布式工作流 Argo 集群,产品化托管Argo Workflow [ 2] ,供给售后支撑,支撑动态 DAG Fan-out Fan-in 使命编列,支撑按需调度云上算力,运用云上弹性,调度数万核 CPU 资源并行运转大规模子使命,削减运转时间,运转完成后及时收回资源节约本钱。支撑数据处理,机器学习,仿真核算,科学核算,CICD 等事务场景。

Argo Workflow 是开源 CNCF 结业项目,聚集云原生范畴下的工作流编列,运用 Kubernetes CRD 编列离线使命和 DAG 工作流,并运用 Kubernetes Pod 在集群中调度运转。

本文介绍运用 Argo Workflow 编列动态 DAG Fan-out Fan-in 使命。

Argo Workflow 编列 Fan-out Fan-in 使命

咱们将构建一个动态 DAG Fan-out Fan-in 工作流,读取阿里云 OSS 目标存储中的一个大日志文件,并将其拆分为多个小文件(split),发动多个子使命分别核算每个小文件中的关键词数量(count),最终聚合成果(merge)。

  1. 创立分布式工作流 Argo 集群 [ 3]

  2. 挂载阿里云 OSS 存储卷,工作流能够像操作本地文件一样,操作阿里云 OSS 上的文件。参阅:工作流运用存储卷 [ 4]

  3. 运用以下工作流 YAML 创立一个工作流,参阅:创立工作流 [ 5] 。详细阐明拜见注释。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dynamic-dag-map-reduce-
spec:
  entrypoint: main
  # claim a OSS PVC, workflow can read/write file in OSS through PVC. 
  volumes:
    - name: workdir
      persistentVolumeClaim:
        claimName: pvc-oss
  # how many tasks to split, default is 5.
  arguments:
    parameters:
      - name: numParts
        value: "5"
  templates:
    - name: main
      # DAG definition.
      dag:
        tasks:
          # split log files to several small files, based on numParts.
          - name: split
            template: split
            arguments:
              parameters:
                - name: numParts
                  value: "{{workflow.parameters.numParts}}"
          # multiple map task to count words in each small file.
          - name: map
            template: map
            arguments:
              parameters:
                - name: partId
                  value: '{{item}}'
            depends: "split"
            # run as a loop, partId from split task json outputs.
            withParam: '{{tasks.split.outputs.result}}'
          - name: reduce
            template: reduce
            arguments:
              parameters:
                - name: numParts
                  value: "{{workflow.parameters.numParts}}"
            depends: "map"
    # The `split` task split the big log file to several small files. Each file has a unique ID (partId).
    # Finally, it dumps a list of partId to stdout as output parameters
    - name: split
      inputs:
        parameters:
          - name: numParts
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["split.py"]
        env:
        - name: NUM_PARTS
          value: "{{inputs.parameters.numParts}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    # One `map` per partID is started. Finds its own "part file" and processes it.
    - name: map
      inputs:
        parameters:
          - name: partId
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["count.py"]
        env:
        - name: PART_ID
          value: "{{inputs.parameters.partId}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    # The `reduce` task takes the "results directory" and returns a single result.
    - name: reduce
      inputs:
        parameters:
          - name: numParts
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["merge.py"]
        env:
        - name: NUM_PARTS
          value: "{{inputs.parameters.numParts}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
      outputs:
        artifacts:
          - name: result
            path: /mnt/vol/result.json
  1. 动态 DAG 完成

1)split 使命在拆分大文件后,会在标准输出中输出一个 json 字符串,包含:子使命要处理的 partId,例如:

["0", "1", "2", "3", "4"]

2)map 使命运用 withParam 引证 split 使命的输出,并解析 json 字符串获得一切 {{item}},并运用每个 {{item}} 作为输入参数发动多个 map 使命。

          - name: map
            template: map
            arguments:
              parameters:
                - name: partId
                  value: '{{item}}'
            depends: "split"
            withParam: '{{tasks.split.outputs.result}}'

更多定义方法,请参阅开源 Argo Workflow 文档 [ 6]

  1. 工作流运转后,经过分布式工作流 Argo 集群控制台 [ 7] 查看使命 DAG 流程与运转成果。

ACK One Argo工作流:完成动态 Fan-out/Fan-in 使命编列

  1. 阿里云 OSS 文件列表,log-count-data.txt 为输入日志文件,split-output,cout-output 中心成果目录,result.json 为最终成果文件。

ACK One Argo工作流:完成动态 Fan-out/Fan-in 使命编列

  1. 示例中的源代码能够参阅:AliyunContainerService GitHub argo-workflow-examples [ 8]

总结

Argo Workflow 是开源 CNCF 结业项目,聚集云原生范畴下的工作流编列,运用 Kubernetes CRD 编列离线使命和 DAG 工作流,并运用 Kubernetes Pod 在集群中调度运转。

阿里云 ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow,供给售后支撑,加固控制面完成数万子使命(Pod)安稳高效调度运转,数据面支撑无服务器方法调度云上大规模算力,无需运维集群或许节点,支撑按需调度云上算力,运用云上弹性,调度数万核 CPU 资源并行运转大规模子使命,削减运转时间,支撑数据处理,机器学习,仿真核算,科学核算,CICD 等事务场景。

欢迎加入 ACK One 客户交流钉钉群与咱们进行交流。(钉钉群号:35688562

相关链接:

[1]阿里云ACK One 分布式工作流 Argo 集群

help.aliyun.com/zh/ack/over…

[2]Argo Workflow

argo-workflows.readthedocs.io/en/latest/

[3]创立分布式工作流 Argo 集群

help.aliyun.com/zh/ack/crea…

[4]工作流运用存储卷

help.aliyun.com/zh/ack/use-…

[5]创立工作流

help.aliyun.com/zh/ack/crea…

[6] 开源Argo Workflow 文档

argo-workflows.readthedocs.io/en/latest/w…

[7]分布式工作流 Argo 集群控制台

account.aliyun.com/login/login…

[8]AliyunContainerService GitHub argo-workflow-examples

github.com/AliyunConta…