Reflector

写在前面

本文将围绕着Informer架构中的Reflector进行具体解说。需求读者掌握client-go底子运用。源码摘录部分只摘录了笔者以为比较中心的部分,感爱好的小伙伴能够阅览源代码,以加深对组件的掌握。

由于是对源码进行解析,一千个人心中有一千种哈姆雷特,假如有哪里写的不对,意思不精确,欢迎咱们纠正。至此感谢。

本文依据以下环境

go1.20

k8s.io/api v0.27.1

k8s.io/apimachinery v0.27.1

k8s.io/client-go v0.27.1

1.引子

咱们先来看一张图:

Client-go学习--Informer中的Reflector

咱们对k8s进行二次开发时,informer组件是最为中心的组件,它涵盖了从k8sApiServer获取资源,将资源经过Queue中进行缓存,在store中进行存放,经过Processor中对应的Eventhandle进行处理。持续上一篇内容,本篇咱们将对Reflector源码进行探究,研究一下在Informer中,是怎么获取k8s资源的?获取了资源又是怎么进行处理的?

2.Reflector

Reflector组件坐落client-go/tools/cache包中,该组件首要是围绕着怎么获取k8s资源,以及怎么对后续k8s资源的变更进行监听。本质上还是经过咱们上篇讲的List/Watch进行网络恳求获取资源。假如有不熟悉的朋友能够看看我上一篇文章中对k8sAPI中的知识铺垫。

2.1 一个小栗子

咱们先测验原生运用Reflector进行获取k8s资源。代码如下:

import (
	"fmt"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/tools/cache"
	"kube/kubecli"
)
const podResource = "pods"
func ReflectorRun() {
	// 这里是我对client-go中的客户端衔接进行了一个封装
    // 你们能够直接运用原生获取client
	cliset := kubecli.DefaultKubeClient().ClientSet()
	// 1.create store
	store := cache.NewStore(func(obj interface{}) (string, error) {
		pod := obj.(*corev1.Pod)
		return pod.Name, nil
	})
	// 2.create watch
	lwc := cache.NewListWatchFromClient(
		cliset.CoreV1().RESTClient(),
		podResource,
		corev1.NamespaceDefault,
		fields.Everything(),
	)
	// 3.create delta FIFO queue
	df := cache.NewDeltaFIFOWithOptions(
		cache.DeltaFIFOOptions{
			KeyFunction:  cache.MetaNamespaceKeyFunc,
			KnownObjects: store,
		},
	)
	// 4.create reflector
	// reflector 会将其存入df的store中
	reflector := cache.NewReflectorWithOptions(
		lwc,
		&corev1.Pod{},
		df,
		cache.ReflectorOptions{},
	)
	go func() {
        // reflector运行
		reflector.Run(wait.NeverStop)
	}()
	for {
		df.Pop(func(obj interface{}, isInInitialList bool) error {
			deltas := obj.(cache.Deltas)
			for _, delta := range deltas {
				pod := delta.Object.(*corev1.Pod)
				switch delta.Type {
				case cache.Sync:
					fmt.Printf("enventType: %+v  podName: %+v \n", delta.Type, pod.Name)
				case cache.Added:
					fmt.Printf("enventType: %+v  podName: %+v \n", delta.Type, pod.Name)
				case cache.Updated:
					fmt.Printf("enventType: %+v  podName: %+v \n", delta.Type, pod.Name)
				}
			}
			return nil
		})
	}
}

以上一共有几个很关键的组件:

  • Store: 能够对Reflector获取的资源进行存储
  • ListWatch: 对ApiServer建议List/Watch恳求的实践完成
  • DelataFIFO: 相似于缓存的FIFO队列,能够对list的全量同步资源和watch的增量同步资源进行缓存,而且能够对同一个资源的不同状态进行追踪。

这些组件后续我会经过文章具体解说其源码和完成原理细节。本章仅对Reflector进行解说

咱们运行一下上述代码能够看到输出,在代码刚启动时,能够看到enventType都是Sync/Replaced,这是由于Reflector正在进行第一次的全量更新(也能够经过emitDeltaTypeReplaced字段设置,一切有些会输出Replaced)。假如后续对一个资源进行更新(删除/新增/更新)都能够获取对应的输出。

