关于 ParameterServerStrategy V2,咱们将从几个方面来研讨:怎么与集群树立衔接,怎么生成变量,怎么获取数据,怎么运转。其中,变量和作用域咱们在前文现已研讨过,运转在 MirroredStrategy 里面也介绍,所以本文主要看看怎么运用,怎么初始化。鄙人一篇之中会要点看看怎么分发核算

[翻译] TensorFlow 散布式之论文篇 Large-Scale Machine Learning on Heterogeneous Distribute

[翻译] TensorFlow 散布式之论文篇 “Implementation of Control Flow in TensorFlow”

[源码解析] TensorFlow 散布式环境(1) — 整体架构

[源码解析] TensorFlow 散布式环境(2)—Master 静态逻辑

[源码解析] TensorFlow 散布式环境(3)— Worker 静态逻辑

[源码解析] TensorFlow 散布式环境(4) — WorkerCache

[源码解析] TensorFlow 散布式环境(5) — Session

[源码解析] TensorFlow 散布式环境(6) — Master 动态逻辑

[源码解析] TensorFlow 散布式环境(7) — Worker 动态逻辑

[源码解析] TensorFlow 散布式环境(8) — 通信机制

[翻译] 运用 TensorFlow 进行散布式练习

[源码解析] TensorFlow 散布式 DistributedStrategy 之基础篇

[源码解析] TensorFlow 之 散布式变量

[源码解析] TensorFlow 散布式之 MirroredStrategy

[源码解析] TensorFlow 散布式之 ParameterServerStrategy V1

1. 怎么运用

在 TensorFlow 2 中,参数服务器练习由 tf.distribution.experimental.ParameterServerStrategy 类供给支撑,该类将练习过程散布到一个可扩展到数千个作业者(伴随着参数服务器)的集群。

1.1 练习办法

支撑练习有两种主要办法:

  • Keras Model.fit API。假如用户喜爱用高层次笼统来练习,则主张运用这种办法。
  • 自界说练习循环(custom training loop)。假如用户需求自己完结或者界说练习细节,则能够考虑这种办法。

1.2 集群

无论挑选何种API( Model.fit 或自界说练习循环),TensorFlow 2中的散布式练习都会触及如下概念:一个”集群” 有若干个”作业(job)”,每个作业或许包括一个或多个”使命”。而当运用参数服务器练习时,主张运用如下装备:

  • 一个和谐者(coordinator ) job(job称号为 chief)。
  • 多个作业者 jobs(job称号为 worker)。
  • 多个参数服务器 jobs(job称号为 ps)。

和谐者负责创立资源、分配练习使命、写检查点和处理使命失利,作业者参数服务器则运转 tf.distribution.Server 来听取和谐者的恳求。

1.3 运用 Model.fit API 进行练习

假如运用 “Model.fit” API,则参数服务器练习需求和谐者运用 tf.distribution.experimental.ParameterServerStrategy 目标和 tf.keras.utils.experimental.DatasetCreator 作为输入。与其他战略相似,其作业流程包括:创立和编译模型,预备回调,调用 Model.fit。

1.4 运用自界说循环进行练习

TensorFlow 2 引荐运用一种根据中央和谐的架构来进行参数服务器练习。每个作业者和参数服务器都运转一个 tf.distribution.Server,在此基础上,一个和谐者使命负责在作业者和参数服务器上创立资源,调度功用,并和谐练习。和谐器运用 tf.distribution.experimental.coordinator.ClusterCoordinator 来和谐集群,运用 tf.distribution.experimental.ParameterServerStrategy 来界说参数服务器上的变量和作业者的核算。在自界说练习循环中, tf.distribution.experimental.coordinator.ClusterCoordinator 类是用于和谐器的要害组件。

  • ClusterCoordinator 类需求与 tf.distribution.Strategy 目标一同作业。
  • 关于参数服务器练习, ClusterCoordinator 需求与 tf.distribution.experimental.ParameterServerStrategy 一同作业。
  • 这个 tf.distribution.Strategy 目标需求运用者供给集群的信息,并运用这些信息来界说练习过程。然后, ClusterCoordinator 目标将这些练习过程的履行分派给长途作业者。

