mirror of
https://github.com/kubernetes/client-go.git
synced 2026-05-16 20:37:09 +00:00
Revert reflector changes from PR #83520 from 1.17
Kubernetes-commit: a1ba57cc20e67014659db2b4f74c0b24d459e6f8
This commit is contained in:
committed by
Kubernetes Publisher
parent
9f2f78dfcb
commit
fa41b0c76b
71
tools/cache/reflector.go
vendored
71
tools/cache/reflector.go
vendored
@@ -74,9 +74,6 @@ type Reflector struct {
|
||||
// observed when doing a sync with the underlying store
|
||||
// it is thread safe, but not synchronized with the underlying store
|
||||
lastSyncResourceVersion string
|
||||
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
|
||||
// failed with an HTTP 410 (Gone) status code.
|
||||
isLastSyncResourceVersionGone bool
|
||||
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||
lastSyncResourceVersionMutex sync.RWMutex
|
||||
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
||||
@@ -188,7 +185,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
|
||||
var resourceVersion string
|
||||
|
||||
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
|
||||
// Explicitly set "0" as resource version - it's fine for the List()
|
||||
// to be served from cache and potentially be delayed relative to
|
||||
// etcd contents. Reflector framework will catch up via Watch() eventually.
|
||||
options := metav1.ListOptions{ResourceVersion: "0"}
|
||||
|
||||
if err := func() error {
|
||||
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
|
||||
@@ -211,17 +211,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
if r.WatchListPageSize != 0 {
|
||||
pager.PageSize = r.WatchListPageSize
|
||||
}
|
||||
|
||||
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
|
||||
list, err = pager.List(context.Background(), options)
|
||||
if isExpiredError(err) {
|
||||
r.setIsLastSyncResourceVersionExpired(true)
|
||||
// Retry immediately if the resource version used to list is expired.
|
||||
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
|
||||
// continuation pages, but the pager might not be enabled, or the full list might fail because the
|
||||
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
|
||||
// to recover and ensure the reflector makes forward progress.
|
||||
list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
|
||||
}
|
||||
close(listCh)
|
||||
}()
|
||||
select {
|
||||
@@ -234,7 +225,6 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
|
||||
}
|
||||
r.setIsLastSyncResourceVersionExpired(false) // list was successful
|
||||
initTrace.Step("Objects listed")
|
||||
listMetaInterface, err := meta.ListAccessor(list)
|
||||
if err != nil {
|
||||
@@ -308,13 +298,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
|
||||
w, err := r.listerWatcher.Watch(options)
|
||||
if err != nil {
|
||||
switch {
|
||||
case isExpiredError(err):
|
||||
r.setIsLastSyncResourceVersionExpired(true)
|
||||
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
|
||||
case err == io.EOF:
|
||||
switch err {
|
||||
case io.EOF:
|
||||
// watch closed normally
|
||||
case err == io.ErrUnexpectedEOF:
|
||||
case io.ErrUnexpectedEOF:
|
||||
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
|
||||
@@ -333,8 +320,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||
if err != errorStopRequested {
|
||||
switch {
|
||||
case isExpiredError(err):
|
||||
r.setIsLastSyncResourceVersionExpired(true)
|
||||
case apierrs.IsResourceExpired(err):
|
||||
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||
default:
|
||||
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
|
||||
@@ -446,42 +432,3 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
|
||||
defer r.lastSyncResourceVersionMutex.Unlock()
|
||||
r.lastSyncResourceVersion = v
|
||||
}
|
||||
|
||||
// relistResourceVersion determines the resource version the reflector should list or relist from.
|
||||
// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
|
||||
// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
|
||||
// in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
|
||||
// etcd via a quorum read.
|
||||
func (r *Reflector) relistResourceVersion() string {
|
||||
r.lastSyncResourceVersionMutex.RLock()
|
||||
defer r.lastSyncResourceVersionMutex.RUnlock()
|
||||
|
||||
if r.isLastSyncResourceVersionGone {
|
||||
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
|
||||
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
|
||||
// to the latest available ResourceVersion, using a consistent read from etcd.
|
||||
return ""
|
||||
}
|
||||
if r.lastSyncResourceVersion == "" {
|
||||
// For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
|
||||
// be served from the watch cache if it is enabled.
|
||||
return "0"
|
||||
}
|
||||
return r.lastSyncResourceVersion
|
||||
}
|
||||
|
||||
// setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a
|
||||
// expired error: HTTP 410 (Gone) Status Code.
|
||||
func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
|
||||
r.lastSyncResourceVersionMutex.Lock()
|
||||
defer r.lastSyncResourceVersionMutex.Unlock()
|
||||
r.isLastSyncResourceVersionGone = isExpired
|
||||
}
|
||||
|
||||
func isExpiredError(err error) bool {
|
||||
// In Kubernetes 1.17 and earlier, the api server returns both apierrs.StatusReasonExpired and
|
||||
// apierrs.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
|
||||
// and always returns apierrs.StatusReasonExpired. For backward compatibility we can only remove the apierrs.IsGone
|
||||
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
|
||||
return apierrs.IsResourceExpired(err) || apierrs.IsGone(err)
|
||||
}
|
||||
|
||||
192
tools/cache/reflector_test.go
vendored
192
tools/cache/reflector_test.go
vendored
@@ -26,7 +26,6 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@@ -435,197 +434,6 @@ func TestReflectorWatchListPageSize(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
|
||||
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
|
||||
// etcd that is partitioned and serving older data than the reflector has already processed.
|
||||
func TestReflectorResyncWithResourceVersion(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s := NewStore(MetaNamespaceKeyFunc)
|
||||
listCallRVs := []string{}
|
||||
|
||||
lw := &testLW{
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
// Stop once the reflector begins watching since we're only interested in the list.
|
||||
close(stopCh)
|
||||
fw := watch.NewFake()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
listCallRVs = append(listCallRVs, options.ResourceVersion)
|
||||
pods := make([]v1.Pod, 8)
|
||||
for i := 0; i < 8; i++ {
|
||||
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
|
||||
}
|
||||
switch options.ResourceVersion {
|
||||
case "0":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
|
||||
case "10":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
|
||||
default:
|
||||
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
|
||||
}
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
|
||||
// Initial list should use RV=0
|
||||
r.ListAndWatch(stopCh)
|
||||
|
||||
results := s.List()
|
||||
if len(results) != 4 {
|
||||
t.Errorf("Expected 4 results, got %d", len(results))
|
||||
}
|
||||
|
||||
// relist should use lastSyncResourceVersions (RV=10)
|
||||
stopCh = make(chan struct{})
|
||||
r.ListAndWatch(stopCh)
|
||||
|
||||
results = s.List()
|
||||
if len(results) != 8 {
|
||||
t.Errorf("Expected 8 results, got %d", len(results))
|
||||
}
|
||||
|
||||
expectedRVs := []string{"0", "10"}
|
||||
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
|
||||
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReflectorExpiredExactResourceVersion tests that a reflector handles the behavior of kubernetes 1.16 an earlier
|
||||
// where if the exact ResourceVersion requested is not available for a List request for a non-zero ResourceVersion,
|
||||
// an "Expired" error is returned if the ResourceVersion has expired (etcd has compacted it).
|
||||
// (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than
|
||||
// the requested ResourceVersion).
|
||||
func TestReflectorExpiredExactResourceVersion(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s := NewStore(MetaNamespaceKeyFunc)
|
||||
listCallRVs := []string{}
|
||||
|
||||
lw := &testLW{
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
// Stop once the reflector begins watching since we're only interested in the list.
|
||||
close(stopCh)
|
||||
fw := watch.NewFake()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
listCallRVs = append(listCallRVs, options.ResourceVersion)
|
||||
pods := make([]v1.Pod, 8)
|
||||
for i := 0; i < 8; i++ {
|
||||
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
|
||||
}
|
||||
switch options.ResourceVersion {
|
||||
case "0":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
|
||||
case "10":
|
||||
// When watch cache is disabled, if the exact ResourceVersion requested is not available, a "Expired" error is returned.
|
||||
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
|
||||
case "":
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
|
||||
default:
|
||||
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
|
||||
}
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
|
||||
// Initial list should use RV=0
|
||||
r.ListAndWatch(stopCh)
|
||||
|
||||
results := s.List()
|
||||
if len(results) != 4 {
|
||||
t.Errorf("Expected 4 results, got %d", len(results))
|
||||
}
|
||||
|
||||
// relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="".
|
||||
stopCh = make(chan struct{})
|
||||
r.ListAndWatch(stopCh)
|
||||
|
||||
results = s.List()
|
||||
if len(results) != 8 {
|
||||
t.Errorf("Expected 8 results, got %d", len(results))
|
||||
}
|
||||
|
||||
expectedRVs := []string{"0", "10", ""}
|
||||
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
|
||||
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorFullListIfExpired(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
s := NewStore(MetaNamespaceKeyFunc)
|
||||
listCallRVs := []string{}
|
||||
|
||||
lw := &testLW{
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
// Stop once the reflector begins watching since we're only interested in the list.
|
||||
close(stopCh)
|
||||
fw := watch.NewFake()
|
||||
return fw, nil
|
||||
},
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
listCallRVs = append(listCallRVs, options.ResourceVersion)
|
||||
pods := make([]v1.Pod, 8)
|
||||
for i := 0; i < 8; i++ {
|
||||
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
|
||||
}
|
||||
rvContinueLimit := func(rv, c string, l int64) metav1.ListOptions {
|
||||
return metav1.ListOptions{ResourceVersion: rv, Continue: c, Limit: l}
|
||||
}
|
||||
switch rvContinueLimit(options.ResourceVersion, options.Continue, options.Limit) {
|
||||
// initial limited list
|
||||
case rvContinueLimit("0", "", 4):
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
|
||||
// first page of the rv=10 list
|
||||
case rvContinueLimit("10", "", 4):
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
|
||||
// second page of the above list
|
||||
case rvContinueLimit("", "C1", 4):
|
||||
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
|
||||
// rv=10 unlimited list
|
||||
case rvContinueLimit("10", "", 0):
|
||||
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
|
||||
default:
|
||||
err := fmt.Errorf("unexpected list options: %#v", options)
|
||||
t.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
r.WatchListPageSize = 4
|
||||
|
||||
// Initial list should use RV=0
|
||||
if err := r.ListAndWatch(stopCh); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
results := s.List()
|
||||
if len(results) != 4 {
|
||||
t.Errorf("Expected 4 results, got %d", len(results))
|
||||
}
|
||||
|
||||
// relist should use lastSyncResourceVersions (RV=10) and since second page of that expired, it should full list with RV=10
|
||||
stopCh = make(chan struct{})
|
||||
if err := r.ListAndWatch(stopCh); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
results = s.List()
|
||||
if len(results) != 8 {
|
||||
t.Errorf("Expected 8 results, got %d", len(results))
|
||||
}
|
||||
|
||||
expectedRVs := []string{"0", "10", "", "10"}
|
||||
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
|
||||
t.Errorf("Expected series of list calls with resource versiosn of %#v but got: %#v", expectedRVs, listCallRVs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReflectorSetExpectedType(t *testing.T) {
|
||||
obj := &unstructured.Unstructured{}
|
||||
gvk := schema.GroupVersionKind{
|
||||
|
||||
Reference in New Issue
Block a user