撰文|郑建华、赵露阳

1

Op在虚拟机里的履行

1.1 PhysicalRun和InstructionsBuilder

上一篇文章《OneFlow源码解析:Op、Kernel与解说器》 中说到:

PhysicalRun承受一个lambda函数作为参数,这儿即InstructionsBuilder->Call办法,该办法承受kernel、input/output的eager blob object、kernel履行的上下文作为参数。Call办法实践会完结OpCall指令的构建,并终究将其派发至vm指令列表中,等候VM实践调度履行。

这个PhysicalRun函数里包裹着一个lambda函数:

JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> {
    return builder->Call(xxx);
}));

其间,lambda函数承受一个InstructionsBuilder指针(builder),并调用builder->Call办法,用于实践完结Op指令在VM中的构建。而PhysicalRun(github.com/Oneflow-Inc… )在oneflow/core/framework/instructions_builder.h中界说,其承受lambda函数作为模版参数(CallbackT):

// Make VM instructions with instruction builder and run instructions with physical/local view.
template<typename CallbackT>
Maybe<void> PhysicalRun(const CallbackT& Build) {
  vm::InstructionList instruction_list;
  InstructionsBuilder instructions_builder(&instruction_list);
  JUST(Build(&instructions_builder));
  JUST(vm::Run(instructions_builder.mut_instruction_list()));
  return Maybe<void>::Ok();
}

可见,PhysicalRun函数中,首要初始化一个InstructionsBuilder,然后将InstructionsBuilder指针作为参数传给lambda函数,完结实践指令的构建;最后经过vm::Run()办法将该指令发送至VM,等候VM实践调度和履行。Run办法如下:

Maybe<void> Run(vm::InstructionList* instruction_list) {
  auto* virtual_machine = JUST(SingletonMaybe<VirtualMachine>());
  JUST(virtual_machine->Receive(instruction_list));
  return Maybe<void>::Ok();
}

能够看见,Run()办法获取了全局单例的VM目标指针,然后经过vm的Receive()办法,将该条指令发送给虚拟机(所以这儿Run其实有点歧义,更恰当的意思,其实是指令发送或传送)。

这个VirtualMachine->Receive办法很重要,会在后边的第2.章节中详细打开。

1.2 InstructionsBuilder

上面PhysicalRun函数中的InstructionsBuilder,类似一个指令构建的helper,InstructionsBuilder的系列办法合作指令战略(InstructionPolicy),能够协助构建不同类型的vm指令。

从InstructionsBuilder(github.com/Oneflow-Inc… )的界说中,咱们能够看到指令的构建办法,其间常用办法如下:

// 用于lazy mode(nn.Graph)
// Build VM execution instructions with NNGraph's inputs/outputs/parameters for NNGraph execution.
Maybe<void> LaunchLazyJob(const vm::EagerBlobObjectListPtr& inputs,
                          const vm::EagerBlobObjectListPtr& outputs,
                          const vm::EagerBlobObjectListPtr& parameters,
                          const std::shared_ptr<NNGraphIf>& nn_graph);
// 用于全局同步,同步等候一切指令调用完结
Maybe<void> GlobalSync();
// 用于Tensor内存开释(归还allocator)
Maybe<void> ReleaseTensor(const std::shared_ptr<vm::EagerBlobObject>& eager_blob_object);
// 操作Tensor实践内存(blob)
template<typename T>
Maybe<void> AccessBlobByCallback(
    const T tensor,
    const std::function<void(ep::Stream*, const std::shared_ptr<vm::EagerBlobObject>&)>& callback,
    const std::string& modifier);
// 最常用的指令构建办法,用于结构op履行所需的OpCall指令
Maybe<void> Call(const std::shared_ptr<one::StatefulOpKernel>& opkernel,
                   vm::EagerBlobObjectList&& input_eager_blob_objects,
                   vm::EagerBlobObjectList&& output_eager_blob_objects,
                   const one::OpExprInterpContext& ctx, Symbol<Stream> stream);

1.3 InstructionPolicy