ClusterCoordinator 供给的最重要的 API 是 schedule 。

  • Schedule API 把一个 tf.function 刺进行列,并当即回来一个相似 future 的 RemoteValue 。
  • 在行列之中排队的函数被派发给后台线程中的长途作业者,他们的 RemoteValue 将被异步赋值。
  • 由于 schedule 不需求履行分配使命,因而传递进来的 tf.function 能够在任何可用的作业者上履行。
  • 假如被履行的作业者在结束之前变得不可用,该 tf.function 将在另一个可用的作业者上重试。
  • 由于函数的履行不是原子性的,所以一个函数或许被履行屡次。

除了调度长途函数这个功用之外,ClusterCoordinator 还协助在一切作业者上创立数据集,以及当一个作业者从失利中恢复时重建这些数据集。

1.5 树立集群

如上所述,一个参数服务器练习集群需求一个和谐者使命来运转你的练习程序,程序包括一个或几个运转TensorFlow 服务器( tf.distribution.Server )的作业者和参数服务器,或许还有一个运转 side-car 评价的评价使命。设置它们的要求是。

  • 和谐者(coordinator)使命需求知道一切其他 TensorFlow 服务器(评价者在外)的地址和端口。
  • 作业者和参数服务器需求知道他们应该监听哪个端口。为了简略起见,用户一般能够在这些使命上创立 TensorFlow 服务器时传入完整的集群信息。
  • 评价器(evaluator)使命不需求知道练习集群的设置,它也不应该企图衔接到练习集群。
  • 作业者和参数服务器的使命类型应该分为 “worker” 和 “ps” 两种。出于历史原因,和谐器应运用 “chief” 作为使命类型。

2. 初始化

2.1 用例

以下是怎么初始化 ParameterServerStrategy 的样例,无论是运用 Model.fit 还是自界说循环,都需求这步作业。为了运用 GPU 进行练习,需求为每个作业者分配可见的 GPU。 ParameterServerStrategy 将运用每个作业者上一切可用的 GPU,但有个约束是:一切作业者都应该有相同数量的 GPU 可用。

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)

关于 variable_partitioner,这是一个 distribute.experimental.partitioners.Partitioner,其指定怎么对变量进行分区。假如是 None,变量将不被切割,其特点如下:

  • 此参数取值是 tf.distribute.experimental.partitioners 中预界说的分区器。一个常用的分区器是 MinSizePartitioner(min_shard_bytes = 256 << 10, max_shards = num_ps),它为每个分片分配至少 256K,每个 ps 最多得到一个分片。
  • 在战略 scope 下创立的每个变量都会调用 variable_partitioner,以指示该变量应怎么分区。沿着分区轴只要一个分区的变量(即不需求分区)将被创立为一个一般的 tf.Variable 。
  • 只支撑第一个/最外层轴的分区。
  • Div 分区战略被用来对变量进行分区。假定咱们沿着变量的第一轴分配接连的整数 id,那么 id 会以接连的办法分配给分片,一同企图保持每个分片的大小相同。假如 id 不能平均分配给分片的数量,那么前几个分片中的每一个将被多分配一个 id。例如,一个变量的第一个维度是 13,它有 13 个 id,它们被分红 5 个分片。 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10], [11, 12]] .
  • 在 strategy.extended.colocate_vars_with 下创立的变量将不会被切割。

2.2 集群设置

在实在的生产环境中,用户需求在不同机器上的一切不同进程中运转练习使命。在每个使命上装备集群信息的最简略办法是设置”TF_CONFIG” 环境变量,并运用 tf.distribution.cluster_resolver.TFConfigClusterResolver 来解析”TF_CONFIG” 。假如用户运用 Kubernetes 或其他装备模板开端练习使命,很或许这些模板现已设置了”TF_CONFIG”

2.2.1 设置 “TF_CONFIG” 环境变量

假定你有 3 个作业者,3 个参数服务器,那么 worker 1 的 “TF_CONFIG” 能够如下:

os.environ["TF_CONFIG"] = json.dumps({
   "cluster": {
       "worker": ["host1:port","host2:port","host3:port"],
       "ps": ["host4:port","host5:port"],
       "chief": ["host6:port"]
    },
   "task": {"type":"worker","index": 1}
})

2.2.2 运用二进制文件

假如你喜爱用一个二进制文件来运转一切这些使命,你将需求在程序开端就指明不同分支负责处理不同的角色。

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker","ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type =="evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

如下代码发动一个 TensorFlow server 然后等候完结。

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] ="use_caller"
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or"grpc",
    start=True)
server.join()

2.3 初始化办法

