一、什么是 Reflector
Reflector watches a specified resource and causes all changes to be reflected in the given store.
K8s 对于 Reflector 的描述十分干练:Reflector 是监听某项资源并将 “变更” “落库” 的组件。
除了 Reflector 本身,需要额外关注的便是 Store 这个接口,这里我们只简单看下 Store 的接口定义,了解一下这个东西大体是个什么东西:
// Store is a generic object storage and processing interface. A // Store holds a map from string keys to accumulators, and has // operations to add, update, and delete a given object to/from the // accumulator currently associated with a given key. A Store also // knows how to extract the key from a given object, so many operations // are given only the object. // // In the simplest Store implementations each accumulator is simply // the last given object, or empty after Delete, and thus the Store's // behavior is simple storage. // // Reflector knows how to watch a server and update a Store. This // package provides a variety of implementations of Store. type Store interface { // Add adds the given object to the accumulator associated with the given object's key Add(obj interface{}) error // Update updates the given object in the accumulator associated with the given object's key Update(obj interface{}) error // Delete deletes the given object from the accumulator associated with the given object's key Delete(obj interface{}) error // List returns a list of all the currently non-empty accumulators List() []interface{} // ListKeys returns a list of all the keys currently associated with non-empty accumulators ListKeys() []string // Get returns the accumulator associated with the given object's key Get(obj interface{}) (item interface{}, exists bool, err error) // GetByKey returns the accumulator associated with the given key GetByKey(key string) (item interface{}, exists bool, err error) // Replace will delete the contents of the store, using instead the // given list. Store takes ownership of the list, you should not reference // it after calling this function. Replace([]interface{}, string) error // Resync is meaningless in the terms appearing here but has // meaning in some implementations that have non-trivial // additional behavior (e.g., DeltaFIFO). Resync() error }
通俗解释一下,就是 Store 是一个通用对象存储和处理的接口,持有 keys 到 accumulators 的映射,可以增删改查。
以最简单的一种 case 来说,这个 accumulator 可以是一个对象,存储的行为也可以是一个简单存储,比如 Indexer client-go/tools/cache/store#cache
的实现,就是维护了内置了一个 map(严格意义来说是 client-go/tools/cache/thread_safe_store#goThreadSafeStore
,基于 map 实现了并发安全、索引)。
二、准备逻辑 —— List
那么回到正题,Reflector 最主要的运转逻辑就是 ListAndWatch,资源的 watch 和 store 的更新是如何实现的呢,先从启动入口开始,启动的入口如下:
// Run repeatedly uses the reflector's ListAndWatch to fetch all the // objects and subsequent deltas. // Run will exit when stopCh is closed. func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.backoffManager, true, stopCh) klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) }
这里的核心逻辑可以看到是 r.ListAndWatch()
,外面套了一层 Backoff 的管理器,用于崩溃时退避与恢复,这里不作赘述。
ListAndWatch 的代码结构如下,从注释也很容易看出,这里先进行了一次全量的 list,然后根据资源版本去做 watch。可以看到一开始的全量 list,这里由 Pager 代理了 listerWatcher 的 list 逻辑,在拿到结果后,调用 r.syncWith
,将结果存储到 Store 中:
// ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) var resourceVersion string options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} if err := func() error { ... // 有删减 var list runtime.Object go func() { pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) list, paginatedResult, err = pager.List(context.Background(), options) close(listCh) } listMetaInterface, err := meta.ListAccessor(list) resourceVersion = listMetaInterface.GetResourceVersion() items, err := meta.ExtractList(list) if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } r.setLastSyncResourceVersion(resourceVersion) ... // 有删减 }(); err != nil { return err } resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go func() {...Block1...}() for {...Block2...} }
r.syncWith
逻辑很简单,本质上就是调用了 store.Replace
:
// syncWith replaces the store's items with the given list. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { found := make([]interface{}, 0, len(items)) for _, item := range items { found = append(found, item) } return r.store.Replace(found, resourceVersion) }
2.1 ListWatcher
一步步来,首先是最里层的执行者,r.listerWatcher.List(opts)
,它有很多个实现,本质上就是如何去进行首次拉取,我们这里拿两个例子可以很直观的了解它做了什么:
第一个是测试用例中,如何声明 listerWatcher
的实现,ListFunc 就是很简单的返回一个列表对象,WatchFunc 则是简单的声明了一个基于 chan 的实现:
func TestReflectorResync(t *testing.T) { iteration := 0 stopCh := make(chan struct{}) rerr := errors.New("expected resync reached") s := &FakeCustomStore{ ResyncFunc: func() error { iteration++ if iteration == 2 { return rerr } return nil }, } lw := &testLW{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { fw := watch.NewFake() return fw, nil }, ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "0"}}, nil }, } resyncPeriod := 1 * time.Millisecond r := NewReflector(lw, &v1.Pod{}, s, resyncPeriod) if err := r.ListAndWatch(stopCh); err != nil { // error from Resync is not propaged up to here. t.Errorf("expected error %v", err) } if iteration != 2 { t.Errorf("exactly 2 iterations were expected, got: %v", iteration) } } // // fw: // func (f *FakeWatcher) ResultChan() <-chan Event { return f.result } // Add sends an add event. func (f *FakeWatcher) Add(obj runtime.Object) { f.result <- Event{Added, obj} }
另外则是看一下通常的 NewDeploymentInformer
是如何声明的,可以看到,就是依靠基础的 client 来完成的:
// NewFilteredDeploymentInformer constructs a new informer for Deployment type. // Always prefer using an informer factory to get a shared informer instead of getting an independent // one. This reduces memory footprint and number of connections to the server. func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.ExtensionsV1beta1().Deployments(namespace).List(context.TODO(), options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.ExtensionsV1beta1().Deployments(namespace).Watch(context.TODO(), options) }, }, &extensionsv1beta1.Deployment{}, resyncPeriod, indexers, ) }
2.2 Pager
了解了 listerWatcher
的职责,另一快则是 Pager。Pager 本质上是控制 List 这个动作是否分批执行,以及如何分批执行,很容易联想到一个 for 循环不断去捞取数据。
代码也确实如此,这个这里不做赘述。值得注意的是进行分页前的一些配置代码,如 page.PageSize
、options.ResourceVersion
和 Continue
,这些配置可以渗透到 pager 的设置中,在 Pager 启动前,可以看到:
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first // list request will return the full response. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) switch { case r.WatchListPageSize != 0: pager.PageSize = r.WatchListPageSize case r.paginatedResult: // We got a paginated result initially. Assume this resource and server honor // paging requests (i.e. watch cache is probably disabled) and leave the default // pager size set. case options.ResourceVersion != "" && options.ResourceVersion != "0": // User didn't explicitly request pagination. // // With ResourceVersion != "", we have a possibility to list from watch cache, // but we do that (for ResourceVersion != "0") only if Limit is unset. // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly // switch off pagination to force listing from watch cache (if enabled). // With the existing semantic of RV (result is at least as fresh as provided RV), // this is correct and doesn't lead to going back in time. // // We also don't turn off pagination for ResourceVersion="0", since watch cache // is ignoring Limit in that case anyway, and if watch cache is not enabled // we don't introduce regression. pager.PageSize = 0 }
大体围绕着要不要进行分页,以及进行多大的分页来进行。
先看看分页的描述,这里提到了,使用分页应是一个谨慎的操作,它将直接由 etcd 提供,可能会导致性能问题,也就是第一个 case,我们应尽量避免。分页由 Continue 来进行提供,:
// WatchListPageSize is the requested chunk size of initial and resync watch lists. // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0") // it will turn off pagination to allow serving them from watch cache. // NOTE: It should be used carefully as paginated lists are always served directly from // etcd, which is significantly less efficient and may lead to serious performance and // scalability problems. WatchListPageSize int64
再看下服务端(ApiServer)的代码,具体的代码在 apiserver/pkg/storage/cacher/cacher.go
中,可以看到,启用了 APIListChunking
的前提下,使用 Continue 或 不指定 RV 或(使用 Limit 且 RV > 0)都将导致压力传导到 etcd 中。
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) hasContinuation := pagingEnabled && len(pred.Continue) > 0 hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" if resourceVersion == "" || hasContinuation || hasLimit { // If resourceVersion is not specified, serve it from underlying // storage (for backward compatibility). If a continuation is // requested, serve it from the underlying storage as well. // Limits are only sent to storage when resourceVersion is non-zero // since the watch cache isn't able to perform continuations, and // limits are ignored when resource version is zero. return c.storage.List(ctx, key, resourceVersion, pred, listObj) }
所以综合下来,通常情况的优解就是 RV >= 0,且不使用 Limit 这个 case 。我们看看官方文档中对 RV 的描述:
对于 get 和 list 而言,resourceVersion
的语义为:
get:
resourceVersion 未设置 | resourceVersion=”0″ | resourceVersion=”<非零值>” |
---|---|---|
最新版本 | 任何版本 | 不老于给定版本 |
list:
除非你对一致性有着非常强烈的需求,使用 resourceVersionMatch=NotOlderThan
同时为 resourceVersion
设定一个已知值是优选的交互方式,因为与不设置 resourceVersion
和 resourceVersionMatch
相比,这种配置可以取得更好的 集群性能和可扩缩性。后者需要提供带票选能力的读操作。
2.3 Store 与 RV
Pager 拿到数据之后,要做的就是三件事情:
- 第一个是将数据存起来
- 第二个是将数据最新的版本(RV)记录起来
- 第三个是拿着这个最新的数据版本去调用 watch
这里先看前两件事:
listMetaInterface, err := meta.ListAccessor(list) resourceVersion = listMetaInterface.GetResourceVersion() items, err := meta.ExtractList(list) if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } r.setLastSyncResourceVersion(resourceVersion)
- 【Store】将数据存储起来这个事情很好理解,看起来也很 “简单”,这里调用了 Reflector 中的重要成员变量 Store,并将拉取到的数据存入,Reflector 中 Store 实际上比较复杂,值得长篇大论,这里可以先简单把它看作一个内存 KV mapping:
// syncWith replaces the store's items with the given list. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { found := make([]interface{}, 0, len(items)) for _, item := range items { found = append(found, item) } return r.store.Replace(found, resourceVersion) }
-
【RV】 本质上是 MVCC 中基础的设计,是一个逻辑时间戳,对应 Etcd 中的 revision。为每个操作附带一个递增的逻辑时间戳,来达到整个系统的串行化。当然它还有其他的功能,如隔离级别、一致性视图等等,K8s 中最常接触到的是使用 RV 来做并发控制。
更新一个资源的时候,怎么知道其他人在这期间有没有修改过此资源?靠的就是 RV 的对比。
三、运转逻辑 —— Watch
3.1 Watch
拿到 list 的结果后,可以从中获取到 RV,Watch 实际上就是基于这个 RV 来进行增量 watch:
... // 有删减 options = metav1.ListOptions{ ResourceVersion: resourceVersion, // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. TimeoutSeconds: &timeoutSeconds, // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks. // Reflector doesn't assume bookmarks are returned at all (if the server do not support // watch bookmarks, it will ignore this field). AllowWatchBookmarks: true, } // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent start := r.clock.Now() w, err := r.listerWatcher.Watch(options) r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); ... // 有删减
r.listerWatcher.Watch(options)
和前面提到的 list 是一样的,本质上都是借助基础的 client 来实现的。
watch 的实现这里不多讲,核心就是 Transfer-Encoding: chunked + 长连接。内容简单,往 body 不断塞入以下内容即可,长度开头,换行后为内容,再次换行来结束。灵活,简单,高效:
25 This is the data in the first chunk 1C and this is the second one 3 con 8 sequence
类似的流式技术有很多,我也没想太明白为什么是这个,大概是因为简单吧。有了上面简单的认知后,就很好理解 client.watch 在做什么了,先发起一个 Http 长连接,根据 mediaType 拿到相应的解码器,使用 frameReader 来读取流:
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) resp, err := client.Do(req) contentType := resp.Header.Get("Content-Type") mediaType, params, err := mime.ParseMediaType(contentType) objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params) frameReader := framer.NewFrameReader(resp.Body) watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer) return watch.NewStreamWatcher( restclientwatch.NewDecoder(watchEventDecoder, objectDecoder), // use 500 to indicate that the cause of the error is unknown - other error codes // are more specific to HTTP interactions, and set a reason errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"), ), nil
这个 NewStreamWatcher 启了一个协程,不断从 Reader 中读取内容,我们可以看到内容由两部分构成,一个叫 action,另一个则是 runtime.Object。解码 + 反序列化后,将内容投入 channel 中:
// NewStreamWatcher creates a StreamWatcher from the given decoder. func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { sw := &StreamWatcher{ source: d, reporter: r, // It's easy for a consumer to add buffering via an extra // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), } go sw.receive() return sw } const ( Added EventType = "ADDED" Modified EventType = "MODIFIED" Deleted EventType = "DELETED" Bookmark EventType = "BOOKMARK" Error EventType = "ERROR" DefaultChanSize int32 = 100 ) // receive reads result from the decoder in a loop and sends down the result channel. func (sw *StreamWatcher) receive() { defer close(sw.result) defer sw.Stop() defer utilruntime.HandleCrash() for { action, obj, err := sw.source.Decode() if err != nil { // Ignore expected error. if sw.stopping() { return } switch err { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err) default: if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), } } } return } sw.result <- Event{ Type: action, Object: obj, } } }
3.2 WatchHandler
前面说到,Watcher 负责把 HTTP 长连接的内容进行解码、反序列化,拿到的内容投入 channel,那么消费 channel 的地方,在 Reflector 中叫做 WatchHandler ,整体的逻辑清晰简单,就是根据 Event,去调用不同的 store 实现,代码看起来多,本质上很简单:
// watchHandler watches w and keeps *resourceVersion up to date. func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { eventCount := 0 // Stopping the watcher should be idempotent and if we return from this function there's no way // we're coming back in with the same watch interface. defer w.Stop() loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): if !ok { break loop } if event.Type == watch.Error { return apierrors.FromObject(event.Object) } if r.expectedType != nil { if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } } if r.expectedGVK != nil { if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a)) continue } } meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } newResourceVersion := meta.GetResourceVersion() switch event.Type { case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } watchDuration := r.clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount) return nil }
四、DeltaFIFO
经过前面的分析,我们已经了解到,Reflector 的核心逻辑是先进行了一次 List
- 通过 Pager 配合 listerWatcher#ListFunc 尽可能拿到所有数据
- 并将数据存入到 Store( Replace )
再进行 watch
- 通过 listerWatcher#WatchFunc 请求了 ApiServer,不断解析 Body,产出 Event
- WatchHandler 中通过事件的不同,进行处理,或者调用 Store 对应的增删改( Add、Delete、Update )
那么这里还剩下一个疑点没解,就是 Store 里面究竟做了什么?首先 Store 和 listWatcher 一样,都是很开放的实现,我们看看在 informer 的实现中,Store 是如何工作的。
代码很多,但是只需要关注 Store 这块内容即可,在初始化 informer 的过程中,new 了一个 DeltaFifo,注入到 Config 中,然后在运行 informer 的时候,将这个 DeltaFifo 当作 Store 传入了 Reflector 的构造函数。
// newInformer returns a controller for populating the store while also // providing event notifications. // // Parameters // * lw is list and watch functions for the source of the resource you want to // be informed of. // * objType is an object of the type that you expect to receive. // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate // calls, even if nothing changed). Otherwise, re-list will be delayed as // long as possible (until the upstream source closes the watch or times out, // or you stop the controller). // * h is the object you want notifications sent to. // * clientState is the store you want to populate // func newInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, clientState Store, transformer TransformFunc, ) Controller { // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: clientState, EmitDeltaTypeReplaced: true, }) cfg := &Config{ Queue: fifo, ListerWatcher: lw, ObjectType: objType, FullResyncPeriod: resyncPeriod, RetryOnError: false, Process: func(obj interface{}) error { if deltas, ok := obj.(Deltas); ok { return processDeltas(h, clientState, transformer, deltas) } return errors.New("object given as Process argument is not Deltas") }, } return New(cfg) } // New makes a new Controller from the given Config. func New(c *Config) Controller { ctlr := &controller{ config: *c, clock: &clock.RealClock{}, } return ctlr } // Run begins processing items, and will continue until a value is sent down stopCh or it is closed. // It's an error to call Run more than once. // Run blocks; call via go. func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.WatchListPageSize = c.config.WatchListPageSize r.clock = c.clock if c.config.WatchErrorHandler != nil { r.watchErrorHandler = c.config.WatchErrorHandler } c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group wg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh) wg.Wait() }
4.1 DeltaFIFO
我们来关注一下 Informer 中的 Store,也就是 DeltaFIFO 承担了什么角色。先看看 DeltaFIFO 的设计。本质上 DeltaFIFO 像 FIFIO 一样,但是有两个不同:
- 其一是通过 Delta,可以认为是一个包装类(增量)来存储 K -> V 的 V
- 其二是有一个 Sync 机制
用于解决以下问题:
- 不错过任何一个操作历史的处理
- 处理某个 V 时,可以看到从上次处理后,到此次处理期间的操作
- 有感删除
- 可定期重新同步
实现的方式有很多种,来看看 K8s 是怎么实现这套逻辑的,主要声明的成员变量:
- KV 映射:items
- 队列:queue
- 从对象到 key 怎么转换:keyFunc
- 怎么通过 key 拿到对象、维护所有 key:knownObjects
其中 keyFunc 是最简单的,默认会取这个对象的 meta.GetNamespace() + "/" + meta.GetName()
,或者 meta.GetName()
,这里不细说,本质上就是怎么唯一去区分一个对象。
knownObjects 本质上是一个 Indexer( infomer 背景下),在本篇文章的最开头,就介绍了这个 Indexer,DeltaFIFO 和 Indexer 都实现了 Store 接口。可以这么理解,Indexer 是 DeltaFIFO 中实际进行对象存储的成员,而 DeltaFIFO 则补充了期间的增量变化。这里也不多说,这个东西太简单了,有兴趣的可以去看下 client-go/tools/cache/store#cache
。
// DeltaFIFO is like FIFO, but differs in two ways. One is that the // accumulator associated with a given object's key is not that object // but rather a Deltas, which is a slice of Delta values for that // object. Applying an object to a Deltas means to append a Delta // except when the potentially appended Delta is a Deleted and the // Deltas already ends with a Deleted. In that case the Deltas does // not grow, although the terminal Deleted will be replaced by the new // Deleted if the older Deleted's object is a // DeletedFinalStateUnknown. // // The other difference is that DeltaFIFO has an additional way that // an object can be applied to an accumulator, called Sync. // // DeltaFIFO is a producer-consumer queue, where a Reflector is // intended to be the producer, and the consumer is whatever calls // the Pop() method. // // DeltaFIFO solves this use case: // * You want to process every object change (delta) at most once. // * When you process an object, you want to see everything // that's happened to it since you last processed it. // * You want to process the deletion of some of the objects. // * You might want to periodically reprocess objects. // // DeltaFIFO's Pop(), Get(), and GetByKey() methods return // interface{} to satisfy the Store/Queue interfaces, but they // will always return an object of type Deltas. // // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities // to list Store keys and to get objects by Store key. The objects in // question are called "known objects" and this set of objects // modifies the behavior of the Delete, Replace, and Resync methods // (each in a different way). // // A note on threading: If you call Pop() in parallel from multiple // threads, you could end up with multiple threads processing slightly // different versions of the same object. type DeltaFIFO struct { // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond // We depend on the property that items in the set are in // the queue and vice versa, and that all Deltas in this // map have at least one Delta. items map[string]Deltas queue []string // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool // initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int // keyFunc is used to make the key used for queued item // insertion and retrieval, and should be deterministic. keyFunc KeyFunc // knownObjects list keys that are "known" --- affecting Delete(), // Replace(), and Resync() knownObjects KeyListerGetter // Indication the queue is closed. // Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex // emitDeltaTypeReplaced is whether to emit the Replaced or Sync // DeltaType when Replace() is called (to preserve backwards compat). emitDeltaTypeReplaced bool }
4.2 从增删改看 Deltas 工作
先纵览一下 增删改,发现它的核心干活逻辑都是 queueActionLocked,其中删除多了一点点逻辑,本质上只是做了下重入,在删除操作发生的时候,如果 Indexer 和 Queue 中都没有了,就不做任何处理了。其他的都是通过 queueActionLocked 来完成:
// Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn't already exist in the set. func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Added, obj) } // Update is just like Add, but makes an Updated Delta. func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Updated, obj) } // Delete is just like Add, but makes a Deleted Delta. If the given // object does not already exist, it will be ignored. (It may have // already been deleted by a Replace (re-list), for example.) In this // method `f.knownObjects`, if not nil, provides (via GetByKey) // _additional_ objects that are considered to already exist. func (f *DeltaFIFO) Delete(obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.populated = true if f.knownObjects == nil { if _, exists := f.items[id]; !exists { // Presumably, this was deleted when a relist happened. // Don't provide a second report of the same deletion. return nil } } else { // We only want to skip the "deletion" action if the object doesn't // exist in knownObjects and it doesn't have corresponding item in items. // Note that even if there is a "deletion" action in items, we can ignore it, // because it will be deduped automatically in "queueActionLocked" _, exists, err := f.knownObjects.GetByKey(id) _, itemsExist := f.items[id] if err == nil && !exists && !itemsExist { // Presumably, this was deleted when a relist happened. // Don't provide a second report of the same deletion. return nil } } return f.queueActionLocked(Deleted, obj) }
所以目光只需要专注于 queueActionLocked 即可,来看看它是如何完成的,先是根据唯一标识找到了 Delta 数组中的对象,将此次的 Object 与 ActionType,一起 append 到原有的 Delta 数组中。可以看到,不管是增、删还是改,都会以一条记录的形式,塞入到 Delta 数组中:
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { // This never happens, because dedupDeltas never returns an empty list // when given a non-empty list (as it is here). // But if somehow it ever does return an empty list, then // We need to remove this from our map (extra items in the queue are // ignored if they are not in the map). delete(f.items, id) } return nil }
4.3 DeltaFIFO Replace
Reflector 有一个定期从 Api 重新拉取数据的 case,比较简单,这里不细说。值得注意的是,DeltaFIFO 的 Replace 操作,也就是说,触发了数据重新拉取之后,DeltaFIFO 将怎么处理。
重新同步拉取的列表,将全量变成新的 Event 塞入 DeltaFIFO,而同步过来时,本地有(Indexer),服务端却没有的数据,则会以占位符 DeletedFinalStateUnknown
的形式,塞入队列中。这样,在这种数据不一致场景出现的情况下,将模拟出一条删除事件供上层处理:
... // 有删减 func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { // keep backwards compat for old clients action := Sync if f.emitDeltaTypeReplaced { action = Replaced } // Add Sync/Replaced action for each new item. for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } keys.Insert(key) if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } // Detect deletions not already in the queue. knownKeys := f.knownObjects.ListKeys() queuedDeletions := 0 for _, k := range knownKeys { if keys.Has(k) { continue } deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) } else if !exists { deletedObj = nil klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } return nil }
看到了这里,还是有些没头没尾,这里似乎没有 Indexer 什么事,那么数据是什么时候落入 Indexer 的呢?DeltaFIFO 的消费者又在哪里?实际上 Reflector 并没有定义这些逻辑,它只负责生产消息,如何消费则非常开放。
五、消息消费者
在第四节中,我们提到了 Store 的定义者 informer(实体类叫 controller),负责管理 Reflector 的生命周期。它定义了 Reflector 中的组件,如 DeltaFIFO,它同时也定义了消息的消费逻辑:
// Multiplexes updates in the form of a list of Deltas into a Store, and informs // a given handler of events OnUpdate, OnAdd, OnDelete func processDeltas( // Object which receives event notifications from the given deltas handler ResourceEventHandler, clientState Store, transformer TransformFunc, deltas Deltas, ) error { // from oldest to newest for _, d := range deltas { obj := d.Object if transformer != nil { var err error obj, err = transformer(obj) if err != nil { return err } } switch d.Type { case Sync, Replaced, Added, Updated: if old, exists, err := clientState.Get(obj); err == nil && exists { if err := clientState.Update(obj); err != nil { return err } handler.OnUpdate(old, obj) } else { if err := clientState.Add(obj); err != nil { return err } handler.OnAdd(obj) } case Deleted: if err := clientState.Delete(obj); err != nil { return err } handler.OnDelete(obj) } } return nil } func newInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, clientState Store, transformer TransformFunc, ) Controller { // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: clientState, EmitDeltaTypeReplaced: true, }) cfg := &Config{ Queue: fifo, ListerWatcher: lw, ObjectType: objType, FullResyncPeriod: resyncPeriod, RetryOnError: false, Process: func(obj interface{}) error { if deltas, ok := obj.(Deltas); ok { return processDeltas(h, clientState, transformer, deltas) } return errors.New("object given as Process argument is not Deltas") }, } return New(cfg) } // New makes a new Controller from the given Config. func New(c *Config) Controller { ctlr := &controller{ config: *c, clock: &clock.RealClock{}, } return ctlr } // Run begins processing items, and will continue until a value is sent down stopCh or it is closed. // It's an error to call Run more than once. // Run blocks; call via go. func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.WatchListPageSize = c.config.WatchListPageSize r.clock = c.clock if c.config.WatchErrorHandler != nil { r.watchErrorHandler = c.config.WatchErrorHandler } c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group wg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh) wg.Wait() } // processLoop drains the work queue. // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times // concurrently. // // TODO: Plumb through the stopCh here (and down to the queue) so that this can // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. Converting this whole package to use Context would // also be helpful. func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
可以看到,informer 启动了一个协程,不断的从 DeltaFIFO 中 Pop 出 Event,并交给 PopProcessFunc,也就是消费逻辑中处理。
在 Informer 的消费逻辑中,我们看到了 Indexer(clientState) 的更新逻辑,每当有新的事件进来,process 逻辑中便会从旧到新,遍历 Event 中的事件与当时的对象,分别对本地存储进行操作,并调用对应注册进来的 handler。
六:Self-Reflective
这里算是阅读源码的一些思考,其实很早就读过这部分代码,只是没有细看,细看之后产生了一些疑问,同时感谢同事解答。有一些可能我目前也不是太清楚,也欢迎大佬解答。
保存状态变化有必要吗,申明式的资源,控制器不是只需要对 “终态” 负责吗?
这块自己也是先入为主、以偏概全了,其实不然,有不少的控制器是需要知道每次变更的,比如 Deployment 的 Revision,Rollback 等机制,后需要能清楚处理每一次的变更。
为何 processLoop 需要 Requeue 机制?
先入为主 +1,从实现(Indexer)去倒推了抽象,认为 Indexer 的操作不会报错。实际上 Store 可以是任意实现,如果是其他的实现,则无法保证落库一定成功。
已知分页会对 etcd 有压力,什么时候需要分页,什么时候不需要呢?
?