mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-31 05:40:42 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			383 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			383 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2014 The Kubernetes Authors All rights reserved.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package cache
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"math/rand"
 | |
| 	"net"
 | |
| 	"net/url"
 | |
| 	"reflect"
 | |
| 	goruntime "runtime"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"syscall"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| 	"k8s.io/kubernetes/pkg/api"
 | |
| 	apierrs "k8s.io/kubernetes/pkg/api/errors"
 | |
| 	"k8s.io/kubernetes/pkg/api/meta"
 | |
| 	"k8s.io/kubernetes/pkg/runtime"
 | |
| 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
 | |
| 	"k8s.io/kubernetes/pkg/util/wait"
 | |
| 	"k8s.io/kubernetes/pkg/watch"
 | |
| )
 | |
| 
 | |
| // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
 | |
| type ListerWatcher interface {
 | |
| 	// List should return a list type object; the Items field will be extracted, and the
 | |
| 	// ResourceVersion field will be used to start the watch in the right place.
 | |
| 	List(options api.ListOptions) (runtime.Object, error)
 | |
| 	// Watch should begin a watch at the specified version.
 | |
| 	Watch(options api.ListOptions) (watch.Interface, error)
 | |
| }
 | |
| 
 | |
| // Reflector watches a specified resource and causes all changes to be reflected in the given store.
 | |
| type Reflector struct {
 | |
| 	// name identifies this reflector.  By default it will be a file:line if possible.
 | |
| 	name string
 | |
| 
 | |
| 	// The type of object we expect to place in the store.
 | |
| 	expectedType reflect.Type
 | |
| 	// The destination to sync up with the watch source
 | |
| 	store Store
 | |
| 	// listerWatcher is used to perform lists and watches.
 | |
| 	listerWatcher ListerWatcher
 | |
| 	// period controls timing between one watch ending and
 | |
| 	// the beginning of the next one.
 | |
| 	period       time.Duration
 | |
| 	resyncPeriod time.Duration
 | |
| 	// now() returns current time - exposed for testing purposes
 | |
| 	now func() time.Time
 | |
| 	// nextResync is approximate time of next resync (0 if not scheduled)
 | |
| 	nextResync time.Time
 | |
| 	// lastSyncResourceVersion is the resource version token last
 | |
| 	// observed when doing a sync with the underlying store
 | |
| 	// it is thread safe, but not synchronized with the underlying store
 | |
| 	lastSyncResourceVersion string
 | |
| 	// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
 | |
| 	lastSyncResourceVersionMutex sync.RWMutex
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	// We try to spread the load on apiserver by setting timeouts for
 | |
| 	// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
 | |
| 	// However, it can be modified to avoid periodic resync to break the
 | |
| 	// TCP connection.
 | |
| 	minWatchTimeout = 5 * time.Minute
 | |
| 	// If we are within 'forceResyncThreshold' from the next planned resync
 | |
| 	// and are just before issuing Watch(), resync will be forced now.
 | |
| 	forceResyncThreshold = 3 * time.Second
 | |
| 	// We try to set timeouts for Watch() so that we will finish about
 | |
| 	// than 'timeoutThreshold' from next planned periodic resync.
 | |
| 	timeoutThreshold = 1 * time.Second
 | |
| )
 | |
| 
 | |
| // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
 | |
| // The indexer is configured to key on namespace
 | |
| func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
 | |
| 	indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
 | |
| 	reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
 | |
| 	return indexer, reflector
 | |
| }
 | |
| 
 | |
| // NewReflector creates a new Reflector object which will keep the given store up to
 | |
| // date with the server's contents for the given resource. Reflector promises to
 | |
| // only put things in the store that have the type of expectedType, unless expectedType
 | |
| // is nil. If resyncPeriod is non-zero, then lists will be executed after every
 | |
| // resyncPeriod, so that you can use reflectors to periodically process everything as
 | |
| // well as incrementally processing the things that change.
 | |
| func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
 | |
| 	return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
 | |
| }
 | |
| 
 | |
| // NewNamedReflector same as NewReflector, but with a specified name for logging
 | |
| func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
 | |
| 	r := &Reflector{
 | |
| 		name:          name,
 | |
| 		listerWatcher: lw,
 | |
| 		store:         store,
 | |
| 		expectedType:  reflect.TypeOf(expectedType),
 | |
| 		period:        time.Second,
 | |
| 		resyncPeriod:  resyncPeriod,
 | |
| 		now:           time.Now,
 | |
| 	}
 | |
| 	return r
 | |
| }
 | |
| 
 | |
| // internalPackages are packages that ignored when creating a default reflector name.  These packages are in the common
 | |
| // call chains to NewReflector, so they'd be low entropy names for reflectors
 | |
| var internalPackages = []string{"kubernetes/pkg/client/cache/", "kubernetes/pkg/controller/framework/"}
 | |
| 
 | |
| // getDefaultReflectorName walks back through the call stack until we find a caller from outside of the ignoredPackages
 | |
| // it returns back a shortpath/filename:line to aid in identification of this reflector when it starts logging
 | |