初始化办法如下,主要作业是衔接到集群,然后调用 _extended 进行继续初始化。

  def __init__(self, cluster_resolver, variable_partitioner=None):
   """Initializes the TF2 parameter server strategy.
    This initializes the  tf.distribute.experimental.ParameterServerStrategy 
    object to be ready for use with
     tf.distribute.experimental.coordinator.ClusterCoordinator .
   """
    # pyformat: enable
    self._cluster_resolver = cluster_resolver
    self._verify_args_and_config(cluster_resolver)
    self._cluster_coordinator = None
    self._connect_to_cluster(coordinator_name="chief") # 衔接到集群
    self._extended = ParameterServerStrategyV2Extended(self, cluster_resolver,
                                                       variable_partitioner)
    super(ParameterServerStrategyV2, self).__init__(self._extended)
    distribute_lib.distribution_strategy_gauge.get_cell("V2").set(
       "ParameterServerStrategy")
    self._should_use_with_coordinator = True
    # Used while constructing distributed iterators.
    self._canonicalize_devices = False

2.4 衔接到集群

_connect_to_cluster 起到了衔接到集群的作用,其主要逻辑是设置了 filter,然后调用 remote.connect_to_cluster 去衔接集群。

  def _connect_to_cluster(self, coordinator_name):
    if coordinator_name in ["worker","ps"]:
      raise ValueError("coordinator name should not be 'worker' or 'ps'.")
    cluster_spec = self._cluster_resolver.cluster_spec()
    self._num_workers = len(cluster_spec.as_dict().get("worker", ()))
    self._num_ps = len(cluster_spec.as_dict().get("ps", ()))
    device_filters = server_lib.ClusterDeviceFilters()
    # For any worker, only the devices on ps and coordinator nodes are visible
    for i in range(self._num_workers):
      device_filters.set_device_filters(
         "worker", i, ["/job:ps","/job:%s" % coordinator_name])
    # Similarly for any ps, only the devices on workers and coordinator are
    # visible
    for i in range(self._num_ps):
      device_filters.set_device_filters(
         "ps", i, ["/job:worker","/job:%s" % coordinator_name])
    # Allow at most one outstanding RPC for each worker at a certain time. This
    # is to simplify worker failure handling in the runtime
    os.environ["TF_ENABLE_EAGER_CLIENT_STREAMING_ENQUEUE"] ="False"
    remote.connect_to_cluster(
        cluster_spec,
        job_name=coordinator_name,
        protocol=self._cluster_resolver.rpc_layer,
        cluster_device_filters=device_filters)
    distribute_lib.distribution_strategy_replica_gauge.get_cell(
       "ps_strategy_num_workers").set(self._num_workers)
    distribute_lib.distribution_strategy_replica_gauge.get_cell(
       "ps_strategy_num_ps").set(self._num_ps)

connect_to_cluster 办法会衔接到给定的集群,使集群上的设备可用。假如给定的本地 job 称号没有呈现在集群标准中,它将被自动添加,而且运用本地主机上一个未运用的端口。

作业者假如在被过滤的长途设备上访问资源或发动程序/功用,将导致一个未知设备错误。关于任何长途使命,假如没有设备过滤器,一切的集群设备都是可见的;假如指定了设备过滤器,使命则只能看到与至少一个过滤器匹配的设备。使命自身的设备始终是可见的。

以下是运用样例。

cdf = tf.config.experimental.ClusterDeviceFilters()
# For any worker, only the devices on PS nodes and itself are visible
for i in range(num_workers):
  cdf.set_device_filters('worker', i, ['/job:ps'])
# Similarly for any ps, only the devices on workers and itself are visible
for i in range(num_ps):
  cdf.set_device_filters('ps', i, ['/job:worker'])
tf.config.experimental_connect_to_cluster(cluster_def,
                                          cluster_device_filters=cdf)

详细 connect_to_cluster 的代码如下。

