k8s源码阅读:Informer源码解析

news/2024/7/27 11:06:53/文章来源:https://blog.csdn.net/weixin_38757398/article/details/135616831

写在之前

Kubernetes的Informer机制是一种用于监控资源对象变化的机制。它提供了一种简化开发者编写控制器的方式,允许控制器能够及时感知并响应 Kubernetes 集群中资源对象的变化。Informer通过与Kubernetes API服务器进行交互,通过监听API服务器上资源对象的修改事件来实现实时的资源对象状态更新。当一个资源对象被创建、更新或删除时,Informer会收到相应的通知,并在内部维护一个缓存用于存储最新的资源对象状态。同时,Informer还能够为特定类型的资源对象设置过滤器以进行更精细的事件监听。使用Informer机制,开发者可以通过注册事件处理函数来定义对资源对象变化的响应逻辑,从而实现自定义的控制器逻辑。在运行时,Informer会定期向API服务器发起请求以获取最新的资源对象的状态,从而保证缓存中的数据与实际集群的状态保持同步。话不多说,直接进入到源码的解析环节。

1.源码阅读环境搭建

informer的源码是在client-go工程中的,需要从仓库clone完整的代码到本地。首先在GoPath路径下创建文件夹k8s.io,进入到该文件夹下,执行以下步骤。

git clone https://github.com/kubernetes/client-go.git

源码的阅读不依赖环境和操作系统,但是需要提供一个k8s集群的环境,提前将集群的kubeconfig保存在本地,以供后文中调试源码使用。

值得注意的是,笔者看的是master分支的代码,其中的go mod依赖的golang版本是1.21,建议大家选择适合自己的版本分支阅读,否则可能会出现编译报错的情况。

2. 一个informer的简单用例

将下面的代码放到kubernetes_test目录下,以方便调试源码

package kubernetesimport ("fmt""k8s.io/apimachinery/pkg/labels""k8s.io/client-go/informers""k8s.io/client-go/kubernetes""k8s.io/client-go/tools/cache""k8s.io/client-go/tools/clientcmd""log""testing""time"metav1 "k8s.io/api/apps/v1"
)func getKubernetesClient(kubeconfig string) (*kubernetes.Clientset, error) {// 配置文件路径,根据你的实际情况修改// 使用指定的 kubeconfig 文件创建一个 Config 对象config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)if err != nil {return nil, err}//config.Insecure = true// 创建客户端集clientset, err := kubernetes.NewForConfig(config)if err != nil {return nil, err}return clientset, nil
}// 展示informer的核心用法
func TestInformer(t *testing.T) {client, err := getKubernetesClient("XXX这里是集群kubeconfig")if err != nil {log.Fatal(err.Error())}// 初始化 informer factory(为了测试方便这里设置每30s重新 List 一次)informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)// 对 Deployment 监听deployInformer := informerFactory.Apps().V1().Deployments()// 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源)informer := deployInformer.Informer()// 创建 ListerdeployLister := deployInformer.Lister()// 注册事件处理程序informer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    onAdd,UpdateFunc: onUpdate,DeleteFunc: onDelete,})stopper := make(chan struct{})defer close(stopper)// 启动 informer,List & WatchinformerFactory.Start(stopper)// 等待所有启动的 Informer 的缓存被同步informerFactory.WaitForCacheSync(stopper)// 从本地缓存中获取 default 中的所有 deployment 列表deployments, err := deployLister.Deployments("default").List(labels.Everything())if err != nil {panic(err)}for idx, deploy := range deployments {fmt.Printf("%d -> %s\n", idx+1, deploy.Name)}<-stopper}func onAdd(obj interface{}) {deploy := obj.(*metav1.Deployment)fmt.Println("add a deployment:", deploy.Name)
}func onUpdate(old, new interface{}) {oldDeploy := old.(*metav1.Deployment)newDeploy := new.(*metav1.Deployment)fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name)
}func onDelete(obj interface{}) {deploy := obj.(*metav1.Deployment)fmt.Println("delete a deployment:", deploy.Name)
}

简单mark一下源码分析的链路,建议大家还是一步一步使用编辑器的debug模式来阅读。
在这里插入图片描述

3.2. informers.NewSharedInformerFactory(client, time.Second*30)

