1. 简介

当运用 Kubernetes 进行应用程序的开发和部署时,client-go 是一个非常重要的东西。它是 Kubernetes 的官方客户端库,供给了与 Kubernetes ApiServer 进行通讯的接口和完成。

client-go 首要供给以下几个功用:

  1. 与 Kubernetes ApiServer 进行通讯:client-go 供给了与 Kubernetes ApiServer 进行通讯的接口和完成,包括基本的 http 恳求和更深层次的封装。开发人员能够运用 client-go 创立、更新和删去 Kubernetes 中的资源。
  2. 拜访 Kubernetes ApiServer 中的资源:client-go 供给了拜访 Kubernetes ApiServer 中资源的办法,包括运用 ClientSet 进行根据目标的拜访和运用 DynamicClient 进行根据无类型的拜访。
  3. 处理 Kubernetes 资源的事情:client-go 供给了一种称为 Informer 的机制,它能够监听 Kubernetes ApiServer 中的资源改变事情。开发人员能够运用 Informer 完成资源的快速检索和本地缓存,然后减轻对 ApiServer 的拜访压力。
  4. 发现 Kubernetes ApiServer 中的资源:client-go 还供给了 DiscoveryClient 接口,该接口能够用于在 Kubernetes ApiServer 中查询当时集群的资源及其版别.

总的来说,client-go 是 Kubernetes 开发人员不可或缺的东西之一。它供给了丰富的功用和灵活的接口,使开发人员能够更轻松地构建和办理 Kubernetes 应用程序。

上述的关键下文都会逐个的酌情展开, 由于我需求开发多集群办理渠道和一些k8s组件所以在 client-go 上有深度的运用, 在 client-go 上的一些小坑和处理技巧会在下一篇文章中列出, 本文更多重视 client-go 关于 Informer 的具体用法.

2. Client

这儿只简略介绍其封装好的几个 client, 调用起来都比较便利所以就不展开了.

2.1 加载 kubeconfig 装备

  • 加载kubeconfig 及各客户端初始化的办法
    package config
    import (
            "k8s.io/client-go/discovery"
            "k8s.io/client-go/dynamic"
            "k8s.io/client-go/kubernetes"
            "k8s.io/client-go/rest"
            "k8s.io/client-go/tools/clientcmd"
            "log"
    )
    const kubeConfigFilePath = "/Users/ShadowYD/.kube/config"
    type K8sConfig struct {
    }
    func NewK8sConfig() *K8sConfig {
            return &K8sConfig{}
    }
    // 读取kubeconfig 装备文件
    func (this *K8sConfig) K8sRestConfig() *rest.Config {
            config, err := clientcmd.BuildConfigFromFlags("", kubeConfigFilePath)
            if err != nil {
                    log.Fatal(err)
            }
            return config
    }
    // 初始化 clientSet
    func (this *K8sConfig) InitClient() *kubernetes.Clientset {
            c, err := kubernetes.NewForConfig(this.K8sRestConfig())
            if err != nil {
                    log.Fatal(err)
            }
            return c
    }
    // 初始化 dynamicClient
    func (this *K8sConfig) InitDynamicClient() dynamic.Interface {
            c, err := dynamic.NewForConfig(this.K8sRestConfig())
            if err != nil {
                    log.Fatal(err)
            }
            return c
    }
    // 初始化 DiscoveryClient
    func (this *K8sConfig) InitDiscoveryClient() *discovery.DiscoveryClient {
            return discovery.NewDiscoveryClient(this.InitClient().RESTClient())
    }
    

2.2 ClientSet

  • ClientSet 是比较常用的一个client, 常用于对k8s内部资源做CRUD, 或查询当时集群拥有什么资源;
    
    func main () {
        // 运用的是上文说到的装备加载目标
        cliset := NewK8sConfig().InitClient()
        configMaps, err := cliset.CoreV1().ConfigMaps(ns).List(metav1.ListOptions{})
        if err != nil {
           panic(err)
        }
        for _, cm := range configMaps.Items {
           fmt.Printf("configName: %v, configData: %v \n", cm.Name, cm.Data)
        }
        return nil
    }
    

