Cap the number of managedFields entries for updates at 10

This commit is contained in:
jennybuckley 2019-09-26 12:53:28 -07:00 committed by Jennifer Buckley
parent d72260b9da
commit 61b19c7298
6 changed files with 504 additions and 1 deletions

View File

@ -4,6 +4,7 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"buildmanagerinfo.go", "buildmanagerinfo.go",
"capmanagers.go",
"fieldmanager.go", "fieldmanager.go",
"skipnonapplied.go", "skipnonapplied.go",
"stripmeta.go", "stripmeta.go",
@ -48,6 +49,7 @@ filegroup(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"capmanagers_test.go",
"fieldmanager_test.go", "fieldmanager_test.go",
"skipnonapplied_test.go", "skipnonapplied_test.go",
], ],
@ -60,6 +62,7 @@ go_test(
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -68,6 +71,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/util/proto:go_default_library", "//vendor/k8s.io/kube-openapi/pkg/util/proto:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/util/proto/testing:go_default_library", "//vendor/k8s.io/kube-openapi/pkg/util/proto/testing:go_default_library",
"//vendor/sigs.k8s.io/structured-merge-diff/fieldpath:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library",
], ],
) )

View File

@ -0,0 +1,135 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"fmt"
"sort"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
"sigs.k8s.io/structured-merge-diff/fieldpath"
)
type capManagersManager struct {
fieldManager Manager
maxUpdateManagers int
oldUpdatesManagerName string
}
var _ Manager = &capManagersManager{}
// NewCapManagersManager creates a new wrapped FieldManager which ensures that the number of managers from updates
// does not exceed maxUpdateManagers, by merging some of the oldest entries on each update.
func NewCapManagersManager(fieldManager Manager, maxUpdateManagers int) Manager {
return &capManagersManager{
fieldManager: fieldManager,
maxUpdateManagers: maxUpdateManagers,
oldUpdatesManagerName: "ancient-changes",
}
}
// Update implements Manager.
func (f *capManagersManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
object, managed, err := f.fieldManager.Update(liveObj, newObj, managed, manager)
if err != nil {
return object, managed, err
}
if managed, err = f.capUpdateManagers(managed); err != nil {
return nil, nil, fmt.Errorf("failed to cap update managers: %v", err)
}
return object, managed, nil
}
// Apply implements Manager.
func (f *capManagersManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, fieldManager string, force bool) (runtime.Object, Managed, error) {
return f.fieldManager.Apply(liveObj, patch, managed, fieldManager, force)
}
// capUpdateManagers merges a number of the oldest update entries into versioned buckets,
// such that the number of entries from updates does not exceed f.maxUpdateManagers.
func (f *capManagersManager) capUpdateManagers(managed Managed) (newManaged Managed, err error) {
// Gather all entries from updates
updaters := []string{}
for manager, fields := range managed.Fields() {
if fields.Applied() == false {
updaters = append(updaters, manager)
}
}
if len(updaters) <= f.maxUpdateManagers {
return managed, nil
}
// If we have more than the maximum, sort the update entries by time, oldest first.
sort.Slice(updaters, func(i, j int) bool {
iTime, jTime, nTime := managed.Times()[updaters[i]], managed.Times()[updaters[j]], &metav1.Time{Time: time.Time{}}
if iTime == nil {
iTime = nTime
}
if jTime == nil {
jTime = nTime
}
if !iTime.Equal(jTime) {
return iTime.Before(jTime)
}
return updaters[i] < updaters[j]
})
// Merge the oldest updaters with versioned bucket managers until the number of updaters is under the cap
versionToFirstManager := map[string]string{}
for i, length := 0, len(updaters); i < len(updaters) && length > f.maxUpdateManagers; i++ {
manager := updaters[i]
vs := managed.Fields()[manager]
time := managed.Times()[manager]
version := string(vs.APIVersion())
// Create a new manager identifier for the versioned bucket entry.
// The version for this manager comes from the version of the update being merged into the bucket.
bucket, err := internal.BuildManagerIdentifier(&metav1.ManagedFieldsEntry{
Manager: f.oldUpdatesManagerName,
Operation: metav1.ManagedFieldsOperationUpdate,
APIVersion: version,
})
if err != nil {
return managed, fmt.Errorf("failed to create bucket manager for version %v: %v", version, err)
}
// Merge the fieldets if this is not the first time the version was seen.
// Otherwise just record the manager name in versionToFirstManager
if first, ok := versionToFirstManager[version]; ok {
// If the bucket doesn't exists yet, create one.
if _, ok := managed.Fields()[bucket]; !ok {
s := managed.Fields()[first]
delete(managed.Fields(), first)
managed.Fields()[bucket] = s
}
managed.Fields()[bucket] = fieldpath.NewVersionedSet(vs.Set().Union(managed.Fields()[bucket].Set()), vs.APIVersion(), vs.Applied())
delete(managed.Fields(), manager)
length--
// Use the time from the update being merged into the bucket, since it is more recent.
managed.Times()[bucket] = time
} else {
versionToFirstManager[version] = manager
}
}
return managed, nil
}