InstructionPolicy(github.com/Oneflow-Inc… )——指令战略,一般用于合作InstructionsBuilder实践构建出不同的vm指令。InstructionPolicy的子类完成如下:

OneFlow源码解析:算子指令在虚拟机中的执行

这些子类的InstructionPolicy可近似以为是指令类型。如,用于Op履行的OpCallInstructionPolicy、用于Tensor内存开释的 ReleaseTensorInstructionPolicy、用于屏障堵塞的BarrierInstructionPolicy等。

以Op履行为例:

JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> {
    return builder->Call(xxx);
}));

实践上是经过InstructionsBuilder的Call办法(github.com/Oneflow-Inc… ),合作OpCall的指令战略(OpCallInstructionPolicy),结构了OpCall指令:

Maybe<void> InstructionsBuilder::Call(
    const std::shared_ptr<one::StatefulOpKernel>& opkernel,
    vm::EagerBlobObjectList&& input_eager_blob_objects,
    vm::EagerBlobObjectList&& output_eager_blob_objects,
    const std::shared_ptr<const one::GlobalTensorInferResult>& global_tensor_infer_result,
    const one::OpExprInterpContext& ctx, Symbol<Stream> stream) {
  ...
  ...
  // 获取当前vm stream
  auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));
  // 经过OpCallInstructionPolicy初始化OpCall指令
  auto instruction = intrusive::make_shared<vm::Instruction>(
      vm_stream, std::make_shared<vm::OpCallInstructionPolicy>(
                     vm_stream, opkernel, std::move(input_eager_blob_objects),
                     std::move(output_eager_blob_objects), global_tensor_infer_result, ctx,
                     *one::CurrentDevVmDepObjectConsumeMode()));
  // 指令入列表
  instruction_list_->EmplaceBack(std::move(instruction));
  return Maybe<void>::Ok();
}

并将构建好的指令塞入指令列表,待后续VM调度并实践履行。

2

虚拟机的运转原理

2.1 VM初始化

OneFlow环境初始化时,会触发VirtualMachineScope(github.com/Oneflow-Inc… )的初始化:

VirtualMachineScope::VirtualMachineScope(const Resource& resource) {
  Singleton<VirtualMachine>::New();
}

进而触发VM目标——VirtualMachine(github.com/Oneflow-Inc… )的初始化。VM作为一个Singleton目标,全局唯一。

VirtualMachine::VirtualMachine() : disable_vm_threads_(false), scheduler_stopped_(false) {
  // Class VirtualMachineEngine only cares the basic logical of vm, while class VirtualMachine
  // manages threads and condition variables.
  // In order to notify threads in VirtualMachineEngine, a notify callback lambda should be take as
  // an argument for VirtualMachineEngine's constructor.
  engine_ = intrusive::make_shared<vm::VirtualMachineEngine>();
  OF_PROFILER_NAME_THIS_HOST_THREAD("_Main");
  std::function<void()> SchedulerInitializer;
  GetSchedulerThreadInitializer(&SchedulerInitializer);
  schedule_thread_ = std::thread(&VirtualMachine::ScheduleLoop, this, SchedulerInitializer);
  transport_local_dep_object_.Reset();
}

VM初始化中最重要的内容,便是:

1.初始化了一个VM的履行引擎——VirtualMachineEngine

2.经过VirtualMachine::ScheduleLoop启动了VM的调度线程

VirtualMachine::ScheduleLoop

VM目标只担任条件变量和线程管理;而首要事务逻辑处理(包含指令构建、调度、派发和履行等),则由VirtualMachineEngine(github.com/Oneflow-Inc… )目标担任;VM初始化时还开辟了单独的schedule线程用于VM引擎处理调度逻辑,在VirtualMachine::ScheduleLoop(github.com/Oneflow-Inc… )中:

void VirtualMachine::ScheduleLoop(const std::function<void()>& Initializer) {
  SyncVmModeGuard guard(SyncVmMode::kEnable);
  Initializer();
  MultiThreadScheduleCtx schedule_ctx{};
  while (pending_notifier_.WaitAndClearNotifiedCnt() == kNotifierStatusSuccess) {
    OF_PROFILER_RANGE_GUARD("VirtualMachine::ScheduleLoop");
    auto start = std::chrono::steady_clock::now();
    static constexpr int kWorkingMicroseconds = 1000;
    // Every time this thread wakes up, engine_ is scheduled for about `kWorkingMicroseconds`.
    // The cost of os thread switching is about 5-10 microseconds. Doing more scheduling in
    // a single waiting up can reach higher performance.
    do {
      do {
        const size_t total_inserted = engine_->total_inserted_instruction_cnt();
        const size_t total_erased = engine_->total_erased_instruction_cnt();
        engine_->Schedule(schedule_ctx);
        if (ThreadLocalEnvBool<ONEFLOW_VM_ENABLE_SCHEDULE_YIELD>()
            && total_inserted == engine_->total_inserted_instruction_cnt()
            && total_erased == engine_->total_erased_instruction_cnt()) {  // nothing handled.
          std::this_thread::yield();
        }
      } while (!engine_->SchedulerThreadUnsafeEmpty());
    } while (MicrosecondsFrom(start) < kWorkingMicroseconds);
  }
  ScheduleUntilVMEmpty(engine_.Mutable(), schedule_ctx);
  CHECK_JUST(ForEachThreadCtx(engine_.Mutable(), [&](vm::ThreadCtx* thread_ctx) -> Maybe<void> {
    thread_ctx->mut_notifier()->Close();
    return Maybe<void>::Ok();
  }));
  {
    std::unique_lock<std::mutex> lock(worker_threads_mutex_);
    for (const auto& worker_thread : worker_threads_) { worker_thread->join(); }
  }
  scheduler_stopped_ = true;
}

ScheduleLoop是一个近似于busy loop的while循环,pending_notifier_是VM内部保护的成员,实践上是ScheduleLoop线程的告诉/唤醒者,其界说坐落
oneflow/oneflow/core/common/notifier.h

class Notifier final {
 public:
  OF_DISALLOW_COPY_AND_MOVE(Notifier);
  Notifier() : notified_cnt_(0), is_closed_(false) {}
  ~Notifier() = default;
  NotifierStatus Notify();
  NotifierStatus WaitAndClearNotifiedCnt();
  void Close();
 private:
  size_t notified_cnt_;
  std::mutex mutex_;
  bool is_closed_;
  std::condition_variable cond_;
};

其首要保护了互斥锁mutex_、线程是否封闭的flag is_closed_、条件变量cond_。忽略线程唤醒、超时相关逻辑,ScheduleLoop中最重要的工作是engine_->Schedule(schedule_ctx);

while (pending_notifier_.WaitAndClearNotifiedCnt() == kNotifierStatusSuccess) {
    auto start = std::chrono::steady_clock::now();
    ...
    do {
      do {
        ...
        engine_->Schedule(schedule_ctx);
        ...
      } while (!engine_->SchedulerThreadUnsafeEmpty());
    } while (MicrosecondsFrom(start) < kWorkingMicroseconds);
  }

当VM保护的指令队列不为空时,便不断唤醒VM引擎履行指令调度逻辑——engine->Schedule()

2.2 VM指令调度

