异步使命处理体系中的数据剖析

数据处理、机器学习练习、数据核算剖析是最为常见的一类离线使命。这类使命往往都是经过了一系列的预处理后,由上游统一发送到使命渠道进行批量练习及剖析。在处理语言方面,Python 因为其所提供的丰厚的数据处理库,成为了数据范畴最为常用的语言之一。函数核算原生支撑 Python runtime,并支撑快捷的引进第三方库,使得运用函数核算异步使命进行处理变得极为方便。

数据剖析场景常见诉求

数据剖析场景往往具有履行时间长、并发量大的特点。在离线场景中,往往会守时触发一批大量的数据进行会集处理。因为这种触发特性,事务方往往会对资源利用率(本钱)具有较高的要求,期望能够满足功率的一起,尽量下降本钱。具体归纳如下:

  1. 程序开发便捷,关于第三方包及自定义依靠友爱;
  2. 支撑长时运转。能够检查履行过程中的使命状况,或登录机器进行操作。假如呈现数据过错支撑手动中止使命;
  3. 资源利用率高,本钱最优。

以上诉求十分适合运用函数核算异步使命。

典型案例 – 数据库自治服务

  • 事务基本情况

阿里云集团内部的数据库巡检渠道首要用于对 sql 语句的慢查询、日志等进行优化剖析。整个渠道使命分为离线练习及在线剖析两类首要使命,其间在线剖析事务的的核算规模达到了上万核,离线事务的每日履行时长也在数万核小时。因为在线剖析、离线练习时间上的不确定性,很难进步集群整体资源利用率,而且在事务高峰来时需求极大的弹性算力支撑。运用函数核算后,整个事务的架构图如下:

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

  • 事务痛点及架构演进

数据库巡检渠道负责阿里巴巴全网各 Region 的数据库 SQL 优化及剖析作业。Mysql 数据来源于各 Region 的各个集群,并统一在 Region 维度进行一次预聚合及存储。在进行剖析时,因为需求跨 region 的聚合及核算,巡检渠道首先测验在内网建立大型 Flink 集群进行核算剖析作业。但是在实践运用中,遇到了如下问题:

  1. 数据处理算法迭代繁琐。首要体现在算法的布置、测验及发布上。Flink 的 Runtime 才能极大限制了发布周期;
  2. 关于常见的及一些自定义的第三方库,Flink 支撑不是很好。算法所依靠的一些机器学习、核算的库在 Flink 官方 Python runtime 中要么没有,要么版别老旧,运用不方便,无法满足要求;
  3. 走 Flink 转发链路较长,Flink 排查问题困难;
  4. 峰值时弹性速度及资源均较难满足要求。而且整体本钱十分高。

在了解了函数核算后,针对 Flink 核算部分进行了算法使命的搬迁作业,将中心练习及统核算法搬迁至函数核算。经过运用函数核算异步使命所提供的相关才能,整个开发、运维及本钱得到了极大的提高。

  • 搬迁函数核算架构后的效果
  1. 搬迁函数核算后,体系能够完好承接峰值流量,快速完成每日剖析及练习使命;
  2. 函数核算丰厚的 Runtime 才能支撑了事务的快速迭代;
  3. 核算上相同的核数本钱变为了本来 Flink 的 1/3。

函数核算异步使命十分适用于这类数据处理使命。函数核算在下降运算资源的本钱一起,能够将您从繁杂的渠道运维作业中解放出来,专心于算法开发及优化。

函数核算异步使命最佳实践-Kafka ETL

ETL 是数据处理中较为常见的使命。原始数据或存在于 Kafka 中,或存在于 DB 中,因为事务需求对数据进行处理后转储到其他存储介质(或存回本来的使命行列)。这类事务也归于显着的使命场景。假如您采用了云上的中间件服务(如云上的 Kafka),您就能够利用函数核算强大的触发器集成生态便捷的集成 Kafka,而无需重视诸如 Kafka Connector 的布置、过错处理等与事务无关的操作。