2.3 DynamicClient

  • DynamicClient 也是比较常用的 client 之一, 但频繁度不及 ClientSet, 它首要作用是用于 CRD (自界说资源)当然它也能够用于 k8s 的内部资源, 咱们在项目内就用它来开发出能够对恣意资源做 CRUD 的接口;

  • 下面将演示运用 dynamicClient 创立资源, 先在 tpls/deployment.yaml 测验装备

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: myngx
      namespace: default
    spec:
      selector:
        matchLabels:
          app: myngx
      replicas: 1
      template:
        metadata:
          labels:
            app: myngx
        spec:
          containers:
            - name: myngx-container
              image: nginx:1.18-alpine
              imagePullPolicy: IfNotPresent
              ports:
                - containerPort: 80
    
  • 运用 DynamicClient 创立测验装备

    package main
    import (
       "context"
       _ "embed"
       "k8s-clientset/config"
       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
       "k8s.io/apimachinery/pkg/runtime/schema"
       "k8s.io/apimachinery/pkg/util/yaml"
       "log"
    )
    // 这个是新特性运用注释加载装备
    //go:embed tpls/deployment.yaml
    var deployTpl string
    // dynamic client 创立 Deploy
    func main()  {
       // 动态客户端
       dynamicCli := config.NewK8sConfig().InitDynamicClient()
       // 能够随意指定集群拥有的资源, 进行创立
       deployGVR := schema.GroupVersionResource{
          Group: "apps",
          Version: "v1",
          Resource: "deployments",
       }
       deployObj := &unstructured.Unstructured{}
       if err := yaml.Unmarshal([]byte(deployTpl), deployObj); err != nil {
           log.Fatalln(err)
       }
       if _, err = dynamicCli.
          Resource(deployGVR).
          Namespace("default").
          Create(context.Background(), deployObj, metav1.CreateOptions{}); 
          err != nil {
          log.Fatalln(err)
       }
       log.Println("Create deploy succeed")
    }
    

2.4 DiscoveryClient

  • DiscoveryClient 望文生义便是用于发现k8s资源的, 当咱们不知道当时集群有什么资源时就会用该客户端封装好的办法进行查询; kubectl api-resources 命令便是它完成的.

    package main
    import (
            "fmt"
            "k8s-clientset/config"
    )
    func main() {
            client := config.NewK8sConfig().InitDiscoveryClient()
            // 能够看到当时集群的 gvr
            preferredResources, _ := client.ServerPreferredResources()
            for _, pr := range preferredResources {
                    fmt.Println(pr.String())
            }
            // _, _, _ = client.ServerGroupsAndResources()
    }
    

3. Informer

3.1 前语

本文重点便是放在 Informer 的源码的调试及怎样去运用 Informer 到达对多集群查询目的之余也不会对集群的API Server 造成压力; 下面将沿着 Informer 架构图一步一步的剖析每个环节, 你将知道informer每一步的运作办法, 全网或许独一份, 是不是该 点赞 以示支撑一下;

3.2 Informer 架构图

该图其实还有下半部分是关于 Custom Controller, 想了解请跳转 Controller源码解析

[K8S] client-go 的正确打开方式

上图的流程解析:

  1. Reflector(反射器) 经过 http trunk 协议监听k8s apiserver 服务的资源改变事情, 事情首要分为三个动作 ADDUPDATEDELETE;
  2. Reflector(反射器) 将事情增加到 Delta 行列中等候;
  3. Informer 从行列获取新的事情;
  4. Informer 调用 Indexer (索引器, 该索引器内包含Store目标), 默认索引器是以namespace 和 name 作为每种资源的索引名;
  5. Indexer 经过调用 Store 存储目标按资源分类存储;

3.3 源码调试与剖析

下面部分示例需求把部分源码 copy 到一个可导入的目录下, 由于有些源码是私有化不允许经过包 import;

