作者:应健健,新华智云核算中心

OpenYurt 是业界首个非侵入的边际核算云原生开源项目,经过边际自治,云边协同,边际单元化,边际流量闭环等才能为用户提供云边一体化的运用体验。在 Openyurt 里边际网络能够运用数据过滤结构在不同节点池里完结边际流量闭环才能。

Yurthub 数据过滤结构解析

Yurthub 本质上是一层 kube-apiserver 的署理,在署理的基础上加了一层 cache,一来确保边际节点离线的情况下能够运用本地 cache 确保业务稳定性,有效的处理了边际自治的问题。二来能够下降许多的 list & watch 操作对云上 api 产生一定的负载。

Yurthub 的数据过滤经过节点上的 pod 以及 kubelet 的恳求经过 Load Balancer 发送给 kube-apiserver,署理接收到呼应音讯进行数据过滤处理,之后再将过滤后的数据回来给恳求方。假如节点是边际节点会依据恳求类型对呼应恳求体中的资源进行本地缓存,假如是云端节点考虑到网络状态杰出不进行本地缓存。

Yurthub 的过滤结构完结原理图:

OpenYurt 之 Yurthub 数据过滤框架解析

Yurthub 现在包含四种过滤规矩,经过 addons 恳求的 user-agent,resource,verb 判别经过那个过滤器进行相应的数据过滤。

四种过滤规矩功能及完结

ServiceTopologyFilter

首要针对 EndpointSlice 资源进行数据过滤, 但 Endpoint Slice 特性需求在 Kubernetes v1.18 或以上版别才干支持,假如在 1.18 版别以下主张运用 endpointsFilter 过滤器。当经过该过滤器首要经过 kubernetes.io/service-name 找到 endpointSlice 资源所对应的 services 资源,之后判别 servces 资源是否存在 openyurt.io/topologyKeys 这个 Annotations,假如存在那么经过这个 Annotations 的值判别数据过滤规矩,最终更新 response data 回来给 addons。

Annotations 的值分为两大类:

1、kubernetes.io/hostname:只过滤出相同节点的 endpoint ip

2、openyurt.io/nodepool 或许 kubernetes.io/zone: 经过这个 Annotations 获取对应节点池,最终遍历 endpointSlice 资源,经过endpointSlice 里的 topology 字段中的 kubernetes.io/hostname 字段在 endpointSlice 目标里找到对应的 Endpoints,之后重组 endpointSlice 里的 Endpoints 后回来给 addons。

代码完结:

func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice) *discovery.EndpointSlice {
   var serviceTopologyType string
   // get the service Topology type
   if svcName, ok := endpointSlice.Labels[discovery.LabelServiceName]; ok {
      svc, err := fh.serviceLister.Services(endpointSlice.Namespace).Get(svcName)
      if err != nil {
         klog.Infof("skip reassemble endpointSlice, failed to get service %s/%s, err: %v", endpointSlice.Namespace, svcName, err)
         return endpointSlice
      }
      if serviceTopologyType, ok = svc.Annotations[AnnotationServiceTopologyKey]; !ok {
         klog.Infof("skip reassemble endpointSlice, service %s/%s has no annotation %s", endpointSlice.Namespace, svcName, AnnotationServiceTopologyKey)
         return endpointSlice
      }
   }
   var newEps []discovery.Endpoint
   // if type of service Topology is 'kubernetes.io/hostname'
   // filter the endpoint just on the local host
   if serviceTopologyType == AnnotationServiceTopologyValueNode {
      for i := range endpointSlice.Endpoints {
         if endpointSlice.Endpoints[i].Topology[v1.LabelHostname] == fh.nodeName {
            newEps = append(newEps, endpointSlice.Endpoints[i])
         }
      }
      endpointSlice.Endpoints = newEps
   } else if serviceTopologyType == AnnotationServiceTopologyValueNodePool || serviceTopologyType == AnnotationServiceTopologyValueZone {
      // if type of service Topology is openyurt.io/nodepool
      // filter the endpoint just on the node which is in the same nodepool with current node
      currentNode, err := fh.nodeGetter(fh.nodeName)
      if err != nil {
         klog.Infof("skip reassemble endpointSlice, failed to get current node %s, err: %v", fh.nodeName, err)
         return endpointSlice
      }
      if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
         nodePool, err := fh.nodePoolLister.Get(nodePoolName)
         if err != nil {
            klog.Infof("skip reassemble endpointSlice, failed to get nodepool %s, err: %v", nodePoolName, err)
            return endpointSlice
         }
         for i := range endpointSlice.Endpoints {
            if inSameNodePool(endpointSlice.Endpoints[i].Topology[v1.LabelHostname], nodePool.Status.Nodes) {
               newEps = append(newEps, endpointSlice.Endpoints[i])
            }
         }
         endpointSlice.Endpoints = newEps
      }
   }
   return endpointSlice
}

