美团图灵机器学习渠道在长期的优化实践中,积累了一系列共同的优化办法。本文首要介绍了图灵机器学习渠道在内存优化方面沉积的优化技能,咱们深入到源码层面,介绍了Spark算子的原理并供给了最佳实践。期望为读者带来一些思路上的启示。

导语

图灵渠道是美团履约渠道技能部2018年开始自研的算法渠道,供给模型全生命周期的一站式服务,旨在帮助算法同学脱离繁琐的工程化开发,把有限的精力聚焦于事务和算法的迭代优化中。

随着美团图灵机器学习渠道的发展,图灵技能团队在内存优化、核算优化、磁盘IO优化三个方面沉积了一系列功能优化技能。咱们将以连载的办法为咱们揭秘这些技能。本文作为该系列的开篇之作,将要点为咱们介绍内存优化。

1. 事务布景

图灵渠道首要包括机器学习渠道、特征渠道、图灵在线服务(Online Serving)、AB试验渠道四大功能,详细可参阅《一站式机器学习渠道建造实践》以及《算法渠道在线服务体系的演进与实践》这两篇博客。其间,图灵机器学习渠道的离线练习引擎是依据Spark完结的。

随着图灵的用户增长,越来越多算法模型在图灵渠道上完结迭代,优化离线练习引擎的功能和吞吐对于节省离线核算资源显得愈发重要。通过半年持续的迭代,咱们积累了一系列共同的优化办法,使图灵机器学习渠道的离线资源耗费下降80%,生产任务均匀耗时下降63%(如下图所示),图灵全渠道的练习任务在功能层面都得到了较为明显的提升。

资源耗费下降:

图1 资源耗费

当时渠道功能:

下图是某位图灵用户的试验。运用100万数据练习深度模型,总计约29亿的数据调用深度模型,核算评价目标并保存到Hive,整个试验只需求35分钟。其间Spark敞开DynamicAllocation,maxExecutor=400 ,单个Executor为7Core16GB。

图2 试验运转图

2. 图灵练习引擎优化

那么,图灵练习引擎的功能优化是怎么做到的呢?咱们的优化分为内存优化、核算优化、磁盘IO优化三个层面。

内存优化包括列裁切、自适应Cache、算子优化。咱们学习Spark SQL原理规划了列裁切,可以主动剔除各组件中用户实践没有运用的字段,以降低内存占用。何时对Dataset Persist和Unpersist一直是Spark代码中的取舍问题,针对用户不熟悉Persist和Unpersist机遇这个问题,咱们将多年的开发经验沉积在图灵中,结合列裁切技能完结自适应Cache。在核算优化方面,咱们完结了图优化、Spark源码优化、XGB源码优化。在磁盘IO优化方面,咱们创新性的完结了主动化小文件保存优化,可以运用一个Action完结多级分区表小文件的兼并保存。

此外,咱们完结的TFRecord表明优化技能,成功将Spark生成的TFRecord体积减少50%。因图灵渠道运用的优化技巧较多,咱们将分红多篇文章为咱们逐个介绍这些优化技能。

图3 图灵练习引擎优化

而在众多优化中,收益最高、适用性最广的技能的便是算子优化,这项技能极大提升了图灵练习引擎的吞吐量。本篇文章首先将为咱们介绍内存优化中的算子优化技能。

3. Spark算子解读

相同的事务需求,不同的算子完结会有不一样的特性。咱们将多年的Spark开发技巧总结在下表中:

表1 Spark算子开发技巧总结

  1. 多行输入多行输出:多行数据一同进入内存处理。输出多行数据。
  2. 多列输出:特定场景下,咱们期望输出多个字段。

    1. SQL场景下只能输出Struct,再从Struct中SELECT各字段。
    2. map/flatMap/mapPartitions可以轻松输出任意个字段。
  3. 中间成果复用

    1. SQL场景下:SQL场景下只能先SELECT一次得到中间变量,再SELECT中间变量完结后续处理。
    2. map/flatMap/mapPartitions可将核算逻辑封装在函数内。
  4. 重量级目标复用

    1. Executor等级,例如可以通过广播变量完结,或许通过静态类成员变量的“懒汉”模式完结。
    2. Partition等级,mapPartitions时,先创立目标,后迭代数据,这个目标可在Partition内复用。