View File

@ -0,0 +1,286 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager_test
import (
"bytes"
"encoding/json"
"fmt"
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"sigs.k8s.io/structured-merge-diff/fieldpath"
)
type fakeManager struct{}
var _ fieldmanager.Manager = &fakeManager{}
func (*fakeManager) Update(_, newObj runtime.Object, managed fieldmanager.Managed, _ string) (runtime.Object, fieldmanager.Managed, error) {
return newObj, managed, nil
}
func (*fakeManager) Apply(_ runtime.Object, _ []byte, _ fieldmanager.Managed, _ string, force bool) (runtime.Object, fieldmanager.Managed, error) {
panic("not implemented")
return nil, nil, nil
}
func TestCapManagersManagerMergesEntries(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f.fieldManager = fieldmanager.NewCapManagersManager(f.fieldManager, 3)
podWithLabels := func(labels ...string) runtime.Object {
labelMap := map[string]interface{}{}
for _, key := range labels {
labelMap[key] = "true"
}
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"labels": labelMap,
},
},
}
obj.SetKind("Pod")
obj.SetAPIVersion("v1")
return obj
}
if err := f.Update(podWithLabels("one"), "fieldmanager_test_update_1"); err != nil {
t.Fatalf("failed to update object: %v", err)
}
expectIdempotence(t, f)
if err := f.Update(podWithLabels("one", "two"), "fieldmanager_test_update_2"); err != nil {
t.Fatalf("failed to update object: %v", err)
}
expectIdempotence(t, f)
if err := f.Update(podWithLabels("one", "two", "three"), "fieldmanager_test_update_3"); err != nil {
t.Fatalf("failed to update object: %v", err)
}
expectIdempotence(t, f)
if err := f.Update(podWithLabels("one", "two", "three", "four"), "fieldmanager_test_update_4"); err != nil {
t.Fatalf("failed to update object: %v", err)
}
expectIdempotence(t, f)
if e, a := 3, len(f.ManagedFields()); e != a {
t.Fatalf("exected %v entries in managedFields, but got %v: %#v", e, a, f.ManagedFields())
}
if e, a := "ancient-changes", f.ManagedFields()[0].Manager; e != a {
t.Fatalf("exected first manager name to be %v, but got %v: %#v", e, a, f.ManagedFields())
}
if e, a := "fieldmanager_test_update_3", f.ManagedFields()[1].Manager; e != a {
t.Fatalf("exected second manager name to be %v, but got %v: %#v", e, a, f.ManagedFields())
}
if e, a := "fieldmanager_test_update_4", f.ManagedFields()[2].Manager; e != a {
t.Fatalf("exected third manager name to be %v, but got %v: %#v", e, a, f.ManagedFields())
}
expectManagesField(t, f, "ancient-changes", fieldpath.MakePathOrDie("metadata", "labels", "one"))
expectManagesField(t, f, "ancient-changes", fieldpath.MakePathOrDie("metadata", "labels", "two"))
expectManagesField(t, f, "fieldmanager_test_update_3", fieldpath.MakePathOrDie("metadata", "labels", "three"))
expectManagesField(t, f, "fieldmanager_test_update_4", fieldpath.MakePathOrDie("metadata", "labels", "four"))
}
func TestCapUpdateManagers(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f.fieldManager = fieldmanager.NewCapManagersManager(&fakeManager{}, 3)
set := func(fields ...string) *metav1.FieldsV1 {
s := fieldpath.NewSet()
for _, f := range fields {
s.Insert(fieldpath.MakePathOrDie(f))
}
b, err := s.ToJSON()
if err != nil {
panic(fmt.Sprintf("error building ManagedFieldsEntry for test: %v", err))
}
return &metav1.FieldsV1{Raw: b}
}
entry := func(name string, version string, order int, fields *metav1.FieldsV1) metav1.ManagedFieldsEntry {
return metav1.ManagedFieldsEntry{
Manager: name,
APIVersion: version,
Operation: "Update",
FieldsType: "FieldsV1",
FieldsV1: fields,
Time: &metav1.Time{Time: time.Time{}.Add(time.Hour * time.Duration(order))},
}
}
testCases := []struct {
name string
input []metav1.ManagedFieldsEntry
expected []metav1.ManagedFieldsEntry
}{
{
name: "one version, no ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("update-manager1", "v1", 1, set("a")),
entry("update-manager2", "v1", 2, set("b")),
entry("update-manager3", "v1", 3, set("c")),
entry("update-manager4", "v1", 4, set("d")),
},
expected: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 2, set("a", "b")),
entry("update-manager3", "v1", 3, set("c")),
entry("update-manager4", "v1", 4, set("d")),
},
}, {
name: "one version, one ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 2, set("a", "b")),
entry("update-manager3", "v1", 3, set("c")),
entry("update-manager4", "v1", 4, set("d")),
entry("update-manager5", "v1", 5, set("e")),
},
expected: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 3, set("a", "b", "c")),
entry("update-manager4", "v1", 4, set("d")),
entry("update-manager5", "v1", 5, set("e")),
},
}, {
name: "two versions, no ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("update-manager1", "v1", 1, set("a")),
entry("update-manager2", "v2", 2, set("b")),
entry("update-manager3", "v1", 3, set("c")),
entry("update-manager4", "v1", 4, set("d")),
entry("update-manager5", "v1", 5, set("e")),
},
expected: []metav1.ManagedFieldsEntry{
entry("update-manager2", "v2", 2, set("b")),
entry("ancient-changes", "v1", 4, set("a", "c", "d")),
entry("update-manager5", "v1", 5, set("e")),
},
}, {
name: "three versions, one ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("update-manager2", "v2", 2, set("b")),
entry("ancient-changes", "v1", 4, set("a", "c", "d")),
entry("update-manager5", "v1", 5, set("e")),
entry("update-manager6", "v3", 6, set("f")),
entry("update-manager7", "v2", 7, set("g")),
},
expected: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")),
entry("update-manager6", "v3", 6, set("f")),
entry("ancient-changes", "v2", 7, set("b", "g")),
},
}, {
name: "three versions, two ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")),
entry("update-manager6", "v3", 6, set("f")),
entry("ancient-changes", "v2", 7, set("b", "g")),
entry("update-manager8", "v3", 8, set("h")),
},
expected: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")),
entry("ancient-changes", "v2", 7, set("b", "g")),
entry("ancient-changes", "v3", 8, set("f", "h")),
},
}, {
name: "four versions, two ancient changes",
input: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")),
entry("update-manager6", "v3", 6, set("f")),
entry("ancient-changes", "v2", 7, set("b", "g")),
entry("update-manager8", "v4", 8, set("h")),
},
expected: []metav1.ManagedFieldsEntry{
entry("ancient-changes", "v1", 5, set("a", "c", "d", "e")),
entry("update-manager6", "v3", 6, set("f")),
entry("ancient-changes", "v2", 7, set("b", "g")),
entry("update-manager8", "v4", 8, set("h")),
},
},
}
for _, tc := range testCases {
f.Reset()
accessor, err := meta.Accessor(f.liveObj)
if err != nil {
t.Fatalf("%v: couldn't get accessor: %v", tc.name, err)
}
accessor.SetManagedFields(tc.input)
if err := f.Update(f.liveObj, "no-op-update"); err != nil {
t.Fatalf("%v: failed to do no-op update to object: %v", tc.name, err)
}
if e, a := tc.expected, f.ManagedFields(); !apiequality.Semantic.DeepEqual(e, a) {
t.Errorf("%v: unexpected value for managedFields:\nexpected: %v\n but got: %v", tc.name, mustMarshal(e), mustMarshal(a))
}
expectIdempotence(t, f)
}
}
// expectIdempotence does a no-op update and ensures that managedFields doesn't change by calling capUpdateManagers.
func expectIdempotence(t *testing.T, f TestFieldManager) {
before := []metav1.ManagedFieldsEntry{}
for _, m := range f.ManagedFields() {
before = append(before, *m.DeepCopy())
}
if err := f.Update(f.liveObj, "no-op-update"); err != nil {
t.Fatalf("failed to do no-op update to object: %v", err)
}
if after := f.ManagedFields(); !apiequality.Semantic.DeepEqual(before, after) {
t.Fatalf("exected idempotence, but managedFields changed:\nbefore: %v\n after: %v", mustMarshal(before), mustMarshal(after))
}
}
// expectManagesField ensures that manager m currently manages field path p.
func expectManagesField(t *testing.T, f TestFieldManager, m string, p fieldpath.Path) {
for _, e := range f.ManagedFields() {
if e.Manager == m {
var s fieldpath.Set
err := s.FromJSON(bytes.NewReader(e.FieldsV1.Raw))
if err != nil {
t.Fatalf("error parsing managedFields for %v: %v: %#v", m, err, f.ManagedFields())
}
if !s.Has(p) {
t.Fatalf("expected managedFields for %v to contain %v, but got:\n%v", m, p.String(), s.String())
}
return
}
}
t.Fatalf("exected to find manager name %v, but got: %#v", m, f.ManagedFields())
}
func mustMarshal(i interface{}) string {
b, err := json.MarshalIndent(i, "", " ")
if err != nil {
panic(fmt.Sprintf("error marshalling %v to json: %v", i, err))
}
return string(b)
}