@tf_export("config.experimental_connect_to_cluster")
def connect_to_cluster(cluster_spec_or_resolver,
                       job_name="localhost",
                       task_index=0,
                       protocol=None,
                       make_master_device_default=True,
                       cluster_device_filters=None):
 """Connects to the given cluster.
  Will make devices on the cluster available to use. Note that calling this more
  than once will work, but will invalidate any tensor handles on the old remote
  devices.
  If the given local job name is not present in the cluster specification, it
  will be automatically added, using an unused port on the localhost.
  Device filters can be specified to isolate groups of remote tasks to avoid
  undesired accesses between workers. Workers accessing resources or launching
  ops / functions on filtered remote devices will result in errors (unknown
  devices). For any remote task, if no device filter is present, all cluster
  devices will be visible; if any device filter is specified, it can only
  see devices matching at least one filter. Devices on the task itself are
  always visible. Device filters can be particially specified.
  Args:
    cluster_spec_or_resolver: A  ClusterSpec  or  ClusterResolver  describing
      the cluster.
    job_name: The name of the local job.
    task_index: The local task index.
    protocol: The communication protocol, such as "grpc" . If unspecified, will
      use the default from  python/platform/remote_utils.py .
    make_master_device_default: If True and a cluster resolver is passed, will
      automatically enter the master task device scope, which indicates the
      master becomes the default device to run ops. It won't do anything if
      a cluster spec is passed. Will throw an error if the caller is currently
      already in some device scope.
    cluster_device_filters: an instance of
       tf.train.experimental/ClusterDeviceFilters  that specify device filters
      to the remote tasks in cluster.
 """
  if not context.executing_eagerly():
    raise ValueError(
       " tf.config.experimental_connect_to_cluster  can only be called in"
       "eager mode."
    )
  protocol = protocol or remote_utils.get_default_communication_protocol()
  if isinstance(cluster_spec_or_resolver, server_lib.ClusterSpec):
    cluster_spec = cluster_spec_or_resolver
  elif isinstance(cluster_spec_or_resolver, cluster_resolver.ClusterResolver):
    if cluster_spec_or_resolver.master() in _LOCAL_MASTERS:
      # Do nothing if the master is local.
      return
    cluster_spec = cluster_spec_or_resolver.cluster_spec()
  else:
    raise ValueError(
       " cluster_spec_or_resolver  must be a  ClusterSpec  or a"
       " ClusterResolver .")
  cluster_def = copy.deepcopy(cluster_spec.as_cluster_def())
  if cluster_device_filters:
    if isinstance(cluster_device_filters, server_lib.ClusterDeviceFilters):
      cluster_device_filters = copy.deepcopy(
          cluster_device_filters._as_cluster_device_filters())  
    else:
      raise ValueError(" cluster_device_filters  must be an instance of"
                      " tf.train.experimental.ClusterDeviceFilters .")
  # Automatically add local job, if not part of the cluster spec.
  if job_name not in cluster_spec.jobs:
    local_port = pywrap_tfe.TF_PickUnusedPortOrDie()
    job_def = cluster_def.job.add()
    job_def.name = job_name
    job_def.tasks[0] ="localhost:{}".format(local_port)
  server_def = ServerDef(
      cluster=cluster_def,
      job_name=job_name,
      task_index=task_index,
      protocol=protocol,
      default_session_config=context.context().config,
      cluster_device_filters=cluster_device_filters)
  if context.get_server_def() is None:
    context.set_server_def(server_def) # 这儿会做处理设备
  else:
    context.update_server_def(server_def)
  # 装备 master Device  
  if make_master_device_default and isinstance(
      cluster_spec_or_resolver,
      cluster_resolver.ClusterResolver) and cluster_spec_or_resolver.master():
    master = cluster_spec_or_resolver.master()
    master_job_name = None
    master_task_id = None
    for job_name in cluster_spec.jobs:
      for task_id in cluster_spec.task_indices(job_name):
        task_address = cluster_spec.task_address(job_name, task_id)
        if master in task_address or task_address in master:
          master_job_name = job_name
          master_task_id = task_id
          break
    if not master_job_name:
      raise ValueError(
         " make_master_device_default  is set to True but cannot find"
         "master %s in the cluster" % master)
    master_device ="/job:{}/replica:0/task:{}".format(master_job_name,
                                                       master_task_id)
    master_device = device_util.canonicalize(master_device)
    current_device = device_util.current()
    if current_device:
      current_device = device_util.canonicalize(current_device)
    if current_device and current_device != master_device:
      raise ValueError(" connect_to_cluster  is called inside existing device"
                      "scope %s, which is different from the master device"
                      "scope %s to enter. This is not allowed." %
                       (current_device, master_device))
    if not current_device:
      logging.info("Entering into master device scope: %s", master_device)
      ops.device(master_device).__enter__()

2.5 初始化设备