| func getDefaultReflectorName(ignoredPackages ...string) string {
 | |
| 	name := "????"
 | |
| outer:
 | |
| 	for i := 1; i < 10; i++ {
 | |
| 		_, file, line, ok := goruntime.Caller(i)
 | |
| 		if !ok {
 | |
| 			break
 | |
| 		}
 | |
| 		for _, ignoredPackage := range ignoredPackages {
 | |
| 			if strings.Contains(file, ignoredPackage) {
 | |
| 				continue outer
 | |
| 			}
 | |
| 
 | |
| 		}
 | |
| 
 | |
| 		pkgLocation := strings.LastIndex(file, "/pkg/")
 | |
| 		if pkgLocation >= 0 {
 | |
| 			file = file[pkgLocation+1:]
 | |
| 		}
 | |
| 		name = fmt.Sprintf("%s:%d", file, line)
 | |
| 		break
 | |
| 	}
 | |
| 
 | |
| 	return name
 | |
| }
 | |
| 
 | |
| // Run starts a watch and handles watch events. Will restart the watch if it is closed.
 | |
| // Run starts a goroutine and returns immediately.
 | |
| func (r *Reflector) Run() {
 | |
| 	go wait.Until(func() { r.ListAndWatch(wait.NeverStop) }, r.period, wait.NeverStop)
 | |
| }
 | |
| 
 | |
| // RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
 | |
| // RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
 | |
| func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
 | |
| 	go wait.Until(func() { r.ListAndWatch(stopCh) }, r.period, stopCh)
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	// nothing will ever be sent down this channel
 | |
| 	neverExitWatch <-chan time.Time = make(chan time.Time)
 | |
| 
 | |
| 	// Used to indicate that watching stopped so that a resync could happen.
 | |
| 	errorResyncRequested = errors.New("resync channel fired")
 | |
| 
 | |
| 	// Used to indicate that watching stopped because of a signal from the stop
 | |
| 	// channel passed in from a client of the reflector.
 | |
| 	errorStopRequested = errors.New("Stop requested")
 | |
| )
 | |
| 
 | |
| // resyncChan returns a channel which will receive something when a resync is
 | |
| // required, and a cleanup function.
 | |
| func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
 | |
| 	if r.resyncPeriod == 0 {
 | |
| 		r.nextResync = time.Time{}
 | |
| 		return neverExitWatch, func() bool { return false }
 | |
| 	}
 | |
| 	// The cleanup function is required: imagine the scenario where watches
 | |
| 	// always fail so we end up listing frequently. Then, if we don't
 | |
| 	// manually stop the timer, we could end up with many timers active
 | |
| 	// concurrently.
 | |
| 	r.nextResync = r.now().Add(r.resyncPeriod)
 | |
| 	t := time.NewTimer(r.resyncPeriod)
 | |
| 	return t.C, t.Stop
 | |
| }
 | |
| 
 | |
| // We want to avoid situations when periodic resyncing is breaking the TCP
 | |
| // connection.
 | |
| // If response`s body is not read to completion before calling body.Close(),
 | |
| // that TCP connection will not be reused in the future - see #15664 issue
 | |
| // for more details.
 | |
| // Thus, we set timeout for watch requests to be smaller than the remaining
 | |
| // time until next periodic resync and force resyncing ourself to avoid
 | |
| // breaking TCP connection.
 | |
| //
 | |
| // TODO: This should be parametrizable based on server load.
 | |
| func (r *Reflector) timeoutForWatch() *int64 {
 | |
| 	randTimeout := time.Duration(float64(minWatchTimeout) * (rand.Float64() + 1.0))
 | |
| 	timeout := r.nextResync.Sub(r.now()) - timeoutThreshold
 | |
| 	if timeout < 0 || randTimeout < timeout {
 | |
| 		timeout = randTimeout
 | |
| 	}
 | |
| 	timeoutSeconds := int64(timeout.Seconds())
 | |
| 	return &timeoutSeconds
 | |
| }
 | |
| 
 | |
| // Returns true if we are close enough to next planned periodic resync
 | |
| // and we can force resyncing ourself now.
 | |
| func (r *Reflector) canForceResyncNow() bool {
 | |
| 	if r.nextResync.IsZero() {
 | |
| 		return false
 | |
| 	}
 | |
| 	return r.now().Add(forceResyncThreshold).After(r.nextResync)
 | |
| }
 | |
| 
 | |
| // ListAndWatch first lists all items and get the resource version at the moment of call,
 | |
| // and then use the resource version to watch.
 | |
| // It returns error if ListAndWatch didn't even try to initialize watch.
 | |
| func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
 | |
| 	var resourceVersion string
 | |
| 	resyncCh, cleanup := r.resyncChan()
 | |
| 	defer cleanup()
 | |
| 
 | |
| 	// Explicitly set "0" as resource version - it's fine for the List()
 | |
| 	// to be served from cache and potentially be delayed relative to
 | |
| 	// etcd contents. Reflector framework will catch up via Watch() eventually.
 | |
| 	options := api.ListOptions{ResourceVersion: "0"}
 | |
| 	list, err := r.listerWatcher.List(options)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
 | |
| 	}
 | |