3.3.1 从头说起 List & Watch

  • 在 Reflector 包中,存在着 ListWatch 客户端,其间包含了 list 和 watch 两个目标。list 目标首要用于列出指定资源(如 pods)的当时列表版别,而 watch 目标则用于追寻指定资源的当时版别并监听其后续的一切改变事情。

  • 在 watch 的进程中,API Server 不或许长时刻保留咱们 watch 的某个资源版别。因而,每个资源版别都会有一个过期时刻。一旦版别过期,watch 就会中止并回来 expired 相关的过错。此时,假如咱们想继续监听并防止遗失改变事情,就需求继续记载资源版别号(或记载 API Server 传递的符号版别号)。一旦之前咱们监听的版别号过期,咱们就能够从记载的版别号开始从头监听。

  • watch 目标运用的是 http 的 chunk 协议(数据分块协议)。在制造浏览器进度条时,咱们也会运用该协议进行长连接。

  • 用代码调试一下怎样 watch Pod 资源, 下面仅仅是代码片段需求自行补全;;

    package main
    import (
            "fmt"
            "k8s-clientset/deep_client_go/reflector/helper"
            v1 "k8s.io/api/core/v1"
            metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
            "k8s.io/apimachinery/pkg/fields"
            "k8s.io/client-go/tools/cache"
            "log"
    )
    // create pods list & watch
    func main() {
            // helper 只是一个相似上文演示的 config, 只要用于初始化各种客户端
            cliset := helper.InitK8SClient()
            lwc := cache.NewListWatchFromClient(cliset.CoreV1().RESTClient(), "pods", "kube-system", fields.Everything())
            watcher, err := lwc.Watch(metav1.ListOptions{})
            if err != nil {
                    log.Fatalln(err)
            }
            for {
                    select {
                    case v, ok := <-watcher.ResultChan():
                            if ok {
                                    fmt.Println(v.Type, ":", v.Object.(*v1.Pod).Name, "-", v.Object.(*v1.Pod).Status.Phase)
                            }
                    }
            }
    }
    // 输出成果
    // ADDED : kube-apiserver-k8s-01 - Running
    // ADDED : kube-scheduler-k8s-01 - Running
    // ADDED : coredns-65c54cc984-26zx9 - Running
    // ADDED : metrics-server-7fd564dc66-sm29c - Running
    // ADDED : kube-proxy-6jl96 - Running
    // ADDED : coredns-65c54cc984-bgmpm - Running
    // ADDED : etcd-k8s-01 - Running
    // ADDED : kube-controller-manager-k8s-01 - Running
    
  • 当你做 Pod 资源改变时便能够接纳到改变事情,

    // 履行 kubectl apply -f  deploy.yaml
    //ADDED : mygott-7565765f4d-2t4z8 - Pending
    //MODIFIED : mygott-7565765f4d-2t4z8 - Pending
    //MODIFIED : mygott-7565765f4d-2t4z8 - Pending
    //MODIFIED : mygott-7565765f4d-2t4z8 - Running
    // 履行 kubectl delete deploy mygott
    //MODIFIED : mygott-7565765f4d-2t4z8 - Running
    //MODIFIED : mygott-7565765f4d-2t4z8 - Running
    //MODIFIED : mygott-7565765f4d-2t4z8 - Running
    //DELETED : mygott-7565765f4d-2t4z8 - Running
    

3.3.2 入列 DeltaFifo

  • 从 reflector 中获取到资源事情然后放入先进先出行列,事情目标包含了2个特点如下所示:

    type Event struct {
            // 事情类型
            Type EventType
            // 资源目标
            Object runtime.Object
    }
    // 事情类型如下: 
    // 资源增加事情
    Added    EventType = "ADDED"  
    // 资源修改事情
    Modified EventType = "MODIFIED"
    // 资源删去事情
    Deleted  EventType = "DELETED"
    // 符号资源版别号事情, 这个便是用于可从头watch的版别号
    Bookmark EventType = "BOOKMARK"
    // 过错事情
    Error    EventType = "ERROR" 
    
  • DeltaFifo 行列源码调试, 增加 Pod 资源入行列

    package main
    import (
            "fmt"
            "k8s.io/client-go/tools/cache"
    )
    type Pod struct {
            Name  string
            Value int
    }
    func NewPod(name string, v int) Pod {
            return Pod{Name: name, Value: v}
    }
    // 需求供给一个资源的仅有标识的字符串给到 DeltaFifo, 这样它就能追寻某个资源的改变
    func PodKeyFunc(obj interface{}) (string, error) {
            return obj.(Pod).Name, nil
    }
    func main() {
            df := cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{KeyFunction: PodKeyFunc})
            // ADD3个object 进入 fifo
            pod1 := NewPod("pod-1", 1)
            pod2 := NewPod("pod-2", 2)
            pod3 := NewPod("pod-3", 3)
            df.Add(pod1)
            df.Add(pod2)
            df.Add(pod3)
            // Update pod-1
            pod1.Value = 11
            df.Update(pod1)
            df.Delete(pod1)
            // 当时df 的列表
            fmt.Println(df.List())
            // 循环抛出事情
            for {
                df.Pop(func(i interface{}) error {
                    for _, delta := range i.(cache.Deltas) {
                        switch delta.Type {
                        case cache.Added:
                                fmt.Printf("Add Event: %v \n", delta.Object)
                                break
                        case cache.Updated:
                                fmt.Printf("Update Event: %v \n", delta.Object)
                                break
                        case cache.Deleted:
                                fmt.Printf("Delete Event: %v \n", delta.Object)
                                break
                        case cache.Sync:
                                fmt.Printf("Sync Event: %v \n", delta.Object)
                                break
                        case cache.Replaced:
                                fmt.Printf("Replaced Event: %v \n", delta.Object)
                                break
                        }
                    }
                    return nil
                })
            }
    }
    // 输出成果, 能够看到先入列的资源事情会被先抛出
    // 这是由于底层是是用 map 来记载资源的仅有标识起到快速索引和去重复的作用;
    //[{pod-1 11} {pod-2 2} {pod-3 3}]
    //Add Event: {pod-1 1}
    //Update Event: {pod-1 11}
    //Delete Event: {pod-1 11}
    //Add Event: {pod-2 2}
    //Add Event: {pod-3 3}
    