set_server_def 会调用 _initialize_logical_devices 来初始化逻辑设备。

  def set_server_def(self, server_def, keep_alive_secs=_KEEP_ALIVE_SECS):
   """Allow setting a server_def on the context.
    When a server def is replaced, it effectively clears a bunch of caches
    within the context. If you attempt to use a tensor object that was pointing
    to a tensor on the remote device, it will raise an error.
    Args:
      server_def: A tensorflow::ServerDef proto. Enables execution on remote
        devices.
      keep_alive_secs: Num. seconds after which the remote end will hang up. As
        long as the client is still alive, the server state for the context will
        be kept alive. If the client is killed (or there is some failure), the
        server will clean up its context keep_alive_secs after the final RPC it
        receives.
    Raises:
      ValueError: if server_def is None.
   """
    if not server_def:
      raise ValueError("server_def is None.")
    self._server_def = server_def
    if self._context_handle:
      server_def_str = server_def.SerializeToString()
      pywrap_tfe.TFE_ContextSetServerDef(self._context_handle, keep_alive_secs,
                                         server_def_str)
      self._initialize_logical_devices()
    # Clear all the caches in case there are remote tensors in them.
    self._clear_caches()

_initialize_logical_devices 则会调用上下文目标的办法和一些其他办法来完结功用。


  def _initialize_logical_devices(self):
   """Helper to initialize devices."""
    # Store list of devices
    logical_devices = []
    context_devices = []
    device_list = pywrap_tfe.TFE_ContextListDevices(self._context_handle)
    try:
      self._num_gpus = 0
      for i in range(pywrap_tfe.TF_DeviceListCount(device_list)):
        dev_name = pywrap_tfe.TF_DeviceListName(device_list, i)
        context_devices.append(pydev.canonical_name(dev_name))
        spec = pydev.DeviceSpec.from_string(dev_name)
        # If the job is localhost, we assume that the cluster has not yet been
        # configured and thus clear the job, replica & task.
        if spec.job =="localhost":
          spec = spec.replace(job=None, replica=None, task=None)
        logical_devices.append(
            LogicalDevice(name=spec.to_string(), device_type=spec.device_type))
        dev_type = pywrap_tfe.TF_DeviceListType(device_list, i)
        if dev_type =="GPU":
          self._num_gpus += 1
    finally:
      self._logical_devices = logical_devices
      self._context_devices = context_devices
      pywrap_tfe.TF_DeleteDeviceList(device_list)

咱们以 TFE_ContextListDevices 为例来看,其调用到了 Context 的 ListDevices 办法。

TF_DeviceList* TFE_ContextListDevices(TFE_Context* ctx, TF_Status* status) {
  TF_DeviceList* l = new TF_DeviceList;
  tensorflow::unwrap(ctx)->ListDevices(&l->response);
  return l;
}

上下文怎么完结,就需求详细状况详细剖析了,比方下面的生成上下文的代码。

TFE_Context* TFE_NewContext(const TFE_ContextOptions* opts, TF_Status* status) {
  if (opts->use_tfrt) {
#if defined(PLATFORM_GOOGLE) && !defined(LIBTPU_ON_GCE)
    tfrt::tf::ContextInterface* tfrt_context = new tfrt::tf::ContextInterface(
        opts->session_options.options,
        static_cast<tensorflow::ContextDevicePlacementPolicy>(
            opts->device_placement_policy),
        opts->async, opts->use_tfrt_distributed_runtime);
#if !defined(IS_MOBILE_PLATFORM)
    tfrt_context->SetDistributedManager(
        tfrt::tf::CreateDistributedManagerContext(
            tfrt_context->GetCoreRuntime()->GetHostContext()));
#endif  // !IS_MOBILE_PLATFORM
    return tensorflow::wrap(tfrt_context);
#else
    status->status = tensorflow::errors::Unimplemented("TFRT is not supported");
    return nullptr;
#endif  // PLATFORM_GOOGLE && !LIBTPU_ON_GCE
  }
  std::vector<std::unique_ptr<tensorflow::Device>> devices;
  status->status = tensorflow::DeviceFactory::AddDevices(
      opts->session_options.options,"/job:localhost/replica:0/task:0",
      &devices);
  if (!status->status.ok()) return nullptr;
  std::unique_ptr<tensorflow::DeviceMgr> device_mgr(
      new tensorflow::DynamicDeviceMgr(std::move(devices)));
  tensorflow::Rendezvous* r =
      new tensorflow::IntraProcessRendezvous(device_mgr.get());
  tensorflow::EagerContext* eager_context = new tensorflow::EagerContext(
      opts->session_options.options,
      static_cast<tensorflow::ContextDevicePlacementPolicy>(
          opts->device_placement_policy),
      opts->async, device_mgr.release(),
      /*device_mgr_owned*/ true, r,
      /*cluster_flr=*/nullptr,
      /*collective_executor_mgr=*/nullptr,
      /*run_eager_op_as_function=*/opts->run_eager_op_as_function);
#if !defined(IS_MOBILE_PLATFORM)
  eager_context->SetDistributedManager(
      std::make_unique<tensorflow::EagerContextDistributedManager>(
          eager_context));
#endif  // !IS_MOBILE_PLATFORM
  return tensorflow::wrap(eager_context);
}