enventType: Sync  podName: first-trigger-pipelinerunwbps6-pipeline-first-trigger-task-pod
enventType: Sync  podName: webhook-test 
enventType: Sync  podName: first-trigger-pipelinerunl675b-pipeline-first-trigger-task-pod 
enventType: Sync  podName: el-first-trigger-listener-5fdc66645c-747qt 
enventType: Sync  podName: nginx-cluster-db7d5495d-mvdgd 
enventType: Sync  podName: nginx-cluster-db7d5495d-tjrqw 
enventType: Sync  podName: first-trigger-pipelinerunppnz6-pipeline-first-trigger-task-pod 
enventType: Sync  podName: first-trigger-pipelineruncrs9c-pipeline-first-trigger-task-pod 
enventType: Sync  podName: nginx-cluster-db7d5495d-hp5gj 

2.2 Reflector Struct

这一个末节咱们看一下Reflector结构,以下首要展现最常用,重要的字段,感爱好的小伙伴能够检查源码进行深入了解学习。

type Reflector struct {
	// Name标识Reflector
	name string
	// 咱们希望放置在store中的类型的名称。假如供给,名称将是expectedGVK的字符串化,不然将是expectedType的字符串化。
	// 它仅用于显现,不应用于解析或比较。
	// gvk的字符串化
	typeDescription string
	// 咱们希望放置在存储库中的类型的示例目标。 例如: &corev1.Pod{}
	expectedType reflect.Type
	// 假如对错结构化的,咱们希望放在`store`中的目标的GVK。
	expectedGVK *schema.GroupVersionKind
	// watch的指定存储`store`
	store Store
	// 用于履行list和watch接口
    // 能够传入自定义的完成
	listerWatcher ListerWatcher
	// 从头同步的周期时刻
	resyncPeriod time.Duration
	// paginatedResult定义是否应该对列表调用强制分页。
	// 它是依据初始列表调用的成果设置的。
	paginatedResult bool
	// lastSyncResourceVersion是与底层存储同步时最终观察到的资源版别令牌,
	// 它是线程安全的,但不与底层存储同步 -> 底层存储同步必定情况下,会慢于server端资源
	// 存储的是最终一次watch的资源令牌
	// 相似etcd的revision
	lastSyncResourceVersion string
	// 假如运用lastSyncResourceVersion的前一个list或watch恳求失利并呈现“过期”或“资源版别太大”过错。
	// 则isLastSyncResourceVersionUnavailable为true。
	// 资源不行获取的标志
	isLastSyncResourceVersionUnavailable bool
	// lastSyncResourceVersionMutex维护对lastSyncResourceVersion的读写访问
	// 用于对lastSyncResourceVersion令牌的读写锁
	lastSyncResourceVersionMutex sync.RWMutex
	// ListAndWatch衔接过错时候调用
	watchErrorHandler WatchErrorHandler
    ...
    ...
}

在这些字段中,其间lastSyncResourceVersionlisterWatcher相互调配获取k8s资源。与咱们经过HTTP恳求获取资源相似。

2.3 Reflector New

咱们看一下创立Reflector的过程,在这个func中,咱们对Reflector的属性进行设置,其间比较关键几个点,ListWatch对指定资源进行全量同步和增量同步,expectedType是为了后续watch增量同步进行比对,检查watch的资源和expectedType是否匹配,expectedGVK用于判别watch资源的GVK是否匹配。

// 以下摘取部分代码
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
  	... 
    // watch反常处理器
	if options.WatchErrorHandler == nil {
		options.WatchErrorHandler = DefaultWatchErrorHandler
	}
	// 创立一个新的Reflector
	r := &Reflector{
		name:            options.Name,
		resyncPeriod:    options.ResyncPeriod,
		typeDescription: options.TypeDescription,
		// listWatch目标 (底层用于检测运用)
		listerWatcher: lw,
		// 存储目标 检测完的数据进行存储
		store: store,
		// 运用默许的WatchErrorHandle
		watchErrorHandler: options.WatchErrorHandler,
		// 依据反射获取源类型
		expectedType: reflect.TypeOf(expectedType),
        .......
	}
	// 判别 type的GVK
        // 这里是经过断语成unstructured
        // 从而获取内嵌的GVK
	if r.expectedGVK == nil {
		r.expectedGVK = getExpectedGVKFromObject(expectedType)
	}
	return r
}

2.4 Reflector Run

