Refactor fieldmanager to be more modular

This commit is contained in:
jennybuckley 2019-09-19 13:14:36 -07:00 committed by Jennifer Buckley
parent 3b58b35921
commit d72260b9da
14 changed files with 538 additions and 324 deletions

View File

@ -823,18 +823,18 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
}
if utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) {
reqScope := *requestScopes[v.Name]
fm, err := fieldmanager.NewCRDFieldManager(
reqScope.FieldManager, err = fieldmanager.NewDefaultCRDFieldManager(
openAPIModels,
reqScope.Convertor,
reqScope.Defaulter,
reqScope.Kind.GroupVersion(),
reqScope.Creater,
reqScope.Kind,
reqScope.HubGroupVersion,
*crd.Spec.PreserveUnknownFields,
)
if err != nil {
return nil, err
}
reqScope.FieldManager = fieldmanager.NewSkipNonAppliedManager(fm, reqScope.Creater, reqScope.Kind)
requestScopes[v.Name] = &reqScope
}

View File

@ -3,8 +3,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"buildmanagerinfo.go",
"fieldmanager.go",
"skipnonapplied.go",
"stripmeta.go",
"structuredmerge.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager",
importpath = "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager",

View File

@ -0,0 +1,72 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
)
type buildManagerInfoManager struct {
fieldManager Manager
groupVersion schema.GroupVersion
}
var _ Manager = &buildManagerInfoManager{}
// NewBuildManagerInfoManager creates a new Manager that converts the manager name into a unique identifier
// combining operation and version for update requests, and just operation for apply requests.
func NewBuildManagerInfoManager(f Manager, gv schema.GroupVersion) Manager {
return &buildManagerInfoManager{
fieldManager: f,
groupVersion: gv,
}
}
// Update implements Manager.
func (f *buildManagerInfoManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
manager, err := f.buildManagerInfo(manager, metav1.ManagedFieldsOperationUpdate)
if err != nil {
return nil, nil, fmt.Errorf("failed to build manager identifier: %v", err)
}
return f.fieldManager.Update(liveObj, newObj, managed, manager)
}
// Apply implements Manager.
func (f *buildManagerInfoManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, manager string, force bool) (runtime.Object, Managed, error) {
manager, err := f.buildManagerInfo(manager, metav1.ManagedFieldsOperationApply)
if err != nil {
return nil, nil, fmt.Errorf("failed to build manager identifier: %v", err)
}
return f.fieldManager.Apply(liveObj, patch, managed, manager, force)
}
func (f *buildManagerInfoManager) buildManagerInfo(prefix string, operation metav1.ManagedFieldsOperationType) (string, error) {
managerInfo := metav1.ManagedFieldsEntry{
Manager: prefix,
Operation: operation,
APIVersion: f.groupVersion.String(),
}
if managerInfo.Manager == "" {
managerInfo.Manager = "unknown"
}
return internal.BuildManagerIdentifier(&managerInfo)
}

View File

