mirror of
				https://github.com/kubernetes/client-go.git
				synced 2025-10-30 21:30:26 +00:00 
			
		
		
		
	Also brushed up some other informer comments for readability and nitpicking accuracy. Kubernetes-commit: c4774de94a80bb7d0f54a8a728ad3e97ad1a07b9
		
			
				
	
	
		
			411 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			411 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2015 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package cache
 | |
| 
 | |
| import (
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"k8s.io/apimachinery/pkg/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/clock"
 | |
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| )
 | |
| 
 | |
| // This file implements a low-level controller that is used in
 | |
| // sharedIndexInformer, which is an implementation of
 | |
| // SharedIndexInformer.  Such informers, in turn, are key components
 | |
| // in the high level controllers that form the backbone of the
 | |
| // Kubernetes control plane.  Look at those for examples, or the
 | |
| // example in
 | |
| // https://github.com/kubernetes/client-go/tree/master/examples/workqueue
 | |
| // .
 | |
| 
 | |
| // Config contains all the settings for one of these low-level controllers.
 | |
| type Config struct {
 | |
| 	// The queue for your objects - has to be a DeltaFIFO due to
 | |
| 	// assumptions in the implementation. Your Process() function
 | |
| 	// should accept the output of this Queue's Pop() method.
 | |
| 	Queue
 | |
| 
 | |
| 	// Something that can list and watch your objects.
 | |
| 	ListerWatcher
 | |
| 
 | |
| 	// Something that can process a popped Deltas.
 | |
| 	Process ProcessFunc
 | |
| 
 | |
| 	// ObjectType is an example object of the type this controller is
 | |
| 	// expected to handle.  Only the type needs to be right, except
 | |
| 	// that when that is `unstructured.Unstructured` the object's
 | |
| 	// `"apiVersion"` and `"kind"` must also be right.
 | |
| 	ObjectType runtime.Object
 | |
| 
 | |
| 	// FullResyncPeriod is the period at which ShouldResync is considered.
 | |
| 	FullResyncPeriod time.Duration
 | |
| 
 | |
| 	// ShouldResync is periodically used by the reflector to determine
 | |
| 	// whether to Resync the Queue. If ShouldResync is `nil` or
 | |
| 	// returns true, it means the reflector should proceed with the
 | |
| 	// resync.
 | |
| 	ShouldResync ShouldResyncFunc
 | |
| 
 | |
| 	// If true, when Process() returns an error, re-enqueue the object.
 | |
| 	// TODO: add interface to let you inject a delay/backoff or drop
 | |
| 	//       the object completely if desired. Pass the object in
 | |
| 	//       question to this interface as a parameter.  This is probably moot
 | |
| 	//       now that this functionality appears at a higher level.
 | |
| 	RetryOnError bool
 | |
| }
 | |
| 
 | |
| // ShouldResyncFunc is a type of function that indicates if a reflector should perform a
 | |
| // resync or not. It can be used by a shared informer to support multiple event handlers with custom
 | |
| // resync periods.
 | |
| type ShouldResyncFunc func() bool
 | |
| 
 | |
| // ProcessFunc processes a single object.
 | |
| type ProcessFunc func(obj interface{}) error
 | |
| 
 | |
| // `*controller` implements Controller
 | |
| type controller struct {
 | |
| 	config         Config
 | |
| 	reflector      *Reflector
 | |
| 	reflectorMutex sync.RWMutex
 | |
| 	clock          clock.Clock
 | |
| }
 | |
| 
 | |
| // Controller is a low-level controller that is parameterized by a
 | |
| // Config and used in sharedIndexInformer.
 | |
| type Controller interface {
 | |
| 	// Run does two things.  One is to construct and run a Reflector
 | |
| 	// to pump objects/notifications from the Config's ListerWatcher
 | |
| 	// to the Config's Queue and possibly invoke the occasional Resync
 | |
| 	// on that Queue.  The other is to repeatedly Pop from the Queue
 | |
| 	// and process with the Config's ProcessFunc.  Both of these
 | |
| 	// continue until `stopCh` is closed.
 | |
| 	Run(stopCh <-chan struct{})
 | |
| 
 | |
| 	// HasSynced delegates to the Config's Queue
 | |
| 	HasSynced() bool
 | |
| 
 | |
| 	// LastSyncResourceVersion delegates to the Reflector when there
 | |
| 	// is one, otherwise returns the empty string
 | |
| 	LastSyncResourceVersion() string
 | |
| }
 | |