ETL 使命场景的需求

一个 ETL 使命往往包含 Source、Sink 及处理单元三个部分,因此 ETL 使命除了对算力的要求外,还需求使命体系具有极强的上下游衔接生态。除此之外,因为数据处理的精确性要求,需求使命处理体系能够提供使命去重、Exactly Once 的操作语义。而且,关于处理失利的音讯,需求能够进行补偿(如重试、死信行列)的才能。总结如下:

  1. 使命的精确履行:
    1. 使命重复触发支撑去重;
    2. 使命支撑补偿,死信行列;
  1. 使命的上下游:

    1. 能够方便的拉取数据,并在处理后将数据传递至其他体系;
  1. 算子才能的要求:

    1. 支撑用户自定义算子的才能,能够灵活的履行各种数据处理使命。

Serverless Task 对 ETL 使命的支撑

函数核算支撑的 Destinationg 功用能够很好的支撑 ETL 使命关于便捷衔接上下游、使命精确履行的相关诉求。函数核算丰厚的 Runtime 支撑也使得关于数据处理的使命变得极为灵活。在 Kafka ETL 使命处理场景中,咱们首要用到的 Serverless Task 才能如下:

  1. 异步方针装备功用:
    1. 经过装备使命成功方针,支撑主动将使命投递至下游体系(如行列中);
    2. 经过装备使命失利方针,支撑死信行列才能,将失利的使命投递至音讯行列,等待后续的补偿处理;
  1. 灵活的算子及第三方库支撑:

    1. Python 因为其丰厚的核算、运算的第三方库的支撑,在数据处理范畴 Python 是用的最为广泛的语言之一。函数核算的 Python Runtime 支撑对第三方库打包,使您能够快速的进行原型验证及测验上线。

Kafka ETL 使命处理示例

咱们以简略的 ETL 使命处理为例,数据源来自 Kafka,经过函数核算处理后,将使命履行成果及上下游信息推送至音讯服务 MNS。见函数核算部分项目源码 [ 1]

  • Kafka 资源准备
  1. 进入 Kafka 控制台,点击购买实例,之后布置。等待实例布置完成;

  2. 进入创立好的实例中,创立一个测验用 Topic。

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

  • 方针资源准备(MNS)

进入 MNS 控制台,别离创立两个行列:

  1. dead-letter-queue:作为死信行列运用。当音讯处理失利后,履行的上下文信息将投递到这儿;
  2. fc-etl-processed-message:作为使命成功履行后的推送方针。

创立完成后,如下图所示:

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

  • 布置
  1. 下载装置 Serverless Devs:
npm install @serverless-devs/s

具体文档能够参阅 Serverless Devs 装置文档 [ 2]

  1. 装备密钥信息:
s config add

具体文档能够参阅阿里云密钥装备文档 [ 3]

  1. 进入项目,修正 s.yaml 文件中的方针 ARN 为上述创立后的 MNS 行列 ARN,并修正服务人物为已存在的人物;

  2. 布置:s deploy -t s.yaml

  • 装备 ETL 使命
  1. 进入 kafka 控制台 – connector 使命列表标签页,点击创立 Connector;

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

  1. 在装备完基本信息、源的 Topic 后,装备方针服务。在这儿面咱们挑选函数核算作为方针:

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

您能够依据事务需求装备发送批大小及重试次数。至此,咱们已完成使命的基本装备。留意:这儿面的发送形式请挑选“异步”形式。

进入到函数核算异步装备页面,咱们能够看到现在的装备如下:

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

  • 测验 ETL 使命