通过比照咱们发现,mapPartitions是各类算子中最为灵敏——可以灵敏完结输入M条输出N条数据,可以输出任意数量的字段,还可以完结重量级目标在Partition或Executor等级上的复用。mapPartitions因其强大的功能和灵敏可定制性,在图灵练习引擎的开发中有着举足轻重的位置(例如按Batch调用深度模型、上下采样、Partition核算等组件,都是依据该算子完结)。可是mapPartitions也有一个不足之处。

4. mapPartitions之殇

信任大部分读者都曾经写过这样的代码,创立一个重量级目标在Partition内完结复用,而不是像map算子那样每处理一行数据创立一个目标。

mapPartitions模板,重量级目标复用

dataset.mapPartitions((MapPartitionsFunction<Row, Row>) iterator -> {
  HeavyObject obj = new HeavyObject();
  List<Row> list = new ArrayList<>();
  // 遍历处理数据
  while (iterator.hasNext()) {
    Row row = iterator.next();
    // 凑集batch或逐条处理
    // ....
    obj.process(row)
    // batch add或逐条add
    list.add(...);
  }
  // 回来list的迭代器
  return list.iterator();
}, RowEncoder.apply(schema));

熟悉mapPartitions的同学都知道,这段代码完结了重量级目标的复用,相比map算子如同现已减少了很多GC,但这样仍旧十分容易溢出。那么:

  1. 为什么mapPartitions算子容易溢出呢?
  2. 当多个mapPartitions算子串联的时候又是怎么GC的呢?

5. Spark Pipeline中的mapPartitions

在进行下一部分讲解之前,咱们先扼要介绍一下Spark的懒履行机制。Spark的算子分为Action和Transformation两大类。RDD的依靠联系构成了数据处理的有向无环图DAG。只有当Action算子出现时,才会履行Action算子与前面一系列Transformation算子构成的DAG。Spark还会依据Shuffle将DAG划分红多个Stage进行核算,Shuffle进程需求跨节点交流数据,会产生很多的磁盘IO和网络IO。而每个Stage内的核算则构成了Pipeline,在内存中进行。

![图4 多列词典映射试验图](p0.meituan.net/travelcube/… =80%x)

咱们以上图为例,该同学试验中的多列词典映射组件,对很多的特征做了词典映射核算。多列词典映射组件包括两个部分,核算词典和运用词典。

::: block-1

核算词典:通过去重和collect生成了各个特征的词典,每个特征词典的核算都伴随着1次Shuffle和1次Action。
运用词典:将特征依据词典映射成唯一ID,不存在Shuffle。
:::

与Spark StringIndexer的Pipeline优化类似,当进行多个特征的词典映射核算时,图灵机器学习渠道会将核算词典的Action独自履行,而多个运用词典则一同履行。

词典生成后,所有运用词典的核算逻辑(mapPartitions Transformation)不存在Shuffle,因而被划分到同一个Stage中,所有mapPartitions算子将串联成一条十分长的Pipeline。终究由后面的Action算子触发提交Job,履行该Pipeline。Stage的划分可参阅下图:

图5 运用多个词典Stage

运用词典的完结中,每个mapPartitionsFunction中都新建了一个ArrayList充任Buffer来存储核算后的数据,终究回来ArrayList.iterator()。履行时,每次运用词典都会将整个Partition的数据拉入ArrayList傍边。上述词典映射串联构成Pipeline的时候,内存中会有多少数据呢?

带着这个疑问,让咱们走进Spark的源代码,看看mapPartitionsFunction是怎么构成Spark Pipeline的。

Spark的一个Stage中会划分为多个Task,除了union和coalesce的场景,1个Partition对应1个Task。Task的履行通过笼统办法runTask()完结,以完结类ResultTask为例,最终runTask()办法调用了rdd.iterator()。