| 
 | |
| // New makes a new Controller from the given Config.
 | |
| func New(c *Config) Controller {
 | |
| 	ctlr := &controller{
 | |
| 		config: *c,
 | |
| 		clock:  &clock.RealClock{},
 | |
| 	}
 | |
| 	return ctlr
 | |
| }
 | |
| 
 | |
| // Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
 | |
| // It's an error to call Run more than once.
 | |
| // Run blocks; call via go.
 | |
| func (c *controller) Run(stopCh <-chan struct{}) {
 | |
| 	defer utilruntime.HandleCrash()
 | |
| 	go func() {
 | |
| 		<-stopCh
 | |
| 		c.config.Queue.Close()
 | |
| 	}()
 | |
| 	r := NewReflector(
 | |
| 		c.config.ListerWatcher,
 | |
| 		c.config.ObjectType,
 | |
| 		c.config.Queue,
 | |
| 		c.config.FullResyncPeriod,
 | |
| 	)
 | |
| 	r.ShouldResync = c.config.ShouldResync
 | |
| 	r.clock = c.clock
 | |
| 
 | |
| 	c.reflectorMutex.Lock()
 | |
| 	c.reflector = r
 | |
| 	c.reflectorMutex.Unlock()
 | |
| 
 | |
| 	var wg wait.Group
 | |
| 	defer wg.Wait()
 | |
| 
 | |
| 	wg.StartWithChannel(stopCh, r.Run)
 | |
| 
 | |
| 	wait.Until(c.processLoop, time.Second, stopCh)
 | |
| }
 | |
| 
 | |
| // Returns true once this controller has completed an initial resource listing
 | |
| func (c *controller) HasSynced() bool {
 | |
| 	return c.config.Queue.HasSynced()
 | |
| }
 | |
| 
 | |
| func (c *controller) LastSyncResourceVersion() string {
 | |
| 	c.reflectorMutex.RLock()
 | |
| 	defer c.reflectorMutex.RUnlock()
 | |
| 	if c.reflector == nil {
 | |
| 		return ""
 | |
| 	}
 | |
| 	return c.reflector.LastSyncResourceVersion()
 | |
| }
 | |
| 
 | |
| // processLoop drains the work queue.
 | |
| // TODO: Consider doing the processing in parallel. This will require a little thought
 | |
| // to make sure that we don't end up processing the same object multiple times
 | |
| // concurrently.
 | |
| //
 | |
| // TODO: Plumb through the stopCh here (and down to the queue) so that this can
 | |
| // actually exit when the controller is stopped. Or just give up on this stuff
 | |
| // ever being stoppable. Converting this whole package to use Context would
 | |
| // also be helpful.
 | |