1.进入 kafka 控制台 – connector 使命列表标签页,点击测验;填完音讯内容后,点击发送:

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

  1. 发送多条音讯后,进入到函数控制台。咱们能够看到有多条音讯在履行中。此时咱们挑选运用中止使命的方式来模仿一次使命履行失利:

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

  1. 进入到音讯服务 MNS 控制台中,咱们能够看到两个从前创立的行列中均有一条可用音讯,别离代表一次履行和失利的使命内容:

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

  1. 进入到行列概况中,咱们能够看到两条音讯内容。以成功的音讯内容为例:
{
    "timestamp":1646826806389,
    "requestContext":{
        "requestId":"919889e7-60ff-408f-a0c7-627bbff88456",
        "functionArn":"acs:fc:::services/fc-etl-job.LATEST/functions/fc-job-function",
        "condition":"",
        "approximateInvokeCount":1
    },
    "requestPayload":"[{"key":"k1","offset":1,"overflowFlag":false,"partition":5,"timestamp":1646826803356,"topic":"connector-demo","value":"k1","valueSize":4}]",
    "responseContext":{
        "statusCode":200,
        "functionError":""
    },
    "responsePayload":"[\n    {\n        "key": "k1",\n        "offset": 1,\n        "overflowFlag": false,\n        "partition": 5,\n        "timestamp": 1646826803356,\n        "topic": "connector-demo",\n        "value": "k1",\n        "valueSize": 4\n    }\n]"
}

在这儿面,咱们能够看到 “responsePayload” 这一个 Key 中有函数回来的原始内容。一般情况下咱们会将数据处理的成果作为 response 回来,所以在后续的处理中,能够经过读取 “responsePayload” 来获取处理后的成果。

“requestPayload” 这一个 Key 中是 Kafka 触发函数核算的原始内容,经过读取这条数据中的内容,便能够获取原始数据。

函数核算异步使命最佳实践-音视频处理

随着核算机技能和网络的发展,视频点播技能因其杰出的人机交互性和流媒体传输技能倍受教育、文娱等职业的青睐。当前云核算渠道厂商的产品线不断成熟完善,假如想要建立视频点播类应用,直接上云会扫清硬件采购、技能等各种障碍。以阿里云为例,典型的解决方案如下:

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

在该解决方案中,对象存储 OSS 能够支撑海量视频存储,收集上传的视频被转码以适配各种终端、CDN 加快终端设备播放视频的速度。此外还有一些内容安全 [ 4] 审查需求,例如鉴黄、鉴恐等。

音视频是典型的长时处理场景,十分适合运用函数核算使命。

音视频处理的需求

在视频点播解决方案中,视频转码是最消耗核算力的一个子体系,尽管您能够运用云上专门的转码服务,但在某些场景下,您仍会挑选自己建立转码服务,例如:

  • 需求更弹性的视频处理服务。例如,已经在虚拟机或容器渠道上基于 FFmpeg 布置了一套视频处理服务,但想在此基础上提高资源利用率,实现具有显着波峰波谷、流量突增情况下的快弹及稳定性;

  • 需求批量快速处理多个超大的视频。例如,每周五守时产生几百个 4 GB 以上 1080P 的大视频,每个使命可能履行时长达数小时;

  • 对视频处理使命希望实时掌握进度;并在一些呈现过错的情况下需求登录实例排查问题乃至中止履行中的使命避免资源消耗。

Serverless Task 对音视频场景的支撑

上述诉求是典型的使命场景。而因为这类使命往往具有波峰波谷的特性,如何进行核算资源的运维,并尽可能的下降其本钱,这部分的作业量乃至比实践视频处理事务的作业量还要大。Serverless Task 这一产品形态就是为了解决这类场景而诞生的,经过 Serverless Task,您能够快速构建高弹性、高可用、低本钱免运维的视频处理渠道。

在这个场景中,咱们会用到的 Serverless Task 的首要才能如下:

  1. 免运维 & 低本钱:核算资源随用随弹,不运用不付费;
  2. 长时履行使命负载友爱:单个实例最长支撑 24h 的履行时长;
  3. 使命去重:支撑触发端的过错补偿。关于单一使命,Serverless Task 能够做到主动去重的才能,履行更牢靠;
  4. 使命可观测:一切履行中、履行成功、履行失利的使命可追溯,可查询;支撑使命的履行历史数据查询、使命日志查询;
  5. 使命可操作:您能够中止、重试使命;
  6. 灵敏开发 & 测验:官方支撑 S 东西进行主动化一键布置;支撑登录运转中函数实例的才能,您能够直接登录实例调试 ffmpeg 等第三方程序,所见即所得。

