技术学习分享_CKX技术 技术资讯 K8s 核心组件 Reflector 源码剖析 (基于 Informer 的实现)

K8s 核心组件 Reflector 源码剖析 (基于 Informer 的实现)

广告位

一、什么是 Reflector

Reflector watches a specified resource and causes all changes to be reflected in the given store. 

K8s 对于 Reflector 的描述十分干练:Reflector 是监听某项资源并将 “变更” “落库” 的组件。

除了 Reflector 本身,需要额外关注的便是 Store 这个接口,这里我们只简单看下 Store 的接口定义,了解一下这个东西大体是个什么东西:

// Store is a generic object storage and processing interface.  A // Store holds a map from string keys to accumulators, and has // operations to add, update, and delete a given object to/from the // accumulator currently associated with a given key.  A Store also // knows how to extract the key from a given object, so many operations // are given only the object. // // In the simplest Store implementations each accumulator is simply // the last given object, or empty after Delete, and thus the Store's // behavior is simple storage. // // Reflector knows how to watch a server and update a Store.  This // package provides a variety of implementations of Store. type Store interface {  	// Add adds the given object to the accumulator associated with the given object's key 	Add(obj interface{}) error  	// Update updates the given object in the accumulator associated with the given object's key 	Update(obj interface{}) error  	// Delete deletes the given object from the accumulator associated with the given object's key 	Delete(obj interface{}) error  	// List returns a list of all the currently non-empty accumulators 	List() []interface{}  	// ListKeys returns a list of all the keys currently associated with non-empty accumulators 	ListKeys() []string  	// Get returns the accumulator associated with the given object's key 	Get(obj interface{}) (item interface{}, exists bool, err error)  	// GetByKey returns the accumulator associated with the given key 	GetByKey(key string) (item interface{}, exists bool, err error)  	// Replace will delete the contents of the store, using instead the 	// given list. Store takes ownership of the list, you should not reference 	// it after calling this function. 	Replace([]interface{}, string) error  	// Resync is meaningless in the terms appearing here but has 	// meaning in some implementations that have non-trivial 	// additional behavior (e.g., DeltaFIFO). 	Resync() error } 

通俗解释一下,就是 Store 是一个通用对象存储和处理的接口,持有 keys 到 accumulators 的映射,可以增删改查。

以最简单的一种 case 来说,这个 accumulator 可以是一个对象,存储的行为也可以是一个简单存储,比如 Indexer client-go/tools/cache/store#cache 的实现,就是维护了内置了一个 map(严格意义来说是 client-go/tools/cache/thread_safe_store#goThreadSafeStore,基于 map 实现了并发安全、索引)。

二、准备逻辑 —— List

那么回到正题,Reflector 最主要的运转逻辑就是 ListAndWatch,资源的 watch 和 store 的更新是如何实现的呢,先从启动入口开始,启动的入口如下:

// Run repeatedly uses the reflector's ListAndWatch to fetch all the // objects and subsequent deltas. // Run will exit when stopCh is closed. func (r *Reflector) Run(stopCh <-chan struct{}) { 	klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) 	wait.BackoffUntil(func() { 		if err := r.ListAndWatch(stopCh); err != nil { 			utilruntime.HandleError(err) 		} 	}, r.backoffManager, true, stopCh) 	klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) } 

这里的核心逻辑可以看到是 r.ListAndWatch(),外面套了一层 Backoff 的管理器,用于崩溃时退避与恢复,这里不作赘述。

ListAndWatch 的代码结构如下,从注释也很容易看出,这里先进行了一次全量的 list,然后根据资源版本去做 watch。可以看到一开始的全量 list,这里由 Pager 代理了 listerWatcher 的 list 逻辑,在拿到结果后,调用 r.syncWith,将结果存储到 Store 中:

// ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {   klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) 	var resourceVersion string  	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}  	if err := func() error {     ... // 有删减          var list runtime.Object     go func() {       pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { 				return r.listerWatcher.List(opts) 			}))       list, paginatedResult, err = pager.List(context.Background(), options)       close(listCh)      }          listMetaInterface, err := meta.ListAccessor(list)     resourceVersion = listMetaInterface.GetResourceVersion()     items, err := meta.ExtractList(list)     if err := r.syncWith(items, resourceVersion); err != nil { 			return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) 		}          r.setLastSyncResourceVersion(resourceVersion)     ... // 有删减   }(); err != nil { 		return err 	}  	resyncerrc := make(chan error, 1) 	cancelCh := make(chan struct{}) 	defer close(cancelCh) 	go func() {...Block1...}()  	for {...Block2...} } 