2.6 Master 设备

在 connect_to_cluster 之中,会调用 ops.device(master_device).enter() 来设置 master Device。代码坐落 tensorflow/python/framework/ops.py。 device_name_or_function 参数能够是一个设备称号字符串,一个设备函数,或者是None:

  • 假如它是一个设备称号字符串,在这个上下文中构建的一切操作将被分配给具有该称号的设备,除非被嵌套的 device() 上下文掩盖。
  • 假如它是一个函数,它将被视为一个从操作目标到设备称号字符串的函数,而且在每次创立一个新操作时被调用。该操作将被分配给具有回来称号的设备。
  • 假如它是 None,一切来自包围上下文(enclosing context)的 device() 调用将被疏忽。
@tf_export(v1=["device"])
def device(device_name_or_function):
 """Wrapper for  Graph.device()  using the default graph.
  See  tf.Graph.device  for more details.
  Args:
    device_name_or_function: The device name or function to use in the context.
  Returns:
    A context manager that specifies the default device to use for newly
    created ops.
  Raises:
    RuntimeError: If eager execution is enabled and a function is passed in.
 """
  if context.executing_eagerly():
    if callable(device_name_or_function):
      raise RuntimeError(
         "tf.device does not support functions when eager execution"
         "is enabled.")
    return context.device(device_name_or_function)
  elif executing_eagerly_outside_functions():
    @tf_contextlib.contextmanager
    def combined(device_name_or_function):
      with get_default_graph().device(device_name_or_function):
        if not callable(device_name_or_function):
          with context.device(device_name_or_function):
            yield
        else:
          yield
    return combined(device_name_or_function)
  else:
    return get_default_graph().device(device_name_or_function)

3. 运用 Model.fit 练习

Keras 经过 Model.fit 供给了一个易于运用的练习 API,它在幕后处理练习循环,而且经过可重写的 train_step 和回调办法供给了灵活性,也供给了检查点保存或 TensorBoard 摘要保存等功用。经过 Model.fit,相同的练习代码只需经过简略地交流战略目标即可被用于其他战略。

3.1 输入数据

运用参数服务器练习的 Model.fit 需求在一个 callable 中供给输入数据,该 callable 接纳一个 tf.distribution.InputContext 类型的参数,并回来一个 tf.data.Dataset 。然后,体系将创立一个 tf.keras.utils.experimental.DatasetCreator 目标,它接受上述的 callable,并经过 input_options 参数创立一个可选的 tf.distribution.InputOptions 目标。

留意,主张用参数服务器练习来 shuffle 和 repeat 数据,并在 fit 调用中指定 steps_per_epoch,这样库就会知道 epoch 的界限。

关于 InputContext 参数的更多信息,请拜见官方 Distributed input 教程。

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))
  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)
  return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

dataset_fn 中的代码将在每个作业者的输入设备上被调用,这个设备一般是CPU。

3.2 模型构建和编译

处理好数据之后,用户需求创立一个 tf.keras.Model,然后是一个 Model.compile 调用,以纳入组件,如优化器、度量或参数(如 steps_per_execution)。

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

3.3 回谐和练习

在你调用 model.fit 进行实践练习之前,还需求为常见的作业预备所需的回调,例如。

  • ModelCheckpoint :保存模型的权重。
  • BackupAndRestore :保证练习进展被自动备份,并在集群呈现不可用状况(如中止或抢占)时恢复;
  • TensorBoard :将进展陈述保存为摘要文件,在 TensorBoard 东西中进行可视化。

留意:由于功用方面的考虑,自界说回调在与 ParameterServerStrategy 一同运用时不能掩盖批级(batch level)回调。请修改你的自界说回调成为 epoch 等级的调用,并将 steps_per_epoch 调整到一个合适的值。此外,当与 ParameterServerStrategy 一同运用时, steps_per_epoch 是 Model.fit 的一个必要参数。

working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)

3.4 直接运用 ClusterCoordinator (optional)

即便你挑选了 Model.fit 练习途径,你也能够挑选实例化一个 tf.distribution.experimental.coordinator.ClusterCoordinator 目标来组织你期望在作业者上履行的其他功用。