Serverless-FFmpeg 视频转码

项目源码 [ 5] 见文末

  • 布置
  1. 下载装置 Serverless Devs:
npm install @serverless-devs/s

具体文档能够参阅 Serverless Devs 装置文档 [ 2]

  1. 装备密钥信息:
s config add

具体文档能够参阅阿里云密钥装备文档 [ 3 ]

  1. 初始化项目:s init video-transcode -d video-transcode
  2. 进入项目并布置:cd video-transcode && s deploy
  • 调用函数
  1. 发起 5 次异步使命函数调用
$ s VideoTranscoder invoke -e '{"bucket":"my-bucket", "object":"480P.mp4", "output_dir":"a", "dst_format":"mov"}' --invocation-type async   --stateful-async-invocation-id my1-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: bf7d7745-886b-42fc-af21-ba87d98e1b1c
$ s VideoTranscoder invoke -e '{"bucket":"my-bucket", "object":"480P.mp4", "output_dir":"a", "dst_format":"mov"}' --invocation-type async   --stateful-async-invocation-id my2-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: edb06071-ca26-4580-b0af-3959344cf5c3
$ s VideoTranscoder invoke -e '{"bucket":"my-bucket", "object":"480P.mp4", "output_dir":"a", "dst_format":"flv"}' --invocation-type async   --stateful-async-invocation-id my3-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: 41101e41-3c0a-497a-b63c-35d510aef6fb
$ s VideoTranscoder invoke -e '{"bucket":"my-bucket", "object":"480P.mp4", "output_dir":"a", "dst_format":"avi"}' --invocation-type async   --stateful-async-invocation-id my4-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: ff48cc04-c61b-4cd3-ae1b-1aaaa1f6c2b2
$ s VideoTranscoder invoke -e '{"bucket":"my-bucket", "object":"480P.mp4", "output_dir":"a", "dst_format":"m3u8"}' --invocation-type async   --stateful-async-invocation-id my5-480P-mp4
VideoTranscoder/transcode async invoke success.
request id: d4b02745-420c-4c9e-bc05-75cbdd2d010f
  1. 登录FC 控制台 [ 6 ]

阿里云 Serverless 异步任务处理系统在数据分析领域的应用

能够清晰看出每一次转码使命的履行情况:

    • A 视频是什么时分开始转码的, 什么时分转码结束
    • B 视频转码使命不太契合预期, 我中途能够点击中止调用
    • 经过调用状况过滤和时间窗口过滤,我能够知道现在有多少个使命正在履行, 历史完成情况是怎么样的
    • 能够追溯每次转码使命履行日志和触发payload
    • 当您的转码函数有异常时分, 会触发 dest-fail 函数的履行,您在这个函数能够增加您自定义的逻辑, 比如报警

转码结束后, 您也能够登录 OSS 控制台到指定的输出目录检查转码后的视频。

在本地运用该项目时,不只能够布置,还能够进行更多的操作,例如检查日志,检查目标,进行多种形式的调试等,这些操作概况能够参阅函数核算组件指令文档 [ 7]

参阅链接:

[1] 函数核算部分项目源码:

github.com/awesome-fc/…

[2] Serverless Devs 装置文档:

https://github.com/Serverless-Devs/ServerlessDevs/blob/master/docs/zh/install.md

[3] 阿里云密钥装备文档:

github.com/devsapp/fc/…

[4] 内容安全:

help.aliyun.com/product/284…

[5] 项目源码:

github.com/devsapp/sta…

[6] FC 控制台:

fcnext.console.aliyun.com/overview

[7] 函数核算组件指令文档:

github.com/devsapp/fc#…