| 	metaInterface, err := meta.Accessor(list)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("%s: Unable to understand list result %#v", r.name, list)
 | |
| 	}
 | |
| 	resourceVersion = metaInterface.GetResourceVersion()
 | |
| 	items, err := meta.ExtractList(list)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
 | |
| 	}
 | |
| 	if err := r.syncWith(items, resourceVersion); err != nil {
 | |
| 		return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
 | |
| 	}
 | |
| 	r.setLastSyncResourceVersion(resourceVersion)
 | |
| 
 | |
| 	for {
 | |
| 		options := api.ListOptions{
 | |
| 			ResourceVersion: resourceVersion,
 | |
| 			// We want to avoid situations when resyncing is breaking the TCP connection
 | |
| 			// - see comment for 'timeoutForWatch()' for more details.
 | |
| 			TimeoutSeconds: r.timeoutForWatch(),
 | |
| 		}
 | |
| 		w, err := r.listerWatcher.Watch(options)
 | |
| 		if err != nil {
 | |
| 			switch err {
 | |
| 			case io.EOF:
 | |
| 				// watch closed normally
 | |
| 			case io.ErrUnexpectedEOF:
 | |
| 				glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
 | |
| 			default:
 | |
| 				utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
 | |
| 			}
 | |
| 			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
 | |
| 			// It doesn't make sense to re-list all objects because most likely we will be able to restart
 | |
| 			// watch where we ended.
 | |
| 			// If that's the case wait and resend watch request.
 | |
| 			if urlError, ok := err.(*url.Error); ok {
 | |
| 				if opError, ok := urlError.Err.(*net.OpError); ok {
 | |
| 					if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
 | |
| 						time.Sleep(time.Second)
 | |
| 						continue
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 		if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil {
 | |
| 			if err != errorResyncRequested && err != errorStopRequested {
 | |
| 				glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 		if r.canForceResyncNow() {
 | |
| 			glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // syncWith replaces the store's items with the given list.
 | |
| func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
 | |
| 	found := make([]interface{}, 0, len(items))
 | |
| 	for _, item := range items {
 | |
| 		found = append(found, item)
 | |
| 	}
 | |
| 	return r.store.Replace(found, resourceVersion)
 | |
| }
 | |
| 
 | |
| // watchHandler watches w and keeps *resourceVersion up to date.
 | |
| func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error {
 | |
| 	start := time.Now()
 | |
| 	eventCount := 0
 | |
| 
 | |
| 	// Stopping the watcher should be idempotent and if we return from this function there's no way
 | |
| 	// we're coming back in with the same watch interface.
 | |
| 	defer w.Stop()
 | |
| 
 | |
| loop:
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-stopCh:
 | |
| 			return errorStopRequested
 | |
| 		case <-resyncCh:
 | |
| 			return errorResyncRequested
 | |
| 		case event, ok := <-w.ResultChan():
 | |
| 			if !ok {
 | |
| 				break loop
 | |
| 			}
 | |
| 			if event.Type == watch.Error {
 | |
| 				return apierrs.FromObject(event.Object)
 | |
| 			}
 | |
| 			if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
 | |
| 				utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
 | |
| 				continue
 | |
| 			}
 | |
| 			meta, err := meta.Accessor(event.Object)
 | |
| 			if err != nil {
 | |
| 				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
 | |
| 				continue
 | |
| 			}
 | |
| 			newResourceVersion := meta.GetResourceVersion()
 | |
| 			switch event.Type {
 | |
| 			case watch.Added:
 | |
| 				r.store.Add(event.Object)
 | |
| 			case watch.Modified:
 | |
| 				r.store.Update(event.Object)
 | |
| 			case watch.Deleted:
 | |
| 				// TODO: Will any consumers need access to the "last known
 | |
| 				// state", which is passed in event.Object? If so, may need
 | |
| 				// to change this.
 | |
| 				r.store.Delete(event.Object)
 | |
| 			default:
 | |
| 				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
 | |
| 			}
 | |
| 			*resourceVersion = newResourceVersion
 | |
| 			r.setLastSyncResourceVersion(newResourceVersion)
 | |
| 			eventCount++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	watchDuration := time.Now().Sub(start)
 | |
| 	if watchDuration < 1*time.Second && eventCount == 0 {
 | |
| 		glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
 | |
| 		return errors.New("very short watch")
 | |
| 	}
 | |
| 	glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
 | |
| // The value returned is not synchronized with access to the underlying store and is not thread-safe
 | |
| func (r *Reflector) LastSyncResourceVersion() string {
 | |
| 	r.lastSyncResourceVersionMutex.RLock()
 | |
| 	defer r.lastSyncResourceVersionMutex.RUnlock()
 | |
| 	return r.lastSyncResourceVersion
 | |
| }
 | |
| 
 | |
| func (r *Reflector) setLastSyncResourceVersion(v string) {
 | |
| 	r.lastSyncResourceVersionMutex.Lock()
 | |
| 	defer r.lastSyncResourceVersionMutex.Unlock()
 | |
| 	r.lastSyncResourceVersion = v
 | |
| }
 |