3.3.3 Reflector 的结构

  • 上述2个末节现已把 listWatch 客户端和 DeltaFifo 怎样作业的办法说明晰一下, 本末节演示Reflector目标整合 listWatch 和 DeltaFifo.

    package main
    import (
            "fmt"
            "k8s-clientset/deep_client_go/reflector/helper"
            v1 "k8s.io/api/core/v1"
            "k8s.io/apimachinery/pkg/fields"
            "k8s.io/client-go/tools/cache"
            "time"
    )
    // simulate  k8s simple reflector creation process
    func main() {
            cliset := helper.InitK8SClient()
            // 运用 store 进行存储,这样本地才有一份数据;
            // 假如本地没有存储到被删去的资源, 则不需求 Pop 该资源的 Delete 事情;
            // 所以咱们为了精确接纳到delete时接纳到 Delete 事情, 所以预先创立一下 store
            // cache.MetaNamespaceKeyFunc 是用于回来资源的仅有标识, {namespace}/{name} 或 {name}
            store := cache.NewStore(cache.MetaNamespaceKeyFunc)
            // create list & watch Client
            lwc := cache.NewListWatchFromClient(cliset.CoreV1().RESTClient(),
                    helper.Resource,
                    helper.Namespace,
                    fields.Everything(),
            )
            // create deltafifo
            df := cache.NewDeltaFIFOWithOptions(
                    cache.DeltaFIFOOptions{
                            KeyFunction:  cache.MetaNamespaceKeyFunc,
                            KnownObjects: store,
                    })
            // crete reflector
            rf := cache.NewReflector(lwc, &v1.Pod{}, df, time.Second*0)
            rsCH := make(chan struct{})
            go func() {
                    rf.Run(rsCH)
            }()
            // fetch delta event
            for {
                df.Pop(func(i interface{}) error {
                    // deltas
                    for _, d := range i.(cache.Deltas) {
                        fmt.Println(d.Type, ":", d.Object.(*v1.Pod).Name,
                                "-", d.Object.(*v1.Pod).Status.Phase)
                        switch d.Type {
                        case cache.Sync, cache.Added:
                                // 向store中增加目标
                                store.Add(d.Object)
                        case cache.Updated:
                                store.Update(d.Object)
                        case cache.Deleted:
                                store.Delete(d.Object)
                        }
                    }
                    return nil
                })
            }
    }
    // 输出成果
    //Sync : pod-1 - Running
    //Sync : web-sts-1 - Running
    //Sync : web-sts-0 - Running
    //Sync : ngx-8669b5c9d-xwljg - Running
    // 履行 kubectl apply -f  deploy.yaml
    //Added : mygott-7565765f4d-x6znf - Pending
    //Updated : mygott-7565765f4d-x6znf - Pending
    //Updated : mygott-7565765f4d-x6znf - Pending
    //Updated : mygott-7565765f4d-x6znf - Running
    // 履行 kubectl delete deploy mygott
    //Updated : mygott-7565765f4d-x6znf - Running
    //Updated : mygott-7565765f4d-x6znf - Running
    //Updated : mygott-7565765f4d-x6znf - Running
    //Deleted : mygott-7565765f4d-wcml6 - Running
    