0x04 自界说练习

运用 tf.distribution.Strategy 的自界说练习循环为界说练习循环供给了极大的灵活性。经过上面界说的 ParameterServerStrategy (作为 strategy ),用户能够运用 tf.distribution.experimental.coordinator.ClusterCoordinator 将练习过程调度给长途作业者来履行。

和其他 tf.distribution.Strategy 的练习循环一样,用户需求创立一个模型,界说一个数据集和一个步进函数(step function)。为了保证高效的数据集预取,主张运用下面会提到的散布式数据集创立 API。此外,保证在 worker_fn 内调用 Strategy.run,这样能够充分利用分配给作业者的 GPU。

咱们接下来看看怎么创立这些组件。

4.1 装备数据

首要,编写一个函数来创立一个数据集,其中包括由 Keras preprocessing layers 所完结的预处理逻辑。咱们在 dataset_fn 之外创立这些层,但在 dataset_fn 内运用转换,由于咱们将把 dataset_fn 包裹到 tf.function 中,它不答应在其内部创立变量。

feature_vocab = [
   "avenger","ironman","batman","hulk","spiderman","kingkong","wonder_woman"
]
label_vocab = ["yes","no"]
with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)
  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)
  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

以下是构建数据的代码。

def feature_and_label_gen(num_examples=200):
  examples = {"features": [],"label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if"avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples
examples = feature_and_label_gen()

然后,运用 dataset_fn 把练习数据集包装起来。

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

4.2 树立模型

接下来,咱们来树立模型和其他目标,要保证在 strategy.scope 之下创立这些变量。

# These variables created under the  strategy.scope  will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")
  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

然后需求保证运用 FixedShardsPartitioner 将一切变量分红两个分片,每个分片被分配给不同的参数服务器。

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device =="/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device =="/job:ps/replica:0/task:1/device:CPU:0"

4.3 界说练习过程

第三步则是运用 tf.function 来创立练习 step。

@tf.function
def step_fn(iterator):
  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss
  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

在上面的练习步进函数中,在 step_fn 中调用 Strategy.run 和 Strategy.reduce 就能够支撑每个作业者的多个GPU。作业者被分配 GPU 之后, Strategy.run 将在多个模型副本上分配数据集。

4.4 分配核算到远端

在运用 ParameterServerStrategy 界说一切的核算后,你将运用 tf.distribution.experimental.coordinator.ClusterCoordinator 类来创立资源并将练习过程分配给长途作业者。

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

然后,为每个作业者(per-worker)创立一个数据集和一个迭代器。鄙人面的 per_worker_dataset_fn 中,主张将 dataset_fn 包裹到 strategy.distribution_datasets_from_function 中,以答应无缝高效的把数据预取到 GPU。

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)

最后一步是运用 ClusterCoordinator.schedule 将核算分配给长途作业者。

  • schedule 办法把一个 tf.function 刺进行列,并当即回来一个 future-like 的 RemoteValue 。行列之中的函数将被派发给后台线程中的长途作业者,RemoteValue 将被异步填充。
  • 能够运用 join 办法( ClusterCoordinator.join )来等候一切被规划(scheduled)的函数履行完毕。
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))

下面是怎么得到 RemoteValue 的结果。

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())

或者,你能够发动一切的过程,并在等候完结时做一些工作。

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

4.5 树立数据集

上述代码中的数据集是运用 ClusterCoordinator.create_per_worker_dataset API 创立的。它为每个作业者创立一个数据集,并回来一个容器目标。你能够调用 iter 办法来创立一个属于每个作业者(per-worker)的迭代器。在作业者履行函数之前, ClusterCoordinator.schedule 办法的输入参数将被设置成作业者的相应切片(slice)。

现在, ClusterCoordinator.schedule 办法假定worker都是相同的,因而假定不同worker上的数据集是相同的,假如数据集包括 Dataset.shuffle 操作,则数据集或许会被shuffle。正由于如此,主张用户组织运转有限的过程,而不是依靠数据集的 OutOfRangeError 。

另一个重要的留意事项是, tf.data 数据集不支撑跨使命鸿沟的隐式序列化和反序列化。所以在传递给 ClusterCoordinator.create_per_worker_dataset 的函数内创立整个数据集是很重要的。

5. 运转

5.1 直接运转