ResultTask.scala

override def runTask(context: TaskContext): U = {
  ...... // 源码缩略不进行展现:初始化一些需求的目标
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
  ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L
  // 这里的func()调用了rdd.iterator()
  func(context, rdd.iterator(partition, context))
}

而RDD的iterator办法的源码如下,其调用逻辑终究都会进入computeOrReadCheckpoint办法,若没有CheckPoint则进入compute办法履行核算。以MapPartitionsRDD类为例,获取父RDD的Iterator并传入自己的核算逻辑函数f中。

RDD.scala

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context) // 内部仍然调用下面的computeOrReadCheckpoint(partition, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}
// StorageLevel不为NONE时调用的办法
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
  ...... // 初始化相关变量
  SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
    readCachedBlock = false
    // 内部仍然调用iterator()中的computeOrReadCheckpoint办法
    computeOrReadCheckpoint(partition, context)
  }) match {
    ...... // 源码缩略不进行展现:按case包装为对应iterator回来
  }
}
// 默许调用该办法
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
  if (isCheckpointedAndMaterialized) {
    // 有checkpoint或materialized则回来依靠联系中第一个父RDD的iterator
    firstParent[T].iterator(split, context)
  } else {
    // 调用当时RDD的compute办法核算,内部的核算逻辑包括了用户编写的代码
    compute(split, context)
  }
}

MapPartitionsRDD.scala

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  // 用户编写的代码逻辑被封装为函数‘f’,在此承受参数后履行
  f(context, split.index, firstParent[T].iterator(split, context))

为了更清晰的解释这个问题,以下述代码为例。

Example

val rddA = initRDD(); // 获取一个RDD
//funcA、funcB、funcC均为用户的代码逻辑
val rddB = rddA.mapPartitions(funcA)
val rddC = rddB.mapPartitions(funcB)
val rddD = rddC.mapPartitions(funcC)
rddD.count()

在遇到count算子时会进行RDD回溯,终究的形成核算链路为fCount(funcC(funcB(funcA(rddA.iterator=>iterator)))),由此构成了Pipeline,以多个mapPartitions + ArrayList.iterator()串联的代码展开则如下所示:

Example

iteratorA => // iteratorA:初始RDD对应Partition的输出迭代器
  var list = List[Row]()
  while (iteratorA.hasNext) {
    list = process(iteratorA.next()) +: list // funcA:每条拉至内存处理后参加resultList
  }
  val iteratorB = list.iterator
iteratorB => // iteratorB:rddA对应Partition的输出迭代器
  var list = List[Row]()
  while (iteratorB.hasNext) {
    list = process(iteratorB.next()) +: list // funcB:每条数据拉至内存处理后参加resultList
  }
  val iteratorC = list.iterator
iteratorC =>  // iteratorC:rddB对应Partition的输出迭代器
  var list = List[Row]()
  while (iteratorC.hasNext) {
    list = process(iteratorC.next()) +: list // funcC:每条数据拉至内存处理后参加resultList
  }
  val iteratorD = list.iterator
iteratorD => count()

回看mapPartitions模板,作为Buffer的ArrayList是每个mapPartitionsFunction的局部变量,ArrayList.iterator()引证了这个Buffer,结合上面的源码咱们知道,子RDD会引证父RDD的Iterator。结合该同学的试验分析,每个RDD中的核算都形成了一个Array Buffer,在RDD的function调用链路中Array Buffer2依靠Array Buffer1.iterator(),Array Buffer3依靠Array Buffer2.iterator()。

以此类推,在核算RDD-3时,RDD-1的func1现已出栈,且RDD-3不依靠Array Buffer1.iterator(),因而局部变量Array Buffer1可以被GC。由此可见在Stage-运用多个词典的核算进程中,内存占用的峰值达到了两个Array Buffer,也便是两倍partitionSize。

图6 运用多个词典内存占用