3.3.4 Indexer 与 Store

>> Store

  • Store 是怎样存储资源目标的? 其实经过 NewStore 办法就能立刻找到的答案, 底层则是一个 ThreadSafeStore 的目标来存储资源的, 而它的核心数据结构是一个 map 并且配合互斥锁确保并发安全, 下面的源码的 item 字段便是其存储的核心 ;

    func NewStore(keyFunc KeyFunc) Store {
        return &cache{
                cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
                keyFunc:      keyFunc,
            }
    }
    // NewThreadSafeStore creates a new instance of ThreadSafeStore.
    func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
        return &threadSafeMap{
            items:    map[string]interface{}{},
            indexers: indexers,
            indices:  indices,
        }
    }
    // threadSafeMap implements ThreadSafeStore
    type threadSafeMap struct {
        lock  sync.RWMutex
        items map[string]interface{}
        // indexers maps a name to an IndexFunc
        indexers Indexers
        // indices maps a name to an Index
        indices Indices
    }
    
  • 咱们能够一起看看 ThreadSafeStore 所含有的的一些动作, 便很容易理解其作业的办法:

    type ThreadSafeStore interface {
            Add(key string, obj interface{})
            Update(key string, obj interface{})
            Delete(key string)
            Get(key string) (item interface{}, exists bool)
            List() []interface{}
            ListKeys() []string
            Replace(map[string]interface{}, string)
            Index(indexName string, obj interface{}) ([]interface{}, error)
            IndexKeys(indexName, indexKey string) ([]string, error)
            ListIndexFuncValues(name string) []string
            ByIndex(indexName, indexKey string) ([]interface{}, error)
            GetIndexers() Indexers
            AddIndexers(newIndexers Indexers) error
            Resync() error
    }
    
  • threadSafeMap 上还有一层用于 Store 的标准接口, 用于存储k8s资源即 runtime.Object 的专用完成; (runtime.Object 在k8s二开中是一个很重要的概念)

    type Store interface {
            Add(obj interface{}) error
            Update(obj interface{}) error
            Delete(obj interface{}) error
            List() []interface{}
            ListKeys() []string
            Get(obj interface{}) (item interface{}, exists bool, err error)
            GetByKey(key string) (item interface{}, exists bool, err error)
            Replace([]interface{}, string) error
            Resync() error
    }
    
  • 到此咱们大约知道 Store 是怎样作业的了, Store 的调用演示能够检查 [3.3.3 章节]

>> Indexer

  • Indexer 用于对资源进行快速检索, 它也是经过几个map做相互映射完成, 而咱们外部是经过 IndexFunc的界说进行操控回转, IndexFunc 是界说了该资源需求用什么字段作为索引值, 如默认供给的索引办法回来的便是 {namespace} 这个字符串;

    • Indexer 运用的几种数据结构
      // Index maps the indexed value to a set of keys in the store that match on that value
      type Index map[string]sets.String
      // Indexers maps a name to an IndexFunc
      type Indexers map[string]IndexFunc
      // Indices maps a name to an Index
      type Indices map[string]Index
      
    • 默认供给的 IndexFunc, 构建经过 namespace 进行索引资源的索引器, 当咱们检索namespace 下的资源时便能够运用该索引器树立索引与资源的存储联系, ;
      func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
              meta, err := meta.Accessor(obj)
              if err != nil {
                      return []string{""}, fmt.Errorf("object has no meta: %v", err)
              }
              return []string{meta.GetNamespace()}, nil
      }
      
  • 咱们能够手动调用下带 Indexer 的 Store 是怎样运用的, 由于我是在源码内调试的所以我的包名是 cache;

    package cache
    import (
            "fmt"
            v1 "k8s.io/api/core/v1"
            "k8s.io/apimachinery/pkg/api/meta"
            metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
            "testing"
    )
    // LabelsIndexFunc 用作给出可检索一切的索引值
    func LabelsIndexFunc(obj interface{}) ([]string, error) {
            metaD, err := meta.Accessor(obj)
            if err != nil {
                    return []string{""}, fmt.Errorf("object has no meta: %v", err)
            }
            return []string{metaD.GetLabels()["app"]}, nil
    }
    func TestIndexer(t *testing.T) {
            // 树立一个名为 app 的 Indexer, 并运用咱们自己编写的 索引办法
            idxs := Indexers{"app": LabelsIndexFunc}
            // 伪造2个pod资源
            pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
    	Name:      "pod1",
    	Namespace: "ns1",
    	Labels: map[string]string{
    		"app": "l1",
    	}}}
            pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
    	Name:      "pod2",
    	Namespace: "ns2",
    	Labels: map[string]string{
    		"app": "l2",
    	}}}
            // 初始化 Indexer
            myIdx := NewIndexer(MetaNamespaceKeyFunc, idxs)
            // 增加pod
            myIdx.Add(pod1)
            myIdx.Add(pod2)
            // 打印经过索引检索的资源
            fmt.Println(myIdx.IndexKeys("app", "l1"))
    }
    // Output
    // 成果只回来 app=l1 的 pod
    // [ns1/pod1] <nil>
    
  • 咱们现已了解了 Informer 怎样存储和检索资源。在调用 Informer 时,一般咱们会看到许多不同的选项,例如NewInformerNewIndexInfomerNewShareInformerNewShareIndexInformer等等。此外,还有其他几种选项没有列举出来。假如咱们了解了上述内容,就会发现当咱们看到 “Index” 这个词时,就知道咱们能够传入自己结构的 Indexer 。至于怎样选择初始化办法,则取决于具体情况。 见 [3.4 章节] .