void VirtualMachineEngine::Schedule(const ScheduleCtx& schedule_ctx) {
  // Release finished instructions and try to schedule out instructions in DAG onto ready list.
  if (unlikely(mut_active_stream_list()->size())) { ReleaseFinishedInstructions(schedule_ctx); }
  // Try run the first barrier instruction.
  if (unlikely(mut_barrier_instruction_list()->size())) { TryRunBarrierInstruction(schedule_ctx); }
  // Handle pending instructions, and try schedule them to ready list.
  // Use thread_unsafe_size to avoid acquiring mutex lock.
  // The inconsistency between pending_instruction_list.list_head_.list_head_.container_ and
  // pending_instruction_list.list_head_.list_head_.size_ is not a fatal error because
  // VirtualMachineEngine::Schedule is always in a buzy loop. All instructions will get handled
  // eventually.
  //  VirtualMachineEngine::Receive may be less effiencient if the thread safe version
  //  `pending_instruction_list().size()` used here, because VirtualMachineEngine::Schedule is more
  //  likely to get the mutex lock.
  if (unlikely(local_pending_instruction_list().size())) {
    HandleLocalPending();
  } else if (unlikely(pending_instruction_list().thread_unsafe_size())) {
    // MoveTo is under a lock.
    mut_pending_instruction_list()->MoveTo(mut_local_pending_instruction_list());
    if (local_pending_instruction_list().size()) { HandleLocalPending(); }
  }
  // dispatch ready instructions and try to schedule out instructions in DAG onto ready list.
  if (unlikely(mut_ready_instruction_list()->size())) {
    DispatchAndPrescheduleInstructions(schedule_ctx);
  }
  // handle scheduler probes
  if (unlikely(local_probe_list_.size())) {
    HandleLocalProbe();
  } else if (unlikely(probe_list_.thread_unsafe_size())) {
    probe_list_.MoveTo(&local_probe_list_);
    if (local_probe_list_.size()) { HandleLocalProbe(); }
  }
}

VM引擎保护了一系列指令列表的成员:

InstructionMutexedList pending_instruction_list_;
// local_pending_instruction_list_ should be consider as the cache of pending_instruction_list_.
InstructionList local_pending_instruction_list_;
ReadyInstructionList ready_instruction_list_;
LivelyInstructionList lively_instruction_list_;
BarrierInstructionList barrier_instruction_list_;
  • pending相关的instruction_list是悬挂/待处理的指令列表;
  • lively相关的instruction_list是活泼的正在履行中的指令列表;
  • ready相关的instruction_list则是已完结准备工作(指令融合、指令DAG构建等)待履行的指令列表;

VM引擎Schedule时,会对指令队列做相应处理,包含:

  • 将已完结准备工作的指令放入ready_instruction_list_中保护;
  • 测验运转barrier指令列表(barrier_instruction_list_)中的第一条指令;
  • 如果本地pending指令列表(local_pending_instruction_list_)非空,则经过HandleLocalPending办法处理这些悬挂指令(指令融合、指令履行DAG图构建、插入ready列表)
  • 如果ready指令列表非空,则经过DispatchAndPrescheduleInstructions办法进行指令派发和预调度处理。

这儿重点介绍指令派发相关的 DispatchAndPrescheduleInstructions办法,其间 DispatchAndPrescheduleInstructions中最首要的是便是DispatchInstruction指令派发办法,这儿的指令派发能够以为实践上便是指令履行

2.3 VM指令派发

VirtualMachineEngine::DispatchInstruction(github.com/Oneflow-Inc… )办法是vm引擎中的中心,其实践完结了指令的派发和实践履行,代码如下:

template<void (VirtualMachineEngine::*OOMHandler)(vm::Stream*, const ScheduleCtx&)>
void VirtualMachineEngine::DispatchInstruction(Instruction* instruction,
                                               const ScheduleCtx& schedule_ctx) {
  auto* stream = instruction->mut_stream();
  // Prepare
  {
    // 指令的Prepare
    const auto& ret = TRY(instruction->Prepare());
    if (unlikely(!ret.IsOk())) {
      // 处理指令Prepare进程中的OOM的逻辑
      if (ret.error()->has_out_of_memory_error()) {
        // 让allocator开释不必要的cacahe,再从头履行指令的Prepare
        (this->*OOMHandler)(stream, schedule_ctx);
        ...
      }
    }
  }
  // 将当前指令放入running_instruction_list
  stream->mut_running_instruction_list()->PushBack(instruction);
  if (stream->active_stream_hook().empty()) { mut_active_stream_list()->PushBack(stream); }
  // Compute
  if (OnSchedulerThread(*stream)) {
    // StreamPolicy的Run办法触发指令的实践履行——Compute
    stream->stream_policy().Run(instruction);
  } else {
    stream->mut_thread_ctx()->mut_worker_pending_instruction_list()->PushBack(instruction);
    schedule_ctx.OnWorkerLoadPending(stream->mut_thread_ctx());
  }
}

