mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-28 07:57:20 +00:00
reflector: introduce watchList
Kubernetes-commit: 2fed6b8a19dde62fb6824aa9d008baedf51c2466
This commit is contained in:
parent
d2ebc4d27c
commit
d1c260eddc
144
tools/cache/reflector.go
vendored
144
tools/cache/reflector.go
vendored
@ -41,6 +41,7 @@ import (
|
|||||||
"k8s.io/client-go/tools/pager"
|
"k8s.io/client-go/tools/pager"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
|
"k8s.io/utils/pointer"
|
||||||
"k8s.io/utils/trace"
|
"k8s.io/utils/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -392,8 +393,9 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
|
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
|
||||||
// Ensure that watch will not be reused across iterations.
|
// Ensure that watch will not be reused across iterations.
|
||||||
|
w.Stop()
|
||||||
w = nil
|
w = nil
|
||||||
retry.After(err)
|
retry.After(err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -528,6 +530,114 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// watchList establishes a stream to get a consistent snapshot of data
|
||||||
|
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
|
||||||
|
//
|
||||||
|
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
|
||||||
|
// Establishes a consistent stream with the server.
|
||||||
|
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
|
||||||
|
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
|
||||||
|
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
|
||||||
|
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
|
||||||
|
// It replaces its internal store with the collected items and
|
||||||
|
// reuses the current watch requests for getting further events.
|
||||||
|
//
|
||||||
|
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
|
||||||
|
// Establishes a stream with the server at the provided resource version.
|
||||||
|
// To establish the initial state the server begins with synthetic "Added" events.
|
||||||
|
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
|
||||||
|
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
|
||||||
|
// It replaces its internal store with the collected items and
|
||||||
|
// reuses the current watch requests for getting further events.
|
||||||
|
func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
|
||||||
|
var w watch.Interface
|
||||||
|
var err error
|
||||||
|
var temporaryStore Store
|
||||||
|
var resourceVersion string
|
||||||
|
// TODO(#115478): see if this function could be turned
|
||||||
|
// into a method and see if error handling
|
||||||
|
// could be unified with the r.watch method
|
||||||
|
isErrorRetriableWithSideEffectsFn := func(err error) bool {
|
||||||
|
if canRetry := isWatchErrorRetriable(err); canRetry {
|
||||||
|
klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
|
||||||
|
<-r.initConnBackoffManager.Backoff().C()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
|
||||||
|
// we tried to re-establish a watch request but the provided RV
|
||||||
|
// has either expired or it is greater than the server knows about.
|
||||||
|
// In that case we reset the RV and
|
||||||
|
// try to get a consistent snapshot from the watch cache (case 1)
|
||||||
|
r.setIsLastSyncResourceVersionUnavailable(true)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
|
||||||
|
defer initTrace.LogIfLong(10 * time.Second)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
return nil, nil
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceVersion = ""
|
||||||
|
lastKnownRV := r.rewatchResourceVersion()
|
||||||
|
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
|
||||||
|
// TODO(#115478): large "list", slow clients, slow network, p&f
|
||||||
|
// might slow down streaming and eventually fail.
|
||||||
|
// maybe in such a case we should retry with an increased timeout?
|
||||||
|
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
|
||||||
|
options := metav1.ListOptions{
|
||||||
|
ResourceVersion: lastKnownRV,
|
||||||
|
AllowWatchBookmarks: true,
|
||||||
|
SendInitialEvents: pointer.Bool(true),
|
||||||
|
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||||
|
TimeoutSeconds: &timeoutSeconds,
|
||||||
|
}
|
||||||
|
start := r.clock.Now()
|
||||||
|
|
||||||
|
w, err = r.listerWatcher.Watch(options)
|
||||||
|
if err != nil {
|
||||||
|
if isErrorRetriableWithSideEffectsFn(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
bookmarkReceived := pointer.Bool(false)
|
||||||
|
err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
|
||||||
|
func(rv string) { resourceVersion = rv },
|
||||||
|
bookmarkReceived,
|
||||||
|
r.clock, make(chan error), stopCh)
|
||||||
|
if err != nil {
|
||||||
|
w.Stop() // stop and retry with clean state
|
||||||
|
if err == errorStopRequested {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if isErrorRetriableWithSideEffectsFn(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if *bookmarkReceived {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// We successfully got initial state from watch-list confirmed by the
|
||||||
|
// "k8s.io/initial-events-end" bookmark.
|
||||||
|
initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
|
||||||
|
r.setIsLastSyncResourceVersionUnavailable(false)
|
||||||
|
if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
|
||||||
|
}
|
||||||
|
initTrace.Step("SyncWith done")
|
||||||
|
r.setLastSyncResourceVersion(resourceVersion)
|
||||||
|
|
||||||
|
return w, nil
|
||||||
|
}
|
||||||
|
|
||||||
// syncWith replaces the store's items with the given list.
|
// syncWith replaces the store's items with the given list.
|
||||||
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
|
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
|
||||||
found := make([]interface{}, 0, len(items))
|
found := make([]interface{}, 0, len(items))
|
||||||
@ -546,15 +656,17 @@ func watchHandler(start time.Time,
|
|||||||
name string,
|
name string,
|
||||||
expectedTypeName string,
|
expectedTypeName string,
|
||||||
setLastSyncResourceVersion func(string),
|
setLastSyncResourceVersion func(string),
|
||||||
|
exitOnInitialEventsEndBookmark *bool,
|
||||||
clock clock.Clock,
|
clock clock.Clock,
|
||||||
errc chan error,
|
errc chan error,
|
||||||
stopCh <-chan struct{},
|
stopCh <-chan struct{},
|
||||||
) error {
|
) error {
|
||||||
eventCount := 0
|
eventCount := 0
|
||||||
|
if exitOnInitialEventsEndBookmark != nil {
|
||||||
// Stopping the watcher should be idempotent and if we return from this function there's no way
|
// set it to false just in case somebody
|
||||||
// we're coming back in with the same watch interface.
|
// made it positive
|
||||||
defer w.Stop()
|
*exitOnInitialEventsEndBookmark = false
|
||||||
|
}
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
@ -609,6 +721,11 @@ loop:
|
|||||||
}
|
}
|
||||||
case watch.Bookmark:
|
case watch.Bookmark:
|
||||||
// A `Bookmark` means watch has synced here, just update the resourceVersion
|
// A `Bookmark` means watch has synced here, just update the resourceVersion
|
||||||
|
if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
|
||||||
|
if exitOnInitialEventsEndBookmark != nil {
|
||||||
|
*exitOnInitialEventsEndBookmark = true
|
||||||
|
}
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
|
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
|
||||||
}
|
}
|
||||||
@ -617,6 +734,11 @@ loop:
|
|||||||
rvu.UpdateResourceVersion(resourceVersion)
|
rvu.UpdateResourceVersion(resourceVersion)
|
||||||
}
|
}
|
||||||
eventCount++
|
eventCount++
|
||||||
|
if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
|
||||||
|
watchDuration := clock.Since(start)
|
||||||
|
klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -665,6 +787,18 @@ func (r *Reflector) relistResourceVersion() string {
|
|||||||
return r.lastSyncResourceVersion
|
return r.lastSyncResourceVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rewatchResourceVersion determines the resource version the reflector should start streaming from.
|
||||||
|
func (r *Reflector) rewatchResourceVersion() string {
|
||||||
|
r.lastSyncResourceVersionMutex.RLock()
|
||||||
|
defer r.lastSyncResourceVersionMutex.RUnlock()
|
||||||
|
if r.isLastSyncResourceVersionUnavailable {
|
||||||
|
// initial stream should return data at the most recent resource version.
|
||||||
|
// the returned data must be consistent i.e. as if served from etcd via a quorum read
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return r.lastSyncResourceVersion
|
||||||
|
}
|
||||||
|
|
||||||
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
|
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
|
||||||
// "expired" or "too large resource version" error.
|
// "expired" or "too large resource version" error.
|
||||||
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
|
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
|
||||||
|
6
tools/cache/reflector_test.go
vendored
6
tools/cache/reflector_test.go
vendored
@ -138,7 +138,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
fw.Stop()
|
fw.Stop()
|
||||||
}()
|
}()
|
||||||
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop)
|
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("unexpected non-error")
|
t.Errorf("unexpected non-error")
|
||||||
}
|
}
|
||||||
@ -157,7 +157,7 @@ func TestReflectorWatchHandler(t *testing.T) {
|
|||||||
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
|
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
|
||||||
fw.Stop()
|
fw.Stop()
|
||||||
}()
|
}()
|
||||||
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop)
|
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error %v", err)
|
t.Errorf("unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) {
|
|||||||
fw := watch.NewFake()
|
fw := watch.NewFake()
|
||||||
stopWatch := make(chan struct{}, 1)
|
stopWatch := make(chan struct{}, 1)
|
||||||
stopWatch <- struct{}{}
|
stopWatch <- struct{}{}
|
||||||
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch)
|
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch)
|
||||||
if err != errorStopRequested {
|
if err != errorStopRequested {
|
||||||
t.Errorf("expected stop error, got %q", err)
|
t.Errorf("expected stop error, got %q", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user