View File

@ -28,6 +28,11 @@ import (
"sigs.k8s.io/structured-merge-diff/fieldpath" "sigs.k8s.io/structured-merge-diff/fieldpath"
) )
// DefaultMaxUpdateManagers defines the default maximum retained number of managedFields entries from updates
// if the number of update managers exceeds this, the oldest entries will be merged until the number is below the maximum.
// TODO(jennybuckley): Determine if this is really the best value. Ideally we wouldn't unnecessarily merge too many entries.
const DefaultMaxUpdateManagers int = 10
// Managed groups a fieldpath.ManagedFields together with the timestamps associated with each operation. // Managed groups a fieldpath.ManagedFields together with the timestamps associated with each operation.
type Managed interface { type Managed interface {
// Fields gets the fieldpath.ManagedFields. // Fields gets the fieldpath.ManagedFields.
@ -86,6 +91,7 @@ func NewDefaultCRDFieldManager(models openapiproto.Models, objectConverter runti
func newDefaultFieldManager(f Manager, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind) *FieldManager { func newDefaultFieldManager(f Manager, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind) *FieldManager {
f = NewStripMetaManager(f) f = NewStripMetaManager(f)
f = NewBuildManagerInfoManager(f, kind.GroupVersion()) f = NewBuildManagerInfoManager(f, kind.GroupVersion())
f = NewCapManagersManager(f, DefaultMaxUpdateManagers)
f = NewSkipNonAppliedManager(f, objectCreater, kind) f = NewSkipNonAppliedManager(f, objectCreater, kind)
return NewFieldManager(f) return NewFieldManager(f)
} }

View File

@ -205,7 +205,10 @@ func sortEncodedManagedFields(encodedManagedFields []metav1.ManagedFieldsEntry)
return p.Time.Before(q.Time) return p.Time.Before(q.Time)
} }
return p.Manager < q.Manager if p.Manager != q.Manager {
return p.Manager < q.Manager
}
return p.APIVersion < q.APIVersion
}) })
return encodedManagedFields, nil return encodedManagedFields, nil