这末节咱们看一下Reflector的进口。咱们在2.1的例子中也看到,Run办法实践上是传入一个操控中止的chan。这个stopCh非常关键,贯穿了整个Reflector内部组件,能够用于操控内部的同步/watch/sync等。Run办法中经过一个BackoffUntil,其作用是: 假如产生过错则进行回退 等待再进行调用,依据指数回退算法,意味着每次重试时,时刻间隔会逐渐添加,以避免过度频繁地测验。直到该函数回来一个成功的成果或达到最大重试次数。避免失利无约束的重试,导致不断耗费Server的资源。

// Run重复运用reflector的listanWatch来获取一切目标和后续增量。
// 当stopCh封闭时,Run将退出。
func (r *Reflector) Run(stopCh <-chan struct{}) {
	// 假如产生过错则进行回退 等待再进行调用
	// 依据指数回退算法,意味着每次重试时,时刻间隔会逐渐添加,以避免过度频繁地测验。
	// 直到该函数回来一个成功的成果或达到最大重试次数。
	wait.BackoffUntil(func() {
		// 调用list and watch
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
}

2.5 Reflector ListAndWatch

ListAndWatch办法涵盖了Relfector的首要逻辑函数:

  • ⭐ 首要是进行全量更新,判别是经过流获取还是块获取。(本文解说块办法获取)

  • ⭐ 然后是敞开一个新协程将全量更新的资源,同步究竟层的store

  • ⭐ 最终经过loop进行监测watch指定的资源后续更新。

// listwatch首要列出一切项目,并在调用时获取资源版别`resourceVersion` ,然后运用资源版别`resourceVersion`进行监督。
// 假如listwatch甚至没有测验初始化watch,它将回来过错。
// 假定client第一次获取到 resourceVersion = 20000
// 那么后续watch将会依据这个resourceVersion版别进行watch
// 其间: stopCh 是一个很重要的操控通道,用于操控大部分组件的生命
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    ...
	var err error
	var w watch.Interface
	// 是否运用流检测
	fallbackToList := !r.UseWatchList
	// 运用`流`获取
	if r.UseWatchList {
        ...
		}
	}
	// 运用一般的`块`获取 内部敞开了协程作为第一次的全量获取(List)
	if fallbackToList {
		err = r.list(stopCh)
		if err != nil {
			return err
		}
	}
	// 从头同步过错chan
	resyncerrc := make(chan error, 1)
	// 取消任务chan
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	// ⭐: store同步办法
	go r.startResync(stopCh, cancelCh, resyncerrc)
	// ⭐: 中心二 `watch`办法
	return r.watch(w, stopCh, resyncerrc)
}

3. Reflector Core

本末节解析Reflector最为中心的办法,由于这部分称得上是Reflector最精华的部分,所以我会拆分红末节进行解说。

3.1 List

咱们先来看看List全量更新,究竟底层是怎么操作的。先来看看上半部分。

首要经过一个新协程进行处理list,经过构建一个分页器,进行对应分页的处理。然后经过pager进行调用真实的list恳求。此时会判别,假如资源超时或许410过错,则会从头建议一次获取最新资源的恳求。随后封闭listChchan。让外部的select通道监听封闭信号。持续往下履行

	// resourceVersion版别
	var resourceVersion string
	// 初始化设置版别为 "0" 获取任何版别的ListOptions
	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
	var list runtime.Object
	var paginatedResult bool
	// 此函数中共用err
	var err error
	// 用于承受完毕分页的信号chan
	listCh := make(chan struct{}, 1)
	// 用于承受 产生过错的信号chan
	panicCh := make(chan interface{}, 1)
	// 独自敞开一个协程去获取list
	go func() {
		defer func() {
			// 产生过错则放入 panicChan
			if r := recover(); r != nil {
				panicCh <- r
			}
		}()
		// 测验收集列表块,假如listwatcher支撑, (实践上就是相似分页查询)
		// 假如不支撑,第一个列表恳求将回来完好的呼应。
		// 构造一个分页器
		pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
			// 回来lw的listWatch的list
            // 这个list函数是进行实践建议恳求的函数
			return r.listerWatcher.List(opts)
		}))
		switch {
		// 检查是否敞开分块
		case r.WatchListPageSize != 0:
			pager.PageSize = r.WatchListPageSize
		// 检查是否强制敞开分块
		case r.paginatedResult:
		// 证明敞开了检测指定版别
		case options.ResourceVersion != "" && options.ResourceVersion != "0":
			// 用户没有显式恳求分页。
			// 运用ResourceVersion != "",咱们有或许从`watch`缓存中列出列表,但咱们只在Limit未设置时才这样做(对于ResourceVersion != "0")。
			// pager中默许设置为500
			pager.PageSize = 0
		}
        // ⭐: 真实建议恳求,调用了pager中的list函数,
		list, paginatedResult, err = pager.List(context.Background(), options)
		// 判别是否`资源过期过错` 或许 `资源过大过错`
		if isExpiredError(err) || isTooLargeResourceVersionError(err) {
			// 设置同步资源标志为不行用
			r.setIsLastSyncResourceVersionUnavailable(true)
			// 假如用于列出的资源版别不行用请立即重试。假如分页列表调用由于延续页上的“Expired”过错而失利,
			// 或许缓存或许没有与供给的资源版别同步。
			// 因此,咱们需求回来到resourceVersion=""获取最新版别资源来康复。
			// 从头建议一次恳求
			list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
		}
		close(listCh)
	}()
	// 检测信号
	// 1.外部中止信号
	// 2.panic过错
	// 3.list获取完毕
	select {
	case <-stopCh:
		return nil
	case r := <-panicCh:
		panic(r)
	case <-listCh:
	}