@ -18,302 +18,138 @@ package fieldmanager
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
"k8s.io/klog"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/structured-merge-diff/fieldpath"
"sigs.k8s.io/structured-merge-diff/merge"
"sigs.k8s.io/yaml"
)
// FieldManager updates the managed fields and merge applied
// configurations.
type FieldManager interface {
// Managed groups a fieldpath.ManagedFields together with the timestamps associated with each operation.
type Managed interface {
// Fields gets the fieldpath.ManagedFields.
Fields() fieldpath.ManagedFields
// Times gets the timestamps associated with each operation.
Times() map[string]*metav1.Time
}
// Manager updates the managed fields and merges applied configurations.
type Manager interface {
// Update is used when the object has already been merged (non-apply
// use-case), and simply updates the managed fields in the output
// object.
Update(liveObj, newObj runtime.Object, manager string) (runtime.Object, error)
Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error)
// Apply is used when server-side apply is called, as it merges the
// object and update the managed fields.
Apply(liveObj runtime.Object, patch []byte, fieldManager string, force bool) (runtime.Object, error)
// object and updates the managed fields.
Apply(liveObj runtime.Object, patch []byte, managed Managed, fieldManager string, force bool) (runtime.Object, Managed, error)
}
type fieldManager struct {
typeConverter internal.TypeConverter
objectConverter runtime.ObjectConvertor
objectDefaulter runtime.ObjectDefaulter
groupVersion schema.GroupVersion
hubVersion schema.GroupVersion
updater merge.Updater
// FieldManager updates the managed fields and merge applied
// configurations.
type FieldManager struct {
fieldManager Manager
}
var _ FieldManager = &fieldManager{}
// NewFieldManager creates a new FieldManager that decodes, manages, then re-encodes managedFields
// on update and apply requests.
func NewFieldManager(f Manager) *FieldManager {
return &FieldManager{f}
}
// NewFieldManager creates a new FieldManager that merges apply requests
// NewDefaultFieldManager creates a new FieldManager that merges apply requests
// and update managed fields for other types of requests.
func NewFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion) (FieldManager, error) {
typeConverter, err := internal.NewTypeConverter(models, false)
func NewDefaultFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind, hub schema.GroupVersion) (*FieldManager, error) {
f, err := NewStructuredMergeManager(models, objectConverter, objectDefaulter, kind.GroupVersion(), hub)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create field manager: %v", err)
}
return &fieldManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
objectDefaulter: objectDefaulter,
groupVersion: gv,
hubVersion: hub,
updater: merge.Updater{
Converter: internal.NewVersionConverter(typeConverter, objectConverter, hub),
},
}, nil
return newDefaultFieldManager(f, objectCreater, kind), nil
}
// NewCRDFieldManager creates a new FieldManager specifically for
// NewDefaultCRDFieldManager creates a new FieldManager specifically for
// CRDs. This allows for the possibility of fields which are not defined
// in models, as well as having no models defined at all.
func NewCRDFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion, preserveUnknownFields bool) (_ FieldManager, err error) {
var typeConverter internal.TypeConverter = internal.DeducedTypeConverter{}
if models != nil {
typeConverter, err = internal.NewTypeConverter(models, preserveUnknownFields)
if err != nil {
return nil, err
}
func NewDefaultCRDFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind, hub schema.GroupVersion, preserveUnknownFields bool) (_ *FieldManager, err error) {
f, err := NewCRDStructuredMergeManager(models, objectConverter, objectDefaulter, kind.GroupVersion(), hub, preserveUnknownFields)
if err != nil {
return nil, fmt.Errorf("failed to create field manager: %v", err)
}
return &fieldManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
objectDefaulter: objectDefaulter,
groupVersion: gv,
hubVersion: hub,
updater: merge.Updater{
Converter: internal.NewCRDVersionConverter(typeConverter, objectConverter, hub),
},
}, nil
return newDefaultFieldManager(f, objectCreater, kind), nil
}
// Update implements FieldManager.
func (f *fieldManager) Update(liveObj, newObj runtime.Object, manager string) (runtime.Object, error) {
// newDefaultFieldManager is a helper function which wraps a Manager with certain default logic.
func newDefaultFieldManager(f Manager, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind) *FieldManager {
f = NewStripMetaManager(f)
f = NewBuildManagerInfoManager(f, kind.GroupVersion())
f = NewSkipNonAppliedManager(f, objectCreater, kind)
return NewFieldManager(f)
}
// Update is used when the object has already been merged (non-apply
// use-case), and simply updates the managed fields in the output
// object.
func (f *FieldManager) Update(liveObj, newObj runtime.Object, manager string) (object runtime.Object, err error) {
// If the object doesn't have metadata, we should just return without trying to
// set the managedFields at all, so creates/updates/patches will work normally.
if _, err := meta.Accessor(newObj); err != nil {
if _, err = meta.Accessor(newObj); err != nil {
return newObj, nil
}
// First try to decode the managed fields provided in the update,
// This is necessary to allow directly updating managed fields.
managed, err := internal.DecodeObjectManagedFields(newObj)
// If the managed field is empty or we failed to decode it,
// let's try the live object. This is to prevent clients who
// don't understand managedFields from deleting it accidentally.
if err != nil || len(managed.Fields) == 0 {
var managed Managed
if managed, err = internal.DecodeObjectManagedFields(newObj); err != nil || len(managed.Fields()) == 0 {
// If the managed field is empty or we failed to decode it,
// let's try the live object. This is to prevent clients who
// don't understand managedFields from deleting it accidentally.
managed, err = internal.DecodeObjectManagedFields(liveObj)
if err != nil {
return nil, fmt.Errorf("failed to decode managed fields: %v", err)
}
}
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {
return nil, fmt.Errorf("failed to convert live object to proper version: %v", err)
}
internal.RemoveObjectManagedFields(liveObjVersioned)
internal.RemoveObjectManagedFields(newObjVersioned)
newObjTyped, err := f.typeConverter.ObjectToTyped(newObjVersioned)
if err != nil {
// Return newObj and just by-pass fields update. This really shouldn't happen.
klog.Errorf("[SHOULD NOT HAPPEN] failed to create typed new object: %v", err)
return newObj, nil
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
// Return newObj and just by-pass fields update. This really shouldn't happen.
klog.Errorf("[SHOULD NOT HAPPEN] failed to create typed live object: %v", err)
return newObj, nil
}
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
// TODO(apelisse) use the first return value when unions are implemented
_, managed.Fields, err = f.updater.Update(liveObjTyped, newObjTyped, apiVersion, managed.Fields, manager)
if err != nil {
return nil, fmt.Errorf("failed to update ManagedFields: %v", err)
}
managed.Fields = f.stripFields(managed.Fields, manager)
internal.RemoveObjectManagedFields(liveObj)
internal.RemoveObjectManagedFields(newObj)
// If the current operation took any fields from anything, it means the object changed,
// so update the timestamp of the managedFieldsEntry and merge with any previous updates from the same manager
if vs, ok := managed.Fields[manager]; ok {
delete(managed.Fields, manager)
// Build a manager identifier which will only match previous updates from the same manager
manager, err = f.buildManagerInfo(manager, metav1.ManagedFieldsOperationUpdate)
if err != nil {
return nil, fmt.Errorf("failed to build manager identifier: %v", err)
}
managed.Times[manager] = &metav1.Time{Time: time.Now().UTC()}
if previous, ok := managed.Fields[manager]; ok {
managed.Fields[manager] = fieldpath.NewVersionedSet(vs.Set().Union(previous.Set()), vs.APIVersion(), vs.Applied())
} else {
managed.Fields[manager] = vs
}
if object, managed, err = f.fieldManager.Update(liveObj, newObj, managed, manager); err != nil {
return nil, err
}
if err := internal.EncodeObjectManagedFields(newObj, managed); err != nil {
if err = internal.EncodeObjectManagedFields(object, managed); err != nil {
return nil, fmt.Errorf("failed to encode managed fields: %v", err)
}
return newObj, nil
return object, nil
}
// Apply implements FieldManager.
func (f *fieldManager) Apply(liveObj runtime.Object, patch []byte, fieldManager string, force bool) (runtime.Object, error) {
// Apply is used when server-side apply is called, as it merges the
// object and updates the managed fields.
func (f *FieldManager) Apply(liveObj runtime.Object, patch []byte, manager string, force bool) (object runtime.Object, err error) {
// If the object doesn't have metadata, apply isn't allowed.
_, err := meta.Accessor(liveObj)
if err != nil {
if _, err = meta.Accessor(liveObj); err != nil {
return nil, fmt.Errorf("couldn't get accessor: %v", err)
}
managed, err := internal.DecodeObjectManagedFields(liveObj)
if err != nil {
// Decode the managed fields in the live object, since it isn't allowed in the patch.
var managed Managed
if managed, err = internal.DecodeObjectManagedFields(liveObj); err != nil {
return nil, fmt.Errorf("failed to decode managed fields: %v", err)
}
// Check that the patch object has the same version as the live object
patchObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(patch, &patchObj.Object); err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error decoding YAML: %v", err))
}
internal.RemoveObjectManagedFields(liveObj)
if patchObj.GetManagedFields() != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("metadata.managedFields must be nil"))
}
if patchObj.GetAPIVersion() != f.groupVersion.String() {
return nil,
errors.NewBadRequest(
fmt.Sprintf("Incorrect version specified in apply patch. "+
"Specified patch version: %s, expected: %s",
patchObj.GetAPIVersion(), f.groupVersion.String()))
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {
return nil, fmt.Errorf("failed to convert live object to proper version: %v", err)
}
internal.RemoveObjectManagedFields(liveObjVersioned)
patchObjTyped, err := f.typeConverter.ObjectToTyped(patchObj)
if err != nil {
return nil, fmt.Errorf("failed to create typed patch object: %v", err)
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
return nil, fmt.Errorf("failed to create typed live object: %v", err)
}
manager, err := f.buildManagerInfo(fieldManager, metav1.ManagedFieldsOperationApply)
if err != nil {
return nil, fmt.Errorf("failed to build manager identifier: %v", err)
}
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
newObjTyped, managedFields, err := f.updater.Apply(liveObjTyped, patchObjTyped, apiVersion, managed.Fields, manager, force)
if err != nil {
if conflicts, ok := err.(merge.Conflicts); ok {
return nil, internal.NewConflictError(conflicts)
}
if object, managed, err = f.fieldManager.Apply(liveObj, patch, managed, manager, force); err != nil {
return nil, err
}
managed.Fields = f.stripFields(managedFields, manager)
// Update the time in the managedFieldsEntry for this operation
managed.Times[manager] = &metav1.Time{Time: time.Now().UTC()}
newObj, err := f.typeConverter.TypedToObject(newObjTyped)
if err != nil {
return nil, fmt.Errorf("failed to convert new typed object to object: %v", err)
}
if err := internal.EncodeObjectManagedFields(newObj, managed); err != nil {
if err = internal.EncodeObjectManagedFields(object, managed); err != nil {
return nil, fmt.Errorf("failed to encode managed fields: %v", err)
}
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
}
f.objectDefaulter.Default(newObjVersioned)
newObjUnversioned, err := f.toUnversioned(newObjVersioned)
if err != nil {
return nil, fmt.Errorf("failed to convert to unversioned: %v", err)
}
return newObjUnversioned, nil
}
func (f *fieldManager) toVersioned(obj runtime.Object) (runtime.Object, error) {
return f.objectConverter.ConvertToVersion(obj, f.groupVersion)
}
func (f *fieldManager) toUnversioned(obj runtime.Object) (runtime.Object, error) {
return f.objectConverter.ConvertToVersion(obj, f.hubVersion)
}
func (f *fieldManager) buildManagerInfo(prefix string, operation metav1.ManagedFieldsOperationType) (string, error) {
managerInfo := metav1.ManagedFieldsEntry{
Manager: prefix,
Operation: operation,
APIVersion: f.groupVersion.String(),
}
if managerInfo.Manager == "" {
managerInfo.Manager = "unknown"
}
return internal.BuildManagerIdentifier(&managerInfo)
}
// stripSet is the list of fields that should never be part of a mangedFields.
var stripSet = fieldpath.NewSet(
fieldpath.MakePathOrDie("apiVersion"),
fieldpath.MakePathOrDie("kind"),
fieldpath.MakePathOrDie("metadata"),
fieldpath.MakePathOrDie("metadata", "name"),
fieldpath.MakePathOrDie("metadata", "namespace"),
fieldpath.MakePathOrDie("metadata", "creationTimestamp"),
fieldpath.MakePathOrDie("metadata", "selfLink"),
fieldpath.MakePathOrDie("metadata", "uid"),
fieldpath.MakePathOrDie("metadata", "clusterName"),
fieldpath.MakePathOrDie("metadata", "generation"),
fieldpath.MakePathOrDie("metadata", "managedFields"),
fieldpath.MakePathOrDie("metadata", "resourceVersion"),
)
// stripFields removes a predefined set of paths found in typed from managed and returns the updated ManagedFields
func (f *fieldManager) stripFields(managed fieldpath.ManagedFields, manager string) fieldpath.ManagedFields {
vs, ok := managed[manager]
if ok {
if vs == nil {
panic(fmt.Sprintf("Found unexpected nil manager which should never happen: %s", manager))
}
newSet := vs.Set().Difference(stripSet)
if newSet.Empty() {
delete(managed, manager)
} else {
managed[manager] = fieldpath.NewVersionedSet(newSet, vs.APIVersion(), vs.Applied())
}
}
return managed
return object, nil
}

View File

@ -65,7 +65,7 @@ type fakeObjectDefaulter struct{}
func (d *fakeObjectDefaulter) Default(in runtime.Object) {}
type TestFieldManager struct {
fieldManager fieldmanager.FieldManager
fieldManager fieldmanager.Manager
emptyObj runtime.Object
liveObj runtime.Object
}
@ -80,7 +80,7 @@ func NewTestFieldManager(gvk schema.GroupVersionKind) TestFieldManager {
panic(err)
}
f, err := fieldmanager.NewFieldManager(
f, err := fieldmanager.NewStructuredMergeManager(
m,
&fakeObjectConvertor{},
&fakeObjectDefaulter{},
@ -93,6 +93,8 @@ func NewTestFieldManager(gvk schema.GroupVersionKind) TestFieldManager {
live := &unstructured.Unstructured{}
live.SetKind(gvk.Kind)
live.SetAPIVersion(gvk.GroupVersion().String())
f = fieldmanager.NewStripMetaManager(f)
f = fieldmanager.NewBuildManagerInfoManager(f, gvk.GroupVersion())
return TestFieldManager{
fieldManager: f,
emptyObj: live,
@ -105,7 +107,7 @@ func (f *TestFieldManager) Reset() {
}
func (f *TestFieldManager) Apply(obj []byte, manager string, force bool) error {
out, err := f.fieldManager.Apply(f.liveObj, obj, manager, force)
out, err := fieldmanager.NewFieldManager(f.fieldManager).Apply(f.liveObj, obj, manager, force)
if err == nil {
f.liveObj = out
}
@ -113,7 +115,7 @@ func (f *TestFieldManager) Apply(obj []byte, manager string, force bool) error {
}
func (f *TestFieldManager) Update(obj runtime.Object, manager string) error {
out, err := f.fieldManager.Update(f.liveObj, obj, manager)
out, err := fieldmanager.NewFieldManager(f.fieldManager).Update(f.liveObj, obj, manager)
if err == nil {
f.liveObj = out
}

View File

@ -28,10 +28,38 @@ import (
"sigs.k8s.io/structured-merge-diff/fieldpath"
)
// Managed groups a fieldpath.ManagedFields together with the timestamps associated with each operation.
type Managed struct {
Fields fieldpath.ManagedFields
Times map[string]*metav1.Time
// ManagedInterface groups a fieldpath.ManagedFields together with the timestamps associated with each operation.
type ManagedInterface interface {
// Fields gets the fieldpath.ManagedFields.
Fields() fieldpath.ManagedFields
// Times gets the timestamps associated with each operation.
Times() map[string]*metav1.Time
}
type managedStruct struct {
fields fieldpath.ManagedFields
times map[string]*metav1.Time
}
var _ ManagedInterface = &managedStruct{}
// Fields implements ManagedInterface.
func (m *managedStruct) Fields() fieldpath.ManagedFields {
return m.fields
}
// Times implements ManagedInterface.
func (m *managedStruct) Times() map[string]*metav1.Time {
return m.times
}
// NewManaged creates a ManagedInterface from a fieldpath.ManagedFields and the timestamps associated with each operation.
func NewManaged(f fieldpath.ManagedFields, t map[string]*metav1.Time) ManagedInterface {
return &managedStruct{
fields: f,
times: t,
}
}
// RemoveObjectManagedFields removes the ManagedFields from the object
@ -46,9 +74,9 @@ func RemoveObjectManagedFields(obj runtime.Object) {
}
// DecodeObjectManagedFields extracts and converts the objects ManagedFields into a fieldpath.ManagedFields.
func DecodeObjectManagedFields(from runtime.Object) (Managed, error) {
func DecodeObjectManagedFields(from runtime.Object) (ManagedInterface, error) {
if from == nil {
return Managed{}, nil
return &managedStruct{}, nil
}
accessor, err := meta.Accessor(from)
if err != nil {
@ -57,13 +85,13 @@ func DecodeObjectManagedFields(from runtime.Object) (Managed, error) {
managed, err := decodeManagedFields(accessor.GetManagedFields())
if err != nil {
return Managed{}, fmt.Errorf("failed to convert managed fields from API: %v", err)
return nil, fmt.Errorf("failed to convert managed fields from API: %v", err)
}
return managed, err
return &managed, nil
}
// EncodeObjectManagedFields converts and stores the fieldpathManagedFields into the objects ManagedFields
func EncodeObjectManagedFields(obj runtime.Object, managed Managed) error {
func EncodeObjectManagedFields(obj runtime.Object, managed ManagedInterface) error {
accessor, err := meta.Accessor(obj)
if err != nil {
panic(fmt.Sprintf("couldn't get accessor: %v", err))
@ -80,19 +108,19 @@ func EncodeObjectManagedFields(obj runtime.Object, managed Managed) error {
// decodeManagedFields converts ManagedFields from the wire format (api format)
// to the format used by sigs.k8s.io/structured-merge-diff
func decodeManagedFields(encodedManagedFields []metav1.ManagedFieldsEntry) (managed Managed, err error) {
managed.Fields = make(fieldpath.ManagedFields, len(encodedManagedFields))
managed.Times = make(map[string]*metav1.Time, len(encodedManagedFields))
func decodeManagedFields(encodedManagedFields []metav1.ManagedFieldsEntry) (managed managedStruct, err error) {
managed.fields = make(fieldpath.ManagedFields, len(encodedManagedFields))
managed.times = make(map[string]*metav1.Time, len(encodedManagedFields))
for _, encodedVersionedSet := range encodedManagedFields {
manager, err := BuildManagerIdentifier(&encodedVersionedSet)
if err != nil {
return Managed{}, fmt.Errorf("error decoding manager from %v: %v", encodedVersionedSet, err)
return managedStruct{}, fmt.Errorf("error decoding manager from %v: %v", encodedVersionedSet, err)
}
managed.Fields[manager], err = decodeVersionedSet(&encodedVersionedSet)
managed.fields[manager], err = decodeVersionedSet(&encodedVersionedSet)
if err != nil {
return Managed{}, fmt.Errorf("error decoding versioned set from %v: %v", encodedVersionedSet, err)
return managedStruct{}, fmt.Errorf("error decoding versioned set from %v: %v", encodedVersionedSet, err)
}
managed.Times[manager] = encodedVersionedSet.Time
managed.times[manager] = encodedVersionedSet.Time
}
return managed, nil
}
@ -139,15 +167,18 @@ func decodeVersionedSet(encodedVersionedSet *metav1.ManagedFieldsEntry) (version
// encodeManagedFields converts ManagedFields from the format used by
// sigs.k8s.io/structured-merge-diff to the wire format (api format)
func encodeManagedFields(managed Managed) (encodedManagedFields []metav1.ManagedFieldsEntry, err error) {
func encodeManagedFields(managed ManagedInterface) (encodedManagedFields []metav1.ManagedFieldsEntry, err error) {
if len(managed.Fields()) == 0 {
return nil, nil
}
encodedManagedFields = []metav1.ManagedFieldsEntry{}
for manager := range managed.Fields {
versionedSet := managed.Fields[manager]
for manager := range managed.Fields() {
versionedSet := managed.Fields()[manager]
v, err := encodeManagerVersionedSet(manager, versionedSet)
if err != nil {
return nil, fmt.Errorf("error encoding versioned set for %v: %v", manager, err)
}
if t, ok := managed.Times[manager]; ok {
if t, ok := managed.Times()[manager]; ok {
v.Time = t
}
encodedManagedFields = append(encodedManagedFields, *v)

View File

@ -31,6 +31,8 @@ import (
// (api format) to the format used by sigs.k8s.io/structured-merge-diff and back
func TestRoundTripManagedFields(t *testing.T) {
tests := []string{
`null
`,
`- apiVersion: v1
fieldsType: FieldsV1
fieldsV1:
@ -146,7 +148,7 @@ func TestRoundTripManagedFields(t *testing.T) {
if err != nil {
t.Fatalf("did not expect decoding error but got: %v", err)
}
encoded, err := encodeManagedFields(decoded)
encoded, err := encodeManagedFields(&decoded)
if err != nil {
t.Fatalf("did not expect encoding error but got: %v", err)
}

View File

@ -19,61 +19,48 @@ package fieldmanager
import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type skipNonAppliedManager struct {
fieldManager FieldManager
objectCreater runtime.ObjectCreater
gvk schema.GroupVersionKind
fieldManager Manager
objectCreater runtime.ObjectCreater
gvk schema.GroupVersionKind
beforeApplyManagerName string
}
var _ FieldManager = &skipNonAppliedManager{}
var _ Manager = &skipNonAppliedManager{}
// NewSkipNonAppliedManager creates a new wrapped FieldManager that only starts tracking managers after the first apply
func NewSkipNonAppliedManager(fieldManager FieldManager, objectCreater runtime.ObjectCreater, gvk schema.GroupVersionKind) FieldManager {
// NewSkipNonAppliedManager creates a new wrapped FieldManager that only starts tracking managers after the first apply.
func NewSkipNonAppliedManager(fieldManager Manager, objectCreater runtime.ObjectCreater, gvk schema.GroupVersionKind) Manager {
return &skipNonAppliedManager{
fieldManager: fieldManager,
objectCreater: objectCreater,
gvk: gvk,
fieldManager: fieldManager,
objectCreater: objectCreater,
gvk: gvk,
beforeApplyManagerName: "before-first-apply",
}
}
// Update implements FieldManager.
func (f *skipNonAppliedManager) Update(liveObj, newObj runtime.Object, manager string) (runtime.Object, error) {
liveObjAccessor, err := meta.Accessor(liveObj)
if err != nil {
return newObj, nil
// Update implements Manager.
func (f *skipNonAppliedManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
if len(managed.Fields()) == 0 {
return newObj, managed, nil
}
newObjAccessor, err := meta.Accessor(newObj)
if err != nil {
return newObj, nil
}
if len(liveObjAccessor.GetManagedFields()) == 0 && len(newObjAccessor.GetManagedFields()) == 0 {
return newObj, nil
}
return f.fieldManager.Update(liveObj, newObj, manager)
return f.fieldManager.Update(liveObj, newObj, managed, manager)
}
// Apply implements FieldManager.
func (f *skipNonAppliedManager) Apply(liveObj runtime.Object, patch []byte, fieldManager string, force bool) (runtime.Object, error) {
liveObjAccessor, err := meta.Accessor(liveObj)
if err != nil {
return nil, fmt.Errorf("couldn't get accessor: %v", err)
}
if len(liveObjAccessor.GetManagedFields()) == 0 {
// Apply implements Manager.
func (f *skipNonAppliedManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, fieldManager string, force bool) (runtime.Object, Managed, error) {
if len(managed.Fields()) == 0 {
emptyObj, err := f.objectCreater.New(f.gvk)
if err != nil {
return nil, fmt.Errorf("failed to create empty object of type %v: %v", f.gvk, err)
return nil, nil, fmt.Errorf("failed to create empty object of type %v: %v", f.gvk, err)
}
liveObj, err = f.fieldManager.Update(emptyObj, liveObj, "before-first-apply")
liveObj, managed, err = f.fieldManager.Update(emptyObj, liveObj, managed, f.beforeApplyManagerName)
if err != nil {
return nil, fmt.Errorf("failed to create manager for existing fields: %v", err)
return nil, nil, fmt.Errorf("failed to create manager for existing fields: %v", err)
}
}
return f.fieldManager.Apply(liveObj, patch, fieldManager, force)
return f.fieldManager.Apply(liveObj, patch, managed, fieldManager, force)
}

View File

@ -0,0 +1,90 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/structured-merge-diff/fieldpath"
)
type stripMetaManager struct {
fieldManager Manager
// stripSet is the list of fields that should never be part of a mangedFields.
stripSet *fieldpath.Set
}
var _ Manager = &stripMetaManager{}
// NewStripMetaManager creates a new Manager that strips metadata and typemeta fields from the manager's fieldset.
func NewStripMetaManager(fieldManager Manager) Manager {
return &stripMetaManager{
fieldManager: fieldManager,
stripSet: fieldpath.NewSet(
fieldpath.MakePathOrDie("apiVersion"),
fieldpath.MakePathOrDie("kind"),
fieldpath.MakePathOrDie("metadata"),
fieldpath.MakePathOrDie("metadata", "name"),
fieldpath.MakePathOrDie("metadata", "namespace"),
fieldpath.MakePathOrDie("metadata", "creationTimestamp"),
fieldpath.MakePathOrDie("metadata", "selfLink"),
fieldpath.MakePathOrDie("metadata", "uid"),
fieldpath.MakePathOrDie("metadata", "clusterName"),
fieldpath.MakePathOrDie("metadata", "generation"),
fieldpath.MakePathOrDie("metadata", "managedFields"),
fieldpath.MakePathOrDie("metadata", "resourceVersion"),
),
}
}
// Update implements Manager.
func (f *stripMetaManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
newObj, managed, err := f.fieldManager.Update(liveObj, newObj, managed, manager)
if err != nil {
return nil, nil, err
}
f.stripFields(managed.Fields(), manager)
return newObj, managed, nil
}
// Apply implements Manager.
func (f *stripMetaManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, manager string, force bool) (runtime.Object, Managed, error) {
newObj, managed, err := f.fieldManager.Apply(liveObj, patch, managed, manager, force)
if err != nil {
return nil, nil, err
}
f.stripFields(managed.Fields(), manager)
return newObj, managed, nil
}
// stripFields removes a predefined set of paths found in typed from managed
func (f *stripMetaManager) stripFields(managed fieldpath.ManagedFields, manager string) {
vs, ok := managed[manager]
if ok {
if vs == nil {
panic(fmt.Sprintf("Found unexpected nil manager which should never happen: %s", manager))
}
newSet := vs.Set().Difference(f.stripSet)
if newSet.Empty() {
delete(managed, manager)
} else {
managed[manager] = fieldpath.NewVersionedSet(newSet, vs.APIVersion(), vs.Applied())
}
}
}

View File

@ -0,0 +1,209 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
"k8s.io/klog"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/structured-merge-diff/fieldpath"
"sigs.k8s.io/structured-merge-diff/merge"
"sigs.k8s.io/yaml"
)
type structuredMergeManager struct {
typeConverter internal.TypeConverter
objectConverter runtime.ObjectConvertor
objectDefaulter runtime.ObjectDefaulter
groupVersion schema.GroupVersion
hubVersion schema.GroupVersion
updater merge.Updater
}
var _ Manager = &structuredMergeManager{}
// NewStructuredMergeManager creates a new Manager that merges apply requests
// and update managed fields for other types of requests.
func NewStructuredMergeManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion) (Manager, error) {
typeConverter, err := internal.NewTypeConverter(models, false)
if err != nil {
return nil, err
}
return &structuredMergeManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
objectDefaulter: objectDefaulter,
groupVersion: gv,
hubVersion: hub,
updater: merge.Updater{
Converter: internal.NewVersionConverter(typeConverter, objectConverter, hub),
},
}, nil
}
// NewCRDStructuredMergeManager creates a new Manager specifically for
// CRDs. This allows for the possibility of fields which are not defined
// in models, as well as having no models defined at all.
func NewCRDStructuredMergeManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion, preserveUnknownFields bool) (_ Manager, err error) {
var typeConverter internal.TypeConverter = internal.DeducedTypeConverter{}
if models != nil {
typeConverter, err = internal.NewTypeConverter(models, preserveUnknownFields)
if err != nil {
return nil, err
}
}
return &structuredMergeManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
objectDefaulter: objectDefaulter,
groupVersion: gv,
hubVersion: hub,
updater: merge.Updater{
Converter: internal.NewCRDVersionConverter(typeConverter, objectConverter, hub),
},
}, nil
}
// Update implements Manager.
func (f *structuredMergeManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert live object to proper version: %v", err)
}
newObjTyped, err := f.typeConverter.ObjectToTyped(newObjVersioned)
if err != nil {
// Return newObj and just by-pass fields update. This really shouldn't happen.
klog.Errorf("[SHOULD NOT HAPPEN] failed to create typed new object: %v", err)
return newObj, managed, nil
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
// Return newObj and just by-pass fields update. This really shouldn't happen.
klog.Errorf("[SHOULD NOT HAPPEN] failed to create typed live object: %v", err)
return newObj, managed, nil
}
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
// TODO(apelisse) use the first return value when unions are implemented
self := "current-operation"
_, managedFields, err := f.updater.Update(liveObjTyped, newObjTyped, apiVersion, managed.Fields(), self)
if err != nil {
return nil, nil, fmt.Errorf("failed to update ManagedFields: %v", err)
}
managed = internal.NewManaged(managedFields, managed.Times())
// If the current operation took any fields from anything, it means the object changed,
// so update the timestamp of the managedFieldsEntry and merge with any previous updates from the same manager
if vs, ok := managed.Fields()[self]; ok {
delete(managed.Fields(), self)
managed.Times()[manager] = &metav1.Time{Time: time.Now().UTC()}
if previous, ok := managed.Fields()[manager]; ok {
managed.Fields()[manager] = fieldpath.NewVersionedSet(vs.Set().Union(previous.Set()), vs.APIVersion(), vs.Applied())
} else {
managed.Fields()[manager] = vs
}
}
return newObj, managed, nil
}
// Apply implements Manager.
func (f *structuredMergeManager) Apply(liveObj runtime.Object, patch []byte, managed Managed, manager string, force bool) (runtime.Object, Managed, error) {
patchObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(patch, &patchObj.Object); err != nil {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("error decoding YAML: %v", err))
}
// Check that the patch object has the same version as the live object
if patchObj.GetAPIVersion() != f.groupVersion.String() {
return nil, nil,
errors.NewBadRequest(
fmt.Sprintf("Incorrect version specified in apply patch. "+
"Specified patch version: %s, expected: %s",
patchObj.GetAPIVersion(), f.groupVersion.String()))
}
if patchObj.GetManagedFields() != nil {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("metadata.managedFields must be nil"))
}
liveObjVersioned, err := f.toVersioned(liveObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert live object to proper version: %v", err)
}
patchObjTyped, err := f.typeConverter.ObjectToTyped(patchObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to create typed patch object: %v", err)
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
return nil, nil, fmt.Errorf("failed to create typed live object: %v", err)
}
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
newObjTyped, managedFields, err := f.updater.Apply(liveObjTyped, patchObjTyped, apiVersion, managed.Fields(), manager, force)
if err != nil {
if conflicts, ok := err.(merge.Conflicts); ok {
return nil, nil, internal.NewConflictError(conflicts)
}
return nil, nil, err
}
managed = internal.NewManaged(managedFields, managed.Times())
// Update the time in the managedFieldsEntry for this operation
managed.Times()[manager] = &metav1.Time{Time: time.Now().UTC()}
newObj, err := f.typeConverter.TypedToObject(newObjTyped)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert new typed object to object: %v", err)
}
newObjVersioned, err := f.toVersioned(newObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert new object to proper version: %v", err)
}
f.objectDefaulter.Default(newObjVersioned)
newObjUnversioned, err := f.toUnversioned(newObjVersioned)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert to unversioned: %v", err)
}
return newObjUnversioned, managed, nil
}
func (f *structuredMergeManager) toVersioned(obj runtime.Object) (runtime.Object, error) {
return f.objectConverter.ConvertToVersion(obj, f.groupVersion)
}
func (f *structuredMergeManager) toUnversioned(obj runtime.Object) (runtime.Object, error) {
return f.objectConverter.ConvertToVersion(obj, f.hubVersion)
}

View File

@ -296,7 +296,7 @@ type patchMechanism interface {
type jsonPatcher struct {
*patcher
fieldManager fieldmanager.FieldManager
fieldManager *fieldmanager.FieldManager
}
func (p *jsonPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) {
@ -382,7 +382,7 @@ type smpPatcher struct {
// Schema
schemaReferenceObj runtime.Object
fieldManager fieldmanager.FieldManager
fieldManager *fieldmanager.FieldManager
}
func (p *smpPatcher) applyPatchToCurrentObject(currentObject runtime.Object) (runtime.Object, error) {
@ -422,7 +422,7 @@ type applyPatcher struct {
options *metav1.PatchOptions
creater runtime.ObjectCreater
kind schema.GroupVersionKind
fieldManager fieldmanager.FieldManager
fieldManager *fieldmanager.FieldManager
}
func (p *applyPatcher) applyPatchToCurrentObject(obj runtime.Object) (runtime.Object, error) {

View File

@ -67,7 +67,7 @@ type RequestScope struct {
EquivalentResourceMapper runtime.EquivalentResourceMapper
TableConvertor rest.TableConvertor
FieldManager fieldmanager.FieldManager
FieldManager *fieldmanager.FieldManager
Resource schema.GroupVersionResource
Kind schema.GroupVersionKind

View File

@ -550,18 +550,17 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
reqScope.MetaGroupVersion = *a.group.MetaGroupVersion
}
if a.group.OpenAPIModels != nil && utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) {
fm, err := fieldmanager.NewFieldManager(
reqScope.FieldManager, err = fieldmanager.NewDefaultFieldManager(
a.group.OpenAPIModels,
a.group.UnsafeConvertor,
a.group.Defaulter,
fqKindToRegister.GroupVersion(),
a.group.Creater,
fqKindToRegister,
reqScope.HubGroupVersion,
)
if err != nil {
return nil, fmt.Errorf("failed to create field manager: %v", err)
}
fm = fieldmanager.NewSkipNonAppliedManager(fm, a.group.Creater, fqKindToRegister)
reqScope.FieldManager = fm
}
for _, action := range actions {
producedObject := storageMeta.ProducesObject(action.Verb)

View File

@ -180,7 +180,7 @@ func TestNoOpUpdateSameResourceVersion(t *testing.T) {
}
}`)
o, err := client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
_, err := client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
Namespace("default").
Param("fieldManager", "apply_test").
Resource(podResource).
@ -192,23 +192,6 @@ func TestNoOpUpdateSameResourceVersion(t *testing.T) {
t.Fatalf("Failed to create object: %v", err)
}
// Need to update once for some reason
// TODO (#82042): Remove this update once possible
b, err := json.MarshalIndent(o, "\t", "\t")
if err != nil {
t.Fatalf("Failed to marshal created object: %v", err)
}
_, err = client.CoreV1().RESTClient().Put().
Namespace("default").
Resource(podResource).
Name(podName).
Body(b).
Do().
Get()
if err != nil {
t.Fatalf("Failed to apply first no-op update: %v", err)
}
// Sleep for one second to make sure that the times of each update operation is different.
time.Sleep(1 * time.Second)
@ -941,7 +924,7 @@ func TestApplyConvertsManagedFieldsVersion(t *testing.T) {
Time: actual.Time,
FieldsType: "FieldsV1",
FieldsV1: &metav1.FieldsV1{
Raw: []byte(`{"f:metadata":{"f:labels":{"f:sidecar_version":{}}},"f:spec":{"f:template":{"f:spec":{"f:containers":{"k:{\"name\":\"sidecar\"}":{".":{},"f:image":{},"f:name":{}}}}}}}`),
Raw: []byte(`{"f:metadata":{"f:labels":{"f:sidecar_version":{}}},"f:spec":{"f:template":{"f:spec":{"f:containers":{"k:{\"name\":\"sidecar\"}":{"f:image":{},"f:name":{},".":{}}}}}}}`),
},
}