diff --git a/applyconfigurations/internal/internal.go b/applyconfigurations/internal/internal.go index f14cc835..90d47547 100644 --- a/applyconfigurations/internal/internal.go +++ b/applyconfigurations/internal/internal.go @@ -13966,6 +13966,9 @@ var schemaYAML = typed.YAMLObject(`types: - name: gracePeriodSeconds type: scalar: numeric + - name: ignoreStoreReadErrorWithClusterBreakingPotential + type: + scalar: boolean - name: kind type: scalar: string diff --git a/applyconfigurations/meta/v1/deleteoptions.go b/applyconfigurations/meta/v1/deleteoptions.go index a3b2b4ea..ab398ef5 100644 --- a/applyconfigurations/meta/v1/deleteoptions.go +++ b/applyconfigurations/meta/v1/deleteoptions.go @@ -25,12 +25,13 @@ import ( // DeleteOptionsApplyConfiguration represents a declarative configuration of the DeleteOptions type for use // with apply. type DeleteOptionsApplyConfiguration struct { - TypeMetaApplyConfiguration `json:",inline"` - GracePeriodSeconds *int64 `json:"gracePeriodSeconds,omitempty"` - Preconditions *PreconditionsApplyConfiguration `json:"preconditions,omitempty"` - OrphanDependents *bool `json:"orphanDependents,omitempty"` - PropagationPolicy *metav1.DeletionPropagation `json:"propagationPolicy,omitempty"` - DryRun []string `json:"dryRun,omitempty"` + TypeMetaApplyConfiguration `json:",inline"` + GracePeriodSeconds *int64 `json:"gracePeriodSeconds,omitempty"` + Preconditions *PreconditionsApplyConfiguration `json:"preconditions,omitempty"` + OrphanDependents *bool `json:"orphanDependents,omitempty"` + PropagationPolicy *metav1.DeletionPropagation `json:"propagationPolicy,omitempty"` + DryRun []string `json:"dryRun,omitempty"` + IgnoreStoreReadErrorWithClusterBreakingPotential *bool `json:"ignoreStoreReadErrorWithClusterBreakingPotential,omitempty"` } // DeleteOptionsApplyConfiguration constructs a declarative configuration of the DeleteOptions type for use with @@ -99,3 +100,11 @@ func (b *DeleteOptionsApplyConfiguration) WithDryRun(values ...string) *DeleteOp } return b } + +// WithIgnoreStoreReadErrorWithClusterBreakingPotential sets the IgnoreStoreReadErrorWithClusterBreakingPotential field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the IgnoreStoreReadErrorWithClusterBreakingPotential field is set to the value of the last call. +func (b *DeleteOptionsApplyConfiguration) WithIgnoreStoreReadErrorWithClusterBreakingPotential(value bool) *DeleteOptionsApplyConfiguration { + b.IgnoreStoreReadErrorWithClusterBreakingPotential = &value + return b +} diff --git a/go.mod b/go.mod index c91210e6..22b6ac48 100644 --- a/go.mod +++ b/go.mod @@ -26,8 +26,8 @@ require ( golang.org/x/time v0.7.0 google.golang.org/protobuf v1.35.1 gopkg.in/evanphx/json-patch.v4 v4.12.0 - k8s.io/api v0.0.0-20241108114313-789a813a3da8 - k8s.io/apimachinery v0.0.0-20241106231735-d941d9fb4c83 + k8s.io/api v0.0.0-20241108114314-0869e9d258da + k8s.io/apimachinery v0.0.0-20241108022104-96b97de8d6ba k8s.io/klog/v2 v2.130.1 k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 diff --git a/go.sum b/go.sum index c9af92aa..46e786d7 100644 --- a/go.sum +++ b/go.sum @@ -150,10 +150,10 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.0.0-20241108114313-789a813a3da8 h1:+3HQBAIjBgEx+fcUE7qou+b97GTD0FwsRvdivPD4Fk8= -k8s.io/api v0.0.0-20241108114313-789a813a3da8/go.mod h1:h7yaPC7+0KxMELdLjLoo6n6m3EWq6AeHEY25PjH4cPI= -k8s.io/apimachinery v0.0.0-20241106231735-d941d9fb4c83 h1:4KfMPmiiRIpvYJQ8cBYFEFht59EKysW1anuJWzHLHNg= -k8s.io/apimachinery v0.0.0-20241106231735-d941d9fb4c83/go.mod h1:HqhdaJUgQqky29T1V0o2yFkt/pZqLFIDyn9Zi/8rxoY= +k8s.io/api v0.0.0-20241108114314-0869e9d258da h1:0IBD100isGkMfDCeLrXtOqsmKKeiy3Uj85sI50Aff7U= +k8s.io/api v0.0.0-20241108114314-0869e9d258da/go.mod h1:jw6pQTESH9mdZL2vOK3twojvpPxipl5TpLZpPyl5ZYU= +k8s.io/apimachinery v0.0.0-20241108022104-96b97de8d6ba h1:ghB5Iygt6Ge8UyIwW7C1kJx4kP7AUTCL9Qg6GCsUUOY= +k8s.io/apimachinery v0.0.0-20241108022104-96b97de8d6ba/go.mod h1:HqhdaJUgQqky29T1V0o2yFkt/pZqLFIDyn9Zi/8rxoY= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y= diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index e9b14b3a..a56fce6e 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -21,9 +21,12 @@ import ( "errors" "fmt" "math/rand" + "net/http" "reflect" goruntime "runtime" "strconv" + "sync" + "sync/atomic" "syscall" "testing" "time" @@ -1753,6 +1756,152 @@ func TestReflectorListExtract(t *testing.T) { } } +func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) { + mkPod := func(id string, rv string) *v1.Pod { + return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv}} + } + mkList := func(rv string, pods ...*v1.Pod) *v1.PodList { + list := &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: rv}} + for _, pod := range pods { + list.Items = append(list.Items, *pod) + } + return list + } + makeStatus := func() *metav1.Status { + return &metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusInternalServerError, + Reason: metav1.StatusReasonStoreReadError, + Message: "failed to prepare current and previous objects: corrupt object has been deleted", + } + } + + // these pods preexist and never get updated/deleted + preExisting := mkPod("foo-1", "1") + pods := []*v1.Pod{preExisting, mkPod("foo-2", "2"), mkPod("foo-3", "3")} + lastExpectedRV := "5" + lists := []*v1.PodList{ + mkList("3", pods...), // initial list + mkList(lastExpectedRV, pods...), // re-list due to watch error + } + corruptObj := mkPod("foo", "4") + events := []watch.Event{ + {Type: watch.Added, Object: corruptObj}, + // the object becomes corrupt, and it gets unsafe-deleted, and + // watch sends the following Error event, note the RV has + // advanced to "5" in the storage due to the delete operation + {Type: watch.Error, Object: makeStatus()}, + } + + s := NewFIFO(MetaNamespaceKeyFunc) + var replaceInvoked atomic.Int32 + store := &fakeStore{ + Store: s, + beforeReplace: func(list []interface{}, rv string) { + // interested in the Replace call that happens after the Error event + if rv == lastExpectedRV { + replaceInvoked.Add(1) + _, exists, err := s.Get(corruptObj) + if err != nil || !exists { + t.Errorf("expected the object to exist in the store, exists: %t, err: %v", exists, err) + } + _, exists, err = s.Get(preExisting) + if err != nil || !exists { + t.Errorf("expected the pre-existing object to be in the store, exists: %t, err: %v", exists, err) + } + } + }, + afterReplace: func(rv string, err error) { + if rv == lastExpectedRV { + replaceInvoked.Add(1) + if err != nil { + t.Errorf("expected Replace to have succeeded, but got error: %v", err) + } + _, exists, err := s.Get(corruptObj) + if err != nil || exists { + t.Errorf("expected the object to have been removed from the store, exists: %t, err: %v", exists, err) + } + // show that a pre-existing pod is still in the cache + _, exists, err = s.Get(preExisting) + if err != nil || !exists { + t.Errorf("expected the pre-existing object to be in the store, exists: %t, err: %v", exists, err) + } + } + }, + } + + var once sync.Once + lw := &testLW{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + fw := watch.NewFake() + go func() { + once.Do(func() { + for _, e := range events { + fw.Action(e.Type, e.Object) + } + }) + }() + return fw, nil + }, + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + var list runtime.Object + if len(lists) > 0 { + list = lists[0] + lists = lists[1:] + } + return list, nil + }, + } + + r := NewReflector(lw, &v1.Pod{}, store, 0) + doneCh, stopCh := make(chan struct{}), make(chan struct{}) + go func() { + defer close(doneCh) + r.Run(stopCh) + }() + + // wait for the RV to sync to the version returned by the final list + err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) { + if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err) + } + + if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got { + t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got) + } + if want, got := 2, int(replaceInvoked.Load()); want != got { + t.Errorf("expected store Delete hooks to be invoked %d times, but got: %d", want, got) + } + if want, got := len(pods), len(s.List()); want != got { + t.Errorf("expected the store to have %d objects, but got: %d", want, got) + } + + close(stopCh) + select { + case <-doneCh: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for Run to return") + } +} + +type fakeStore struct { + Store + beforeReplace func(list []interface{}, s string) + afterReplace func(rv string, err error) +} + +func (f *fakeStore) Replace(list []interface{}, rv string) error { + f.beforeReplace(list, rv) + err := f.Store.Replace(list, rv) + f.afterReplace(rv, err) + return err +} + func BenchmarkExtractList(b *testing.B) { _, _, podList := getPodListItems(0, fakeItemsNum) _, _, configMapList := getConfigmapListItems(0, fakeItemsNum)