View File

@ -369,6 +369,75 @@ func TestApplyUpdateApplyConflictForced(t *testing.T) {
} }
} }
// TestApplyGroupsManySeparateUpdates tests that when many different managers update the same object,
// the number of managedFields entries will only grow to a certain size.
func TestApplyGroupsManySeparateUpdates(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)()
_, client, closeFn := setup(t)
defer closeFn()
obj := []byte(`{
"apiVersion": "admissionregistration.k8s.io/v1",
"kind": "ValidatingWebhookConfiguration",
"metadata": {
"name": "webhook",
"labels": {"applier":"true"},
},
}`)
object, err := client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath("/apis/admissionregistration.k8s.io/v1").
Resource("validatingwebhookconfigurations").
Name("webhook").
Param("fieldManager", "apply_test").
Body(obj).Do().Get()
if err != nil {
t.Fatalf("Failed to create object using Apply patch: %v", err)
}
for i := 0; i < 20; i++ {
unique := fmt.Sprintf("updater%v", i)
version := "v1"
if i%2 == 0 {
version = "v1beta1"
}
object, err = client.CoreV1().RESTClient().Patch(types.MergePatchType).
AbsPath("/apis/admissionregistration.k8s.io/"+version).
Resource("validatingwebhookconfigurations").
Name("webhook").
Param("fieldManager", unique).
Body([]byte(`{"metadata":{"labels":{"` + unique + `":"new"}}}`)).Do().Get()
if err != nil {
t.Fatalf("Failed to patch object: %v", err)
}
}
accessor, err := meta.Accessor(object)
if err != nil {
t.Fatalf("Failed to get meta accessor: %v", err)
}
// Expect 11 entries, because the cap for update entries is 10, and 1 apply entry
if actual, expected := len(accessor.GetManagedFields()), 11; actual != expected {
if b, err := json.MarshalIndent(object, "\t", "\t"); err == nil {
t.Fatalf("Object expected to contain %v entries in managedFields, but got %v:\n%v", expected, actual, string(b))
} else {
t.Fatalf("Object expected to contain %v entries in managedFields, but got %v: error marshalling object: %v", expected, actual, err)
}
}
// Expect the first entry to have the manager name "apply_test"
if actual, expected := accessor.GetManagedFields()[0].Manager, "apply_test"; actual != expected {
t.Fatalf("Expected first manager to be named %v but got %v", expected, actual)
}
// Expect the second entry to have the manager name "ancient-changes"
if actual, expected := accessor.GetManagedFields()[1].Manager, "ancient-changes"; actual != expected {
t.Fatalf("Expected first manager to be named %v but got %v", expected, actual)
}
}
// TestApplyManagedFields makes sure that managedFields api does not change // TestApplyManagedFields makes sure that managedFields api does not change
func TestApplyManagedFields(t *testing.T) { func TestApplyManagedFields(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)()