然后咱们持续看下半部分。下半部分相对简单一些,中心就是将上半部分获取的list数据进行提取元数据,获取resourceVersion(供后续watch运用),提取资源list,调用Refelctor的syncWith(),将其同步到store的完成类中。


	// 上半部分的err不为空直接回来
	if err != nil {
		klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
		return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
	}
	...
	// 设置成可用可获取
	r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
	// 获取整个list的元数据
	listMetaInterface, err := meta.ListAccessor(list)
	if err != nil {
		return fmt.Errorf("unable to understand list result %#v: %v", list, err)
	}
	// 获取list的 资源版别(resourceVersion)
	resourceVersion = listMetaInterface.GetResourceVersion()
	// 提取列表item项
	items, err := meta.ExtractList(list)
	if err != nil {
		return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
	}
	// 这一步是进行同步功用 同步到store的完成中
	// 同步时,会将items项和resourceVersion同步传入
	// 第一次全量同步就会进行信息的同步 假如敞开了 resync 则会进行持续性同步
	if err := r.syncWith(items, resourceVersion); err != nil {
		return fmt.Errorf("unable to sync list result: %v", err)
	}
	// 设置上一次获取之后的资源版别 `revision`/`resourceVersion`
	r.setLastSyncResourceVersion(resourceVersion)
	return nil

3.2 Watch

持续来看一下watch函数中,首要逻辑就是经过resourceVersion来调用watch函数,拿到w之后传入watchHandler,在watchHandler中具体处理后续监听到的event事情。

// watch simply starts a watch request with the server.
// Watch仅仅向服务器启动一个监督恳求。
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
	var err error
	... 
	for {
		// 给stopCh一个中止循环的时机,即便在continue句子出错的情况下也是如此
		select {
		// 该stop由Run办法履行
		// 咱们能够理解成 假如Run办法需求完毕 则 watch也同步进行完毕
		case <-stopCh:
			return nil
		default:
		}
		// w == nil 证明是运用一般的block办法获取list
		if w == nil {
			// 设置随机时刻
			timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
			// 3大特征
			// resourceVersion/TimeoutSeconds/Bookmarks
			options := metav1.ListOptions{
				// 这里获取的是上一个list恳求中的resourceVersion版别
				ResourceVersion: r.LastSyncResourceVersion(),
				// 咱们要避免挂起watch的情况。中止在`超时窗口`内未接收任何事情的任何监督程序。
				// 经过运用上述设置的随机时刻,避免在该时刻内未接遭到信息 导致watch空悬
				TimeoutSeconds: &timeoutSeconds,
				// 为了在watch重启时削减kube-apisserver上的负载,
				// 您能够启用`watch书签`。Reflector底子不假定回来书签(假如服务器不支撑watch书签,它将疏忽此字段)。
				// 默许敞开书签
				AllowWatchBookmarks: true,
			}
			// 调用watch函数 建议watch操作
			w, err = r.listerWatcher.Watch(options)
			if err != nil {
				// 判别是否是watch过错`err`重试
				if canRetry := isWatchErrorRetriable(err); canRetry {
					klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err)
					// 判别stop或进行回退
					select {
					case <-stopCh:
						return nil
					case <-r.initConnBackoffManager.Backoff().C():
						continue
					}
				}
				return err
			}
		}
        // ⭐: 中心watch函数 
		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK,
			r.name, r.typeDescription, r.setLastSyncResourceVersion, nil,
			r.clock, resyncerrc, stopCh)
		// Ensure that watch will not be reused across iterations.
		// 确保watch不会跨迭代重用。
		// 中止stop
		// watchHandle产生过错 或许stopCh接遭到中止的信号
		w.Stop()
		// 将w置为nil
		w = nil
		// 接下来就是进行过错判别
		.......
	}
}