初始化一个SharedInformerFactory,传入了一个定时30进行数据同步的配置项。核心代码如下:

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {factory := &sharedInformerFactory{client:           client,namespace:        v1.NamespaceAll,defaultResync:    defaultResync,informers:        make(map[reflect.Type]cache.SharedIndexInformer),startedInformers: make(map[reflect.Type]bool),customResync:     make(map[reflect.Type]time.Duration),}// Apply all optionsfor _, opt := range options {//这一步执行,在我们追踪的链路中,options为空factory = opt(factory)}return factory
}

4.deployInformer := informerFactory.Apps().V1().Deployments()

这一步是deployInformer的初始化方式。我们展开步骤2中的初始化的SharedInformerFactory结构体会发现,下面的属性字段core,policy,apps似乎是将k8s原生的资源分组进行了类别、版本的抽象。类如通过.Apps().V1().Deployments(),其实获取的就是deploymentInformer的一个实例对象:

// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

看一下deploymentInformer这个结构体的定义,这里的factory就是在informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)初始化的factory,之所以关注这个factory的实例,是因为在后文中还有提及。

type deploymentInformer struct {factory          internalinterfaces.SharedInformerFactorytweakListOptions internalinterfaces.TweakListOptionsFuncnamespace        string
}

5.informer := deployInformer.Informer()