r.syncWith 逻辑很简单,本质上就是调用了 store.Replace

// syncWith replaces the store's items with the given list. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {    found := make([]interface{}, 0, len(items))    for _, item := range items {       found = append(found, item)    }    return r.store.Replace(found, resourceVersion) } 

2.1 ListWatcher

一步步来,首先是最里层的执行者,r.listerWatcher.List(opts),它有很多个实现,本质上就是如何去进行首次拉取,我们这里拿两个例子可以很直观的了解它做了什么:

第一个是测试用例中,如何声明 listerWatcher 的实现,ListFunc 就是很简单的返回一个列表对象,WatchFunc 则是简单的声明了一个基于 chan 的实现:

func TestReflectorResync(t *testing.T) { 	iteration := 0 	stopCh := make(chan struct{}) 	rerr := errors.New("expected resync reached") 	s := &FakeCustomStore{ 		ResyncFunc: func() error { 			iteration++ 			if iteration == 2 { 				return rerr 			} 			return nil 		}, 	}  	lw := &testLW{ 		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { 			fw := watch.NewFake() 			return fw, nil 		}, 		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { 			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "0"}}, nil 		}, 	} 	resyncPeriod := 1 * time.Millisecond 	r := NewReflector(lw, &v1.Pod{}, s, resyncPeriod) 	if err := r.ListAndWatch(stopCh); err != nil { 		// error from Resync is not propaged up to here. 		t.Errorf("expected error %v", err) 	} 	if iteration != 2 { 		t.Errorf("exactly 2 iterations were expected, got: %v", iteration) 	} }  // // fw: //  func (f *FakeWatcher) ResultChan() <-chan Event { 	return f.result }  // Add sends an add event. func (f *FakeWatcher) Add(obj runtime.Object) { 	f.result <- Event{Added, obj} } 

另外则是看一下通常的 NewDeploymentInformer 是如何声明的,可以看到,就是依靠基础的 client 来完成的:

// NewFilteredDeploymentInformer constructs a new informer for Deployment type. // Always prefer using an informer factory to get a shared informer instead of getting an independent // one. This reduces memory footprint and number of connections to the server. func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { 	return cache.NewSharedIndexInformer( 		&cache.ListWatch{ 			ListFunc: func(options v1.ListOptions) (runtime.Object, error) { 				if tweakListOptions != nil { 					tweakListOptions(&options) 				} 				return client.ExtensionsV1beta1().Deployments(namespace).List(context.TODO(), options) 			}, 			WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { 				if tweakListOptions != nil { 					tweakListOptions(&options) 				} 				return client.ExtensionsV1beta1().Deployments(namespace).Watch(context.TODO(), options) 			}, 		}, 		&extensionsv1beta1.Deployment{}, 		resyncPeriod, 		indexers, 	) } 

2.2 Pager

了解了 listerWatcher 的职责,另一快则是 Pager。Pager 本质上是控制 List 这个动作是否分批执行,以及如何分批执行,很容易联想到一个 for 循环不断去捞取数据。

代码也确实如此,这个这里不做赘述。值得注意的是进行分页前的一些配置代码,如 page.PageSizeoptions.ResourceVersionContinue ,这些配置可以渗透到 pager 的设置中,在 Pager 启动前,可以看到:

			// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first 			// list request will return the full response. 			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { 				return r.listerWatcher.List(opts) 			})) 			switch { 			case r.WatchListPageSize != 0: 				pager.PageSize = r.WatchListPageSize 			case r.paginatedResult: 				// We got a paginated result initially. Assume this resource and server honor 				// paging requests (i.e. watch cache is probably disabled) and leave the default 				// pager size set. 			case options.ResourceVersion != "" && options.ResourceVersion != "0": 				// User didn't explicitly request pagination. 				// 				// With ResourceVersion != "", we have a possibility to list from watch cache, 				// but we do that (for ResourceVersion != "0") only if Limit is unset. 				// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly 				// switch off pagination to force listing from watch cache (if enabled). 				// With the existing semantic of RV (result is at least as fresh as provided RV), 				// this is correct and doesn't lead to going back in time. 				// 				// We also don't turn off pagination for ResourceVersion="0", since watch cache 				// is ignoring Limit in that case anyway, and if watch cache is not enabled 				// we don't introduce regression. 				pager.PageSize = 0 			} 

大体围绕着要不要进行分页,以及进行多大的分页来进行。

先看看分页的描述,这里提到了,使用分页应是一个谨慎的操作,它将直接由 etcd 提供,可能会导致性能问题,也就是第一个 case,我们应尽量避免。分页由 Continue 来进行提供,:

	// WatchListPageSize is the requested chunk size of initial and resync watch lists. 	// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data 	// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0") 	// it will turn off pagination to allow serving them from watch cache. 	// NOTE: It should be used carefully as paginated lists are always served directly from 	// etcd, which is significantly less efficient and may lead to serious performance and 	// scalability problems. 	WatchListPageSize int64 

再看下服务端(ApiServer)的代码,具体的代码在 apiserver/pkg/storage/cacher/cacher.go 中,可以看到,启用了 APIListChunking 的前提下,使用 Continue 或 不指定 RV 或(使用 Limit 且 RV > 0)都将导致压力传导到 etcd 中。

	pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) 	hasContinuation := pagingEnabled && len(pred.Continue) > 0 	hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" 	if resourceVersion == "" || hasContinuation || hasLimit { 		// If resourceVersion is not specified, serve it from underlying 		// storage (for backward compatibility). If a continuation is 		// requested, serve it from the underlying storage as well. 		// Limits are only sent to storage when resourceVersion is non-zero 		// since the watch cache isn't able to perform continuations, and 		// limits are ignored when resource version is zero. 		return c.storage.List(ctx, key, resourceVersion, pred, listObj) 	} 