DispatchInstruction的中心首要有2块:

  • 履行指令的Prepare
  • 履行指令的Compute

Prepare担任一些指令履行前的准备;Compute则是实践的指令履行,指令履行并不是直接经过instruction->Run而是在StreamPolicy的Run办法中完结的,这儿又触及到一个StreamPolicy目标。

StreamPolicy::Run

StreamPolicy(github.com/Oneflow-Inc… )是个虚基类:

class StreamPolicy {
 public:
  virtual ~StreamPolicy() = default;
  virtual ep::Stream* stream() = 0;
  virtual vm::Allocator* mut_allocator() = 0;
  virtual DeviceType device_type() const = 0;
  virtual void InitInstructionStatus(const Stream& stream,
                                     InstructionStatusBuffer* status_buffer) const = 0;
  virtual void DeleteInstructionStatus(const Stream& stream,
                                       InstructionStatusBuffer* status_buffer) const = 0;
  virtual bool QueryInstructionStatusDone(const Stream& stream,
                                          const InstructionStatusBuffer& status_buffer) const = 0;
  virtual void Run(Instruction* instruction) const = 0;
  virtual bool OnSchedulerThread(StreamType stream_type) const;
  virtual bool SupportingTransportInstructions() const = 0;
 protected:
  StreamPolicy() = default;
};
  • stream()办法回来ep::Stream指针,指向的是针对不同渠道的ep::stream目标。
  • mut_allocator()办法回来一个vm的Allocator指针,用于内存分配/开释。 InitInstructionStatus/QueryInstructionStatusDone/DeleteInstructionStatus用于创立/查询/销毁指令履行状态
  • Run办法则是中心,界说了该Stream具体运转时的逻辑。

这儿的ep在oneflow中是execution provider的缩写,ep从本质上来讲便是一个针对不同硬件渠道的executor笼统。

StreamPolicy相关的承继和子类如下:

OneFlow源码解析:算子指令在虚拟机中的执行

看一下EpStreamPolicyBase的Run办法(github.com/Oneflow-Inc… ):

void EpStreamPolicyBase::Run(Instruction* instruction) const {
  ...
  auto* stream = instruction->mut_stream();
  EpStreamPolicyBase* ep_stream_policy_base =
      dynamic_cast<EpStreamPolicyBase*>(stream->mut_stream_policy());
  ...
  auto* ep_device = ep_stream_policy_base->GetOrCreateEpDevice();
  ep_device->SetAsActiveDevice();
  instruction->Compute();
  ...
}

首要获取了该stream对应的ep device,然后履行了instruction的Compute办法,即指令的实践履行

2.4 VM履行履行

以OpCall指令为例,看一下op指令的Compute(github.com/Oneflow-Inc… ):

void OpCallInstructionPolicy::Compute(vm::Instruction* instruction) {
  OpCallInstructionUtil::Compute(this, instruction);
}

OpCallInstructionPolicy办法调用了OpCallInstructionUtil的Compute办法:

OneFlow源码解析:算子指令在虚拟机中的执行

上面咱们能够看到,在指令Prepare时,做了output tensor内存分配;而指令Compute中最重要的办法是:

  • TryInitOpKernelStateAndCache——初始化一些kernel核算需要的状态或缓存
  • OpKernelCompute——履行该op对应的kernel,kernel内首要是实践的op核算逻辑

OneFlow源码解析:算子指令在虚拟机中的执行

user kernel一致坐落:oneflow/user/kernels目录下,.cpp一般对应cpu kernel逻辑;.cu为cuda kernel逻辑。到这儿,就会触发user_kernel的Compute办法,不同op的kernel核算逻辑不同,以rele op为例,实践Compute进程可参考文章《算子在深度学习结构中的履行及interpreter》的第5末节。

2.5 VM指令发送

这儿的VM指令发送,指的是VM外部的指令发送进程(不是VM内部的指令派发)。上面2.1~2.3末节介绍了VM以及VM引擎的初始化、VM内部指令的调度、派发和实践履行的进程,那么这些指令是如何发送到VM的呢?答案是:在1.1末节中说到的PhysicalRun