为了完全证明这个主意,又进行了实践的测验验证:初始化1个单Partition的RDD,并且该Partition的数据量为300万,占用内存大约为180M。接着将这些数据利用多个mapPartitions + ArrayList.iterator()串联,每输入1个目标,生成1个新目标放入Buffer中,最终用rdd.count()触发Action,整个履行流程中只包括一个Stage。运转的JVM堆内存设置为512M,以此来调查堆内存中的实例目标及其GC活动是否符合只有两个Buffer的预期。

调查成果如下,每一行数据以一个GenericRowWithSchema实例存在并参加ArrayList中,其核算进程中最大的峰值正好为600万即两倍的分区数据量。GC以周期性的活动去销毁上上个mapPartitions中的无用Buffer,并且堆内存保持在了最大约两倍的数据占用量(约360M),因而验证了推断。以下是测验中的GenericRowWithSchema目标实例计数图、内存实时占用以及GC活动核算图。

图7 目标核算

图8 内存核算

通过测验验证,mapPartitions + ArrayList.iterator()导致了两倍partitionSize的内存占用。

运用mapPartitions + ArrayList.iterator()仅仅只是造成OOM或GC压力大吗?偏偏不巧,在Spark的内存办理中还有一番六合,会牵扯到更多的功能问题。

Spark内存办理机制

Spark从2.0开始运用的是统一内存办理机制,首要分为四大区域,System Reserved、User Memory、Storage Memory和Execution Memory。System Reserved是为系统预留运用的内存,User Memory是用户界说的数据结构和Spark的元数据。存储内存Storage Memory与履行内存Execution Memory在运转期间会共享一块内存区域,默许有由spark.storage.storageFraction参数控制。Spark运用动态占用机制来办理这两块内存。

图9 Spark内存逻辑模型

Storage和Execution的动态占用机制

  1. 当Storage或Execution的内存不足、而对方的内存空余时,可以占用对方的内存空间。
  2. Storage占用Execution时,假如Execution需求更多内存,则会将Storage占用的内存筛选(依据RDD的StorageLevel决定是溢写到磁盘仍是直接删去),偿还借用的内存空间。
  3. Execution占用Storage时,假如Storage需求更多内存,则直接发生筛选(Execution的逻辑复杂,偿还内存的难度十分高)。
  4. 从Storage中筛选掉的RDD Cache会在RDD重新运用时再次Cache。

在涉及到mapPartitions + ArrayList.iterator()的履行进程中,因为很多的内存占用,导致Execution Memory不足,借用Storage Memory,并且借用后仍存在内存不足状况时,Storage Memory中的已缓存的Block会进行筛选机制,依据其存储等级进行落盘或直接删去,这会导致缓存数据屡次的IO操作与重复核算,极大的降低了数据处理的功率。

图10 筛选机制

让咱们小结一下mapPartitions + ArrayList.iterator()的完结办法:

  1. Spark通过mapPartitionsFunction嵌套完结Pipeline,例如fCount(funcC(funcB(funcA))),func中的Buffer是办法中的局部变量。
  2. 在mapPartitionsFunction中运用不约束长度的Buffer,会导致partitionSize两倍的数据拉入内存。
  3. 或许触发Spark内存办理的筛选机制,导致缓存数据屡次的IO操作与重复核算。

6. 最佳实践

以多输入多输出为例,假定咱们需求处理一批单个分区数据量达到千万等级的数据集,以单个分区中每5行数据为一批次,每批次随机输出2行数据,那么在mapPartitions基础上,可以这样写:

BatchIteratorDemo:mapPartitions处理多输入->多输出——以单分区每5行数据为一批次,每批次随机输出2行数据的Demo