所以综合下来,通常情况的优解就是 RV >= 0,且不使用 Limit 这个 case 。我们看看官方文档中对 RV 的描述:


对于 getlist 而言,resourceVersion 的语义为:

get:

resourceVersion 未设置 resourceVersion=”0″ resourceVersion=”<非零值>”
最新版本 任何版本 不老于给定版本

list:

除非你对一致性有着非常强烈的需求,使用 resourceVersionMatch=NotOlderThan 同时为 resourceVersion 设定一个已知值是优选的交互方式,因为与不设置 resourceVersionresourceVersionMatch 相比,这种配置可以取得更好的 集群性能和可扩缩性。后者需要提供带票选能力的读操作。


2.3 Store 与 RV

Pager 拿到数据之后,要做的就是三件事情:

  • 第一个是将数据存起来
  • 第二个是将数据最新的版本(RV)记录起来
  • 第三个是拿着这个最新的数据版本去调用 watch

这里先看前两件事:

		listMetaInterface, err := meta.ListAccessor(list) 		resourceVersion = listMetaInterface.GetResourceVersion()  		items, err := meta.ExtractList(list)  		if err := r.syncWith(items, resourceVersion); err != nil { 			return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) 		}  		r.setLastSyncResourceVersion(resourceVersion) 
  • 【Store】将数据存储起来这个事情很好理解,看起来也很 “简单”,这里调用了 Reflector 中的重要成员变量 Store,并将拉取到的数据存入,Reflector 中 Store 实际上比较复杂,值得长篇大论,这里可以先简单把它看作一个内存 KV mapping:
// syncWith replaces the store's items with the given list. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { 	found := make([]interface{}, 0, len(items)) 	for _, item := range items { 		found = append(found, item) 	} 	return r.store.Replace(found, resourceVersion) } 
  • 【RV】 本质上是 MVCC 中基础的设计,是一个逻辑时间戳,对应 Etcd 中的 revision。为每个操作附带一个递增的逻辑时间戳,来达到整个系统的串行化。当然它还有其他的功能,如隔离级别、一致性视图等等,K8s 中最常接触到的是使用 RV 来做并发控制。

    更新一个资源的时候,怎么知道其他人在这期间有没有修改过此资源?靠的就是 RV 的对比。

三、运转逻辑 —— Watch

3.1 Watch

拿到 list 的结果后,可以从中获取到 RV,Watch 实际上就是基于这个 RV 来进行增量 watch:

    ... // 有删减     options = metav1.ListOptions{ 			ResourceVersion: resourceVersion, 			// We want to avoid situations of hanging watchers. Stop any wachers that do not 			// receive any events within the timeout window. 			TimeoutSeconds: &timeoutSeconds, 			// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks. 			// Reflector doesn't assume bookmarks are returned at all (if the server do not support 			// watch bookmarks, it will ignore this field). 			AllowWatchBookmarks: true, 		}  		// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent 		start := r.clock.Now() 		w, err := r.listerWatcher.Watch(options)      r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);     ... // 有删减 

r.listerWatcher.Watch(options) 和前面提到的 list 是一样的,本质上都是借助基础的 client 来实现的。

watch 的实现这里不多讲,核心就是 Transfer-Encoding: chunked + 长连接。内容简单,往 body 不断塞入以下内容即可,长度开头,换行后为内容,再次换行来结束。灵活,简单,高效:

25 This is the data in the first chunk  1C and this is the second one  3 con  8 sequence 

类似的流式技术有很多,我也没想太明白为什么是这个,大概是因为简单吧。有了上面简单的认知后,就很好理解 client.watch 在做什么了,先发起一个 Http 长连接,根据 mediaType 拿到相应的解码器,使用 frameReader 来读取流:

func (r *Request) Watch(ctx context.Context) (watch.Interface, error)    resp, err := client.Do(req)  	contentType := resp.Header.Get("Content-Type")  	mediaType, params, err := mime.ParseMediaType(contentType)  	objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)  	frameReader := framer.NewFrameReader(resp.Body) 	watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)  	return watch.NewStreamWatcher( 		restclientwatch.NewDecoder(watchEventDecoder, objectDecoder), 		// use 500 to indicate that the cause of the error is unknown - other error codes 		// are more specific to HTTP interactions, and set a reason 		errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"), 	), nil 

这个 NewStreamWatcher 启了一个协程,不断从 Reader 中读取内容,我们可以看到内容由两部分构成,一个叫 action,另一个则是 runtime.Object。解码 + 反序列化后,将内容投入 channel 中:

// NewStreamWatcher creates a StreamWatcher from the given decoder. func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { 	sw := &StreamWatcher{ 		source:   d, 		reporter: r, 		// It's easy for a consumer to add buffering via an extra 		// goroutine/channel, but impossible for them to remove it, 		// so nonbuffered is better. 		result: make(chan Event), 	} 	go sw.receive() 	return sw }  const ( 	Added    EventType = "ADDED" 	Modified EventType = "MODIFIED" 	Deleted  EventType = "DELETED" 	Bookmark EventType = "BOOKMARK" 	Error    EventType = "ERROR"  	DefaultChanSize int32 = 100 )  // receive reads result from the decoder in a loop and sends down the result channel. func (sw *StreamWatcher) receive() { 	defer close(sw.result) 	defer sw.Stop() 	defer utilruntime.HandleCrash() 	for { 		action, obj, err := sw.source.Decode() 		if err != nil { 			// Ignore expected error. 			if sw.stopping() { 				return 			} 			switch err { 			case io.EOF: 				// watch closed normally 			case io.ErrUnexpectedEOF: 				klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err) 			default: 				if net.IsProbableEOF(err) || net.IsTimeout(err) { 					klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) 				} else { 					sw.result <- Event{ 						Type:   Error, 						Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), 					} 				} 			} 			return 		} 		sw.result <- Event{ 			Type:   action, 			Object: obj, 		} 	} } 

3.2 WatchHandler

前面说到,Watcher 负责把 HTTP 长连接的内容进行解码、反序列化,拿到的内容投入 channel,那么消费 channel 的地方,在 Reflector 中叫做 WatchHandler ,整体的逻辑清晰简单,就是根据 Event,去调用不同的 store 实现,代码看起来多,本质上很简单:

// watchHandler watches w and keeps *resourceVersion up to date. func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { 	eventCount := 0  	// Stopping the watcher should be idempotent and if we return from this function there's no way 	// we're coming back in with the same watch interface. 	defer w.Stop()  loop: 	for { 		select { 		case <-stopCh: 			return errorStopRequested 		case err := <-errc: 			return err 		case event, ok := <-w.ResultChan(): 			if !ok { 				break loop 			} 			if event.Type == watch.Error { 				return apierrors.FromObject(event.Object) 			} 			if r.expectedType != nil { 				if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { 					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) 					continue 				} 			} 			if r.expectedGVK != nil { 				if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { 					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a)) 					continue 				} 			} 			meta, err := meta.Accessor(event.Object) 			if err != nil { 				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) 				continue 			} 			newResourceVersion := meta.GetResourceVersion() 			switch event.Type { 			case watch.Added: 				err := r.store.Add(event.Object) 				if err != nil { 					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) 				} 			case watch.Modified: 				err := r.store.Update(event.Object) 				if err != nil { 					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) 				} 			case watch.Deleted: 				// TODO: Will any consumers need access to the "last known 				// state", which is passed in event.Object? If so, may need 				// to change this. 				err := r.store.Delete(event.Object) 				if err != nil { 					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) 				} 			case watch.Bookmark: 				// A `Bookmark` means watch has synced here, just update the resourceVersion 			default: 				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) 			} 			*resourceVersion = newResourceVersion 			r.setLastSyncResourceVersion(newResourceVersion) 			eventCount++ 		} 	}  	watchDuration := r.clock.Since(start) 	if watchDuration < 1*time.Second && eventCount == 0 { 		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) 	} 	klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount) 	return nil } 

四、DeltaFIFO

经过前面的分析,我们已经了解到,Reflector 的核心逻辑是先进行了一次 List

  • 通过 Pager 配合 listerWatcher#ListFunc 尽可能拿到所有数据
  • 并将数据存入到 Store( Replace )

再进行 watch

  • 通过 listerWatcher#WatchFunc 请求了 ApiServer,不断解析 Body,产出 Event
  • WatchHandler 中通过事件的不同,进行处理,或者调用 Store 对应的增删改( Add、Delete、Update )

那么这里还剩下一个疑点没解,就是 Store 里面究竟做了什么?首先 Store 和 listWatcher 一样,都是很开放的实现,我们看看在 informer 的实现中,Store 是如何工作的。

代码很多,但是只需要关注 Store 这块内容即可,在初始化 informer 的过程中,new 了一个 DeltaFifo,注入到 Config 中,然后在运行 informer 的时候,将这个 DeltaFifo 当作 Store 传入了 Reflector 的构造函数。

// newInformer returns a controller for populating the store while also // providing event notifications. // // Parameters //  * lw is list and watch functions for the source of the resource you want to //    be informed of. //  * objType is an object of the type that you expect to receive. //  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate //    calls, even if nothing changed). Otherwise, re-list will be delayed as //    long as possible (until the upstream source closes the watch or times out, //    or you stop the controller). //  * h is the object you want notifications sent to. //  * clientState is the store you want to populate // func newInformer( 	lw ListerWatcher, 	objType runtime.Object, 	resyncPeriod time.Duration, 	h ResourceEventHandler, 	clientState Store, 	transformer TransformFunc, ) Controller { 	// This will hold incoming changes. Note how we pass clientState in as a 	// KeyLister, that way resync operations will result in the correct set 	// of update/delete deltas. 	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 		KnownObjects:          clientState, 		EmitDeltaTypeReplaced: true, 	})  	cfg := &Config{ 		Queue:            fifo, 		ListerWatcher:    lw, 		ObjectType:       objType, 		FullResyncPeriod: resyncPeriod, 		RetryOnError:     false,  		Process: func(obj interface{}) error { 			if deltas, ok := obj.(Deltas); ok { 				return processDeltas(h, clientState, transformer, deltas) 			} 			return errors.New("object given as Process argument is not Deltas") 		}, 	} 	return New(cfg) }  // New makes a new Controller from the given Config. func New(c *Config) Controller { 	ctlr := &controller{ 		config: *c, 		clock:  &clock.RealClock{}, 	} 	return ctlr }  // Run begins processing items, and will continue until a value is sent down stopCh or it is closed. // It's an error to call Run more than once. // Run blocks; call via go. func (c *controller) Run(stopCh <-chan struct{}) { 	defer utilruntime.HandleCrash() 	go func() { 		<-stopCh 		c.config.Queue.Close() 	}() 	r := NewReflector( 		c.config.ListerWatcher, 		c.config.ObjectType, 		c.config.Queue, 		c.config.FullResyncPeriod, 	) 	r.ShouldResync = c.config.ShouldResync 	r.WatchListPageSize = c.config.WatchListPageSize 	r.clock = c.clock 	if c.config.WatchErrorHandler != nil { 		r.watchErrorHandler = c.config.WatchErrorHandler 	}  	c.reflectorMutex.Lock() 	c.reflector = r 	c.reflectorMutex.Unlock()  	var wg wait.Group  	wg.StartWithChannel(stopCh, r.Run)  	wait.Until(c.processLoop, time.Second, stopCh) 	wg.Wait() } 

4.1 DeltaFIFO

我们来关注一下 Informer 中的 Store,也就是 DeltaFIFO 承担了什么角色。先看看 DeltaFIFO 的设计。本质上 DeltaFIFO 像 FIFIO 一样,但是有两个不同:

  • 其一是通过 Delta,可以认为是一个包装类(增量)来存储 K -> V 的 V
  • 其二是有一个 Sync 机制

用于解决以下问题:

  • 不错过任何一个操作历史的处理
  • 处理某个 V 时,可以看到从上次处理后,到此次处理期间的操作
  • 有感删除
  • 可定期重新同步

实现的方式有很多种,来看看 K8s 是怎么实现这套逻辑的,主要声明的成员变量:

  • KV 映射:items
  • 队列:queue
  • 从对象到 key 怎么转换:keyFunc
  • 怎么通过 key 拿到对象、维护所有 key:knownObjects

其中 keyFunc 是最简单的,默认会取这个对象的 meta.GetNamespace() + "/" + meta.GetName(),或者 meta.GetName(),这里不细说,本质上就是怎么唯一去区分一个对象。

knownObjects 本质上是一个 Indexer( infomer 背景下),在本篇文章的最开头,就介绍了这个 Indexer,DeltaFIFO 和 Indexer 都实现了 Store 接口。可以这么理解,Indexer 是 DeltaFIFO 中实际进行对象存储的成员,而 DeltaFIFO 则补充了期间的增量变化。这里也不多说,这个东西太简单了,有兴趣的可以去看下 client-go/tools/cache/store#cache

// DeltaFIFO is like FIFO, but differs in two ways.  One is that the // accumulator associated with a given object's key is not that object // but rather a Deltas, which is a slice of Delta values for that // object.  Applying an object to a Deltas means to append a Delta // except when the potentially appended Delta is a Deleted and the // Deltas already ends with a Deleted.  In that case the Deltas does // not grow, although the terminal Deleted will be replaced by the new // Deleted if the older Deleted's object is a // DeletedFinalStateUnknown. // // The other difference is that DeltaFIFO has an additional way that // an object can be applied to an accumulator, called Sync. // // DeltaFIFO is a producer-consumer queue, where a Reflector is // intended to be the producer, and the consumer is whatever calls // the Pop() method. // // DeltaFIFO solves this use case: //  * You want to process every object change (delta) at most once. //  * When you process an object, you want to see everything //    that's happened to it since you last processed it. //  * You want to process the deletion of some of the objects. //  * You might want to periodically reprocess objects. // // DeltaFIFO's Pop(), Get(), and GetByKey() methods return // interface{} to satisfy the Store/Queue interfaces, but they // will always return an object of type Deltas. // // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities // to list Store keys and to get objects by Store key.  The objects in // question are called "known objects" and this set of objects // modifies the behavior of the Delete, Replace, and Resync methods // (each in a different way). // // A note on threading: If you call Pop() in parallel from multiple // threads, you could end up with multiple threads processing slightly // different versions of the same object. type DeltaFIFO struct { 	// lock/cond protects access to 'items' and 'queue'. 	lock sync.RWMutex 	cond sync.Cond  	// We depend on the property that items in the set are in 	// the queue and vice versa, and that all Deltas in this 	// map have at least one Delta. 	items map[string]Deltas 	queue []string  	// populated is true if the first batch of items inserted by Replace() has been populated 	// or Delete/Add/Update was called first. 	populated bool 	// initialPopulationCount is the number of items inserted by the first call of Replace() 	initialPopulationCount int  	// keyFunc is used to make the key used for queued item 	// insertion and retrieval, and should be deterministic. 	keyFunc KeyFunc  	// knownObjects list keys that are "known" --- affecting Delete(), 	// Replace(), and Resync() 	knownObjects KeyListerGetter  	// Indication the queue is closed. 	// Used to indicate a queue is closed so a control loop can exit when a queue is empty. 	// Currently, not used to gate any of CRED operations. 	closed     bool 	closedLock sync.Mutex  	// emitDeltaTypeReplaced is whether to emit the Replaced or Sync 	// DeltaType when Replace() is called (to preserve backwards compat). 	emitDeltaTypeReplaced bool } 

4.2 从增删改看 Deltas 工作

先纵览一下 增删改,发现它的核心干活逻辑都是 queueActionLocked,其中删除多了一点点逻辑,本质上只是做了下重入,在删除操作发生的时候,如果 Indexer 和 Queue 中都没有了,就不做任何处理了。其他的都是通过 queueActionLocked 来完成:

// Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn't already exist in the set. func (f *DeltaFIFO) Add(obj interface{}) error { 	f.lock.Lock() 	defer f.lock.Unlock() 	f.populated = true 	return f.queueActionLocked(Added, obj) }  // Update is just like Add, but makes an Updated Delta. func (f *DeltaFIFO) Update(obj interface{}) error { 	f.lock.Lock() 	defer f.lock.Unlock() 	f.populated = true 	return f.queueActionLocked(Updated, obj) }  // Delete is just like Add, but makes a Deleted Delta. If the given // object does not already exist, it will be ignored. (It may have // already been deleted by a Replace (re-list), for example.)  In this // method `f.knownObjects`, if not nil, provides (via GetByKey) // _additional_ objects that are considered to already exist. func (f *DeltaFIFO) Delete(obj interface{}) error { 	id, err := f.KeyOf(obj) 	if err != nil { 		return KeyError{obj, err} 	} 	f.lock.Lock() 	defer f.lock.Unlock() 	f.populated = true 	if f.knownObjects == nil { 		if _, exists := f.items[id]; !exists { 			// Presumably, this was deleted when a relist happened. 			// Don't provide a second report of the same deletion. 			return nil 		} 	} else { 		// We only want to skip the "deletion" action if the object doesn't 		// exist in knownObjects and it doesn't have corresponding item in items. 		// Note that even if there is a "deletion" action in items, we can ignore it, 		// because it will be deduped automatically in "queueActionLocked" 		_, exists, err := f.knownObjects.GetByKey(id) 		_, itemsExist := f.items[id] 		if err == nil && !exists && !itemsExist { 			// Presumably, this was deleted when a relist happened. 			// Don't provide a second report of the same deletion. 			return nil 		} 	}  	return f.queueActionLocked(Deleted, obj) } 

所以目光只需要专注于 queueActionLocked 即可,来看看它是如何完成的,先是根据唯一标识找到了 Delta 数组中的对象,将此次的 Object 与 ActionType,一起 append 到原有的 Delta 数组中。可以看到,不管是增、删还是改,都会以一条记录的形式,塞入到 Delta 数组中:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { 	id, err := f.KeyOf(obj) 	if err != nil { 		return KeyError{obj, err} 	}  	newDeltas := append(f.items[id], Delta{actionType, obj}) 	newDeltas = dedupDeltas(newDeltas)  	if len(newDeltas) > 0 { 		if _, exists := f.items[id]; !exists { 			f.queue = append(f.queue, id) 		} 		f.items[id] = newDeltas 		f.cond.Broadcast() 	} else { 		// This never happens, because dedupDeltas never returns an empty list 		// when given a non-empty list (as it is here). 		// But if somehow it ever does return an empty list, then 		// We need to remove this from our map (extra items in the queue are 		// ignored if they are not in the map). 		delete(f.items, id) 	} 	return nil } 

4.3 DeltaFIFO Replace

Reflector 有一个定期从 Api 重新拉取数据的 case,比较简单,这里不细说。值得注意的是,DeltaFIFO 的 Replace 操作,也就是说,触发了数据重新拉取之后,DeltaFIFO 将怎么处理。

重新同步拉取的列表,将全量变成新的 Event 塞入 DeltaFIFO,而同步过来时,本地有(Indexer),服务端却没有的数据,则会以占位符 DeletedFinalStateUnknown 的形式,塞入队列中。这样,在这种数据不一致场景出现的情况下,将模拟出一条删除事件供上层处理:

... // 有删减 func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {    	// keep backwards compat for old clients 	action := Sync 	if f.emitDeltaTypeReplaced { 		action = Replaced 	}    	// Add Sync/Replaced action for each new item. 	for _, item := range list { 		key, err := f.KeyOf(item) 		if err != nil { 			return KeyError{item, err} 		} 		keys.Insert(key) 		if err := f.queueActionLocked(action, item); err != nil { 			return fmt.Errorf("couldn't enqueue object: %v", err) 		} 	}  	// Detect deletions not already in the queue. 	knownKeys := f.knownObjects.ListKeys() 	queuedDeletions := 0 	for _, k := range knownKeys { 		if keys.Has(k) { 			continue 		}  		deletedObj, exists, err := f.knownObjects.GetByKey(k) 		if err != nil { 			deletedObj = nil 			klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) 		} else if !exists { 			deletedObj = nil 			klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) 		} 		queuedDeletions++ 		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { 			return err 		} 	}  	return nil } 

看到了这里,还是有些没头没尾,这里似乎没有 Indexer 什么事,那么数据是什么时候落入 Indexer 的呢?DeltaFIFO 的消费者又在哪里?实际上 Reflector 并没有定义这些逻辑,它只负责生产消息,如何消费则非常开放。

五、消息消费者

在第四节中,我们提到了 Store 的定义者 informer(实体类叫 controller),负责管理 Reflector 的生命周期。它定义了 Reflector 中的组件,如 DeltaFIFO,它同时也定义了消息的消费逻辑:

// Multiplexes updates in the form of a list of Deltas into a Store, and informs // a given handler of events OnUpdate, OnAdd, OnDelete func processDeltas( 	// Object which receives event notifications from the given deltas 	handler ResourceEventHandler, 	clientState Store, 	transformer TransformFunc, 	deltas Deltas, ) error { 	// from oldest to newest 	for _, d := range deltas { 		obj := d.Object 		if transformer != nil { 			var err error 			obj, err = transformer(obj) 			if err != nil { 				return err 			} 		}  		switch d.Type { 		case Sync, Replaced, Added, Updated: 			if old, exists, err := clientState.Get(obj); err == nil && exists { 				if err := clientState.Update(obj); err != nil { 					return err 				} 				handler.OnUpdate(old, obj) 			} else { 				if err := clientState.Add(obj); err != nil { 					return err 				} 				handler.OnAdd(obj) 			} 		case Deleted: 			if err := clientState.Delete(obj); err != nil { 				return err 			} 			handler.OnDelete(obj) 		} 	} 	return nil }  func newInformer( 	lw ListerWatcher, 	objType runtime.Object, 	resyncPeriod time.Duration, 	h ResourceEventHandler, 	clientState Store, 	transformer TransformFunc, ) Controller { 	// This will hold incoming changes. Note how we pass clientState in as a 	// KeyLister, that way resync operations will result in the correct set 	// of update/delete deltas. 	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 		KnownObjects:          clientState, 		EmitDeltaTypeReplaced: true, 	})  	cfg := &Config{ 		Queue:            fifo, 		ListerWatcher:    lw, 		ObjectType:       objType, 		FullResyncPeriod: resyncPeriod, 		RetryOnError:     false,  		Process: func(obj interface{}) error { 			if deltas, ok := obj.(Deltas); ok { 				return processDeltas(h, clientState, transformer, deltas) 			} 			return errors.New("object given as Process argument is not Deltas") 		}, 	} 	return New(cfg) }  // New makes a new Controller from the given Config. func New(c *Config) Controller { 	ctlr := &controller{ 		config: *c, 		clock:  &clock.RealClock{}, 	} 	return ctlr }  // Run begins processing items, and will continue until a value is sent down stopCh or it is closed. // It's an error to call Run more than once. // Run blocks; call via go. func (c *controller) Run(stopCh <-chan struct{}) { 	defer utilruntime.HandleCrash() 	go func() { 		<-stopCh 		c.config.Queue.Close() 	}() 	r := NewReflector( 		c.config.ListerWatcher, 		c.config.ObjectType, 		c.config.Queue, 		c.config.FullResyncPeriod, 	) 	r.ShouldResync = c.config.ShouldResync 	r.WatchListPageSize = c.config.WatchListPageSize 	r.clock = c.clock 	if c.config.WatchErrorHandler != nil { 		r.watchErrorHandler = c.config.WatchErrorHandler 	}  	c.reflectorMutex.Lock() 	c.reflector = r 	c.reflectorMutex.Unlock()  	var wg wait.Group  	wg.StartWithChannel(stopCh, r.Run)  	wait.Until(c.processLoop, time.Second, stopCh) 	wg.Wait() }  // processLoop drains the work queue. // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times // concurrently. // // TODO: Plumb through the stopCh here (and down to the queue) so that this can // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. Converting this whole package to use Context would // also be helpful. func (c *controller) processLoop() { 	for { 		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) 		if err != nil { 			if err == ErrFIFOClosed { 				return 			} 			if c.config.RetryOnError { 				// This is the safe way to re-enqueue. 				c.config.Queue.AddIfNotPresent(obj) 			} 		} 	} } 

可以看到,informer 启动了一个协程,不断的从 DeltaFIFO 中 Pop 出 Event,并交给 PopProcessFunc,也就是消费逻辑中处理。

在 Informer 的消费逻辑中,我们看到了 Indexer(clientState) 的更新逻辑,每当有新的事件进来,process 逻辑中便会从旧到新,遍历 Event 中的事件与当时的对象,分别对本地存储进行操作,并调用对应注册进来的 handler。

六:Self-Reflective

这里算是阅读源码的一些思考,其实很早就读过这部分代码,只是没有细看,细看之后产生了一些疑问,同时感谢同事解答。有一些可能我目前也不是太清楚,也欢迎大佬解答。

保存状态变化有必要吗,申明式的资源,控制器不是只需要对 “终态” 负责吗?

这块自己也是先入为主、以偏概全了,其实不然,有不少的控制器是需要知道每次变更的,比如 Deployment 的 Revision,Rollback 等机制,后需要能清楚处理每一次的变更。

为何 processLoop 需要 Requeue 机制?

先入为主 +1,从实现(Indexer)去倒推了抽象,认为 Indexer 的操作不会报错。实际上 Store 可以是任意实现,如果是其他的实现,则无法保证落库一定成功。

已知分页会对 etcd 有压力,什么时候需要分页,什么时候不需要呢?

?

本文来自网络,不代表技术学习分享_CKX技术立场,转载请注明出处。

作者: CKX技术

上一篇
下一篇
广告位

发表回复

返回顶部