这里调用了informerfactory结构InformerFor的方法,我们上一节已经分析过了factory的实例是什么了,我们追踪到方法中看一下这个方法是在所做什么

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {// 注册Deployment的informer到factory中return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

现在继续追踪informerFor源码,这里看起来是将deployment的cache.SharedIndexInformer注册到factory里的缓存中去。

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {//  f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)f.lock.Lock()defer f.lock.Unlock()informerType := reflect.TypeOf(obj)// 1.f.informers:  informers map[reflect.Type]cache.SharedIndexInformer 这里缓存不同k8s资源的informer对象informer, exists := f.informers[informerType]if exists {return informer}resyncPeriod, exists := f.customResync[informerType]if !exists {resyncPeriod = f.defaultResync}// 2.customResync     map[reflect.Type]time.Duration 缓存了资源的resync时间informer = newFunc(f.client, resyncPeriod)informer.SetTransform(f.transform)f.informers[informerType] = informerreturn informer
}

6.deployLister := deployInformer.Lister()

返回lister实例,这个listener主要是用来从内存中获取资源对象详细信息的客户端,类比于数据库查询客户端。
我们看一下f.Informer().GetIndexer()函数是什么,返回了一个Index结构体的实现类。

func (s *sharedIndexInformer) GetIndexer() Indexer {return s.indexer
}
// 这个类就是在集群中的类别是deployment的所有object缓存存放的结构体
type Indexer interface {Store// Index returns the stored objects whose set of indexed values// intersects the set of indexed values of the given object, for// the named indexIndex(indexName string, obj interface{}) ([]interface{}, error)// IndexKeys returns the storage keys of the stored objects whose// set of indexed values for the named index includes the given// indexed valueIndexKeys(indexName, indexedValue string) ([]string, error)// ListIndexFuncValues returns all the indexed values of the given indexListIndexFuncValues(indexName string) []string// ByIndex returns the stored objects whose set of indexed values// for the named index includes the given indexed valueByIndex(indexName, indexedValue string) ([]interface{}, error)// GetIndexers return the indexersGetIndexers() Indexers// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.AddIndexers(newIndexers Indexers) error
}

7.informer.AddEventHandler

注册事件处理回调函数,下面是核心逻辑实现的具体细节代码。

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {.....//6.1这个是一个事件处理器的生成函数listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)if !s.started {//6.2首次初始化的时候就started是false的,进入到这个逻辑里return s.processor.addListener(listener), nil}.....
}

7.1 newProcessListener

processListener实例化的方法,该实例将消息事件路由到不同的ResourceEventHandler方法中处理。它包含两个 goroutine、两个无缓冲通道和一个无界环形缓冲区区。 add(notification) 函数将给定的通知事件发送到 addCh。一个 goroutine 运行“pop()”,它使用环形缓冲区中的存储将通知从“addCh”传递到“nextCh”。另一个 goroutine 运行 run(),它接收来自 nextCh 的通知并同步调用适当的处理程序方法。

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {ret := &processorListener{// 这里是事件处理channel通道nextCh:                make(chan interface{}),// 这里是watch到k8s资源变动的接收通道addCh:                 make(chan interface{}),// 这里就是传递的handler事件处理函数handler:               handler,// 这个结构体是一个线程安全的计数器syncTracker:           &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},// 无界环,个人理解主要是用来在生产、消费者模型中速率不一致时进行缓冲的一个存储pendingNotifications:  *buffer.NewRingGrowing(bufferSize),requestedResyncPeriod: requestedResyncPeriod,resyncPeriod:          resyncPeriod,}// 生成下一次resync的时间并保存下来ret.determineNextResync(now)return ret
}

7.2 processor.addListener(listener)

这里的processor是sharedProcessor实例化的结果,实例化的过程放在
defaultInformer这个函数中,在前文informerFor中由提及,我们把这个函数展开,看一下这个processor是怎么实例化的。

func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(//初始化listwatch&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)},},&appsv1.Deployment{},resyncPeriod,indexers,)
}
//中间还有一跳,我们省略了,直接找到process的实例化代码
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {realClock := &clock.RealClock{}return &sharedIndexInformer{// indexre的初始化逻辑在这里indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),//sharedProcessor初始的位置processor:                       &sharedProcessor{clock: realClock},listerWatcher:                   lw,objectType:                      exampleObject,objectDescription:               options.ObjectDescription,resyncCheckPeriod:               options.ResyncPeriod,defaultEventHandlerResyncPeriod: options.ResyncPeriod,clock:                           realClock,cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),}
}

addListener主要做了两件事情
1)listeners map[*processorListener]bool,是shardProcessor中的一个缓存,主要是保存定义的processorListener
2)如果p.listenersStarted 是true的话,这里会启动两个线程,一个线程运行listener.run,定时从nextCh这个channel中获取事件进行处理
一个线程运行listener.pop,定时将addCh这个channel中的事件转移到nextCh这个channel中去,两者之间是通过一个无界环状结构进行传递的
但是,按照前文的追踪逻辑这里的p.listenersStarted是false, p.wg.Start(listener.run和p.wg.Start(listener.pop)这两个典型的生产-消费者逻辑没有执行。

func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {p.listenersLock.Lock()defer p.listenersLock.Unlock()if p.listeners == nil {p.listeners = make(map[*processorListener]bool)}p.listeners[listener] = true//这里貌似没进来if p.listenersStarted {p.wg.Start(listener.run)p.wg.Start(listener.pop)}return listener
}

8.informerFactory.Start(stopper)

这一步是启动factory的核心逻辑,前文中主要是进行了一些结构体的定义及初始化,并没有任何执行逻辑存在。戏台子搭好了,这一个步骤应该是核心的执行逻辑了吧。核心代码在这里,主要是把所有注册的informer启动起来。

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()if f.shuttingDown {return}// 上文已经提到了,informer主要是保存k8s的资源的类runtimeObject和Informer之间的映射关系的缓存for informerType, informer := range f.informers {if !f.startedInformers[informerType] {f.wg.Add(1)// We need a new variable in each loop iteration,// otherwise the goroutine would use the loop variable// and that keeps changing.informer := informergo func() {defer f.wg.Done()//这里另起协程把factory中全部的informer启动起来//8.1 在这里启动indexInformerinformer.Run(stopCh)}()f.startedInformers[informerType] = true}}
}

8.1 informer.Run(stopCh)

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {....// s.controller = New(cfg),就是构建controller这个结构体,把shardInformer的start标识修改成truefunc() {s.startedLock.Lock()defer s.startedLock.Unlock()//7.1.1 搞了一个队列fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects:          s.indexer,EmitDeltaTypeReplaced: true,Transformer: s.transform,})//1)构建了一个配置项cfg := &Config{Queue:             fifo,ListerWatcher:     s.listerWatcher,ObjectType:        s.objectType,ObjectDescription: s.objectDescription,FullResyncPeriod:  s.resyncCheckPeriod,RetryOnError:      false,ShouldResync:      s.processor.shouldResync,Process:           s.HandleDeltas,WatchErrorHandler: s.watchErrorHandler,}//2)创建controllers.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait()              // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stop//这个不知道做了什么事情,用到的时候再说wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)//3)启动controller这里是真正启动listeners的地方wg.StartWithChannel(processorStopCh, s.processor.run)defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}()//4) 启动controllers.controller.Run(stopCh)
}

8.1.1 wg.StartWithChannel(processorStopCh, s.processor.run)

这里启动了一个协程用来执行processor.run方法,我们进到这个函数中看一下这个函数做了些什么事情.

func (p *sharedProcessor) run(stopCh <-chan struct{}) {func() {p.listenersLock.RLock()defer p.listenersLock.RUnlock()// p.listener还记得是做什么用的吗,就是保存所有的listener的缓存for listener := range p.listeners {// 启动listener,逻辑在上文已经提及p.wg.Start(listener.run)p.wg.Start(listener.pop)}p.listenersStarted = true}()<-stopCh// 后续都是中止逻辑了...
}

8.1.2 s.controller.Run(stopCh),controller

1)构建reflector
2)启动reflector
3)启动循环处理逻辑