3.3.5 EventHandler 事情处理

  • 从前面几末节的内容能够看出,咱们一直在接纳改变事情并将其存储起来,以完成本地存储和长途存储的共同,然后减少对 API Server 的恳求压力。不过,咱们还需求考虑怎样处理这些事情。接下来,咱们将经过一个简略的比如来解释这一进程,并对源代码进行一些剖析。

    package main
    import (
            "fmt"
            "k8s-clientset/config"
            "k8s.io/api/core/v1"
            "k8s.io/apimachinery/pkg/fields"
            "k8s.io/apimachinery/pkg/util/wait"
            "k8s.io/client-go/tools/cache"
    )
    type CmdHandler struct {
    }
    // 当接纳到增加事情便会履行该回调, 后面的办法以此类推
    func (this *CmdHandler) OnAdd(obj interface{}) {
            fmt.Println("Add: ", obj.(*v1.ConfigMap).Name)
    }
    func (this *CmdHandler) OnUpdate(obj interface{}, newObj interface{}) {
            fmt.Println("Update: ", newObj.(*v1.ConfigMap).Name)
    }
    func (this *CmdHandler) OnDelete(obj interface{}) {
            fmt.Println("Delete: ", obj.(*v1.ConfigMap).Name)
    }
    func main() {
            cliset := config.NewK8sConfig().InitClient()
            // 经过 clientset 回来一个 listwatcher, 仅支撑 default/configmaps 资源
            listWatcher := cache.NewListWatchFromClient(
                    cliset.CoreV1().RESTClient(),
                    "configmaps",
                    "default",
                    fields.Everything(),
            )
            // 初始化一个informer, 传入了监听器, 资源名, 间隔同步时刻
            // 最后一个是咱们界说的 Handler 用于接纳咱们监听的资源改变事情;
            _, c := cache.NewInformer(listWatcher, &v1.ConfigMap{}, 0, &CmdHandler{})
            // 发动循环监听
            c.Run(wait.NeverStop)
    }
    
  • 经过上面的比如,咱们能够监听集群中 default/configmaps 资源的改变。它实际上接纳改变的办法与前面的一些调试比如相似,但为了愈加直观,咱们能够直接看一下源代码是怎样完成的。我删去了一些不必要的代码,只保留了重要的部分。完整的代码途径为client-go/tools/cache/controller.go。在processDeltas的外层,有一个processLoop循环,它会不断地从行列中抛出事情,使得handler能够继续地流式处理事情。

    func processDeltas(
            handler ResourceEventHandler,
            clientState Store,
            transformer TransformFunc,
            deltas Deltas,
    ) error {
            // from oldest to newest
            for _, d := range deltas {
                    ...
                    switch d.Type {
                    case Sync, Replaced, Added, Updated:
                            if old, exists, err := clientState.Get(obj); err == nil && exists {
                                    if err := clientState.Update(obj); err != nil {
                                            return err
                                    }
                                    handler.OnUpdate(old, obj)
                            } else {
                                    if err := clientState.Add(obj); err != nil {
                                            return err
                                    }
                                    handler.OnAdd(obj)
                            }
                    case Deleted:
                            if err := clientState.Delete(obj); err != nil {
                                    return err
                            }
                            handler.OnDelete(obj)
                    }
            }
            return nil
    }
    