3.3 watchHandler

此函数是watch的中心逻辑函数,能够看到,传入的字段都特别多。以下只对中心的流程进行摘取,如有爱好,能够去看一下具体的源码流程。该函数大体逻辑是: 首要是经过传入的w,从w的resultCh中获取资源,然后对其资源进行匹配,看看该资源与expectedType,expectedType是否相符合。然后经过判别该event的事情(Add/Modified/Delete),将调用对应的store完成办法。随后设置resourceVersion。

// watchHandler watches w and sets setLastSyncResourceVersion
// watchHandler 监督w并设置setLastSyncResourceVersion
// *中心办法3*
// 用于获取事情并传入咱们自定义的store完成中
func watchHandler(
	start time.Time,
	w watch.Interface,
	store Store,
	expectedType reflect.Type,
	expectedType *schema.GroupVersionKind,
	name string,
	expectedTypeName string,
	setLastSyncResourceVersion func(string),
	exitOnInitialEventsEndBookmark *bool,
	clock clock.Clock,
	errc chan error,
	stopCh <-chan struct{},
) error {
	// eventCount 记录事情数量
	eventCount := 0
	// 中心逻辑流程
loop:
	for {
		// 会卡在这里等待信号进入 并不会无限循环下去
		select {
		// 接遭到中止信号 回来中止err `errors.New("stop requested")`
		case <-stopCh:
			return errorStopRequested
		// 接遭到resyncerrc,中止回来`resyncerrc`
		case err := <-errc:
			return err
		// 从watch的通道中承受Result事情`event`
		case event, ok := <-w.ResultChan():
			if !ok {
				// 跳转loop 从头循环承受
				break loop
			}
			// watch的产生过错 将目标包装后回来
			if event.Type == watch.Error {
				return apierrors.FromObject(event.Object)
			}
			// 以下分支从几个方面进行判别
			// 判别目标类型
			if expectedType != nil {
				// 并不是希望`expectType`
				// 既 传入的type于watch的type不符
				// 而且 获取type可经过反射进行获取
				if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
					continue
				}
			}
			// 判别gvk
			if expectedGVK != nil {
				// 判别GVK是否持平
				if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
					continue
				}
			}
			// 提取metadata数据
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
				continue
			}
			// 获取该目标的 resourceVersion
			resourceVersion := meta.GetResourceVersion()
			// 开端依据eventType进行分发信息
			switch event.Type {
			// ADD 
			case watch.Added:
				err := store.Add(event.Object)
				...
			// Modified 
			case watch.Modified:
				err := store.Update(event.Object)
				...
			// Deleted
			case watch.Deleted:
				err := store.Delete(event.Object)
				...
            // Bookmark
			case watch.Bookmark:
				// A `Bookmark` means watch has synced here, just update the resourceVersion
				// 一个“Bookmark”意味着watch现已同步在这里,只需更新资源版别
				// 携带书签
				if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
					if exitOnInitialEventsEndBookmark != nil {
						*exitOnInitialEventsEndBookmark = true
					}
				}
			// 不属于任何类型 报错
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
			}
			// 从头设置 `resourceVersion`
			// 将新的 `resourceVersion` 从头设置进reflector中
			setLastSyncResourceVersion(resourceVersion)
			// 调用`store`UpdateResourceVersion 将新的resourceVersion设置进去
			if rvu, ok := store.(ResourceVersionUpdater); ok {
				rvu.UpdateResourceVersion(resourceVersion)
			}
			// eventCount计数器+1
			eventCount++
			// 判别书签
			if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
				watchDuration := clock.Since(start)
				klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
				return nil
			}
		}
	}
	watchDuration := clock.Since(start)
	// 假如watch耗时小于1s 和 eventCount事情计数 == 0
	// 则进行报错
	if watchDuration < 1*time.Second && eventCount == 0 {
		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
	}
	klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
	return nil
}

