diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/BUILD b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/BUILD index 89632b17253..dad87445a77 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/BUILD @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "buildmanagerinfo.go", + "capmanagers.go", "fieldmanager.go", "skipnonapplied.go", "stripmeta.go", @@ -48,6 +49,7 @@ filegroup( go_test( name = "go_default_test", srcs = [ + "capmanagers_test.go", "fieldmanager_test.go", "skipnonapplied_test.go", ], @@ -60,6 +62,7 @@ go_test( embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -68,6 +71,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/kube-openapi/pkg/util/proto:go_default_library", "//vendor/k8s.io/kube-openapi/pkg/util/proto/testing:go_default_library", + "//vendor/sigs.k8s.io/structured-merge-diff/fieldpath:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/capmanagers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/capmanagers.go new file mode 100644 index 00000000000..8c38461613f --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/capmanagers.go @@ -0,0 +1,135 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fieldmanager + +import ( + "fmt" + "sort" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal" + "sigs.k8s.io/structured-merge-diff/fieldpath" +) + +type capManagersManager struct { + fieldManager Manager + maxUpdateManagers int + oldUpdatesManagerName string +} + +var _ Manager = &capManagersManager{} + +// NewCapManagersManager creates a new wrapped FieldManager which ensures that the number of managers from updates +// does not exceed maxUpdateManagers, by merging some of the oldest entries on each update. +func NewCapManagersManager(fieldManager Manager, maxUpdateManagers int) Manager { + return &capManagersManager{ + fieldManager: fieldManager, + maxUpdateManagers: maxUpdateManagers, + oldUpdatesManagerName: "ancient-changes", + } +} + +// Update implements Manager. +func (f *capManagersManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) { + object, managed, err := f.fieldManager.Update(liveObj, newObj, managed, manager) + if err != nil { + return object, managed, err + } + if managed, err = f.capUpdateManagers(managed); err != nil { + return nil, nil, fmt.Errorf("failed to cap update managers: %v", err) + } + return object, managed, nil +} + +// Apply implements Manager. +func (f *capManagersManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, fieldManager string, force bool) (runtime.Object, Managed, error) { + return f.fieldManager.Apply(liveObj, patch, managed, fieldManager, force) +} + +// capUpdateManagers merges a number of the oldest update entries into versioned buckets, +// such that the number of entries from updates does not exceed f.maxUpdateManagers. +func (f *capManagersManager) capUpdateManagers(managed Managed) (newManaged Managed, err error) { + // Gather all entries from updates + updaters := []string{} + for manager, fields := range managed.Fields() { + if fields.Applied() == false { + updaters = append(updaters, manager) + } + } + if len(updaters) <= f.maxUpdateManagers { + return managed, nil + } + + // If we have more than the maximum, sort the update entries by time, oldest first. + sort.Slice(updaters, func(i, j int) bool { + iTime, jTime, nTime := managed.Times()[updaters[i]], managed.Times()[updaters[j]], &metav1.Time{Time: time.Time{}} + if iTime == nil { + iTime = nTime + } + if jTime == nil { + jTime = nTime + } + if !iTime.Equal(jTime) { + return iTime.Before(jTime) + } + return updaters[i] < updaters[j] + }) + + // Merge the oldest updaters with versioned bucket managers until the number of updaters is under the cap + versionToFirstManager := map[string]string{} + for i, length := 0, len(updaters); i < len(updaters) && length > f.maxUpdateManagers; i++ { + manager := updaters[i] + vs := managed.Fields()[manager] + time := managed.Times()[manager] + version := string(vs.APIVersion()) + + // Create a new manager identifier for the versioned bucket entry. + // The version for this manager comes from the version of the update being merged into the bucket. + bucket, err := internal.BuildManagerIdentifier(&metav1.ManagedFieldsEntry{ + Manager: f.oldUpdatesManagerName, + Operation: metav1.ManagedFieldsOperationUpdate, + APIVersion: version, + }) + if err != nil { + return managed, fmt.Errorf("failed to create bucket manager for version %v: %v", version, err) + } + + // Merge the fieldets if this is not the first time the version was seen. + // Otherwise just record the manager name in versionToFirstManager + if first, ok := versionToFirstManager[version]; ok { + // If the bucket doesn't exists yet, create one. + if _, ok := managed.Fields()[bucket]; !ok { + s := managed.Fields()[first] + delete(managed.Fields(), first) + managed.Fields()[bucket] = s + } + + managed.Fields()[bucket] = fieldpath.NewVersionedSet(vs.Set().Union(managed.Fields()[bucket].Set()), vs.APIVersion(), vs.Applied()) + delete(managed.Fields(), manager) + length-- + + // Use the time from the update being merged into the bucket, since it is more recent. + managed.Times()[bucket] = time + } else { + versionToFirstManager[version] = manager + } + } + + return managed, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/capmanagers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/capmanagers_test.go new file mode 100644 index 00000000000..e64a2b3c492 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/capmanagers_test.go @@ -0,0 +1,286 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fieldmanager_test + +import ( + "bytes" + "encoding/json" + "fmt" + "testing" + "time" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" + "sigs.k8s.io/structured-merge-diff/fieldpath" +) + +type fakeManager struct{} + +var _ fieldmanager.Manager = &fakeManager{} + +func (*fakeManager) Update(_, newObj runtime.Object, managed fieldmanager.Managed, _ string) (runtime.Object, fieldmanager.Managed, error) { + return newObj, managed, nil +} + +func (*fakeManager) Apply(_ runtime.Object, _ []byte, _ fieldmanager.Managed, _ string, force bool) (runtime.Object, fieldmanager.Managed, error) { + panic("not implemented") + return nil, nil, nil +} + +func TestCapManagersManagerMergesEntries(t *testing.T) { + f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod")) + f.fieldManager = fieldmanager.NewCapManagersManager(f.fieldManager, 3) + + podWithLabels := func(labels ...string) runtime.Object { + labelMap := map[string]interface{}{} + for _, key := range labels { + labelMap[key] = "true" + } + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": labelMap, + }, + }, + } + obj.SetKind("Pod") + obj.SetAPIVersion("v1") + return obj + } + + if err := f.Update(podWithLabels("one"), "fieldmanager_test_update_1"); err != nil { + t.Fatalf("failed to update object: %v", err) + } + expectIdempotence(t, f) + + if err := f.Update(podWithLabels("one", "two"), "fieldmanager_test_update_2"); err != nil { + t.Fatalf("failed to update object: %v", err) + } + expectIdempotence(t, f) + + if err := f.Update(podWithLabels("one", "two", "three"), "fieldmanager_test_update_3"); err != nil { + t.Fatalf("failed to update object: %v", err) + } + expectIdempotence(t, f) + + if err := f.Update(podWithLabels("one", "two", "three", "four"), "fieldmanager_test_update_4"); err != nil { + t.Fatalf("failed to update object: %v", err) + } + expectIdempotence(t, f) + + if e, a := 3, len(f.ManagedFields()); e != a { + t.Fatalf("exected %v entries in managedFields, but got %v: %#v", e, a, f.ManagedFields()) + } + + if e, a := "ancient-changes", f.ManagedFields()[0].Manager; e != a { + t.Fatalf("exected first manager name to be %v, but got %v: %#v", e, a, f.ManagedFields()) + } + + if e, a := "fieldmanager_test_update_3", f.ManagedFields()[1].Manager; e != a { + t.Fatalf("exected second manager name to be %v, but got %v: %#v", e, a, f.ManagedFields()) + } + + if e, a := "fieldmanager_test_update_4", f.ManagedFields()[2].Manager; e != a { + t.Fatalf("exected third manager name to be %v, but got %v: %#v", e, a, f.ManagedFields()) + } + + expectManagesField(t, f, "ancient-changes", fieldpath.MakePathOrDie("metadata", "labels", "one")) + expectManagesField(t, f, "ancient-changes", fieldpath.MakePathOrDie("metadata", "labels", "two")) + expectManagesField(t, f, "fieldmanager_test_update_3", fieldpath.MakePathOrDie("metadata", "labels", "three")) + expectManagesField(t, f, "fieldmanager_test_update_4", fieldpath.MakePathOrDie("metadata", "labels", "four")) +} + +func TestCapUpdateManagers(t *testing.T) { + f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod")) + f.fieldManager = fieldmanager.NewCapManagersManager(&fakeManager{}, 3) + + set := func(fields ...string) *metav1.FieldsV1 { + s := fieldpath.NewSet() + for _, f := range fields { + s.Insert(fieldpath.MakePathOrDie(f)) + } + b, err := s.ToJSON() + if err != nil { + panic(fmt.Sprintf("error building ManagedFieldsEntry for test: %v", err)) + } + return &metav1.FieldsV1{Raw: b} + } + + entry := func(name string, version string, order int, fields *metav1.FieldsV1) metav1.ManagedFieldsEntry { + return metav1.ManagedFieldsEntry{ + Manager: name, + APIVersion: version, + Operation: "Update", + FieldsType: "FieldsV1", + FieldsV1: fields, + Time: &metav1.Time{Time: time.Time{}.Add(time.Hour * time.Duration(order))}, + } + } + + testCases := []struct { + name string + input []metav1.ManagedFieldsEntry + expected []metav1.ManagedFieldsEntry + }{ + { + name: "one version, no ancient changes", + input: []metav1.ManagedFieldsEntry{ + entry("update-manager1", "v1", 1, set("a")), + entry("update-manager2", "v1", 2, set("b")), + entry("update-manager3", "v1", 3, set("c")), + entry("update-manager4", "v1", 4, set("d")), + }, + expected: []metav1.ManagedFieldsEntry{ + entry("ancient-changes", "v1", 2, set("a", "b")), + entry("update-manager3", "v1", 3, set("c")), + entry("update-manager4", "v1", 4, set("d")), + }, + }, { + name: "one version, one ancient changes", + input: []metav1.ManagedFieldsEntry{ + entry("ancient-changes", "v1", 2, set("a", "b")), + entry("update-manager3", "v1", 3, set("c")), + entry("update-manager4", "v1", 4, set("d")), + entry("update-manager5", "v1", 5, set("e")), + }, + expected: []metav1.ManagedFieldsEntry{ + entry("ancient-changes", "v1", 3, set("a", "b", "c")), + entry("update-manager4", "v1", 4, set("d")), + entry("update-manager5", "v1", 5, set("e")), + }, + }, { + name: "two versions, no ancient changes", + input: []metav1.ManagedFieldsEntry{ + entry("update-manager1", "v1", 1, set("a")), + entry("update-manager2", "v2", 2, set("b")), + entry("update-manager3", "v1", 3, set("c")), + entry("update-manager4", "v1", 4, set("d")), + entry("update-manager5", "v1", 5, set("e")), + }, + expected: []metav1.ManagedFieldsEntry{ + entry("update-manager2", "v2", 2, set("b")), + entry("ancient-changes", "v1", 4, set("a", "c", "d")), + entry("update-manager5", "v1", 5, set("e")), + }, + }, { + name: "three versions, one ancient changes", + input: []metav1.ManagedFieldsEntry{ + entry("update-manager2", "v2", 2, set("b")), + entry("ancient-changes", "v1", 4, set("a", "c", "d")), + entry("update-manager5", "v1", 5, set("e")), + entry("update-manager6", "v3", 6, set("f")), + entry("update-manager7", "v2", 7, set("g")), + }, + expected: []metav1.ManagedFieldsEntry{ + entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")), + entry("update-manager6", "v3", 6, set("f")), + entry("ancient-changes", "v2", 7, set("b", "g")), + }, + }, { + name: "three versions, two ancient changes", + input: []metav1.ManagedFieldsEntry{ + entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")), + entry("update-manager6", "v3", 6, set("f")), + entry("ancient-changes", "v2", 7, set("b", "g")), + entry("update-manager8", "v3", 8, set("h")), + }, + expected: []metav1.ManagedFieldsEntry{ + entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")), + entry("ancient-changes", "v2", 7, set("b", "g")), + entry("ancient-changes", "v3", 8, set("f", "h")), + }, + }, { + name: "four versions, two ancient changes", + input: []metav1.ManagedFieldsEntry{ + entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")), + entry("update-manager6", "v3", 6, set("f")), + entry("ancient-changes", "v2", 7, set("b", "g")), + entry("update-manager8", "v4", 8, set("h")), + }, + expected: []metav1.ManagedFieldsEntry{ + entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")), + entry("update-manager6", "v3", 6, set("f")), + entry("ancient-changes", "v2", 7, set("b", "g")), + entry("update-manager8", "v4", 8, set("h")), + }, + }, + } + + for _, tc := range testCases { + f.Reset() + accessor, err := meta.Accessor(f.liveObj) + if err != nil { + t.Fatalf("%v: couldn't get accessor: %v", tc.name, err) + } + accessor.SetManagedFields(tc.input) + if err := f.Update(f.liveObj, "no-op-update"); err != nil { + t.Fatalf("%v: failed to do no-op update to object: %v", tc.name, err) + } + + if e, a := tc.expected, f.ManagedFields(); !apiequality.Semantic.DeepEqual(e, a) { + t.Errorf("%v: unexpected value for managedFields:\nexpected: %v\n but got: %v", tc.name, mustMarshal(e), mustMarshal(a)) + } + expectIdempotence(t, f) + } +} + +// expectIdempotence does a no-op update and ensures that managedFields doesn't change by calling capUpdateManagers. +func expectIdempotence(t *testing.T, f TestFieldManager) { + before := []metav1.ManagedFieldsEntry{} + for _, m := range f.ManagedFields() { + before = append(before, *m.DeepCopy()) + } + + if err := f.Update(f.liveObj, "no-op-update"); err != nil { + t.Fatalf("failed to do no-op update to object: %v", err) + } + + if after := f.ManagedFields(); !apiequality.Semantic.DeepEqual(before, after) { + t.Fatalf("exected idempotence, but managedFields changed:\nbefore: %v\n after: %v", mustMarshal(before), mustMarshal(after)) + } +} + +// expectManagesField ensures that manager m currently manages field path p. +func expectManagesField(t *testing.T, f TestFieldManager, m string, p fieldpath.Path) { + for _, e := range f.ManagedFields() { + if e.Manager == m { + var s fieldpath.Set + err := s.FromJSON(bytes.NewReader(e.FieldsV1.Raw)) + if err != nil { + t.Fatalf("error parsing managedFields for %v: %v: %#v", m, err, f.ManagedFields()) + } + if !s.Has(p) { + t.Fatalf("expected managedFields for %v to contain %v, but got:\n%v", m, p.String(), s.String()) + } + return + } + } + t.Fatalf("exected to find manager name %v, but got: %#v", m, f.ManagedFields()) +} + +func mustMarshal(i interface{}) string { + b, err := json.MarshalIndent(i, "", " ") + if err != nil { + panic(fmt.Sprintf("error marshalling %v to json: %v", i, err)) + } + return string(b) +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/fieldmanager.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/fieldmanager.go index ca9d681d951..3f2b08984ea 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/fieldmanager.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/fieldmanager.go @@ -28,6 +28,11 @@ import ( "sigs.k8s.io/structured-merge-diff/fieldpath" ) +// DefaultMaxUpdateManagers defines the default maximum retained number of managedFields entries from updates +// if the number of update managers exceeds this, the oldest entries will be merged until the number is below the maximum. +// TODO(jennybuckley): Determine if this is really the best value. Ideally we wouldn't unnecessarily merge too many entries. +const DefaultMaxUpdateManagers int = 10 + // Managed groups a fieldpath.ManagedFields together with the timestamps associated with each operation. type Managed interface { // Fields gets the fieldpath.ManagedFields. @@ -86,6 +91,7 @@ func NewDefaultCRDFieldManager(models openapiproto.Models, objectConverter runti func newDefaultFieldManager(f Manager, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind) *FieldManager { f = NewStripMetaManager(f) f = NewBuildManagerInfoManager(f, kind.GroupVersion()) + f = NewCapManagersManager(f, DefaultMaxUpdateManagers) f = NewSkipNonAppliedManager(f, objectCreater, kind) return NewFieldManager(f) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal/managedfields.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal/managedfields.go index fcaf3a13c92..74e30a4e9cc 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal/managedfields.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal/managedfields.go @@ -205,7 +205,10 @@ func sortEncodedManagedFields(encodedManagedFields []metav1.ManagedFieldsEntry) return p.Time.Before(q.Time) } - return p.Manager < q.Manager + if p.Manager != q.Manager { + return p.Manager < q.Manager + } + return p.APIVersion < q.APIVersion }) return encodedManagedFields, nil diff --git a/test/integration/apiserver/apply/apply_test.go b/test/integration/apiserver/apply/apply_test.go index a639b84118e..b1c206db12c 100644 --- a/test/integration/apiserver/apply/apply_test.go +++ b/test/integration/apiserver/apply/apply_test.go @@ -369,6 +369,75 @@ func TestApplyUpdateApplyConflictForced(t *testing.T) { } } +// TestApplyGroupsManySeparateUpdates tests that when many different managers update the same object, +// the number of managedFields entries will only grow to a certain size. +func TestApplyGroupsManySeparateUpdates(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)() + + _, client, closeFn := setup(t) + defer closeFn() + + obj := []byte(`{ + "apiVersion": "admissionregistration.k8s.io/v1", + "kind": "ValidatingWebhookConfiguration", + "metadata": { + "name": "webhook", + "labels": {"applier":"true"}, + }, + }`) + + object, err := client.CoreV1().RESTClient().Patch(types.ApplyPatchType). + AbsPath("/apis/admissionregistration.k8s.io/v1"). + Resource("validatingwebhookconfigurations"). + Name("webhook"). + Param("fieldManager", "apply_test"). + Body(obj).Do().Get() + if err != nil { + t.Fatalf("Failed to create object using Apply patch: %v", err) + } + + for i := 0; i < 20; i++ { + unique := fmt.Sprintf("updater%v", i) + version := "v1" + if i%2 == 0 { + version = "v1beta1" + } + object, err = client.CoreV1().RESTClient().Patch(types.MergePatchType). + AbsPath("/apis/admissionregistration.k8s.io/"+version). + Resource("validatingwebhookconfigurations"). + Name("webhook"). + Param("fieldManager", unique). + Body([]byte(`{"metadata":{"labels":{"` + unique + `":"new"}}}`)).Do().Get() + if err != nil { + t.Fatalf("Failed to patch object: %v", err) + } + } + + accessor, err := meta.Accessor(object) + if err != nil { + t.Fatalf("Failed to get meta accessor: %v", err) + } + + // Expect 11 entries, because the cap for update entries is 10, and 1 apply entry + if actual, expected := len(accessor.GetManagedFields()), 11; actual != expected { + if b, err := json.MarshalIndent(object, "\t", "\t"); err == nil { + t.Fatalf("Object expected to contain %v entries in managedFields, but got %v:\n%v", expected, actual, string(b)) + } else { + t.Fatalf("Object expected to contain %v entries in managedFields, but got %v: error marshalling object: %v", expected, actual, err) + } + } + + // Expect the first entry to have the manager name "apply_test" + if actual, expected := accessor.GetManagedFields()[0].Manager, "apply_test"; actual != expected { + t.Fatalf("Expected first manager to be named %v but got %v", expected, actual) + } + + // Expect the second entry to have the manager name "ancient-changes" + if actual, expected := accessor.GetManagedFields()[1].Manager, "ancient-changes"; actual != expected { + t.Fatalf("Expected first manager to be named %v but got %v", expected, actual) + } +} + // TestApplyManagedFields makes sure that managedFields api does not change func TestApplyManagedFields(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)()