3.4 熟能生巧

3.4.1 入门技巧

上文说到 Informer 有非常多的初始化办法, 本末节首要介绍 NewInformer, NewShareInformerNewIndexInformer;

>> NewInformer
  • [3.3.5章节] 中,咱们介绍了EventHandler并演示了怎样运用NewInformer办法创立Informer。实际上,Informer会向咱们回来两个目标:StoreController。其间,Controller首要用于操控监听事情的循环进程,而Store目标实际上与之前所讲的内容相同,咱们能够直接从本地缓存中获取咱们所监听的资源。在这个进程中,咱们不需求忧虑数据的缺失或过错,由于Informer的监听机制能够确保数据的共同性。

  • 参阅示例

    ...
    ...
    func main () {
            cliset := config.NewK8sConfig().InitClient()
            // 获取configmap
            listWatcher := cache.NewListWatchFromClient(
                    cliset.CoreV1().RESTClient(),
                    "configmaps",
                    "default",
                    fields.Everything(),
            )
            // CmdHandler 和上述的 EventHandler (参阅 3.3.5)
            store, controller := cache.NewInformer(listWatcher, &v1.ConfigMap{}, 0, &CmdHandler{})
            // 开启一个goroutine 防止主线程阻塞
            go controller.Run(wait.NeverStop)
            // 等候3秒 同步缓存
            time.Sleep(3 * time.Second)
            // 从缓存中获取监听到的 configmap 资源
            fmt.Println(store.List())
    }
    // Output:
    // Add:  kube-root-ca.crt
    // Add:  istio-ca-root-cert
    // [... configmap 目标]
    
>> NewIndexInformer
  • 在 NewInformer 基础上接纳 Indexer, 注意这次咱们比如中把资源改变 Pod, 在 EventHandler 中的类型转换也要进行变成 Pod.
    import (
        "fmt"
        "k8s-clientset/config"
        "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/meta"
        "k8s.io/apimachinery/pkg/fields"
        "k8s.io/apimachinery/pkg/util/wait"
        "k8s.io/client-go/tools/cache"
        "time"
    )
    ...
    // LabelsIndexFunc 用作给出可检索的索引值
    func LabelsIndexFunc(obj interface{}) ([]string, error) {
            metaD, err := meta.Accessor(obj)
            if err != nil {
                    return []string{""}, fmt.Errorf("object has no meta: %v", err)
            }
            return []string{metaD.GetLabels()["app"]}, nil
    }
    func main () {
            cliset := config.NewK8sConfig().InitClient()
            // 获取configmap
            listWatcher := cache.NewListWatchFromClient(
                    cliset.CoreV1().RESTClient(),
                    "configmaps",
                    "default",
                    fields.Everything(),
            )
            // 创立索引其并指定姓名
            myIndexer := cache.Indexers{"app": LabelsIndexFunc}
            // CmdHandler 和上述的 EventHandler (参阅 3.3.5)
            i, c := cache.NewIndexerInformer(listWatcher, &v1.Pod{}, 0, &CmdHandler{}, myIndexer)
            // 开启一个goroutine 防止主线程阻塞
            go controller.Run(wait.NeverStop)
            // 等候3秒 同步缓存
            time.Sleep(3 * time.Second)
            // 经过 IndexStore 指定索引器获取咱们需求的索引值
            // busy-box 索引值是由于 我在某个 pod 上打了一个 label 为 app: busy-box
            objList, err := i.ByIndex("app", "busy-box")
            if err != nil {
                    panic(err)
            }
            fmt.Println(objList[0].(*v1.Pod).Name)
    }
    // Output:
    // Add:  cloud-enterprise-7f84df95bc-7vwxb
    // Add:  busy-box-6698d6dff6-jmwfs
    // busy-box-6698d6dff6-jmwfs
    // 
    
