作者:丛国庆

在 Kubernetes(简称 K8s,一个可移植容器的编排管理工具)体系中,etcd 存储集群的数据信息,kube-apiserver 作为一致入口,任何对数据的操作都有必要经过 kube-apiserver。因而 Dubbo 想要以 Kubernetes 作为注册中心,有必要调用 kube-apiserver 获取服务地址列表,那是以什么样的机制保持信息的可靠性、实时性、次序性、高功能呢?答案就是依据 List/Watch 的 Informer 组件

List/Watch 机制介绍

List / Watch 机制是 Kubernetes 中完成集群控制模块最核心的规划之一,它选用一致的异步音讯处理机制,保证了音讯的实时性、可靠性、次序性和功能等,为声明式风格的API奠定了良好的基础。

List 是向 kube-apiserver 调用 list API 获取资源列表,依据 HTTP 短链接完成。

Watch则是向 kube-apiserver 调用 watch API 监听资源改动事情,依据 HTTP 长链接,经过Chunked transfer encoding(分块传输编码)来完成音讯告诉。

当客户端调用 watch API 时,kube-apiserver 在 response 的 HTTP Header 中设置 Transfer-Encoding 的值为 chunked,表示选用分块传输编码,客户端收到该信息后,便和服务端衔接,并等待下一个数据块,即资源的事情信息。例如:

$ curl -i http://{kube-api-server-ip}:8080/api/v1/watch/endpoints?watch=yes
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 14 Seo 2022 20:22:59 GMT
Transfer-Encoding: chunked
{"type":"ADDED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
{"type":"ADDED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
{"type":"MODIFIED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}

Dubbo-kubernetes 基于 Informer 服务发现优化之路

Dubbo 依据 Watch 的服务发现

Dubbo-kubernetes 基于 Informer 服务发现优化之路

以上图为例,dubbo-kubernetes 服务发现以 Kubernetes 为 Registry ,provider 注册地址到注册中心,consumer 从注册中心读取和订阅 provider 地址列表。在 Dubbo3.1 版别之前,consumer 订阅是经过 Fabric8 Kubernetes Java Client 供给的 watch API 完成,监听 kube-apiserver 中资源的 create、update 和 delete 事情,如下:

private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
    Watch watch = kubernetesClient.endpoints()
        .inNamespace(namespace).withName(serviceName).watch(new Watcher<Endpoints>() {
            // 资源更改的事情回调
            @Override
            public void eventReceived(Action action, Endpoints resource) {
                notifyServiceChanged(serviceName, listener);
                ...
            }
        });
    ...
}
private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener) {
    ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
    ...
    listener.onEvent(event);
    ...
}

监听到资源改动后,调用 notifyServiceChanged 方法从 kube-apiserver 全量拉取资源 list 数据,保持 Dubbo 本地侧服务列表。

@Override
public List<ServiceInstance> getInstances(String serviceName){
    // 直接调用kube-apiserver
    Endpoints endpoints = kubernetesClient.endpoints().inNamespace(namespace)
            .withName(serviceName).get();
    return toServiceInstance(endpoints, serviceName);
}

这样的操作存在很严重的问题,因为 watch 对应的回调函数会将更新的资源返回,Dubbo 社区考虑到保护成本较高,之前并没有在本地保护关于 CRD 资源的缓存,这样每次监听到改动后调用 list 从 kube-apiserver 获取对应 serviceName 的 endpoints 信息,无疑增加了一次对 kube-apiserver 的直接拜访。

kubernetes-client 为处理客户端需求自行保护缓存的问题,推出了 informer 机制。

Informer 机制介绍

Informer 模块是 Kubernetes 中的基础组件,以 List/Watch 为基础,担任各组件与 kube-apiserver 的资源与事情同步。Kubernetes 中的组件,假如要拜访 Kubernetes 中的 Object,绝大部分状况下会运用 Informer 中的 Lister()方法,而非直接调用 kube-apiserver。

Dubbo-kubernetes 基于 Informer 服务发现优化之路

以 Pod 资源为例,介绍下 informer 的要害逻辑(与下图过程一一对应):

  1. Informer 在初始化时,Reflector 会先调用 List 取得所有的 Pod,同时调用 Watch 长衔接监听 kube-apiserver。
  2. Reflector 拿到悉数 Pod 后,将 Add Pod 这个事情发送到 DeltaFIFO。
  3. DeltaFIFO 随后 pop 这个事情到 Informer 处理。
  4. Informer 向 Indexer 发布 Add Pod 事情。
  5. Indexer 接到告诉后,直接操作 Store 中的数据(key->value 格式)。
  6. Informer 触发 EventHandler 回调。
  7. 将 key 推到 Workqueue 行列中。
  8. 从 WorkQueue 中 pop 一个 key。
  9. 然后依据 key 去 Indexer 取到 val。依据当前的 EventHandler 进行 Add Pod 操作(用户自界说的回调函数)。
  10. 随后当 Watch 到 kube-apiserver 资源有改动的时分,再重复 2-9 过程。

Dubbo-kubernetes 基于 Informer 服务发现优化之路

(来源于kubernetes/sample-controller)

Informer 要害规划

  • 本地缓存:Informer 只会调用 K8s List 和 Watch 两种类型的 API。Informer 在初始化的时,先调用 List 取得某种 resource 的悉数 Object,缓存在内存中; 然后,调用 Watch API 去 watch 这种 resource,去保护这份缓存; 最终,Informer 就不再调用 kube-apiserver。Informer 笼统了 cache 这个组件,并且完成了 store 接口,后续获取资源直接经过本地的缓存来进行获取。
  • 无界行列:为了协调数据生产与消费的不一致状况,在客户端中经过完成了一个无界行列 DeltaFIFO 来进行数据的缓冲,当 reflector 获取到数据之后,只需求将数据推到到 DeltaFIFO 中,则就可以持续 watch 后续事情,从而减少阻塞时刻,如上图 2-3 过程所示。
  • 事情去重:在 DeltaFIFO 中,假如针对某个资源的事情重复被触发,则就只会保存相同事情最终一个事情作为后续处理,有 resourceVersion 唯一键保证,不会重复消费。
  • 复用衔接:每一种资源都完成了 Informer 机制,答应监控不同的资源事情。为了防止同一个资源树立多个 Informer,每个 Informer 运用一个 Reflector 与 apiserver 树立链接,导致 kube-apiserver 负载过高的状况,K8s 中笼统了 sharedInformer 的概念,即同享的 Informer, 可以使同一类资源 Informer 同享一个 Reflector。内部界说了一个 map 字段,用于寄存所有 Infromer 的字段。针对同一资源只树立一个衔接,减小 kube-apiserver 的负载。

Dubbo 引进 Informer 机制后的服务发现

Dubbo 3.1.1 后引进 Informer 机制,Informer 组件会利用其特性在 consumer 侧内存中保护 Kubernetes 环境中的所有地址列表。

资源监听由 Watch API 更换为 Informer API

以 Endpoints 为例,将本来的 watch 替换为 Informer,回调函数分别为 onAdd、onUpdate、onDelete,回调参数传的都是 Informer store 中的资源全量值。

/**
 * 监听Endpoints
 */
private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
    SharedIndexInformer<Endpoints> endInformer = kubernetesClient
            .endpoints().inNamespace(namespace)
            .withName(serviceName).inform(new ResourceEventHandler<Endpoints>() {
                @Override
                public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
                    notifyServiceChanged(serviceName, listener, toServiceInstance(newEndpoints, serviceName));
                }
                // 省掉掉onAdd和onDelete
                ...
            });
    ...
}
/**
 * 告诉订阅者Service改动
 */