Dataset<Row> dataset = initDataset();// 初始化数据集
// mapPartitions中调用BatchIterator完结核算逻辑
Dataset<Row> result = dataset.mapPartitions((MapPartitionsFunction<Row, Row>) inputIterator -> new Iterator<Row>() {
  // 一批处理的数据行数
  private static final int INPUT_BATCH_PROCESS_SIZE = 5;
  // 当时批次处理的数据集
  private final List<Row> batchRows = new ArrayList<>(INPUT_BATCH_PROCESS_SIZE);
  // 当时批次输出iterator
  private Iterator<Row> batchResult = Collections.emptyIterator();
  @Override
  public boolean hasNext() {
    // 本轮成果已悉数消费,进入下一批次batch
    if (!batchResult.hasNext()) {
      batchRows.clear();
      int count = 0;
      // 按一个 batch 5条数据参加集合
      while (count++ < INPUT_BATCH_PROCESS_SIZE && inputIterator.hasNext()) {
        batchRows.add(inputIterator.next());
      }
      // 上游数据悉数消费
      if (batchRows.size() == 0) {
        return false;
      }
      // 随机获取2条数据
      batchResult = processBatch(batchRows);// 随机抽取2条数据创立新目标回来
    }
    return true;
  }
  @Override
    public Row next() {
      return batchResult.next();// 消费当时批次的成果
    }
}, RowEncoder.apply(dataset.schema()));

当该办法运用到fCount(funcC(funcB(funcA(rddA.iterator=>iterator))))构成的Pipeline时,以多个mapPartitions + ArrayList.iterator()串联的代码展开则如下所示:

Example

iteratorA => iteratorB =  // iteratorA:初始RDD对应Partition的输出迭代器
  new Iterator[Row] {
    override def hasNext: Boolean = {
      processBatch(iteratorA) // 只处理一个batch的数据
    }
    override def next(): Row = nextInBatch() // 获取当时batch的下个输出
  }
iteratorB => iteratorC =  // iteratorB:rddA对应Partition的成果迭代器
  new Iterator[Row] {
    override def hasNext: Boolean = {
      processBatch(iteratorB) // 只处理一个batch的数据
    }
    override def next(): Row = nextInBatch() // 获取当时batch的下个输出
  }
iteratorC => iteratorD =  // iteratorC:rddB对应Partition的成果迭代器
  new Iterator[Row] {
    override def hasNext: Boolean = {
      processBatch(iteratorC) // 只处理一个batch的数据
    }
    override def next(): Row = nextInBatch() // 获取当时batch的下个输出
  }
iteratorD => count()

咱们可以看到,多输入多输出Demo以inputBatch=5、outputBatch=2作为消费单位,内存占用只有Batch=7(inputBatch + outputBatch),每次处理完一个批次,直到当时批次产生的2条数据悉数被下一个RDD Iterator消费完之后,才会继续测验从上一个RDD Iterator读取下一个批次进入内存核算,不需求为了回来分区Iterator而直接消费整个分区数据。将随机抽取数据的逻辑串联处理,其Stage将如下图所示,每个Buffer仅为一个Batch,内存耗费简直可以忽略不计。

图11 Demo Stage

终究的数据处理作用比照如下图:

图12 数据处理作用比照

7. 总结

本文作为《图灵机器学习渠道功能起飞的秘密》系列的第一篇,首要讲述了内存优化中的算子优化技巧,深入分析了mapPartitions算子的原理,并供给了mapPartitions算子的最佳实践。图灵机器学习渠道依据此计划进一步开发了BufferIterator框架,可以灵敏应对输入M条数据输出N条数据的场景,极大提升了图灵的吞吐量。后续咱们将继续为咱们介绍更多的优化技巧,敬请期待。

8. 作者简介

  • 琦帆、立煌、兆军等,均来自美团到家事业群/履约渠道技能部。

阅览美团技能团队更多技能文章合集

前端 | 算法 | 后端 | 数据 | 安全 | 运维 | iOS | Android | 测验

| 在公众号菜单栏对话框回复【2021年货】、【2020年货】、【2019年货】、【2018年货】、【2017年货】等关键词,可检查美团技能团队历年技能文章合集。

| 本文系美团技能团队出品,著作权归属美团。欢迎出于共享和交流等非商业意图转载或运用本文内容,敬请注明“内容转载自美团技能团队”。本文未经许可,不得进行商业性转载或许运用。任何商用行为,请发送邮件至tech@meituan.com请求授权。