3.4 小总结

至此,咱们经过Reflector中心办法大概了解了其怎么获取k8s资源。首要经过将List()函数传入分页器中,经过分页器封装进行调用,获取到对应资源的list,随后将其同步到store中完满足量更新。然后调用watch,经过循环,从watch的resultCh中不断监听进行获取指定资源,经过资源类型判别后再依据事情类型调用对应的store办法,完成增量更新。

4.Other

4.1 Pager

在List流程中,咱们看到其运用了Pager封装list。咱们能够看一下pager的源码,看看其是怎么封装咱们外部传入的ListWatcher。

// ⭐: ListPageFunc 回来给定list options的list Obj
// 传入ctx,options 获取的runtime.Object为list obj目标
type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error)
// SimplePageFunc 将一个无上下文的列表函数改编为一个承受上下文的列表函数。
func SimplePageFunc(fn func(opts metav1.ListOptions) (runtime.Object, error)) ListPageFunc {
	return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
		return fn(opts)
	}
}
// ListPager协助客户端代码将 `大型列表查询` 分解成多个PageSize或更小的块。
// ⭐: PageFn应该承受metav1.ListOptions。ListOptions支撑`分页`并回来一个列表。
// ⭐: 分页器不会更改初始选项列表上的字段或标签选择器。
type ListPager struct {
	// 块巨细
	PageSize int64
	// 分页Fn函数
	PageFn ListPageFunc
    // 假如资源超时,是否获取最新悉数list(注: 这里是默许敞开的)
	FullListIfExpired bool
	// Number of pages to buffer
	// 页面缓冲的巨细
	PageBufferSize int32
}

咱们经过看到其New()函数,能够看到默许现已装备了对应pageSize和FullListIfExpired。咱们只需传入SimplePageFunc封装实践list后的函数即可。

func New(fn ListPageFunc) *ListPager {
	return &ListPager{
		PageSize: defaultPageSize,
		PageFn:   fn,
		// 假如超时,默许获取悉数的资源List
		FullListIfExpired: true,
		PageBufferSize:    defaultPageBufferSize,
	}
}

再来看看pager中的List。咱们在Reflector中传入了ListWatch的list完成,在此函数中,经过一个循环持续的运用分块获取的办法进行获取资源。假如产生资源超时过错,则直接获取资源的悉数list。假如一次获取不完,则经过设置continue token从头进行循环获取。假如一次能获取完,则直接能够回来。