EndpointsFilter

针对 endpoints 资源进行相应的数据过滤,首要判别 endpoint 是否存在对应的 service,经过 node 的 label: apps.openyurt.io/nodepool 获取节点池,之后获取节点池下的一切节点,遍历 endpoints.Subsets 下的资源找出同一个节点池的 Ready pod address 以及 NotReady pod address 重组成新的 endpoints 之后回来给 addons。

func (fh *endpointsFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints {
   svcName := endpoints.Name
   _, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName)
   if err != nil {
      klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err)
      return endpoints
   }
   // filter the endpoints on the node which is in the same nodepool with current node
   currentNode, err := fh.nodeGetter(fh.nodeName)
   if err != nil {
      klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err)
      return endpoints
   }
   if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
      nodePool, err := fh.nodePoolLister.Get(nodePoolName)
      if err != nil {
         klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err)
         return endpoints
      }
      var newEpSubsets []v1.EndpointSubset
      for i := range endpoints.Subsets {
         endpoints.Subsets[i].Addresses = filterValidEndpointsAddr(endpoints.Subsets[i].Addresses, nodePool)
         endpoints.Subsets[i].NotReadyAddresses = filterValidEndpointsAddr(endpoints.Subsets[i].NotReadyAddresses, nodePool)
         if endpoints.Subsets[i].Addresses != nil || endpoints.Subsets[i].NotReadyAddresses != nil {
            newEpSubsets = append(newEpSubsets, endpoints.Subsets[i])
         }
      }
      endpoints.Subsets = newEpSubsets
      if len(endpoints.Subsets) == 0 {
         // this endpoints has no nodepool valid addresses for ingress controller, return nil to ignore it
         return nil
      }
   }
   return endpoints
}

MasterServiceFilter

针对 services 下的域名进行 ip 以及端口替换,这个过滤器的场景首要在于边际端的 pod 无缝运用 InClusterConfig 拜访集群资源。

func (fh *masterServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
   list, err := fh.serializer.Decode(b)
   if err != nil || list == nil {
      klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of masterServiceFilterHandler, %v", err)
      return b, nil
   }
   // return data un-mutated if not ServiceList
   serviceList, ok := list.(*v1.ServiceList)
   if !ok {
      return b, nil
   }
   // mutate master service
   for i := range serviceList.Items {
      if serviceList.Items[i].Namespace == MasterServiceNamespace && serviceList.Items[i].Name == MasterServiceName {
         serviceList.Items[i].Spec.ClusterIP = fh.host
         for j := range serviceList.Items[i].Spec.Ports {
            if serviceList.Items[i].Spec.Ports[j].Name == MasterServicePortName {
               serviceList.Items[i].Spec.Ports[j].Port = fh.port
               break
            }
         }
         klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req))
         break
      }
   }
   // return the mutated serviceList
   return fh.serializer.Encode(serviceList)
}

DiscardCloudService

该过滤器针对两种 service 其中的一种类型是 LoadBalancer,因为边际端无法拜访 LoadBalancer 类型的资源,所以该过滤器会将这种类型的资源直接过滤掉。另外一种是针对 kube-system 名称空间下的 x-tunnel-server-internal-svc,这个 services 首要存在 cloud 节点用于拜访 yurt-tunnel-server,对于 edge 节点会直接过滤掉该 service。