func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()// 1. 初始化reflect来同步服务器和store的objectr := NewReflectorWithOptions(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,ReflectorOptions{ResyncPeriod:    c.config.FullResyncPeriod,TypeDescription: c.config.ObjectDescription,Clock:           c.clock,},)r.ShouldResync = c.config.ShouldResyncr.WatchListPageSize = c.config.WatchListPageSizeif c.config.WatchErrorHandler != nil {r.watchErrorHandler = c.config.WatchErrorHandler}c.reflectorMutex.Lock()// 对reflector赋值c.reflector = rc.reflectorMutex.Unlock()var wg wait.Group// 2.启动reflectorwg.StartWithChannel(stopCh, r.Run)// 3.循环处理队列中的元素wait.Until(c.processLoop, time.Second, stopCh)wg.Wait()
}

我们来看一下reflector启动后在做些什么事情。记住一句话就好:首先从k8s的apiserver中利用list的方式获取指定资源的全部数据,并塞到deltaqueue中。后续通过watch的方式,监听资源的变动信息并同步到队列中国。


func (r *Reflector) Run(stopCh <-chan struct{}) {klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)wait.BackoffUntil(func() {// 核心逻辑入口if err := r.ListAndWatch(stopCh); err != nil {r.watchErrorHandler(r, err)}}, r.backoffManager, true, stopCh)klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)var err errorvar w watch.InterfacefallbackToList := !r.UseWatchList//这里看起来不会落到这个执行逻辑中if r.UseWatchList {// 维持本地的资源缓存w, err = r.watchList(stopCh)if w == nil && err == nil {// stopCh was closedreturn nil}if err != nil {klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err)fallbackToList = true// ensure that we won't accidentally pass some garbage down the watch.w = nil}}if fallbackToList {// 直接借用listwatcher的list方法,从服务器全量同步资源数据err = r.list(stopCh)if err != nil {return err}}klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)resyncerrc := make(chan error, 1)cancelCh := make(chan struct{})defer close(cancelCh)//启动一个协程定期同步deltafifo这个结构体,这里埋个坑,有空写deltafifo的时候解释go r.startResync(stopCh, cancelCh, resyncerrc)return r.watch(w, stopCh, resyncerrc)
}

err = r.list(stopCh)这个方法我们看一下里面做了什么事情。

func (r *Reflector) list(stopCh <-chan struct{}) error {.....// 启动一个线程全量同步资源go func() {.....list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options).....}()// 解析成指定的结构体items, err := meta.ExtractListWithAlloc(list) ....initTrace.Step("Objects extracted")// 将数据同步到indexer本地存储中if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("unable to sync list result: %v", err)}...
}

还有一个函数没有分析,这里的watch函数主要是从listwatch的接口中获取数据变动的事件后,动态维护DeltaFiFO队列

func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {.....w, err = r.listerWatcher.Watch(options)....err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh).....
}func watchHandler(start time.Time,w watch.Interface,store Store,expectedType reflect.Type,expectedGVK *schema.GroupVersionKind,name string,expectedTypeName string,setLastSyncResourceVersion func(string),exitOnInitialEventsEndBookmark *bool,clock clock.Clock,errc chan error,stopCh <-chan struct{},
) error {eventCount := 0if exitOnInitialEventsEndBookmark != nil {// set it to false just in case somebody// made it positive*exitOnInitialEventsEndBookmark = false}loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return err//从watch接口中获取事件case event, ok := <-w.ResultChan():......case watch.Added:err := store.Add(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))}case watch.Modified:err := store.Update(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))}case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this.err := store.Delete(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))}case watch.Bookmark:// A `Bookmark` means watch has synced here, just update the resourceVersionif meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {if exitOnInitialEventsEndBookmark != nil {*exitOnInitialEventsEndBookmark = true}}}

我们看一下这个c.processLoop这个循环做了些什么事情。

func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}}
}