// List回来单个列表目标,但会测验从服务器`检索较小的块`,以削减对服务器的影响。
// 假如块测验失利,它将加载完好列表。选项中的约束字段,假如未设置,将默许为页面巨细。
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
	if options.Limit == 0 {
		// 假如没有设置则用默许的500 避免导致一次获取完好list导致耗费大量服务器性能
		options.Limit = p.PageSize
	}
	// 第一次是不会设置 resourceVersion 默许获取最新的
	requestedResourceVersion := options.ResourceVersion
	// 设置resourceVersion Match规则
	requestedResourceVersionMatch := options.ResourceVersionMatch
	// 这里的list是为了装循环屡次获取的list
	// 假如仅仅循环一次,则不需求用到
	var list *metainternalversion.List
	// 标志分页成果是否成功
	paginatedResult := false
	// 中心的loop
	for {
		// ctx超时操控
		select {
		case <-ctx.Done():
			return nil, paginatedResult, ctx.Err()
		default:
		}
        // ⭐: 履行传入的PageFn,实践的List恳求
		obj, err := p.PageFn(ctx, options)
		if err != nil {
			// ⭐: 只有在回来“Expired”(资源版别过老)过错时才回退到完好列表,FullListIfExpired为true,
			// 而且“Expired”过错产生在第2页或之后(由于完好列表旨在避免分页)。
			// 当由第一个页面恳求树立的资源版别在后续列表恳求期间退出紧缩时,列表不会失利)。
			if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
				// 假如不是资源过期过错
				return nil, paginatedResult, err
			}
			// list在咱们处理资源过期时,在恳求的ResourceVersion处退回到完好列表。
			// 这里是处理缓存资源过期过错 直接进行悉数获取
			options.Limit = 0
			options.Continue = ""
			options.ResourceVersion = requestedResourceVersion
			options.ResourceVersionMatch = requestedResourceVersionMatch
			// 再次直接建议完好的list恳求
			result, err := p.PageFn(ctx, options)
			// 获取数据后直接回来
			return result, paginatedResult, err
		}
		// 获取list元数据
		m, err := meta.ListAccessor(obj)
		if err != nil {
			return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
		}
		// 假如没有处理任何页面,则提早退出并回来得到的目标
		// ⭐: 后续没有需求获取的了 则直接进行退出回来
		if len(m.GetContinue()) == 0 && list == nil {
			// 证明第一次进来就能获取完 能够直接提早退出
			return obj, paginatedResult, nil
		}
		// ⭐: 以下逻辑代表一次获取不完
		// initialize the list and fill its contents
		// 初始化列表并填充其内容
		if list == nil {
			list = &metainternalversion.List{Items: make([]runtime.Object, 0, options.Limit+1)}
			list.ResourceVersion = m.GetResourceVersion()
			list.SelfLink = m.GetSelfLink()
		}
		if err := meta.EachListItem(obj, func(obj runtime.Object) error {
			// ⭐: 将每个item追加到list中
			list.Items = append(list.Items, obj)
			return nil
		}); err != nil {
			return nil, paginatedResult, err
		}
		// if we have no more items, return the list
		// 此时再进行判别 假如没有更多的项,则回来列表 (证明或许现已走到第二次了)
		if len(m.GetContinue()) == 0 {
			return list, paginatedResult, nil
		}
		// ⭐: 这里证明还有下一个continue 还能够持续进行获取
		// 设置下一个循环
		// 将continue设置给下一次进行获取
		// set the next loop up
		options.Continue = m.GetContinue()
		// 依据continue即可 不需求再指定version和version match
		// ⭐原因: 在运用continue时不允许指定资源版别
		options.ResourceVersion = ""
		options.ResourceVersionMatch = ""
		// At this point, result is already paginated.
		// 此时,成果现已分页。
		paginatedResult = true
	}
}

4.2 小试牛刀

依据以上的学习,相信咱们对Reflector履行流程有一个大概了解。这节咱们测验本地模仿测试

这里依据NewReflector试验对应的接口

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	// 实践底层还是调用NewReflectorWithOptions 仅仅简略版...
	return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
}

完成如下:

// ⭐: 完成ListerWatcher
type mockListWatch struct {
}
// ⭐: 完成watch中的watch.Interface
type mockWatchInterface struct {
}
// ⭐: 完成store
type mockStore struct {
}
func (m *mockWatchInterface) Stop() {
}
func (m *mockWatchInterface) ResultChan() <-chan watch.Event {
	return eventCh
}
func (m *mockStore) Add(obj interface{}) error {
	pod := obj.(*corev1.Pod)
	fmt.Printf("nowTime: %v Watch podName: %+v\n", time.Now().Format(time.DateTime), pod.Name)
	return nil
}
func (m *mockStore) Update(obj interface{}) error {
	return nil
}
func (m *mockStore) Delete(obj interface{}) error {
	return nil
}
func (m *mockStore) List() []interface{} {
	return nil
}
func (m *mockStore) ListKeys() []string {
	return nil
}
func (m *mockStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
	return nil, false, nil
}
func (m *mockStore) GetByKey(key string) (item interface{}, exists bool, err error) {
	return nil, false, nil
}
func (m *mockStore) Resync() error {
	return nil
}
// 咱们完成store即可
func (m *mockStore) Replace(i []interface{}, s string) error {
	println("NowTime: ", time.Now().Format(time.DateTime))
	for _, item := range i {
		pod := item.(*corev1.Pod)
		fmt.Printf("Replace PodName: %+v\n", pod.Name)
	}
	return nil
}
func (m *mockListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
	return podList, nil
}
func (m *mockListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
	return &mockWatchInterface{}, nil
}

咱们准备一个有3个pod的podList(模仿全量更新),1个pod(模仿watch增量更新),eventCh用于发送对应的Pod。