PhysicalRun终究会触发VirtualMachine->Receive办法,并经过VirtualMachineEngine的Receive办法完结外部指令 -> VM内部的发送。

VirtualMachineEngine的Receive办法(github.com/Oneflow-Inc… )首要将该指令经过MoveFrom办法push back到指令悬挂列表(pending_instruction_list_)的结尾,从而完结指令的发送。

// Returns true if old scheduler_pending_instruction_list is empty
Maybe<bool> VirtualMachineEngine::Receive(InstructionList* compute_instruction_list) {
  OF_PROFILER_RANGE_GUARD("vm:Receive");
#ifdef OF_ENABLE_PROFILER
  INTRUSIVE_UNSAFE_FOR_EACH_PTR(compute_instruction, compute_instruction_list) {
    OF_PROFILER_RANGE_GUARD(compute_instruction->DebugName());
  }
#endif
  bool old_list_empty = mut_pending_instruction_list()->MoveFrom(compute_instruction_list);
  return old_list_empty;
}

3

小结

至此,Op履行相关的流程算是大体串了一遍。一句flow.relu()后边会触及这么多内容。但这儿其实也只重视了主干逻辑,忽略了中间大量的细节。

流程的梳理仅仅第一步,还需要从中归纳总结一些概念和概念之间的联系,再结合公开资料反推印证规划理念的落地完成。

不过目前对代码和规划的了解还很浅薄,下面的内容纯属大胆猜测。

3.1 UserOpExpr

UserOpExpr表明UserOp履行时所需的上下文,其实UserOp仅仅Op中的一种。下图展现了不同Op的承继联系。能够看到tensor从local/global之间的转化等也都触及不同的OpExpr。

OneFlow源码解析:算子指令在虚拟机中的执行

3.2 Op履行的宏观头绪

从上面的类联系图出发,以中心类为节点,也能看出Op履行流程的宏观头绪。整个流程大体在下面这些角色之间流通:

  • ReluFunctor
  • UserOpExpr
  • Interpreter
  • PhysicalRun
  • VirtualMachine->Receive
  • VirtualMachine->ScheduleLoop …

3.3 虚拟机运转和调度总结

VM -> ScheduleLoop

VM引擎Schedule

处理悬挂指令(HandleLocalPending)

指令派发(DispatchInstruction)

准备(instruction->Prepare)

履行(StreamPolicy.Run -> instruction->Compute)

指令预调度

VM -> Receive

VM引擎 -> Receive

指令入悬挂列表

一般,咱们习气在动态图形式下训练深度学习网络,运用Python建立网络,并经过各种op进行前向、反向、loss核算、调试debug等进程,这些Python代码能够看作是动态的op的履行序列。

OneFlow虚拟机将op履行序列笼统成了各种VM指令序列。 OneFlow的虚拟机会对这些op履行序列进行动态翻译并生成VM指令序列,经过PhysicalRun结构结束后,动态地将指令发送至VM的悬挂列表中保护。这些指令或在时间上存在先后顺序,或在数据上存在依靠联系,所以悬挂列表中的指令后续会被虚拟机进行一些指令融合、指令连边、动态构建指令DAG图的进程,然后移入就绪列表中保护,等候虚拟机调度并实践履行。虚拟机担任保护若干个指令队列,以及指令在这些队列之间的状态转化。

OneFlow虚拟机还一致了动态图形式(Eager Mode)和静态图形式(Lazy Mode)。 静态图形式下,经过nn.Graph编译出深度学习网络的Job,这个Job同样被虚拟机笼统成了VM指令并承受虚拟机的调度和履行。大胆猜测一下,这也为日后动静转化、更极致的性能优化埋下了伏笔。

参考资料

  • OneFlow学习笔记:从OpExprInterpreter到OpKernel

  • 动态调度的“诅咒”| 原有深度学习结构的缺点③

  • 算子在深度学习结构中的履行及interpreter

  • OneFlow源码:(github.com/Oneflow-Inc…

欢迎下载体验 OneFlow v0.8.0 最新版本: github.com/Oneflow-Inc…