围绕着queue,展开了腥风血雨,看起来对于queue中元素的处理落在在c.config.Process这个函数中,我们找一下这个函数在哪里被初始化的

func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()if deltas, ok := obj.(Deltas); ok {return processDeltas(s, s.indexer, deltas, isInInitialList)}return errors.New("object given as Process argument is not Deltas")
}func processDeltas(// Object which receives event notifications from the given deltashandler ResourceEventHandler,clientState Store,deltas Deltas,isInInitialList bool,
) error {// from oldest to newestfor _, d := range deltas {obj := d.Objectswitch d.Type {case Sync, Replaced, Added, Updated:if old, exists, err := clientState.Get(obj); err == nil && exists {if err := clientState.Update(obj); err != nil {return err}handler.OnUpdate(old, obj)} else {if err := clientState.Add(obj); err != nil {return err}handler.OnAdd(obj, isInInitialList)}case Deleted:if err := clientState.Delete(obj); err != nil {return err}handler.OnDelete(obj)}}return nil
}

clientState.Update(obj)这个方法我们点击去看会发现,他更新的就是我们上文提及结构体的indexer内部缓存。这里的handler就是s *sharedIndexInformer,我们把这个结构体的OnAdd,OnUpdate,OnDelete方法截取出来。

// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {// Invocation of this function is locked under s.blockDeltas, so it is// save to distribute the notifications.cacheMutationDetector.AddObject(obj)s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
}// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {isSync := false// If is a Sync event, isSync should be true// If is a Replaced event, isSync is true if resource version is unchanged.// If RV is unchanged: this is a Sync/Replaced event, so isSync is trueif accessor, err := meta.Accessor(new); err == nil {if oldAccessor, err := meta.Accessor(old); err == nil {// Events that didn't change resourceVersion are treated as resync events// and only propagated to listeners that requested resyncisSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()}}// Invocation of this function is locked under s.blockDeltas, so it is// save to distribute the notifications.cacheMutationDetector.AddObject(new)s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnDelete(old interface{}) {// Invocation of this function is locked under s.blockDeltas, so it is// save to distribute the notifications.processor.distribute(deleteNotification{oldObj: old}, false)
}