func (fh *discardCloudServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
   list, err := fh.serializer.Decode(b)
   if err != nil || list == nil {
      klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of discardCloudServiceFilterHandler %v", err)
      return b, nil
   }
   serviceList, ok := list.(*v1.ServiceList)
   if ok {
      var svcNew []v1.Service
      for i := range serviceList.Items {
         nsName := fmt.Sprintf("%s/%s", serviceList.Items[i].Namespace, serviceList.Items[i].Name)
         // remove lb service
         if serviceList.Items[i].Spec.Type == v1.ServiceTypeLoadBalancer {
            if serviceList.Items[i].Annotations[filter.SkipDiscardServiceAnnotation] != "true" {
               klog.V(2).Infof("load balancer service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
               continue
            }
         }
         // remove cloud clusterIP service
         if _, ok := cloudClusterIPService[nsName]; ok {
            klog.V(2).Infof("clusterIP service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
            continue
         }
         svcNew = append(svcNew, serviceList.Items[i])
      }
      serviceList.Items = svcNew
      return fh.serializer.Encode(serviceList)
   }
   return b, nil
}

过滤结构现状

现在的过滤结构比较僵硬,将资源过滤硬编码至代码中,只能是已注册的资源才干进行相应的过滤,为了处理这个问题,需求对过滤结构进行相应的改造。

处理计划

计划一:

运用参数或许环境变量的形式自定义过滤装备,但是这种方式有以下弊端:

1、装备杂乱需求将所以需求自定义的装备写入到发动参数或许读取环境变量 例如下格局:

--filter_serviceTopology=coredns/endpointslices#list,kube-proxy/services#list;watch --filter_endpointsFilter=nginx-ingress-controller/endpoints#list;watch

2、无法热更新,每次修正装备都需求重启 Yurthub 收效。

计划二:

1、运用 configmap 的形式自定义过滤装备下降装备杂乱度装备格局(user-agent/resource#list,watch) 多个资源经过逗号隔开。如下所示:

filter_endpoints: coredns/endpoints#list;watch,test/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""

2、运用 Informer 机制确保装备实时收效

综合以上两点在 OpenYurt 中咱们挑选了处理计划二。

开发过程中遇到的问题

在边际端 Informer watch 的 api 地址是 Yurthub 的署理地址,那么 Yurthub 在发动署理端口之前都是无法确保 configmap 的数据是正常的。假如在发动完结之后 addons 的恳求先于 configmap 数据更新 这个时候会导致数据在没有过滤的情况下就回来给了 addons,这样会导致许多预期以外的问题。

为了处理这个问题 咱们需求在 apporve 中参加 WaitForCacheSync 确保数据同步完结之后才干回来相应的过滤数据,但是在 apporve 中参加 WaitForCacheSync 也直接导致 configmap进行 watch 的时候也会被堵塞,所以需求在 WaitForCacheSync 之前参加一个白名单机制,当 Yurthub 运用 list & watch 拜访 configmap 的时候咱们直接不进行数据过滤,相应的代码逻辑如下:

func (a *approver) Approve(comp, resource, verb string) bool {
   if a.isWhitelistReq(comp, resource, verb) {
      return false
   }
   if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok {
      panic("wait for configMap cache sync timeout")
   }
   a.Lock()
   defer a.Unlock()
   for _, requests := range a.nameToRequests {
      for _, request := range requests {
         if request.Equal(comp, resource, verb) {
            return true
         }
      }
   }
   return false
}

总结

1、经过上述的扩展才能能够看出,YurtHub 不仅仅是边际节点上的带有数据缓存才能的反向署理。而是对 Kubernetes 节点应用生命周期管理加了一层新的封装,提供边际核算所需求的核心管控才能。

2、YurtHub 不仅仅适用于边际核算场景,其实能够作为节点侧的一个常备组件,适用于运用 Kubernetes 的任意场景。信任这也会驱动 YurtHub 向更高功能,更高稳定性开展。

点击​​此处​​​,当即了解 OpenYurt 项目!​