private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener, List<ServiceInstance> serviceInstanceList) {
    ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent(serviceName, serviceInstanceList);
    // 发布事情
    listener.onEvent(event);
}

getInstances() 优化

引进 Informer 后,无需直接调用 List 接口,而是直接从 Informer 的 store 中获取,减少对 kube-apiserver 的直接调用。

public List<ServiceInstance> getInstances(String serviceName) {
    Endpoints endpoints = null;
    SharedIndexInformer<Endpoints> endInformer = ENDPOINTS_INFORMER.get(serviceName);
    if (endInformer != null) {
        // 直接从informer的store中获取Endpoints信息
        List<Endpoints> endpointsList = endInformer.getStore().list();
        if (endpointsList.size() > 0) {
            endpoints = endpointsList.get(0);
        }
    }
    // 假如endpoints经过上面处理仍为空,属于异常状况,那就从kube-apiserver拉取
    if (endpoints == null) {
        endpoints = kubernetesClient.endpoints()
                .inNamespace(namespace).withName(serviceName).get();
    }
    return toServiceInstance(endpoints, serviceName);
}

定论

优化为 Informer 后,Dubbo 的服务发现不必每次直接调用 kube-apiserver,减小了 kube-apiserver 的压力,也大大减少了响应时刻,助力 Dubbo 从传统架构迁移到 Kubernetes 中。