其中核心的方法大概就是s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false),我们看一下这个方法是在做什么。

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {p.listenersLock.RLock()defer p.listenersLock.RUnlock()for listener, isSyncing := range p.listeners {switch {case !sync:// non-sync messages are delivered to every listenerlistener.add(obj)case isSyncing:// sync messages are delivered to every syncing listenerlistener.add(obj)default:// skipping a sync obj for a non-syncing listener}}
}
// 还记得processorListener中的两个channel吗,这里就是往addCh中添加元素了
func (p *processorListener) add(notification interface{}) {if a, ok := notification.(addNotification); ok && a.isInInitialList {p.syncTracker.Start()}p.addCh <- notification
}

9.informerFactory.WaitForCacheSync(stopper)

等待所有启动的 Informer 的缓存被同步,真正的实现方法如下所示

func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {//这里是map[reflect.Type]cache.SharedIndexInformer 这个结构的一个深度拷贝,没有做额外的逻辑informers := func() map[reflect.Type]cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()informers := map[reflect.Type]cache.SharedIndexInformer{}for informerType, informer := range f.informers {if f.startedInformers[informerType] {informers[informerType] = informer}}return informers}()//这里是判断所有的informer的数据是否被完全同步res := map[reflect.Type]bool{}for informType, informer := range informers {//这是一个同步的方法,只有当每一个informer的queue都已经被完全同步,保存执行结果。如果有一个informer没有同步,就一直卡在则立res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)}return res
}

10.deployments, err := deployLister.Deployments(“default”).List(labels.Everything())

我们追踪一下这个List方法是从远程k8s apiserver获取的还是从本地的indexer存储中获取的。

func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {.....if namespace == metav1.NamespaceAll {return ListAll(indexer, selector, appendFn)}// 结论在这里items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace}).....}

写在之后

絮絮叨叨终于理完了informer的全部源码逻辑,笔者记录了自己的阅读源码的过程,期望能在未来温习的时候能够有一个参考,可能之后会更新一些流程图去加深一下记忆吧,但是至少现在,洋洋洒洒的分析确实耗尽了我的耐心。接下来有时间的话,我会继续更新controller-runtime的源码解析。如果大家觉得有什么问题,还请在评论里不吝赐教。如果大家觉得有那么一丝帮助,还请大家帮忙点赞关注,再次拜谢。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_926003.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【设计模式-03】Strategy策略模式及应用场景

一、简要描述 Java 官方文档 Overview (Java SE 18 & JDK 18)module indexhttps://docs.oracle.com/en/java/javase/18/docs/api/index.html Java中使用到的策略模式 Comparator、comparable Comparator (Java SE 18 & JDK 18)declaration: module: java.base, pa…

WPF真入门教程28--项目案例--MQTT服务器和客户端

1、先上图看帅照 这个案例还是布局加视图模型&#xff0c;样式应用&#xff0c;业务逻辑&#xff0c;该项目是一个mqtt服务器和客户端的通信工具&#xff0c;这里不去分析mqtt的通信原理&#xff0c;关注在于wpf技能的应用&#xff0c;能够掌握这个例子&#xff0c;离项目开发…

C#编程-在线程中使用同步

在线程中使用同步 在线程应用程序中,线程需要相互共享数据。但是,应用程序应该确保一个线程不更改另一个线程使用的数据。考虑有两个线程的场景。一个线程从文件读取工资,另一个线程尝试更新工资。当两个线程同时工作时,数据就会受损。下图显示了两个线程同时访问一个文件…

杨中科 .NETCORE EFCORE第七部分 一对一,多对多

一对一 一对一关系配置 1、builder.HasOne(o >o.Delivery).WithOne(d>d.Order).HasForeignKey(d>dOrderId); 2、测试插入和获取数据 示例 新建 Order 新建 Delivery DeliveryConfig OrderConfig 执行 迁移命令 查看数据库 测试数据插入 运行查看数据 多对多…

Python文件自动化处理

os模块 Python标准库和操作系统有关的操作创建、移动、复制文件和文件夹文件路径和名称处理 路径的操作 获取当前Python程序运行路径不同操作系统之间路径的表示方式 windows中采用反斜杠(\)作为文件夹之间的分隔符 Mac和Linux中采用斜杠(/)作为文件夹之间的分隔符 把文件…

Qt优秀开源项目之二十一:遇见QSkinny,一个轻量级Qt UI库

目录 一.QSkinny简介 二.工作原理 三.编译 一.QSkinny简介 QSkinny库基于Qt Graphic View和Qt/Quick中少量的核心类。它提供了一组轻量级控件&#xff0c;可以在C或QML中使用这些控件。QSkinny默认是启用硬件加速的&#xff0c;非常适合嵌入式设备&#xff0c;目前已经应用于…

[DL]深度学习_Feature Pyramid Network

FPN结构详解 目录 一、概念介绍 二、结构详解 1、对比试验 2、特征图融合 3、结构详解 4、不同尺度预测 5、Proposal映射到预测特征层 一、概念介绍 Feature Pyramid Network (FPN)是一种用于目标检测和语义分割的神经网络架构。它的目标是解决在处理不同尺度的图像时…

SQLServer 为角色开视图SELECT权限,报错提示需要开基础表权限

问题&#xff1a; 创建了个视图V&#xff0c;里面包含V库的a表&#xff0c;和T库的b表 为角色开启视图V的SELECT权限&#xff0c;提示T库的b表无SELECT权限&#xff0c;报错如下 解决方案&#xff1a; ①在T库建个视图TV&#xff0c;里面包含b表&#xff08;注意是在b表的对…

基于SSM的戏剧推广网站的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue、HTML 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是…

使用串口 DMA 模式接收不定长数据

一、简介 曾经遇到客户有一个需求&#xff0c;需要用串口 DMA 的方式接收不定长度的数据&#xff0c;DMA 有个缺点就是在每次传输前需要设定好传输的字节长度&#xff0c;这种方式显然对于接收不定长度的数据来说没有那么灵活。但 DMA 也有着显著的优点&#xff0c;如可直接访…

Linux环境搭建FastDFS文件服务器(附带Nginx安装)

本文主要介绍在linux服务器如何搭建FastDFS文件服务器。大概分为9个步骤&#xff0c;由于内容较为繁琐。下面带你入坑&#xff01; 首先简单介绍一下FastDFS是淘宝资深架构师余庆老师主导开源的一个分布式文件系统&#xff0c;用C语言编写。适应与中小企业&#xff0c;对文件不…

电脑安装 Python提示“api-ms-win-crt-process-l1-1-0.dll文件丢失,程序无法启动”,快速修复方法,完美解决

在windows 10系统安装完python后&#xff0c;启动的时候&#xff0c;Windows会弹出错误提示框“无法启动此程序&#xff0c;因为计算机中丢失了api-ms-win-crt-process-l1-1-0.dll&#xff0c;尝试重新安装该程序以解决此问题。” api-ms-win-crt-process-l1-1-0.dll是一个动态…

网络层协议及IP编址与IP路由基础华为ICT网络赛道

目录 4.网络层协议及IP编址 4.1.网络层协议 4.2.IPv4地址介绍 4.3.子网划分 4.4.ICMP协议 4.5.IPv4地址配置及基本应用 5.IP路由基础 5.1.路由概述 5.2.静态路由 5.3.动态路由 5.4.路由高阶特性 4.网络层协议及IP编址 4.1.网络层协议 IPv4(Internet Protocol Versi…

C语言——编译和链接

&#xff08;图片由AI生成&#xff09; 0.前言 C语言是最受欢迎的编程语言之一&#xff0c;以其接近硬件的能力和高效性而闻名。理解C语言的编译和链接过程对于深入了解其运行原理至关重要。本文将详细介绍C语言的翻译环境和运行环境&#xff0c;重点关注编译和链接的各个阶段…

基于JavaWeb+BS架构+SpringBoot+Vue+Hadoop短视频流量数据分析与可视化系统的设计和实现

基于JavaWebBS架构SpringBootVueHadoop短视频流量数据分析与可视化系统的设计和实现 文末获取源码Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 文末获取源码 Lun文目录 目  录 目  录 I 1绪 论 1 1.1开发背景 1 1.2开…

【国内访问github不稳定】可以尝试fastgithub解决这个问题

1、下载 https://github.com/dotnetcore/FastGithub https://github.com/dotnetcore/FastGithub/releases 官网下载即可&#xff0c;比如&#xff0c;我用的是这个&#xff1a;fastgithub_osx-x64.zip&#xff08;点这里下载&#xff09; 2、安装 如下图双击启动即可 3、…

大模型开启应用时代 数钉科技一锤定音

叮叮叮叮&#xff01;数钉智造大模型&#xff0c;“定音”强势发布&#xff01; 随着科技的飞速发展&#xff0c;大模型技术已逐渐成为推动产业变革的核心力量。在这一浪潮中&#xff0c;数钉科技凭借深厚的技术积累和敏锐的市场洞察力&#xff0c;成功利用大模型技术搭建起智能…

SSL之mkcert构建本地自签名

文章目录 1. 什么是SSL2. mkcert&#xff1a;快速生成自签名证书2.1 mkcert的工作流程如下&#xff1a;2.2 window 本地实现自签证书2.2.1 下载安装2.2.2 下载,生成本地 SSL2.2.3 生成 pem 自签证书,可供局域网内使用其他主机访问。2.2.4 使用-psck12 生成*.p12 文件 2.3 Sprin…

盲盒小程序搭建:为盲盒企业、创业者提供新的机遇

近年来&#xff0c;盲盒因其不确定性以及拆盲盒带来的惊喜感&#xff0c;受到了广大消费者的追捧&#xff01;盲盒是由各类手办等组成的&#xff0c;玩家能够以低价格拆到性价比高的盲盒商品&#xff0c;这种购物体验激发了玩家的购物乐趣&#xff0c;因此&#xff0c;盲盒行业…

植物大战僵尸-C语言搭建童年游戏(easyx)

游戏索引 游戏名称&#xff1a;植物大战僵尸 游戏介绍&#xff1a; 本游戏是在B站博主<程序员Rock>的视频指导下完成 想学的更详细的小伙伴可以移步到<程序员Rock>视频 语言项目&#xff1a;完整版植物大战僵尸&#xff01;可能是B站最好的植物大战僵尸教程了&…