>> NewSharedInformer
  • Share Informer 和 Informer 的首要区别便是能够增加多个 EventHandler, 代码比较相似我就只展示重要的部分
    ...
    ...
    func main() {
            cliset := config.NewK8sConfig().InitClient()
            listWarcher := cache.NewListWatchFromClient(
                    cliset.CoreV1().RESTClient(),
                    "configmaps",
                    "default",
                    fields.Everything(),
            )
            // 全量同步时刻
            shareInformer := cache.NewSharedInformer(listWarcher, &v1.ConfigMap{}, 0)
            // 能够增加多个Event handler
            shareInformer.AddEventHandler(&handlers.CmdHandler{})
            shareInformer.AddEventHandler(&handlers.CmdHandler2{})
            shareInformer.Run(wait.NeverStop)
    }
    
  • 最后 NewSharedIndexInformerNewSharedInformer 的区别便是能够增加Indexer.

3.4.2 大集合才是硬道理

  • 在开发云原生应用或者进行多集群办理时,咱们一般需求监听更多的资源,甚至是一切可操作的资源。因而,咱们需求介绍一种愈加灵活的Informer创立办法——NewSharedInformerFactoryWithOptions。运用该办法能够创立一个Informer工厂目标,在该工厂目标发动前,咱们能够向其间增加恣意Kubernetes内置的资源以及恣意Indexer。 看代码演示:

    package main
    import (
            "fmt"
            "k8s-clientset/config"
            "k8s-clientset/dc/handlers"
            "k8s.io/apimachinery/pkg/labels"
            "k8s.io/apimachinery/pkg/runtime/schema"
            "k8s.io/apimachinery/pkg/util/wait"
            "k8s.io/client-go/informers"
    )
    func main() {
            cliset := config.NewK8sConfig().InitClient()
            informerFactory := informers.NewSharedInformerFactoryWithOptions(
                    cliset,
                    0,
                    // 指定的namespace 空间,假如需求一切空间,则不指定该参数
                    informers.WithNamespace("default"),
            )
            // 增加 ConfigMap 资源
            cmGVR := schema.GroupVersionResource{
                    Group:    "",
                    Version:  "v1",
                    Resource: "configmaps",
            }
            cmInformer, _ := informerFactory.ForResource(cmGVR)
            // 增加对 ConfigMap 事情的处理
            cmInformer.Informer().AddEventHandler(&handlers.CmdHandler{})
            // 增加 Pod 资源
            podGVR := schema.GroupVersionResource{
                    Group:    "",
                    Version:  "v1",
                    Resource: "pods",
            }
            _, _ = informerFactory.ForResource(podGVR)
            // 发动 informerFactory
            informerFactory.Start(wait.NeverStop)
            // 等候一切资源完成本地同步
            informerFactory.WaitForCacheSync(wait.NeverStop)
            // 打印资源信息
            listConfigMap, _ := informerFactory.Core().V1().ConfigMaps().Lister().List(labels.Everything())
            fmt.Println("Configmap:")
            for _, obj := range listConfigMap {
                    fmt.Printf("%s/%s \n", obj.Namespace, obj.Name)
            }
            fmt.Println("Pod:")
            listPod, _ := informerFactory.Core().V1().Pods().Lister().List(labels.Everything())
            for _, obj := range listPod {
                    fmt.Printf("%s/%s \n", obj.Namespace, obj.Name)
            }
            select {}
    }
    // Ouput:
    // Configmap:
    // default/istio-ca-root-cert 
    // default/kube-root-ca.crt 
    // default/my-config 
    // Pod:
    // default/cloud-enterprise-7f84df95bc-csdqp 
    // default/busy-box-6698d6dff6-42trb 
    
  • 假如想监听一切可操作的内部资源, 能够运用 DiscoveryClient 去获取当时集群的资源版别再调用 InformerFactory 进行资源缓存;

3.5 埋点坑

  • Informer 获取的资源目标会丢失的 Kind 和 Version, 该怎样处理?

  • Informer在经过信号停止后, 它却没有整理已占用的缓存, 该怎样在不重启的情况下整理膨胀的缓存 ?

  • 点赞、重视➕、留言. 下篇文章见

4.0 写在最后

  • 关于 ChatGPT 的一丢丢不成熟的感触;
  • 技术服务于产品, 产品服务于人道, ChatGPT 让全国际都 wow 了一声, 不是它写文章多凶猛, 不是它会写代码多凶猛, 而是它通“人道”了;
  • 维特根斯坦: “语言的边界,便是我的国际的边界”; 而 ChatGPT 正用它的 “语言” 来描绘国际;