var (
	pod1 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Labels: map[string]string{"CI.io": "dev"}}, Spec: corev1.PodSpec{NodeName: "node-1"}}
	pod2 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Labels: map[string]string{"CI.io": "dev"}}}
	pod3 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Labels: map[string]string{"CI.io": "prod"}}}
	pod4 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "four", Annotations: map[string]string{"abc": "edf"}}}
	podList = &corev1.PodList{
		TypeMeta: metav1.TypeMeta{
			Kind:       "Pod",
			APIVersion: "v1",
		},
		ListMeta: metav1.ListMeta{
			ResourceVersion: "123456",
		},
		Items: []corev1.Pod{*pod1, *pod2, *pod3},
	}
	eventCh = make(chan watch.Event)
)

进口代码:

func TestMockReflector(t *testing.T) {
	mlw := &mockListWatch{}
	store := &mockStore{}
	reflector := NewReflector(mlw, &corev1.Pod{}, store, time.Duration(0))
	go func() {
		reflector.Run(wait.NeverStop)
	}()
    // 模仿3s后新增新的pod
	go func() {
		// after sleep 3s send new pod
		time.Sleep(time.Second * 3)
		eventCh <- watch.Event{
			Type:   watch.Added,
			Object: pod4,
		}
	}()
	time.Sleep(time.Minute)
}

履行输出:

//NowTime:  2023-05-27 16:54:06  Replace:
//PodName: one
//PodName: two
//PodName: tre
//nowTime: 2023-05-27 16:54:09 Watch podName: four

4.3 FullCode

package cache
import (
	"fmt"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/apimachinery/pkg/watch"
	"testing"
	"time"
)
var (
	pod1 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Labels: map[string]string{"CI.io": "dev"}}, Spec: corev1.PodSpec{NodeName: "node-1"}}
	pod2 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Labels: map[string]string{"CI.io": "dev"}}}
	pod3 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Labels: map[string]string{"CI.io": "prod"}}}
	pod4 = &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "four", Annotations: map[string]string{"abc": "edf"}}}
	podList = &corev1.PodList{
		TypeMeta: metav1.TypeMeta{
			Kind:       "Pod",
			APIVersion: "v1",
		},
		ListMeta: metav1.ListMeta{
			ResourceVersion: "123456",
		},
		Items: []corev1.Pod{*pod1, *pod2, *pod3},
	}
	eventCh = make(chan watch.Event)
)
type mockListWatch struct {
}
type mockWatchInterface struct {
}
func (m *mockWatchInterface) Stop() {
}
func (m *mockWatchInterface) ResultChan() <-chan watch.Event {
	return eventCh
}
type mockStore struct {
}
func (m *mockStore) Add(obj interface{}) error {
	pod := obj.(*corev1.Pod)
	fmt.Printf("nowTime: %v Watch podName: %+v\n", time.Now().Format(time.DateTime), pod.Name)
	return nil
}
func (m *mockStore) Update(obj interface{}) error {
	return nil
}
func (m *mockStore) Delete(obj interface{}) error {
	return nil
}
func (m *mockStore) List() []interface{} {
	return nil
}
func (m *mockStore) ListKeys() []string {
	return nil
}
func (m *mockStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
	return nil, false, nil
}
func (m *mockStore) GetByKey(key string) (item interface{}, exists bool, err error) {
	return nil, false, nil
}
func (m *mockStore) Resync() error {
	return nil
}
// 咱们完成store即可
func (m *mockStore) Replace(i []interface{}, s string) error {
	println("NowTime: ", time.Now().Format(time.DateTime))
	for _, item := range i {
		pod := item.(*corev1.Pod)
		fmt.Printf("Replace PodName: %+v\n", pod.Name)
	}
	return nil
}
func (m *mockListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
	return podList, nil
}
func (m *mockListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
	return &mockWatchInterface{}, nil
}
func TestMockReflector(t *testing.T) {
	mlw := &mockListWatch{}
	store := &mockStore{}
	reflector := NewReflector(mlw, &corev1.Pod{}, store, time.Duration(0))
	go func() {
		reflector.Run(wait.NeverStop)
	}()
	go func() {
		// after sleep 3s send new pod
		time.Sleep(time.Second * 3)
		eventCh <- watch.Event{
			Type:   watch.Added,
			Object: pod4,
		}
	}()
	time.Sleep(time.Minute)
}

写在最终

本文解说了Reflector内部逻辑以及底子功用,下一篇将对store进行剖析,探究一下store是怎么存储Reflector获取的资源?

参考

ShadowYD: [K8S] client-go 的正确打开办法

kubernetesAPI