mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-26 02:55:32 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			322 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			322 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2015 The Kubernetes Authors All rights reserved.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package framework
 | |
| 
 | |
| import (
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"k8s.io/kubernetes/pkg/client/cache"
 | |
| 	"k8s.io/kubernetes/pkg/runtime"
 | |
| 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
 | |
| 	"k8s.io/kubernetes/pkg/util/wait"
 | |
| )
 | |
| 
 | |
| // Config contains all the settings for a Controller.
 | |
| type Config struct {
 | |
| 	// The queue for your objects; either a cache.FIFO or
 | |
| 	// a cache.DeltaFIFO. Your Process() function should accept
 | |
| 	// the output of this Oueue's Pop() method.
 | |
| 	cache.Queue
 | |
| 
 | |
| 	// Something that can list and watch your objects.
 | |
| 	cache.ListerWatcher
 | |
| 
 | |
| 	// Something that can process your objects.
 | |
| 	Process ProcessFunc
 | |
| 
 | |
| 	// The type of your objects.
 | |
| 	ObjectType runtime.Object
 | |
| 
 | |
| 	// Reprocess everything at least this often.
 | |
| 	// Note that if it takes longer for you to clear the queue than this
 | |
| 	// period, you will end up processing items in the order determined
 | |
| 	// by cache.FIFO.Replace(). Currently, this is random. If this is a
 | |
| 	// problem, we can change that replacement policy to append new
 | |
| 	// things to the end of the queue instead of replacing the entire
 | |
| 	// queue.
 | |
| 	FullResyncPeriod time.Duration
 | |
| 
 | |
| 	// If true, when Process() returns an error, re-enqueue the object.
 | |
| 	// TODO: add interface to let you inject a delay/backoff or drop
 | |
| 	//       the object completely if desired. Pass the object in
 | |
| 	//       question to this interface as a parameter.
 | |
| 	RetryOnError bool
 | |
| }
 | |
| 
 | |
| // ProcessFunc processes a single object.
 | |
| type ProcessFunc func(obj interface{}) error
 | |
| 
 | |
| // Controller is a generic controller framework.
 | |
| type Controller struct {
 | |
| 	config         Config
 | |
| 	reflector      *cache.Reflector
 | |
| 	reflectorMutex sync.RWMutex
 | |
| }
 | |
| 
 | |
| // New makes a new Controller from the given Config.
 | |
| func New(c *Config) *Controller {
 | |
| 	ctlr := &Controller{
 | |
| 		config: *c,
 | |
| 	}
 | |
| 	return ctlr
 | |
| }
 | |
| 
 | |
| // Run begins processing items, and will continue until a value is sent down stopCh.
 | |
| // It's an error to call Run more than once.
 | |
| // Run blocks; call via go.
 | |
| func (c *Controller) Run(stopCh <-chan struct{}) {
 | |
| 	defer utilruntime.HandleCrash()
 | |
| 	r := cache.NewReflector(
 | |
| 		c.config.ListerWatcher,
 | |
| 		c.config.ObjectType,
 | |
| 		c.config.Queue,
 | |
| 		c.config.FullResyncPeriod,
 | |
| 	)
 | |
| 
 | |
| 	c.reflectorMutex.Lock()
 | |
| 	c.reflector = r
 | |
| 	c.reflectorMutex.Unlock()
 | |
| 
 | |
| 	r.RunUntil(stopCh)
 | |
| 
 | |
| 	wait.Until(c.processLoop, time.Second, stopCh)
 | |
| }
 | |
| 
 | |
| // Returns true once this controller has completed an initial resource listing
 | |
| func (c *Controller) HasSynced() bool {
 | |
| 	return c.config.Queue.HasSynced()
 | |
| }
 | |
| 
 | |
| // Requeue adds the provided object back into the queue if it does not already exist.
 | |
