Merge pull request #90187 from julianvmodesto/last-applied-updater

Implement server-side apply upgrade and downgrade
This commit is contained in:
Kubernetes Prow Robot 2020-07-13 16:45:20 -07:00 committed by GitHub
commit 6079cebfae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1462 additions and 65 deletions

View File

@ -6,6 +6,8 @@ go_library(
"buildmanagerinfo.go",
"capmanagers.go",
"fieldmanager.go",
"lastappliedmanager.go",
"lastappliedupdater.go",
"managedfieldsupdater.go",
"skipnonapplied.go",
"stripmeta.go",
@ -15,9 +17,11 @@ go_library(
importpath = "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal:go_default_library",
@ -50,6 +54,8 @@ go_test(
srcs = [
"capmanagers_test.go",
"fieldmanager_test.go",
"lastappliedmanager_test.go",
"lastappliedupdater_test.go",
"skipnonapplied_test.go",
],
data = [
@ -69,6 +75,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/yaml:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/util/proto:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/util/proto/testing:go_default_library",

View File

@ -41,14 +41,16 @@ func (*fakeManager) Update(_, newObj runtime.Object, managed fieldmanager.Manage
return newObj, managed, nil
}
func (*fakeManager) Apply(_, _ runtime.Object, _ fieldmanager.Managed, _ string, force bool) (runtime.Object, fieldmanager.Managed, error) {
func (*fakeManager) Apply(_, _ runtime.Object, _ fieldmanager.Managed, _ string, _ bool) (runtime.Object, fieldmanager.Managed, error) {
panic("not implemented")
return nil, nil, nil
}
func TestCapManagersManagerMergesEntries(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f.fieldManager = fieldmanager.NewCapManagersManager(f.fieldManager, 3)
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"),
func(m fieldmanager.Manager) fieldmanager.Manager {
return fieldmanager.NewCapManagersManager(m, 3)
})
podWithLabels := func(labels ...string) runtime.Object {
labelMap := map[string]interface{}{}
@ -110,8 +112,10 @@ func TestCapManagersManagerMergesEntries(t *testing.T) {
}
func TestCapUpdateManagers(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f.fieldManager = fieldmanager.NewCapManagersManager(&fakeManager{}, 3)
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"),
func(m fieldmanager.Manager) fieldmanager.Manager {
return fieldmanager.NewCapManagersManager(m, 3)
})
set := func(fields ...string) *metav1.FieldsV1 {
s := fieldpath.NewSet()

View File

@ -29,6 +29,7 @@ import (
"k8s.io/klog/v2"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/structured-merge-diff/v3/fieldpath"
"sigs.k8s.io/structured-merge-diff/v3/merge"
)
// DefaultMaxUpdateManagers defines the default maximum retained number of managedFields entries from updates
@ -78,31 +79,46 @@ func NewFieldManager(f Manager) *FieldManager {
// NewDefaultFieldManager creates a new FieldManager that merges apply requests
// and update managed fields for other types of requests.
func NewDefaultFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind, hub schema.GroupVersion) (*FieldManager, error) {
f, err := NewStructuredMergeManager(models, objectConverter, objectDefaulter, kind.GroupVersion(), hub)
typeConverter, err := internal.NewTypeConverter(models, false)
if err != nil {
return nil, err
}
f, err := NewStructuredMergeManager(typeConverter, objectConverter, objectDefaulter, kind.GroupVersion(), hub)
if err != nil {
return nil, fmt.Errorf("failed to create field manager: %v", err)
}
return newDefaultFieldManager(f, objectCreater, kind), nil
return newDefaultFieldManager(f, typeConverter, objectConverter, objectCreater, kind), nil
}
// NewDefaultCRDFieldManager creates a new FieldManager specifically for
// CRDs. This allows for the possibility of fields which are not defined
// in models, as well as having no models defined at all.
func NewDefaultCRDFieldManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind, hub schema.GroupVersion, preserveUnknownFields bool) (_ *FieldManager, err error) {
f, err := NewCRDStructuredMergeManager(models, objectConverter, objectDefaulter, kind.GroupVersion(), hub, preserveUnknownFields)
var typeConverter internal.TypeConverter = internal.DeducedTypeConverter{}
if models != nil {
typeConverter, err = internal.NewTypeConverter(models, preserveUnknownFields)
if err != nil {
return nil, err
}
}
f, err := NewCRDStructuredMergeManager(typeConverter, objectConverter, objectDefaulter, kind.GroupVersion(), hub, preserveUnknownFields)
if err != nil {
return nil, fmt.Errorf("failed to create field manager: %v", err)
}
return newDefaultFieldManager(f, objectCreater, kind), nil
return newDefaultFieldManager(f, typeConverter, objectConverter, objectCreater, kind), nil
}
// newDefaultFieldManager is a helper function which wraps a Manager with certain default logic.
func newDefaultFieldManager(f Manager, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind) *FieldManager {
func newDefaultFieldManager(f Manager, typeConverter internal.TypeConverter, objectConverter runtime.ObjectConvertor, objectCreater runtime.ObjectCreater, kind schema.GroupVersionKind) *FieldManager {
f = NewStripMetaManager(f)
f = NewManagedFieldsUpdater(f)
f = NewBuildManagerInfoManager(f, kind.GroupVersion())
f = NewCapManagersManager(f, DefaultMaxUpdateManagers)
f = NewProbabilisticSkipNonAppliedManager(f, objectCreater, kind, DefaultTrackOnCreateProbability)
f = NewLastAppliedManager(f, typeConverter, objectConverter, kind.GroupVersion())
f = NewLastAppliedUpdater(f)
return NewFieldManager(f)
}
@ -200,7 +216,11 @@ func (f *FieldManager) Apply(liveObj, appliedObj runtime.Object, manager string,
internal.RemoveObjectManagedFields(liveObj)
if object, managed, err = f.fieldManager.Apply(liveObj, appliedObj, managed, manager, force); err != nil {
object, managed, err = f.fieldManager.Apply(liveObj, appliedObj, managed, manager, force)
if err != nil {
if conflicts, ok := err.(merge.Conflicts); ok {
return nil, internal.NewConflictError(conflicts)
}
return nil, err
}

View File

@ -35,9 +35,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
"k8s.io/kube-openapi/pkg/util/proto"
prototesting "k8s.io/kube-openapi/pkg/util/proto/testing"
"sigs.k8s.io/structured-merge-diff/v3/fieldpath"
@ -80,20 +80,24 @@ type fakeObjectDefaulter struct{}
func (d *fakeObjectDefaulter) Default(in runtime.Object) {}
type TestFieldManager struct {
fieldManager fieldmanager.Manager
fieldManager *fieldmanager.FieldManager
emptyObj runtime.Object
liveObj runtime.Object
}
func NewTestFieldManager(gvk schema.GroupVersionKind) TestFieldManager {
m := NewFakeOpenAPIModels()
tc := NewFakeTypeConverter(m)
func NewDefaultTestFieldManager(gvk schema.GroupVersionKind) TestFieldManager {
return NewTestFieldManager(gvk, nil)
}
converter := internal.NewVersionConverter(tc, &fakeObjectConvertor{}, gvk.GroupVersion())
func NewTestFieldManager(gvk schema.GroupVersionKind, chainFieldManager func(fieldmanager.Manager) fieldmanager.Manager) TestFieldManager {
m := NewFakeOpenAPIModels()
typeConverter := NewFakeTypeConverter(m)
converter := internal.NewVersionConverter(typeConverter, &fakeObjectConvertor{}, gvk.GroupVersion())
apiVersion := fieldpath.APIVersion(gvk.GroupVersion().String())
objectConverter := &fakeObjectConvertor{converter, apiVersion}
f, err := fieldmanager.NewStructuredMergeManager(
m,
&fakeObjectConvertor{converter, apiVersion},
typeConverter,
objectConverter,
&fakeObjectDefaulter{},
gvk.GroupVersion(),
gvk.GroupVersion(),
@ -107,8 +111,13 @@ func NewTestFieldManager(gvk schema.GroupVersionKind) TestFieldManager {
f = fieldmanager.NewStripMetaManager(f)
f = fieldmanager.NewManagedFieldsUpdater(f)
f = fieldmanager.NewBuildManagerInfoManager(f, gvk.GroupVersion())
f = fieldmanager.NewLastAppliedManager(f, typeConverter, objectConverter, gvk.GroupVersion())
f = fieldmanager.NewLastAppliedUpdater(f)
if chainFieldManager != nil {
f = chainFieldManager(f)
}
return TestFieldManager{
fieldManager: f,
fieldManager: fieldmanager.NewFieldManager(f),
emptyObj: live,
liveObj: live.DeepCopyObject(),
}
@ -139,7 +148,7 @@ func (f *TestFieldManager) Reset() {
}
func (f *TestFieldManager) Apply(obj runtime.Object, manager string, force bool) error {
out, err := fieldmanager.NewFieldManager(f.fieldManager).Apply(f.liveObj, obj, manager, force)
out, err := f.fieldManager.Apply(f.liveObj, obj, manager, force)
if err == nil {
f.liveObj = out
}
@ -147,7 +156,7 @@ func (f *TestFieldManager) Apply(obj runtime.Object, manager string, force bool)
}
func (f *TestFieldManager) Update(obj runtime.Object, manager string) error {
out, err := fieldmanager.NewFieldManager(f.fieldManager).Update(f.liveObj, obj, manager)
out, err := f.fieldManager.Update(f.liveObj, obj, manager)
if err == nil {
f.liveObj = out
}
@ -166,7 +175,7 @@ func (f *TestFieldManager) ManagedFields() []metav1.ManagedFieldsEntry {
// TestUpdateApplyConflict tests that applying to an object, which
// wasn't created by apply, will give conflicts
func TestUpdateApplyConflict(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
patch := []byte(`{
"apiVersion": "apps/v1",
@ -227,7 +236,7 @@ func TestUpdateApplyConflict(t *testing.T) {
}
func TestApplyStripsFields(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
newObj := &unstructured.Unstructured{
Object: map[string]interface{}{
@ -260,7 +269,7 @@ func TestApplyStripsFields(t *testing.T) {
}
func TestVersionCheck(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal([]byte(`{
@ -300,7 +309,7 @@ func TestVersionCheck(t *testing.T) {
}
}
func TestVersionCheckDoesNotPanic(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal([]byte(`{
@ -339,7 +348,7 @@ func TestVersionCheckDoesNotPanic(t *testing.T) {
}
func TestApplyDoesNotStripLabels(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal([]byte(`{
@ -393,7 +402,7 @@ func TestApplyNewObject(t *testing.T) {
for _, test := range tests {
t.Run(test.gvk.String(), func(t *testing.T) {
f := NewTestFieldManager(test.gvk)
f := NewDefaultTestFieldManager(test.gvk)
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(test.obj, &appliedObj.Object); err != nil {
@ -431,7 +440,7 @@ func BenchmarkNewObject(b *testing.B) {
}
for _, test := range tests {
b.Run(test.gvk.Kind, func(b *testing.B) {
f := NewTestFieldManager(test.gvk)
f := NewDefaultTestFieldManager(test.gvk)
decoder := serializer.NewCodecFactory(scheme).UniversalDecoder(test.gvk.GroupVersion())
newObj, err := runtime.Decode(decoder, test.obj)
@ -650,7 +659,7 @@ func BenchmarkCompare(b *testing.B) {
}
func BenchmarkRepeatedUpdate(b *testing.B) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
podBytes := getObjectBytes("pod.yaml")
var obj *corev1.Pod
@ -689,7 +698,7 @@ func BenchmarkRepeatedUpdate(b *testing.B) {
}
func TestApplyFailsWithManagedFields(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal([]byte(`{
@ -714,7 +723,7 @@ func TestApplyFailsWithManagedFields(t *testing.T) {
}
func TestApplySuccessWithNoManagedFields(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal([]byte(`{
@ -737,7 +746,7 @@ func TestApplySuccessWithNoManagedFields(t *testing.T) {
// Run an update and apply, and make sure that nothing has changed.
func TestNoOpChanges(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
obj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal([]byte(`{
@ -787,7 +796,7 @@ func TestNoOpChanges(t *testing.T) {
// Tests that one can reset the managedFields by sending either an empty
// list
func TestResetManagedFieldsEmptyList(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
obj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal([]byte(`{
@ -828,7 +837,7 @@ func TestResetManagedFieldsEmptyList(t *testing.T) {
// Tests that one can reset the managedFields by sending either a list with one empty item.
func TestResetManagedFieldsEmptyItem(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
obj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal([]byte(`{
@ -866,3 +875,221 @@ func TestResetManagedFieldsEmptyItem(t *testing.T) {
t.Fatalf("failed to reset managedFields: %v", f.ManagedFields())
}
}
func TestServerSideApplyWithInvalidLastApplied(t *testing.T) {
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
// create object with client-side apply
newObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
deployment := []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app-v1
spec:
replicas: 1
`)
if err := yaml.Unmarshal(deployment, &newObj.Object); err != nil {
t.Errorf("error decoding YAML: %v", err)
}
invalidLastApplied := "invalid-object"
if err := setLastApplied(newObj, invalidLastApplied); err != nil {
t.Errorf("failed to set last applied: %v", err)
}
if err := f.Update(newObj, "kubectl-client-side-apply-test"); err != nil {
t.Errorf("failed to update object: %v", err)
}
lastApplied, err := getLastApplied(f.liveObj)
if err != nil {
t.Errorf("failed to get last applied: %v", err)
}
if lastApplied != invalidLastApplied {
t.Errorf("expected last applied annotation to be set to %q, but got: %q", invalidLastApplied, lastApplied)
}
// upgrade management of the object from client-side apply to server-side apply
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
appliedDeployment := []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app-v2
spec:
replicas: 100
`)
if err := yaml.Unmarshal(appliedDeployment, &appliedObj.Object); err != nil {
t.Errorf("error decoding YAML: %v", err)
}
if err := f.Apply(appliedObj, "kubectl", false); err == nil || !apierrors.IsConflict(err) {
t.Errorf("expected conflict when applying with invalid last-applied annotation, but got no error for object: \n%+v", appliedObj)
}
lastApplied, err = getLastApplied(f.liveObj)
if err != nil {
t.Errorf("failed to get last applied: %v", err)
}
if lastApplied != invalidLastApplied {
t.Errorf("expected last applied annotation to be NOT be updated, but got: %q", lastApplied)
}
// force server-side apply should work and fix the annotation
if err := f.Apply(appliedObj, "kubectl", true); err != nil {
t.Errorf("failed to force server-side apply with: %v", err)
}
lastApplied, err = getLastApplied(f.liveObj)
if err != nil {
t.Errorf("failed to get last applied: %v", err)
}
if lastApplied == invalidLastApplied ||
!strings.Contains(lastApplied, "my-app-v2") {
t.Errorf("expected last applied annotation to be updated, but got: %q", lastApplied)
}
}
func TestInteropForClientSideApplyAndServerSideApply(t *testing.T) {
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
// create object with client-side apply
newObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
deployment := []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app
spec:
replicas: 100
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image-v1
`)
if err := yaml.Unmarshal(deployment, &newObj.Object); err != nil {
t.Errorf("error decoding YAML: %v", err)
}
if err := setLastAppliedFromEncoded(newObj, deployment); err != nil {
t.Errorf("failed to set last applied: %v", err)
}
if err := f.Update(newObj, "kubectl-client-side-apply-test"); err != nil {
t.Errorf("failed to update object: %v", err)
}
lastApplied, err := getLastApplied(f.liveObj)
if err != nil {
t.Errorf("failed to get last applied: %v", err)
}
if !strings.Contains(lastApplied, "my-image-v1") {
t.Errorf("expected last applied annotation to be set properly, but got: %q", lastApplied)
}
// upgrade management of the object from client-side apply to server-side apply
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
appliedDeployment := []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app-v2 # change
spec:
replicas: 8 # change
selector:
matchLabels:
app: my-app-v2 # change
template:
metadata:
labels:
app: my-app-v2 # change
spec:
containers:
- name: my-c
image: my-image-v2 # change
`)
if err := yaml.Unmarshal(appliedDeployment, &appliedObj.Object); err != nil {
t.Errorf("error decoding YAML: %v", err)
}
if err := f.Apply(appliedObj, "kubectl", false); err != nil {
t.Errorf("error applying object: %v", err)
}
lastApplied, err = getLastApplied(f.liveObj)
if err != nil {
t.Errorf("failed to get last applied: %v", err)
}
if !strings.Contains(lastApplied, "my-image-v2") {
t.Errorf("expected last applied annotation to be updated, but got: %q", lastApplied)
}
}
func yamlToJSON(y []byte) (string, error) {
obj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(y, &obj.Object); err != nil {
return "", fmt.Errorf("error decoding YAML: %v", err)
}
serialization, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return "", fmt.Errorf("error encoding object: %v", err)
}
json, err := yamlutil.ToJSON(serialization)
if err != nil {
return "", fmt.Errorf("error converting to json: %v", err)
}
return string(json), nil
}
func setLastAppliedFromEncoded(obj runtime.Object, lastApplied []byte) error {
lastAppliedJSON, err := yamlToJSON(lastApplied)
if err != nil {
return err
}
return setLastApplied(obj, lastAppliedJSON)
}
func setLastApplied(obj runtime.Object, lastApplied string) error {
accessor := meta.NewAccessor()
annotations, err := accessor.Annotations(obj)
if err != nil {
return fmt.Errorf("failed to access annotations: %v", err)
}
if annotations == nil {
annotations = map[string]string{}
}
annotations[corev1.LastAppliedConfigAnnotation] = lastApplied
accessor.SetAnnotations(obj, annotations)
return nil
}
func getLastApplied(obj runtime.Object) (string, error) {
accessor := meta.NewAccessor()
annotations, err := accessor.Annotations(obj)
if err != nil {
return "", fmt.Errorf("failed to access annotations: %v", err)
}
if annotations == nil {
return "", fmt.Errorf("no annotations on obj: %v", obj)
}
lastApplied, ok := annotations[corev1.LastAppliedConfigAnnotation]
if !ok {
return "", fmt.Errorf("expected last applied annotation, but got none for object: %v", obj)
}
return lastApplied, nil
}

View File

@ -0,0 +1,173 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"encoding/json"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
"sigs.k8s.io/structured-merge-diff/v3/fieldpath"
"sigs.k8s.io/structured-merge-diff/v3/merge"
)
type lastAppliedManager struct {
fieldManager Manager
typeConverter internal.TypeConverter
objectConverter runtime.ObjectConvertor
groupVersion schema.GroupVersion
}
var _ Manager = &lastAppliedManager{}
// NewLastAppliedManager converts the client-side apply annotation to
// server-side apply managed fields
func NewLastAppliedManager(fieldManager Manager, typeConverter internal.TypeConverter, objectConverter runtime.ObjectConvertor, groupVersion schema.GroupVersion) Manager {
return &lastAppliedManager{
fieldManager: fieldManager,
typeConverter: typeConverter,
objectConverter: objectConverter,
groupVersion: groupVersion,
}
}
// Update implements Manager.
func (f *lastAppliedManager) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
return f.fieldManager.Update(liveObj, newObj, managed, manager)
}
// Apply will consider the last-applied annotation
// for upgrading an object managed by client-side apply to server-side apply
// without conflicts.
func (f *lastAppliedManager) Apply(liveObj, newObj runtime.Object, managed Managed, manager string, force bool) (runtime.Object, Managed, error) {
newLiveObj, newManaged, newErr := f.fieldManager.Apply(liveObj, newObj, managed, manager, force)
// Upgrade the client-side apply annotation only from kubectl server-side-apply.
// To opt-out of this behavior, users may specify a different field manager.
if manager != "kubectl" {
return newLiveObj, newManaged, newErr
}
// Check if we have conflicts
if newErr == nil {
return newLiveObj, newManaged, newErr
}
conflicts, ok := newErr.(merge.Conflicts)
if !ok {
return newLiveObj, newManaged, newErr
}
conflictSet := conflictsToSet(conflicts)
// Check if conflicts are allowed due to client-side apply,
// and if so, then force apply
allowedConflictSet, err := f.allowedConflictsFromLastApplied(liveObj)
if err != nil {
return newLiveObj, newManaged, newErr
}
if !conflictSet.Difference(allowedConflictSet).Empty() {
newConflicts := conflictsDifference(conflicts, allowedConflictSet)
return newLiveObj, newManaged, newConflicts
}
return f.fieldManager.Apply(liveObj, newObj, managed, manager, true)
}
func (f *lastAppliedManager) allowedConflictsFromLastApplied(liveObj runtime.Object) (*fieldpath.Set, error) {
var accessor, err = meta.Accessor(liveObj)
if err != nil {
panic(fmt.Sprintf("couldn't get accessor: %v", err))
}
// If there is no client-side apply annotation, then there is nothing to do
var annotations = accessor.GetAnnotations()
if annotations == nil {
return nil, fmt.Errorf("no last applied annotation")
}
var lastApplied, ok = annotations[corev1.LastAppliedConfigAnnotation]
if !ok || lastApplied == "" {
return nil, fmt.Errorf("no last applied annotation")
}
liveObjVersioned, err := f.objectConverter.ConvertToVersion(liveObj, f.groupVersion)
if err != nil {
return nil, fmt.Errorf("failed to convert live obj to versioned: %v", err)
}
liveObjTyped, err := f.typeConverter.ObjectToTyped(liveObjVersioned)
if err != nil {
return nil, fmt.Errorf("failed to convert live obj to typed: %v", err)
}
var lastAppliedObj = &unstructured.Unstructured{Object: map[string]interface{}{}}
err = json.Unmarshal([]byte(lastApplied), lastAppliedObj)
if err != nil {
return nil, fmt.Errorf("failed to decode last applied obj: %v in '%s'", err, lastApplied)
}
if lastAppliedObj.GetAPIVersion() != f.groupVersion.String() {
return nil, fmt.Errorf("expected version of last applied to match live object '%s', but got '%s': %v", f.groupVersion.String(), lastAppliedObj.GetAPIVersion(), err)
}
lastAppliedObjTyped, err := f.typeConverter.ObjectToTyped(lastAppliedObj)
if err != nil {
return nil, fmt.Errorf("failed to convert last applied to typed: %v", err)
}
lastAppliedObjFieldSet, err := lastAppliedObjTyped.ToFieldSet()
if err != nil {
return nil, fmt.Errorf("failed to create fieldset for last applied object: %v", err)
}
comparison, err := lastAppliedObjTyped.Compare(liveObjTyped)
if err != nil {
return nil, fmt.Errorf("failed to compare last applied object and live object: %v", err)
}
// Remove fields in last applied that are different, added, or missing in
// the live object.
// Because last-applied fields don't match the live object fields,
// then we don't own these fields.
lastAppliedObjFieldSet = lastAppliedObjFieldSet.
Difference(comparison.Modified).
Difference(comparison.Added).
Difference(comparison.Removed)
return lastAppliedObjFieldSet, nil
}
// TODO: replace with merge.Conflicts.ToSet()
func conflictsToSet(conflicts merge.Conflicts) *fieldpath.Set {
conflictSet := fieldpath.NewSet()
for _, conflict := range []merge.Conflict(conflicts) {
conflictSet.Insert(conflict.Path)
}
return conflictSet
}
func conflictsDifference(conflicts merge.Conflicts, s *fieldpath.Set) merge.Conflicts {
newConflicts := []merge.Conflict{}
for _, conflict := range []merge.Conflict(conflicts) {
if !s.Has(conflict.Path) {
newConflicts = append(newConflicts, conflict)
}
}
return newConflicts
}

View File

@ -0,0 +1,610 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager_test
import (
"fmt"
"reflect"
"testing"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
"sigs.k8s.io/structured-merge-diff/v3/fieldpath"
"sigs.k8s.io/structured-merge-diff/v3/merge"
"sigs.k8s.io/yaml"
)
// TestApplyUsingLastAppliedAnnotation tests that applying to an object
// created with the client-side apply last-applied annotation
// will not give conflicts
func TestApplyUsingLastAppliedAnnotation(t *testing.T) {
f := NewDefaultTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"))
tests := []struct {
lastApplied []byte
original []byte
applied []byte
fieldManager string
expectConflictSet *fieldpath.Set
}{
{
fieldManager: "kubectl",
lastApplied: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 3
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image-v1
- name: my-c2
image: my-image2
`),
original: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app # missing from last-applied
spec:
replicas: 100 # does not match last-applied
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image-v2 # does no match last-applied
# note that second container in last-applied is missing
`),
applied: []byte(`
# test conflicts due to fields not allowed by last-applied
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-new-label # NOT allowed: update label
spec:
replicas: 333 # NOT allowed: update replicas
selector:
matchLabels:
app: my-new-label # allowed: update label
template:
metadata:
labels:
app: my-new-label # allowed: update-label
spec:
containers:
- name: my-c
image: my-image-new # NOT allowed: update image
`),
expectConflictSet: fieldpath.NewSet(
fieldpath.MakePathOrDie("metadata", "labels", "app"),
fieldpath.MakePathOrDie("spec", "replicas"),
fieldpath.MakePathOrDie("spec", "template", "spec", "containers", fieldpath.KeyByFields("name", "my-c"), "image"),
),
},
{
fieldManager: "kubectl",
lastApplied: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app
spec:
replicas: 3
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`),
original: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app
spec:
replicas: 100 # does not match last applied
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`),
applied: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-new-label
spec:
replicas: 3 # expect conflict
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`),
expectConflictSet: fieldpath.NewSet(
fieldpath.MakePathOrDie("spec", "replicas"),
),
},
{
fieldManager: "kubectl",
original: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app
spec:
replicas: 100
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`),
applied: []byte(`
# applied object matches original
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app
spec:
replicas: 100
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`),
},
{
fieldManager: "kubectl",
original: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app
spec:
replicas: 3
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`),
applied: []byte(`
# test allowed update with no conflicts
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-new-label # update label
spec:
replicas: 333 # update replicas
selector:
matchLabels:
app: my-new-label # update label
template:
metadata:
labels:
app: my-new-label # update-label
spec:
containers:
- name: my-c
image: my-image
`),
},
{
fieldManager: "not_kubectl",
lastApplied: []byte(`
# expect conflicts because field manager is NOT kubectl
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app
spec:
replicas: 3
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image-v1
`),
original: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app
spec:
replicas: 100 # does not match last-applied
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image-v2 # does no match last-applied
`),
applied: []byte(`
# test conflicts due to fields not allowed by last-applied
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-new-label # update label
spec:
replicas: 333 # update replicas
selector:
matchLabels:
app: my-new-label # update label
template:
metadata:
labels:
app: my-new-label # update-label
spec:
containers:
- name: my-c
image: my-image-new # update image
`),
expectConflictSet: fieldpath.NewSet(
fieldpath.MakePathOrDie("metadata", "labels", "app"),
fieldpath.MakePathOrDie("spec", "replicas"),
fieldpath.MakePathOrDie("spec", "selector", "matchLabels", "app"),
fieldpath.MakePathOrDie("spec", "template", "metadata", "labels", "app"),
fieldpath.MakePathOrDie("spec", "template", "spec", "containers", fieldpath.KeyByFields("name", "my-c"), "image"),
),
},
{
fieldManager: "kubectl",
original: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app
spec:
replicas: 3
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`),
applied: []byte(`
# test allowed update with no conflicts
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-new-label
spec:
replicas: 3
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-new-image # update image
`),
},
{
fieldManager: "not_kubectl",
original: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-app
spec:
replicas: 100
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`),
applied: []byte(`
# expect changes to fail because field manager is not kubectl
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-new-label # update label
spec:
replicas: 3 # update replicas
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-new-image # update image
`),
expectConflictSet: fieldpath.NewSet(
fieldpath.MakePathOrDie("metadata", "labels", "app"),
fieldpath.MakePathOrDie("spec", "replicas"),
fieldpath.MakePathOrDie("spec", "template", "spec", "containers", fieldpath.KeyByFields("name", "my-c"), "image"),
),
},
{
fieldManager: "kubectl",
original: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 3
`),
applied: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 100 # update replicas
`),
},
{
fieldManager: "kubectl",
lastApplied: []byte(`
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 3
`),
original: []byte(`
apiVersion: apps/v1 # expect conflict due to apiVersion mismatch with last-applied
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 3
`),
applied: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 100 # update replicas
`),
expectConflictSet: fieldpath.NewSet(
fieldpath.MakePathOrDie("spec", "replicas"),
),
},
{
fieldManager: "kubectl",
lastApplied: []byte(`
apiVerison: foo
kind: bar
spec: expect conflict due to invalid object
`),
original: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 3
`),
applied: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 100 # update replicas
`),
expectConflictSet: fieldpath.NewSet(
fieldpath.MakePathOrDie("spec", "replicas"),
),
},
{
fieldManager: "kubectl",
// last-applied is empty
lastApplied: []byte{},
original: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 3
`),
applied: []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 100 # update replicas
`),
expectConflictSet: fieldpath.NewSet(
fieldpath.MakePathOrDie("spec", "replicas"),
),
},
}
for i, test := range tests {
t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) {
f.Reset()
originalObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(test.original, &originalObj.Object); err != nil {
t.Errorf("error decoding YAML: %v", err)
}
if test.lastApplied == nil {
test.lastApplied = test.original
}
if err := setLastAppliedFromEncoded(originalObj, test.lastApplied); err != nil {
t.Errorf("failed to set last applied: %v", err)
}
if err := f.Update(originalObj, "test_client_side_apply"); err != nil {
t.Errorf("failed to apply object: %v", err)
}
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal(test.applied, &appliedObj.Object); err != nil {
t.Errorf("error decoding YAML: %v", err)
}
err := f.Apply(appliedObj, test.fieldManager, false)
if test.expectConflictSet == nil {
if err != nil {
t.Errorf("expected no error but got %v", err)
}
return
}
if err == nil || !apierrors.IsConflict(err) {
t.Errorf("expected to get conflicts but got %v", err)
}
expectedConflicts := merge.Conflicts{}
test.expectConflictSet.Iterate(func(p fieldpath.Path) {
expectedConflicts = append(expectedConflicts, merge.Conflict{
Manager: `{"manager":"test_client_side_apply","operation":"Update","apiVersion":"apps/v1"}`,
Path: p,
})
})
expectedConflictErr := internal.NewConflictError(expectedConflicts)
if !reflect.DeepEqual(expectedConflictErr, err) {
t.Errorf("expected to get\n%+v\nbut got\n%+v", expectedConflictErr, err)
}
})
}
}

View File

@ -0,0 +1,117 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)
type lastAppliedUpdater struct {
fieldManager Manager
}
var _ Manager = &lastAppliedUpdater{}
// NewLastAppliedUpdater sets the client-side apply annotation up to date with
// server-side apply managed fields
func NewLastAppliedUpdater(fieldManager Manager) Manager {
return &lastAppliedUpdater{
fieldManager: fieldManager,
}
}
// Update implements Manager.
func (f *lastAppliedUpdater) Update(liveObj, newObj runtime.Object, managed Managed, manager string) (runtime.Object, Managed, error) {
return f.fieldManager.Update(liveObj, newObj, managed, manager)
}
// server-side apply managed fields
func (f *lastAppliedUpdater) Apply(liveObj, newObj runtime.Object, managed Managed, manager string, force bool) (runtime.Object, Managed, error) {
liveObj, managed, err := f.fieldManager.Apply(liveObj, newObj, managed, manager, force)
if err != nil {
return liveObj, managed, err
}
// Sync the client-side apply annotation only from kubectl server-side apply.
// To opt-out of this behavior, users may specify a different field manager.
//
// If the client-side apply annotation doesn't exist,
// then continue because we have no annotation to update
if manager == "kubectl" && hasLastApplied(liveObj) {
lastAppliedValue, err := buildLastApplied(newObj)
if err != nil {
return nil, nil, fmt.Errorf("failed to build last-applied annotation: %v", err)
}
err = setLastApplied(liveObj, lastAppliedValue)
if err != nil {
return nil, nil, fmt.Errorf("failed to set last-applied annotation: %v", err)
}
}
return liveObj, managed, err
}
func hasLastApplied(obj runtime.Object) bool {
var accessor, err = meta.Accessor(obj)
if err != nil {
panic(fmt.Sprintf("couldn't get accessor: %v", err))
}
var annotations = accessor.GetAnnotations()
if annotations == nil {
return false
}
_, ok := annotations[corev1.LastAppliedConfigAnnotation]
return ok
}
func setLastApplied(obj runtime.Object, value string) error {
accessor, err := meta.Accessor(obj)
if err != nil {
panic(fmt.Sprintf("couldn't get accessor: %v", err))
}
var annotations = accessor.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[corev1.LastAppliedConfigAnnotation] = value
accessor.SetAnnotations(annotations)
return nil
}
func buildLastApplied(obj runtime.Object) (string, error) {
obj = obj.DeepCopyObject()
var accessor, err = meta.Accessor(obj)
if err != nil {
panic(fmt.Sprintf("couldn't get accessor: %v", err))
}
// Remove the annotation from the object before encoding the object
var annotations = accessor.GetAnnotations()
delete(annotations, corev1.LastAppliedConfigAnnotation)
accessor.SetAnnotations(annotations)
lastApplied, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return "", fmt.Errorf("couldn't encode object into last applied annotation: %v", err)
}
return string(lastApplied), nil
}

View File

@ -0,0 +1,91 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fieldmanager_test
import (
"strings"
"testing"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"sigs.k8s.io/yaml"
)
func TestLastAppliedUpdater(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("apps/v1", "Deployment"),
func(m fieldmanager.Manager) fieldmanager.Manager {
return fieldmanager.NewLastAppliedUpdater(m)
})
originalLastApplied := `nonempty`
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
appliedDeployment := []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
annotations:
"kubectl.kubernetes.io/last-applied-configuration": "` + originalLastApplied + `"
labels:
app: my-app
spec:
replicas: 20
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`)
if err := yaml.Unmarshal(appliedDeployment, &appliedObj.Object); err != nil {
t.Errorf("error decoding YAML: %v", err)
}
if err := f.Apply(appliedObj, "NOT-KUBECTL", false); err != nil {
t.Errorf("error applying object: %v", err)
}
lastApplied, err := getLastApplied(f.liveObj)
if err != nil {
t.Errorf("failed to get last applied: %v", err)
}
if lastApplied != originalLastApplied {
t.Errorf("expected last applied annotation to be %q and NOT be updated, but got: %q", originalLastApplied, lastApplied)
}
if err := f.Apply(appliedObj, "kubectl", false); err != nil {
t.Errorf("error applying object: %v", err)
}
lastApplied, err = getLastApplied(f.liveObj)
if err != nil {
t.Errorf("failed to get last applied: %v", err)
}
if lastApplied == originalLastApplied ||
!strings.Contains(lastApplied, "my-app") ||
!strings.Contains(lastApplied, "my-image") {
t.Errorf("expected last applied annotation to be updated, but got: %q", lastApplied)
}
}

View File

@ -43,12 +43,13 @@ func (f *fakeObjectCreater) New(_ schema.GroupVersionKind) (runtime.Object, erro
}
func TestNoUpdateBeforeFirstApply(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f.fieldManager = fieldmanager.NewSkipNonAppliedManager(
f.fieldManager,
&fakeObjectCreater{gvk: schema.GroupVersionKind{Version: "v1", Kind: "Pod"}},
schema.GroupVersionKind{},
)
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"), func(m fieldmanager.Manager) fieldmanager.Manager {
return fieldmanager.NewSkipNonAppliedManager(
m,
&fakeObjectCreater{gvk: schema.GroupVersionKind{Version: "v1", Kind: "Pod"}},
schema.GroupVersionKind{},
)
})
appliedObj := &unstructured.Unstructured{Object: map[string]interface{}{}}
if err := yaml.Unmarshal([]byte(`{
@ -82,12 +83,13 @@ func TestNoUpdateBeforeFirstApply(t *testing.T) {
}
func TestUpdateBeforeFirstApply(t *testing.T) {
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"))
f.fieldManager = fieldmanager.NewSkipNonAppliedManager(
f.fieldManager,
&fakeObjectCreater{gvk: schema.GroupVersionKind{Version: "v1", Kind: "Pod"}},
schema.GroupVersionKind{},
)
f := NewTestFieldManager(schema.FromAPIVersionAndKind("v1", "Pod"), func(m fieldmanager.Manager) fieldmanager.Manager {
return fieldmanager.NewSkipNonAppliedManager(
m,
&fakeObjectCreater{gvk: schema.GroupVersionKind{Version: "v1", Kind: "Pod"}},
schema.GroupVersionKind{},
)
})
updatedObj := &corev1.Pod{}
updatedObj.Kind = "Pod"

View File

@ -24,7 +24,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/structured-merge-diff/v3/fieldpath"
"sigs.k8s.io/structured-merge-diff/v3/merge"
)
@ -42,12 +41,7 @@ var _ Manager = &structuredMergeManager{}
// NewStructuredMergeManager creates a new Manager that merges apply requests
// and update managed fields for other types of requests.
func NewStructuredMergeManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion) (Manager, error) {
typeConverter, err := internal.NewTypeConverter(models, false)
if err != nil {
return nil, err
}
func NewStructuredMergeManager(typeConverter internal.TypeConverter, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion) (Manager, error) {
return &structuredMergeManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
@ -63,14 +57,7 @@ func NewStructuredMergeManager(models openapiproto.Models, objectConverter runti
// NewCRDStructuredMergeManager creates a new Manager specifically for
// CRDs. This allows for the possibility of fields which are not defined
// in models, as well as having no models defined at all.
func NewCRDStructuredMergeManager(models openapiproto.Models, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion, preserveUnknownFields bool) (_ Manager, err error) {
var typeConverter internal.TypeConverter = internal.DeducedTypeConverter{}
if models != nil {
typeConverter, err = internal.NewTypeConverter(models, preserveUnknownFields)
if err != nil {
return nil, err
}
}
func NewCRDStructuredMergeManager(typeConverter internal.TypeConverter, objectConverter runtime.ObjectConvertor, objectDefaulter runtime.ObjectDefaulter, gv schema.GroupVersion, hub schema.GroupVersion, preserveUnknownFields bool) (_ Manager, err error) {
return &structuredMergeManager{
typeConverter: typeConverter,
objectConverter: objectConverter,
@ -149,9 +136,6 @@ func (f *structuredMergeManager) Apply(liveObj, patchObj runtime.Object, managed
apiVersion := fieldpath.APIVersion(f.groupVersion.String())
newObjTyped, managedFields, err := f.updater.Apply(liveObjTyped, patchObjTyped, apiVersion, managed.Fields(), manager, force)
if err != nil {
if conflicts, ok := err.(merge.Conflicts); ok {
return nil, nil, internal.NewConflictError(conflicts)
}
return nil, nil, err
}
managed = internal.NewManaged(managedFields, managed.Times())

View File

@ -423,6 +423,7 @@ type applyPatcher struct {
creater runtime.ObjectCreater
kind schema.GroupVersionKind
fieldManager *fieldmanager.FieldManager
userAgent string
}
func (p *applyPatcher) applyPatchToCurrentObject(obj runtime.Object) (runtime.Object, error) {
@ -571,6 +572,7 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti
options: p.options,
creater: p.creater,
kind: p.kind,
userAgent: p.userAgent,
}
p.forceAllowCreate = true
default:

View File

@ -393,6 +393,34 @@ run_kubectl_server_side_apply_tests() {
# clean-up
kubectl delete -f hack/testdata/pod.yaml "${kube_flags[@]:?}"
## kubectl apply upgrade
# Pre-Condition: no POD exists
kube::test::get_object_assert pods "{{range.items}}{{${id_field:?}}}:{{end}}" ''
kube::log::status "Testing upgrade kubectl client-side apply to server-side apply"
# run client-side apply
kubectl apply -f hack/testdata/pod.yaml "${kube_flags[@]:?}"
# test upgrade does not work with non-standard server-side apply field manager
! kubectl apply --server-side --field-manager="not-kubectl" -f hack/testdata/pod-apply.yaml "${kube_flags[@]:?}" || exit 1
# test upgrade from client-side apply to server-side apply
kubectl apply --server-side -f hack/testdata/pod-apply.yaml "${kube_flags[@]:?}"
# Post-Condition: pod "test-pod" has configuration annotation
grep -q kubectl.kubernetes.io/last-applied-configuration <<< "$(kubectl get pods test-pod -o yaml "${kube_flags[@]:?}")"
output_message=$(kubectl apply view-last-applied pod/test-pod -o json 2>&1 "${kube_flags[@]:?}")
kube::test::if_has_string "${output_message}" '"name": "test-pod-applied"'
kube::log::status "Testing downgrade kubectl server-side apply to client-side apply"
# test downgrade from server-side apply to client-side apply
kubectl apply --server-side -f hack/testdata/pod.yaml "${kube_flags[@]:?}"
# Post-Condition: pod "test-pod" has configuration annotation
grep -q kubectl.kubernetes.io/last-applied-configuration <<< "$(kubectl get pods test-pod -o yaml "${kube_flags[@]:?}")"
output_message=$(kubectl apply view-last-applied pod/test-pod -o json 2>&1 "${kube_flags[@]:?}")
kube::test::if_has_string "${output_message}" '"name": "test-pod-label"'
kubectl apply -f hack/testdata/pod-apply.yaml "${kube_flags[@]:?}"
# clean-up
kubectl delete -f hack/testdata/pod.yaml "${kube_flags[@]:?}"
## kubectl apply dry-run on CR
# Create CRD
kubectl "${kube_flags_with_token[@]}" create -f - << __EOF__

View File

@ -28,6 +28,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/yaml:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
@ -2464,3 +2465,133 @@ func benchRepeatedUpdate(client kubernetes.Interface, podName string) func(*test
}
}
}
func TestUpgradeClientSideToServerSideApply(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)()
_, client, closeFn := setup(t)
defer closeFn()
obj := []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
annotations:
"kubectl.kubernetes.io/last-applied-configuration": |
{"kind":"Deployment","apiVersion":"apps/v1","metadata":{"name":"my-deployment","labels":{"app":"my-app"}},"spec":{"replicas": 3,"template":{"metadata":{"labels":{"app":"my-app"}},"spec":{"containers":[{"name":"my-c","image":"my-image"}]}}}}
labels:
app: my-app
spec:
replicas: 100000
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`)
deployment, err := yamlutil.ToJSON(obj)
if err != nil {
t.Fatalf("Failed marshal yaml: %v", err)
}
_, err = client.CoreV1().RESTClient().Post().
AbsPath("/apis/apps/v1").
Namespace("default").
Resource("deployments").
Body(deployment).Do(context.TODO()).Get()
if err != nil {
t.Fatalf("Failed to create object: %v", err)
}
obj = []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-new-label
spec:
replicas: 3 # expect conflict
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image
`)
deployment, err = yamlutil.ToJSON(obj)
if err != nil {
t.Fatalf("Failed marshal yaml: %v", err)
}
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath("/apis/apps/v1").
Namespace("default").
Resource("deployments").
Name("my-deployment").
Param("fieldManager", "kubectl").
Body(deployment).
Do(context.TODO()).
Get()
if !apierrors.IsConflict(err) {
t.Fatalf("Expected conflict error but got: %v", err)
}
obj = []byte(`
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
labels:
app: my-new-label
spec:
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-c
image: my-image-new
`)
deployment, err = yamlutil.ToJSON(obj)
if err != nil {
t.Fatalf("Failed marshal yaml: %v", err)
}
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath("/apis/apps/v1").
Namespace("default").
Resource("deployments").
Name("my-deployment").
Param("fieldManager", "kubectl").
Body(deployment).
Do(context.TODO()).
Get()
if err != nil {
t.Fatalf("Failed to apply object: %v", err)
}
deploymentObj, err := client.AppsV1().Deployments("default").Get(context.TODO(), "my-deployment", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get object: %v", err)
}
if *deploymentObj.Spec.Replicas != 100000 {
t.Fatalf("expected to get obj with replicas %d, but got %d", 100000, *deploymentObj.Spec.Replicas)
}
if deploymentObj.Spec.Template.Spec.Containers[0].Image != "my-image-new" {
t.Fatalf("expected to get obj with image %s, but got %s", "my-image-new", deploymentObj.Spec.Template.Spec.Containers[0].Image)
}
}