假如直接调用 run 来运转,则 ParameterServerStrategy 和其他战略套路相似,比方在 parameter_server_strategy_v2 之中调用了 mirrored_run,所以咱们不在赘述。

  def _call_for_each_replica(self, fn, args, kwargs):
    self._assert_being_scheduled_by_cluster_coordinator()
    return mirrored_run.call_for_each_replica(self._container_strategy(), fn,
                                              args, kwargs)

5.2 ClusterCoordinator

另一种办法是运用 ClusterCoordinator 来运转,咱们将鄙人一章节结合自界说练习循环来进行剖析。

6. 功用改进

假如你在运用 ParameterServerStrategy 和 ClusterResolver 练习时发现功用问题,或许有几个原因。

一个常见的原因是参数服务器的负载不平衡,一些重载的参数服务器现已到达容量。也或许有多种根本原因。缓解这个问题的一些简略办法是:

  1. 在构建 ParameterServerStrategy 时,经过指定一个 variable_partitioner 来切割你的大型模型变量。
  2. 假如或许的话,避免创立一个一切参数服务器都需求的热点(hotspot)变量。例如,在优化器中运用一个恒定的学习率或子类 tf.keras.optimizers.schedules.LearningRateSchedule,由于默认行为是:学习率将成为一个放在特定参数服务器上的变量,但是此变量在每一步中被一切其他参数服务器运用。
  3. 在将你的大词汇表传递给 Keras 预处理层之前,对它们进行 shuffle。

功用问题的另一个或许原因是和谐器。你的第一个 schedule / join 的完结是根据Python的,因而或许有线程开支。别的,和谐器和作业者之间的延迟也或许很大。假如是这种状况,那么主张:

  • 关于 Model.fit,你能够将 Model.compile 供给的 steps_per_execution 参数设置为大于1的值。
  • 关于一个自界说的练习循环,你能够将多个过程打包到一个 tf.function 中。
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...
    strategy.run(replica_fn, args=(features, labels))

随着库的进一步优化,期望能够让大多数用户在未来不用手动打包过程。此外,进步功用的一个小窍门是组织没有回来值的函数。

7. 已知约束

在上述章节中现已触及了大部分已知的约束。本节供给一个总结。

7.1 ParameterServerStrategy

  • os.environment[“grpc_fail_fast”]=”use_caller” 在包括和谐器在内的每个使命上都需求,以使容错正常作业。
  • 不支撑同步的参数服务器练习。
  • 一般需求将多个过程打包到一个函数中,以完结最佳功用。
  • 不支撑经过 tf.saved_model.load 加载含有分片变量的保存模型。留意运用 TensorFlow Serving 加载这样的 saved_model 是能够的。
  • 不支撑将包括分片优化器插槽(slot)变量的检查点加载到不同数量的分片中。
  • 不支撑在不重启和谐者使命的状况下从参数服务器毛病中恢复。
  • 运用 tf.lookup.StaticHashTable(它一般被一些 Keras 预处理层选用,如 tf.keras.layer.IntegerLookup 、 tf.keras.layer.StringLookup 和 tf.keras.layer.TextVectorization )将导致在这一步之中参数服务器练习所运用的资源被放在和谐器上。这会影响从作业者到和谐器的查找RPC的功用。这是现在需求处理的一个高度优先事项。

7.2 Model.fit

  • steps_per_epoch 参数在 Model.fit 中是必需的。你能够挑选一个值来保证epoch之内被切割恰当。

  • 由于功用原因, ParameterServerStrategy 不支撑批量级自界说回调。你应该将这些调用转换为epoch级的调用,并适当挑选 steps_per_epoch,以便每隔 steps_per_epoch 步数调用这些回调。内置回调不受影响:它们的批处理级调用现已被修改为可履行的。官方正在计划为”ParameterServerStrategy”支撑批量调用。

  • 出于相同的原因,与其他战略不同,进展条和目标只在epoch鸿沟被记录。

  • 不支撑 run_eagerly 。

7.3 自界说循环

  • ClusterCoordinator.schedule 不支撑数据集的访问量保证(visitation guarantees)。

0xEE 个人信息

★★★★★★关于日子和技能的考虑★★★★★★

微信大众账号:罗西的考虑

0xFF 参阅

www.youtube.com/watch?v=B2T…

[中字] TFRT: 新的 TensorFlow 运转库 – TF Dev Summit ’20

深化理解 TFRT

Inside TensorFlow: Eager execution runtime

【 深度学习结构tensorflow: Inside TensorFlow 】Inside TensorFlow(合辑)

github.com/tensorflow/…