| func (c *controller) processLoop() {
 | |
| 	for {
 | |
| 		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
 | |
| 		if err != nil {
 | |
| 			if err == ErrFIFOClosed {
 | |
| 				return
 | |
| 			}
 | |
| 			if c.config.RetryOnError {
 | |
| 				// This is the safe way to re-enqueue.
 | |
| 				c.config.Queue.AddIfNotPresent(obj)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // ResourceEventHandler can handle notifications for events that
 | |
| // happen to a resource. The events are informational only, so you
 | |
| // can't return an error.  The handlers MUST NOT modify the objects
 | |
| // received; this concerns not only the top level of structure but all
 | |
| // the data structures reachable from it.
 | |
| //  * OnAdd is called when an object is added.
 | |
| //  * OnUpdate is called when an object is modified. Note that oldObj is the
 | |
| //      last known state of the object-- it is possible that several changes
 | |
| //      were combined together, so you can't use this to see every single
 | |
| //      change. OnUpdate is also called when a re-list happens, and it will
 | |
| //      get called even if nothing changed. This is useful for periodically
 | |
| //      evaluating or syncing something.
 | |
| //  * OnDelete will get the final state of the item if it is known, otherwise
 | |
| //      it will get an object of type DeletedFinalStateUnknown. This can
 | |
| //      happen if the watch is closed and misses the delete event and we don't
 | |
| //      notice the deletion until the subsequent re-list.
 | |
| type ResourceEventHandler interface {
 | |
| 	OnAdd(obj interface{})
 | |
| 	OnUpdate(oldObj, newObj interface{})
 | |
| 	OnDelete(obj interface{})
 | |
| }
 | |
| 
 | |
| // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
 | |
| // as few of the notification functions as you want while still implementing
 | |
| // ResourceEventHandler.  This adapter does not remove the prohibition against
 | |
| // modifying the objects.
 | |
| type ResourceEventHandlerFuncs struct {
 | |
| 	AddFunc    func(obj interface{})
 | |
| 	UpdateFunc func(oldObj, newObj interface{})
 | |
| 	DeleteFunc func(obj interface{})
 | |
| }
 | |
| 
 | |
| // OnAdd calls AddFunc if it's not nil.
 | |
| func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
 | |
| 	if r.AddFunc != nil {
 | |
| 		r.AddFunc(obj)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // OnUpdate calls UpdateFunc if it's not nil.
 | |
| func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
 | |
| 	if r.UpdateFunc != nil {
 | |
| 		r.UpdateFunc(oldObj, newObj)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // OnDelete calls DeleteFunc if it's not nil.
 | |
| func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
 | |
| 	if r.DeleteFunc != nil {
 | |
| 		r.DeleteFunc(obj)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // FilteringResourceEventHandler applies the provided filter to all events coming
 | |
| // in, ensuring the appropriate nested handler method is invoked. An object
 | |
| // that starts passing the filter after an update is considered an add, and an
 | |
| // object that stops passing the filter after an update is considered a delete.
 | |
| // Like the handlers, the filter MUST NOT modify the objects it is given.
 | |
| type FilteringResourceEventHandler struct {
 | |
| 	FilterFunc func(obj interface{}) bool
 | |
| 	Handler    ResourceEventHandler
 | |
| }
 | |
| 
 | |
| // OnAdd calls the nested handler only if the filter succeeds
 | |
| func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
 | |
| 	if !r.FilterFunc(obj) {
 | |
| 		return
 | |
| 	}
 | |
| 	r.Handler.OnAdd(obj)
 | |
| }
 | |
| 
 | |
| // OnUpdate ensures the proper handler is called depending on whether the filter matches
 | |
| func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
 | |
| 	newer := r.FilterFunc(newObj)
 | |
| 	older := r.FilterFunc(oldObj)
 | |
| 	switch {
 | |
| 	case newer && older:
 | |
| 		r.Handler.OnUpdate(oldObj, newObj)
 | |
| 	case newer && !older:
 | |
| 		r.Handler.OnAdd(newObj)
 | |
| 	case !newer && older:
 | |
| 		r.Handler.OnDelete(oldObj)
 | |
| 	default:
 | |
| 		// do nothing
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // OnDelete calls the nested handler only if the filter succeeds
 | |
| func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
 | |
| 	if !r.FilterFunc(obj) {
 | |
| 		return
 | |
| 	}
 | |
| 	r.Handler.OnDelete(obj)
 | |
| }
 | |
| 
 | |
| // DeletionHandlingMetaNamespaceKeyFunc checks for
 | |
| // DeletedFinalStateUnknown objects before calling
 | |
| // MetaNamespaceKeyFunc.
 | |
| func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
 | |
| 	if d, ok := obj.(DeletedFinalStateUnknown); ok {
 | |
| 		return d.Key, nil
 | |
| 	}
 | |
| 	return MetaNamespaceKeyFunc(obj)
 | |
| }
 | |
| 
 | |
| // NewInformer returns a Store and a controller for populating the store
 | |
| // while also providing event notifications. You should only used the returned
 | |
| // Store for Get/List operations; Add/Modify/Deletes will cause the event
 | |
| // notifications to be faulty.
 | |
| //
 | |
| // Parameters:
 | |
| //  * lw is list and watch functions for the source of the resource you want to
 | |
| //    be informed of.
 | |
| //  * objType is an object of the type that you expect to receive.
 | |
| //  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
 | |
| //    calls, even if nothing changed). Otherwise, re-list will be delayed as
 | |
| //    long as possible (until the upstream source closes the watch or times out,
 | |
| //    or you stop the controller).
 | |
| //  * h is the object you want notifications sent to.
 | |
| //
 | |
| func NewInformer(
 | |
| 	lw ListerWatcher,
 | |
| 	objType runtime.Object,
 | |
| 	resyncPeriod time.Duration,
 | |
| 	h ResourceEventHandler,
 | |
| ) (Store, Controller) {
 | |
| 	// This will hold the client state, as we know it.
 | |
| 	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 | |
| 
 | |
| 	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
 | |
| }
 | |
| 
 | |
| // NewIndexerInformer returns a Indexer and a controller for populating the index
 | |
| // while also providing event notifications. You should only used the returned
 | |
| // Index for Get/List operations; Add/Modify/Deletes will cause the event
 | |
| // notifications to be faulty.
 | |
| //
 | |
| // Parameters:
 | |
| //  * lw is list and watch functions for the source of the resource you want to
 | |
| //    be informed of.
 | |
| //  * objType is an object of the type that you expect to receive.
 | |
| //  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
 | |
| //    calls, even if nothing changed). Otherwise, re-list will be delayed as
 | |
| //    long as possible (until the upstream source closes the watch or times out,
 | |
| //    or you stop the controller).
 | |
| //  * h is the object you want notifications sent to.
 | |
| //  * indexers is the indexer for the received object type.
 | |
| //
 | |
| func NewIndexerInformer(
 | |
| 	lw ListerWatcher,
 | |
| 	objType runtime.Object,
 | |
| 	resyncPeriod time.Duration,
 | |
| 	h ResourceEventHandler,
 | |
| 	indexers Indexers,
 | |
| ) (Indexer, Controller) {
 | |
| 	// This will hold the client state, as we know it.
 | |
| 	clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
 | |
| 
 | |
| 	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
 | |
| }
 | |
| 
 | |
| // newInformer returns a controller for populating the store while also
 | |
| // providing event notifications.
 | |
| //
 | |
| // Parameters
 | |
| //  * lw is list and watch functions for the source of the resource you want to
 | |
| //    be informed of.
 | |
| //  * objType is an object of the type that you expect to receive.
 | |
| //  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
 | |
| //    calls, even if nothing changed). Otherwise, re-list will be delayed as
 | |
| //    long as possible (until the upstream source closes the watch or times out,
 | |
| //    or you stop the controller).
 | |
| //  * h is the object you want notifications sent to.
 | |
| //  * clientState is the store you want to populate
 | |
| //
 | |
| func newInformer(
 | |
| 	lw ListerWatcher,
 | |
| 	objType runtime.Object,
 | |
| 	resyncPeriod time.Duration,
 | |
| 	h ResourceEventHandler,
 | |
| 	clientState Store,
 | |
| ) Controller {
 | |
| 	// This will hold incoming changes. Note how we pass clientState in as a
 | |
| 	// KeyLister, that way resync operations will result in the correct set
 | |
| 	// of update/delete deltas.
 | |
| 	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
 | |
| 		KnownObjects:          clientState,
 | |
| 		EmitDeltaTypeReplaced: true,
 | |
| 	})
 | |
| 
 | |
| 	cfg := &Config{
 | |
| 		Queue:            fifo,
 | |
| 		ListerWatcher:    lw,
 | |
| 		ObjectType:       objType,
 | |
| 		FullResyncPeriod: resyncPeriod,
 | |
| 		RetryOnError:     false,
 | |
| 
 | |
| 		Process: func(obj interface{}) error {
 | |
| 			// from oldest to newest
 | |
| 			for _, d := range obj.(Deltas) {
 | |
| 				switch d.Type {
 | |
| 				case Sync, Replaced, Added, Updated:
 | |
| 					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
 | |
| 						if err := clientState.Update(d.Object); err != nil {
 | |
| 							return err
 | |
| 						}
 | |
| 						h.OnUpdate(old, d.Object)
 | |
| 					} else {
 | |
| 						if err := clientState.Add(d.Object); err != nil {
 | |
| 							return err
 | |
| 						}
 | |
| 						h.OnAdd(d.Object)
 | |
| 					}
 | |
| 				case Deleted:
 | |
| 					if err := clientState.Delete(d.Object); err != nil {
 | |
| 						return err
 | |
| 					}
 | |
| 					h.OnDelete(d.Object)
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		},
 | |
| 	}
 | |
| 	return New(cfg)
 | |
| }
 |