| func (c *Controller) Requeue(obj interface{}) error {
 | |
| 	return c.config.Queue.AddIfNotPresent(cache.Deltas{
 | |
| 		cache.Delta{
 | |
| 			Type:   cache.Sync,
 | |
| 			Object: obj,
 | |
| 		},
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // processLoop drains the work queue.
 | |
| // TODO: Consider doing the processing in parallel. This will require a little thought
 | |
| // to make sure that we don't end up processing the same object multiple times
 | |
| // concurrently.
 | |
| func (c *Controller) processLoop() {
 | |
| 	for {
 | |
| 		obj := c.config.Queue.Pop()
 | |
| 		err := c.config.Process(obj)
 | |
| 		if err != nil {
 | |
| 			if c.config.RetryOnError {
 | |
| 				// This is the safe way to re-enqueue.
 | |
| 				c.config.Queue.AddIfNotPresent(obj)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // ResourceEventHandler can handle notifications for events that happen to a
 | |
| // resource.  The events are informational only, so you can't return an
 | |
| // error.
 | |
| //  * OnAdd is called when an object is added.
 | |
| //  * OnUpdate is called when an object is modified. Note that oldObj is the
 | |
| //      last known state of the object-- it is possible that several changes
 | |
| //      were combined together, so you can't use this to see every single
 | |
| //      change. OnUpdate is also called when a re-list happens, and it will
 | |
| //      get called even if nothing changed. This is useful for periodically
 | |
| //      evaluating or syncing something.
 | |
| //  * OnDelete will get the final state of the item if it is known, otherwise
 | |
| //      it will get an object of type cache.DeletedFinalStateUnknown. This can
 | |
| //      happen if the watch is closed and misses the delete event and we don't
 | |
| //      notice the deletion until the subsequent re-list.
 | |
| type ResourceEventHandler interface {
 | |
| 	OnAdd(obj interface{})
 | |
| 	OnUpdate(oldObj, newObj interface{})
 | |
| 	OnDelete(obj interface{})
 | |
| }
 | |
| 
 | |
| // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
 | |
| // as few of the notification functions as you want while still implementing
 | |
| // ResourceEventHandler.
 | |
| type ResourceEventHandlerFuncs struct {
 | |
| 	AddFunc    func(obj interface{})
 | |
| 	UpdateFunc func(oldObj, newObj interface{})
 | |
| 	DeleteFunc func(obj interface{})
 | |
| }
 | |
| 
 | |
| // OnAdd calls AddFunc if it's not nil.
 | |
| func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
 | |
| 	if r.AddFunc != nil {
 | |
| 		r.AddFunc(obj)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // OnUpdate calls UpdateFunc if it's not nil.
 | |
| func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
 | |
| 	if r.UpdateFunc != nil {
 | |
| 		r.UpdateFunc(oldObj, newObj)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // OnDelete calls DeleteFunc if it's not nil.
 | |
| func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
 | |
| 	if r.DeleteFunc != nil {
 | |
| 		r.DeleteFunc(obj)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // DeletionHandlingMetaNamespaceKeyFunc checks for
 | |
| // cache.DeletedFinalStateUnknown objects before calling
 | |
| // cache.MetaNamespaceKeyFunc.
 | |
| func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
 | |
| 	if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
 | |
| 		return d.Key, nil
 | |
| 	}
 | |
| 	return cache.MetaNamespaceKeyFunc(obj)
 | |
| }
 | |
| 
 | |
| // NewInformer returns a cache.Store and a controller for populating the store
 | |
| // while also providing event notifications. You should only used the returned
 | |
| // cache.Store for Get/List operations; Add/Modify/Deletes will cause the event
 | |
| // notifications to be faulty.
 | |
| //
 | |
| // Parameters:
 | |
| //  * lw is list and watch functions for the source of the resource you want to
 | |
| //    be informed of.
 | |
| //  * objType is an object of the type that you expect to receive.
 | |
| //  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
 | |
| //    calls, even if nothing changed). Otherwise, re-list will be delayed as
 | |
| //    long as possible (until the upstream source closes the watch or times out,
 | |
| //    or you stop the controller).
 | |
| //  * h is the object you want notifications sent to.
 | |
| //
 | |
| func NewInformer(
 | |
| 	lw cache.ListerWatcher,
 | |
| 	objType runtime.Object,
 | |
| 	resyncPeriod time.Duration,
 | |
| 	h ResourceEventHandler,
 | |
| ) (cache.Store, *Controller) {
 | |
| 	// This will hold the client state, as we know it.
 | |
| 	clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 | |
| 
 | |
| 	// This will hold incoming changes. Note how we pass clientState in as a
 | |
| 	// KeyLister, that way resync operations will result in the correct set
 | |
| 	// of update/delete deltas.
 | |
| 	fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)
 | |
| 
 | |
| 	cfg := &Config{
 | |
| 		Queue:            fifo,
 | |
| 		ListerWatcher:    lw,
 | |
| 		ObjectType:       objType,
 | |
| 		FullResyncPeriod: resyncPeriod,
 | |
| 		RetryOnError:     false,
 | |
| 
 | |
| 		Process: func(obj interface{}) error {
 | |
| 			// from oldest to newest
 | |
| 			for _, d := range obj.(cache.Deltas) {
 | |
| 				switch d.Type {
 | |
| 				case cache.Sync, cache.Added, cache.Updated:
 | |
| 					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
 | |
| 						if err := clientState.Update(d.Object); err != nil {
 | |
| 							return err
 | |
| 						}
 | |
| 						h.OnUpdate(old, d.Object)
 | |
| 					} else {
 | |
| 						if err := clientState.Add(d.Object); err != nil {
 | |
| 							return err
 | |
| 						}
 | |
| 						h.OnAdd(d.Object)
 | |
| 					}
 | |
| 				case cache.Deleted:
 | |
| 					if err := clientState.Delete(d.Object); err != nil {
 | |
| 						return err
 | |
| 					}
 | |
| 					h.OnDelete(d.Object)
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		},
 | |
| 	}
 | |
| 	return clientState, New(cfg)
 | |
| }
 | |
| 
 | |
| // NewIndexerInformer returns a cache.Indexer and a controller for populating the index
 | |
| // while also providing event notifications. You should only used the returned
 | |
| // cache.Index for Get/List operations; Add/Modify/Deletes will cause the event
 | |
| // notifications to be faulty.
 | |
| //
 | |
| // Parameters:
 | |
| //  * lw is list and watch functions for the source of the resource you want to
 | |
| //    be informed of.
 | |
| //  * objType is an object of the type that you expect to receive.
 | |
| //  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
 | |
| //    calls, even if nothing changed). Otherwise, re-list will be delayed as
 | |
| //    long as possible (until the upstream source closes the watch or times out,
 | |
| //    or you stop the controller).
 | |
| //  * h is the object you want notifications sent to.
 | |
| //
 | |
| func NewIndexerInformer(
 | |
| 	lw cache.ListerWatcher,
 | |
| 	objType runtime.Object,
 | |
| 	resyncPeriod time.Duration,
 | |
| 	h ResourceEventHandler,
 | |
| 	indexers cache.Indexers,
 | |
| ) (cache.Indexer, *Controller) {
 | |
| 	// This will hold the client state, as we know it.
 | |
| 	clientState := cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
 | |
| 
 | |
| 	// This will hold incoming changes. Note how we pass clientState in as a
 | |
| 	// KeyLister, that way resync operations will result in the correct set
 | |
| 	// of update/delete deltas.
 | |
| 	fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)
 | |
| 
 | |
| 	cfg := &Config{
 | |
| 		Queue:            fifo,
 | |
| 		ListerWatcher:    lw,
 | |
| 		ObjectType:       objType,
 | |
| 		FullResyncPeriod: resyncPeriod,
 | |
| 		RetryOnError:     false,
 | |
| 
 | |
| 		Process: func(obj interface{}) error {
 | |
| 			// from oldest to newest
 | |
| 			for _, d := range obj.(cache.Deltas) {
 | |
| 				switch d.Type {
 | |
| 				case cache.Sync, cache.Added, cache.Updated:
 | |
| 					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
 | |
| 						if err := clientState.Update(d.Object); err != nil {
 | |
| 							return err
 | |
| 						}
 | |
| 						h.OnUpdate(old, d.Object)
 | |
| 					} else {
 | |
| 						if err := clientState.Add(d.Object); err != nil {
 | |
| 							return err
 | |
| 						}
 | |
| 						h.OnAdd(d.Object)
 | |
| 					}
 | |
| 				case cache.Deleted:
 | |
| 					if err := clientState.Delete(d.Object); err != nil {
 | |
| 						return err
 | |
| 					}
 | |
| 					h.OnDelete(d.Object)
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		},
 | |
| 	}
 | |
| 	return clientState, New(cfg)
 | |
| }
 |