Merge pull request #96185 from roycaihw/dedup-owner-references

Apiserver dedups owner references before creating/updating objects
This commit is contained in:
Kubernetes Prow Robot 2020-11-19 14:54:52 -08:00 committed by GitHub
commit d0398c395a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 777 additions and 6 deletions

View File

@ -65,6 +65,7 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers",
importpath = "k8s.io/apiserver/pkg/endpoints/handlers",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
@ -103,6 +104,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/wsstream:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/warning:go_default_library",
"//vendor/github.com/evanphx/json-patch:go_default_library",
"//vendor/golang.org/x/net/websocket:go_default_library",
"//vendor/google.golang.org/grpc/codes:go_default_library",

View File

@ -152,6 +152,8 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
options,
)
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
if scope.FieldManager != nil {
liveObj, err := scope.Creater.New(scope.Kind)
@ -165,6 +167,8 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
return nil, err
}
}
// Dedup owner references again after mutating admission happens
dedupOwnerReferencesAndAddWarning(obj, req.Context(), true)
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.

View File

@ -569,9 +569,14 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti
default:
return nil, false, fmt.Errorf("%v: unimplemented patch type", p.patchType)
}
dedupOwnerReferencesTransformer := func(_ context.Context, obj, _ runtime.Object) (runtime.Object, error) {
// Dedup owner references after mutating admission happens
dedupOwnerReferencesAndAddWarning(obj, ctx, true)
return obj, nil
}
wasCreated := false
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission)
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission, dedupOwnerReferencesTransformer)
requestFunc := func() (runtime.Object, error) {
// Pass in UpdateOptions to override UpdateStrategy.AllowUpdateOnCreate
options := patchToUpdateOptions(p.options)
@ -585,11 +590,15 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti
// it is safe to remove managedFields (which can be large) and try again.
if isTooLargeError(err) && p.patchType != types.ApplyPatchType {
if _, accessorErr := meta.Accessor(p.restPatcher.New()); accessorErr == nil {
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission, func(_ context.Context, obj, _ runtime.Object) (runtime.Object, error) {
accessor, _ := meta.Accessor(obj)
accessor.SetManagedFields(nil)
return obj, nil
})
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil,
p.applyPatch,
p.applyAdmission,
dedupOwnerReferencesTransformer,
func(_ context.Context, obj, _ runtime.Object) (runtime.Object, error) {
accessor, _ := meta.Accessor(obj)
accessor.SetManagedFields(nil)
return obj, nil
})
result, err = requestFunc()
}
}

View File

@ -31,12 +31,14 @@ import (
grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
@ -46,6 +48,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/warning"
"k8s.io/klog/v2"
)
@ -53,6 +56,14 @@ const (
// 34 chose as a number close to 30 that is likely to be unique enough to jump out at me the next time I see a timeout.
// Everyone chooses 30.
requestTimeout = 34 * time.Second
// DuplicateOwnerReferencesWarningFormat is the warning that a client receives when a create/update request contains
// duplicate owner reference entries.
DuplicateOwnerReferencesWarningFormat = ".metadata.ownerReferences contains duplicate entries; API server dedups owner references in 1.20+, and may reject such requests as early as 1.24; please fix your requests; duplicate UID(s) observed: %v"
// DuplicateOwnerReferencesAfterMutatingAdmissionWarningFormat indicates the duplication was observed
// after mutating admission.
// NOTE: For CREATE and UPDATE requests the API server dedups both before and after mutating admission.
// For PATCH request the API server only dedups after mutating admission.
DuplicateOwnerReferencesAfterMutatingAdmissionWarningFormat = ".metadata.ownerReferences contains duplicate entries after mutating admission happens; API server dedups owner references in 1.20+, and may reject such requests as early as 1.24; please fix your requests; duplicate UID(s) observed: %v"
)
// RequestScope encapsulates common fields across all RESTful handler methods.
@ -329,6 +340,68 @@ func checkName(obj runtime.Object, name, namespace string, namer ScopeNamer) err
return nil
}
// dedupOwnerReferences dedups owner references over the entire entry.
// NOTE: We don't know enough about the existing cases of owner references
// sharing the same UID but different fields. Nor do we know what might break.
// In the future we may just dedup/reject owner references with the same UID.
func dedupOwnerReferences(refs []metav1.OwnerReference) ([]metav1.OwnerReference, []string) {
var result []metav1.OwnerReference
var duplicates []string
seen := make(map[types.UID]struct{})
for _, ref := range refs {
_, ok := seen[ref.UID]
// Short-circuit if we haven't seen the UID before. Otherwise
// check the entire list we have so far.
if !ok || !hasOwnerReference(result, ref) {
seen[ref.UID] = struct{}{}
result = append(result, ref)
} else {
duplicates = append(duplicates, string(ref.UID))
}
}
return result, duplicates
}
// hasOwnerReference returns true if refs has an item equal to ref. The function
// focuses on semantic equality instead of memory equality, to catch duplicates
// with different pointer addresses. The function uses apiequality.Semantic
// instead of implementing its own comparison, to tolerate API changes to
// metav1.OwnerReference.
// NOTE: This is expensive, but we accept it because we've made sure it only
// happens to owner references containing duplicate UIDs, plus typically the
// number of items in the list should be small.
func hasOwnerReference(refs []metav1.OwnerReference, ref metav1.OwnerReference) bool {
for _, r := range refs {
if apiequality.Semantic.DeepEqual(r, ref) {
return true
}
}
return false
}
// dedupOwnerReferencesAndAddWarning dedups owner references in the object metadata.
// If duplicates are found, the function records a warning to the provided context.
func dedupOwnerReferencesAndAddWarning(obj runtime.Object, requestContext context.Context, afterMutatingAdmission bool) {
accessor, err := meta.Accessor(obj)
if err != nil {
// The object doesn't have metadata. Nothing we need to do here.
return
}
refs := accessor.GetOwnerReferences()
deduped, duplicates := dedupOwnerReferences(refs)
if len(duplicates) > 0 {
// NOTE: For CREATE and UPDATE requests the API server dedups both before and after mutating admission.
// For PATCH request the API server only dedups after mutating admission.
format := DuplicateOwnerReferencesWarningFormat
if afterMutatingAdmission {
format = DuplicateOwnerReferencesAfterMutatingAdmissionWarningFormat
}
warning.AddWarning(requestContext, "", fmt.Sprintf(format,
strings.Join(duplicates, ", ")))
accessor.SetOwnerReferences(deduped)
}
}
// setObjectSelfLink sets the self link of an object as needed.
// TODO: remove the need for the namer LinkSetters by requiring objects implement either Object or List
// interfaces

View File

@ -1113,3 +1113,230 @@ want: %#+v`, got, converted)
})
}
}
func TestDedupOwnerReferences(t *testing.T) {
falseA := false
falseB := false
testCases := []struct {
name string
ownerReferences []metav1.OwnerReference
expected []metav1.OwnerReference
}{
{
name: "simple multiple duplicates",
ownerReferences: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "2",
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "2",
},
},
expected: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "2",
},
},
},
{
name: "don't dedup same uid different name entries",
ownerReferences: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name1",
UID: "1",
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name2",
UID: "1",
},
},
expected: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name1",
UID: "1",
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name2",
UID: "1",
},
},
},
{
name: "don't dedup same uid different API version entries",
ownerReferences: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion1",
Kind: "customresourceKind",
Name: "name",
UID: "1",
},
{
APIVersion: "customresourceVersion2",
Kind: "customresourceKind",
Name: "name",
UID: "1",
},
},
expected: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion1",
Kind: "customresourceKind",
Name: "name",
UID: "1",
},
{
APIVersion: "customresourceVersion2",
Kind: "customresourceKind",
Name: "name",
UID: "1",
},
},
},
{
name: "dedup memory-equal entries",
ownerReferences: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
Controller: &falseA,
BlockOwnerDeletion: &falseA,
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
Controller: &falseA,
BlockOwnerDeletion: &falseA,
},
},
expected: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
Controller: &falseA,
BlockOwnerDeletion: &falseA,
},
},
},
{
name: "dedup semantic-equal entries",
ownerReferences: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
Controller: &falseA,
BlockOwnerDeletion: &falseA,
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
Controller: &falseB,
BlockOwnerDeletion: &falseB,
},
},
expected: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
Controller: &falseA,
BlockOwnerDeletion: &falseA,
},
},
},
{
name: "don't dedup semantic-different entries",
ownerReferences: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
Controller: &falseA,
BlockOwnerDeletion: &falseA,
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
},
},
expected: []metav1.OwnerReference{
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
Controller: &falseA,
BlockOwnerDeletion: &falseA,
},
{
APIVersion: "customresourceVersion",
Kind: "customresourceKind",
Name: "name",
UID: "1",
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
deduped, _ := dedupOwnerReferences(tc.ownerReferences)
if !apiequality.Semantic.DeepEqual(deduped, tc.expected) {
t.Errorf("diff: %v", diff.ObjectReflectDiff(deduped, tc.expected))
}
})
}
}

View File

@ -150,6 +150,11 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
}
return newObj, nil
})
transformers = append(transformers, func(ctx context.Context, newObj, oldObj runtime.Object) (runtime.Object, error) {
// Dedup owner references again after mutating admission happens
dedupOwnerReferencesAndAddWarning(newObj, req.Context(), true)
return newObj, nil
})
}
createAuthorizerAttributes := authorizer.AttributesRecord{
@ -185,6 +190,8 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
wasCreated = created
return obj, err
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,

View File

@ -24,6 +24,7 @@ go_test(
],
deps = [
"//cmd/kube-apiserver/app/options:go_default_library",
"//cmd/kube-apiserver/app/testing:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/controlplane:go_default_library",
"//pkg/controlplane/reconcilers:go_default_library",
@ -60,8 +61,10 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
@ -77,6 +80,7 @@ go_test(
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/kubectl/pkg/cmd/util:go_default_library",
"//test/integration:go_default_library",
"//test/integration/etcd:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/github.com/google/uuid:go_default_library",
"//vendor/k8s.io/gengo/examples/set-gen/sets:go_default_library",

View File

@ -4,6 +4,7 @@ go_test(
name = "go_default_test",
srcs = [
"admission_test.go",
"apiserver_handler_test.go",
"broken_webhook_test.go",
"client_auth_test.go",
"load_balance_test.go",
@ -36,9 +37,11 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",

View File

@ -0,0 +1,233 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admissionwebhook
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
v1 "k8s.io/api/admission/v1"
admissionv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/handlers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
// TestMutatingWebhookDuplicateOwnerReferences ensures that the API server
// handler correctly deduplicates owner references if a mutating webhook
// patches create/update requests with duplicate owner references.
func TestMutatingWebhookDuplicateOwnerReferences(t *testing.T) {
roots := x509.NewCertPool()
if !roots.AppendCertsFromPEM(localhostCert) {
t.Fatal("Failed to append Cert from PEM")
}
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
if err != nil {
t.Fatalf("Failed to build cert with error: %+v", err)
}
webhookServer := httptest.NewUnstartedServer(newDuplicateOwnerReferencesWebhookHandler(t))
webhookServer.TLS = &tls.Config{
RootCAs: roots,
Certificates: []tls.Certificate{cert},
}
webhookServer.StartTLS()
defer webhookServer.Close()
s := kubeapiservertesting.StartTestServerOrDie(t,
kubeapiservertesting.NewDefaultTestServerOptions(), []string{
"--disable-admission-plugins=ServiceAccount",
}, framework.SharedEtcd())
defer s.TearDownFn()
b := &bytes.Buffer{}
warningWriter := restclient.NewWarningWriter(b, restclient.WarningWriterOptions{})
s.ClientConfig.WarningHandler = warningWriter
client := clientset.NewForConfigOrDie(s.ClientConfig)
if _, err := client.CoreV1().Pods("default").Create(
context.TODO(), duplicateOwnerReferencesMarkerFixture, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
fail := admissionv1.Fail
none := admissionv1.SideEffectClassNone
mutatingCfg, err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(context.TODO(), &admissionv1.MutatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{Name: "dup-owner-references.admission.integration.test"},
Webhooks: []admissionv1.MutatingWebhook{{
Name: "dup-owner-references.admission.integration.test",
ClientConfig: admissionv1.WebhookClientConfig{
URL: &webhookServer.URL,
CABundle: localhostCert,
},
Rules: []admissionv1.RuleWithOperations{{
Operations: []admissionv1.OperationType{admissionv1.Create, admissionv1.Update},
Rule: admissionv1.Rule{APIGroups: []string{""}, APIVersions: []string{"v1"}, Resources: []string{"pods"}},
}},
FailurePolicy: &fail,
AdmissionReviewVersions: []string{"v1", "v1beta1"},
SideEffects: &none,
}},
}, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
defer func() {
err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(context.TODO(), mutatingCfg.GetName(), metav1.DeleteOptions{})
if err != nil {
t.Fatal(err)
}
}()
// Make sure dedup happens in patch requests
var pod *corev1.Pod
var lastErr string
// wait until new webhook is called
expectedWarning := fmt.Sprintf(handlers.DuplicateOwnerReferencesAfterMutatingAdmissionWarningFormat,
duplicateOwnerReferencesMarkerFixture.OwnerReferences[0].UID)
if err := wait.PollImmediate(time.Millisecond*5, wait.ForeverTestTimeout, func() (bool, error) {
pod, err = client.CoreV1().Pods("default").Patch(context.TODO(), duplicateOwnerReferencesMarkerFixture.Name, types.JSONPatchType, []byte("[]"), metav1.PatchOptions{})
if err != nil {
return false, err
}
if warningWriter.WarningCount() == 0 {
lastErr = fmt.Sprintf("no warning, owner references: %v", pod.OwnerReferences)
return false, nil
}
if !strings.Contains(b.String(), expectedWarning) {
lastErr = fmt.Sprintf("unexpected warning, expected: %v, got: %v",
expectedWarning, b.String())
return false, nil
}
if len(pod.OwnerReferences) != 1 {
lastErr = fmt.Sprintf("unexpected owner references, expected one entry, got: %v",
pod.OwnerReferences)
return false, nil
}
return true, nil
}); err != nil {
t.Fatalf("failed to wait for apiserver handling webhook mutation: %v, last error: %v", err, lastErr)
}
if strings.Contains(b.String(), ".metadata.ownerReferences contains duplicate entries,") {
t.Errorf("unexpected warning happened before mutating admission")
}
if warningWriter.WarningCount() != 1 {
t.Errorf("expected one warning, got: %v", warningWriter.WarningCount())
}
b.Reset()
// Make sure dedup happens in update requests
pod, err = client.CoreV1().Pods("default").Update(context.TODO(), pod, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
if warningWriter.WarningCount() != 2 {
t.Errorf("expected two warnings, got: %v", warningWriter.WarningCount())
}
if !strings.Contains(b.String(), expectedWarning) {
t.Errorf("unexpected warning, expected: %v, got: %v",
expectedWarning, b.String())
}
if strings.Contains(b.String(), ".metadata.ownerReferences contains duplicate entries,") {
t.Errorf("unexpected warning happened before mutating admission")
}
b.Reset()
if err := client.CoreV1().Pods("default").Delete(context.TODO(), duplicateOwnerReferencesMarkerFixture.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("failed to delete marker pod: %v", err)
}
// expect no more warning
if warningWriter.WarningCount() != 2 {
t.Errorf("expected two warnings, got: %v", warningWriter.WarningCount())
}
}
func newDuplicateOwnerReferencesWebhookHandler(t *testing.T) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
review := v1.AdmissionReview{}
if err := json.Unmarshal(data, &review); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
if len(review.Request.Object.Raw) == 0 {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
pod := &corev1.Pod{}
if err := json.Unmarshal(review.Request.Object.Raw, pod); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
review.Response = &v1.AdmissionResponse{
Allowed: true,
UID: review.Request.UID,
Result: &metav1.Status{Message: "admitted"},
}
if len(pod.OwnerReferences) > 0 {
review.Response.Patch = []byte(fmt.Sprintf(`[{"op":"add","path":"/metadata/ownerReferences/-","value":{"apiVersion":"v1", "kind": "Node", "name": "fake-node", "uid": "%v"}}]`, pod.OwnerReferences[0].UID))
jsonPatch := v1.PatchTypeJSONPatch
review.Response.PatchType = &jsonPatch
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(review); err != nil {
t.Errorf("Marshal of response failed with error: %v", err)
}
})
}
var duplicateOwnerReferencesMarkerFixture = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "duplicate-owner-references-test-marker",
OwnerReferences: []metav1.OwnerReference{{
APIVersion: "v1",
Kind: "Node",
Name: "fake-node",
UID: uuid.NewUUID(),
}},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "fake-name",
Image: "fakeimage",
}},
},
}

View File

@ -49,8 +49,10 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
@ -61,9 +63,11 @@ import (
"k8s.io/client-go/tools/pager"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework"
)
@ -2136,3 +2140,208 @@ func expectPartialObjectMetaV1EventsProtobuf(t *testing.T, r io.Reader, values .
}
}
}
func TestDedupOwnerReferences(t *testing.T) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
etcd.CreateTestCRDs(t, apiextensionsclient.NewForConfigOrDie(server.ClientConfig), false, etcd.GetCustomResourceDefinitionData()[0])
b := &bytes.Buffer{}
warningWriter := restclient.NewWarningWriter(b, restclient.WarningWriterOptions{})
server.ClientConfig.WarningHandler = warningWriter
client := clientset.NewForConfigOrDie(server.ClientConfig)
dynamicClient := dynamic.NewForConfigOrDie(server.ClientConfig)
ns := "test-dedup-owner-references"
// create test namespace
_, err := client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns,
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create test ns: %v", err)
}
// some fake owner references
fakeRefA := metav1.OwnerReference{
APIVersion: "v1",
Kind: "ConfigMap",
Name: "fake-configmap",
UID: uuid.NewUUID(),
}
fakeRefB := metav1.OwnerReference{
APIVersion: "v1",
Kind: "Node",
Name: "fake-node",
UID: uuid.NewUUID(),
}
fakeRefC := metav1.OwnerReference{
APIVersion: "cr.bar.com/v1",
Kind: "Foo",
Name: "fake-foo",
UID: uuid.NewUUID(),
}
tcs := []struct {
gvr schema.GroupVersionResource
kind string
}{
{
gvr: schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "configmaps",
},
kind: "ConfigMap",
},
{
gvr: schema.GroupVersionResource{
Group: "cr.bar.com",
Version: "v1",
Resource: "foos",
},
kind: "Foo",
},
}
for i, tc := range tcs {
t.Run(tc.gvr.String(), func(t *testing.T) {
previousWarningCount := i * 3
c := &dependentClient{
t: t,
client: dynamicClient.Resource(tc.gvr).Namespace(ns),
gvr: tc.gvr,
kind: tc.kind,
}
klog.Infof("creating dependent with duplicate owner references")
dependent := c.createDependentWithOwners([]metav1.OwnerReference{fakeRefA, fakeRefA})
assertManagedFields(t, dependent)
expectedWarning := fmt.Sprintf(handlers.DuplicateOwnerReferencesWarningFormat, fakeRefA.UID)
assertOwnerReferences(t, dependent, []metav1.OwnerReference{fakeRefA})
assertWarningCount(t, warningWriter, previousWarningCount+1)
assertWarningMessage(t, b, expectedWarning)
klog.Infof("updating dependent with duplicate owner references")
dependent = c.updateDependentWithOwners(dependent, []metav1.OwnerReference{fakeRefA, fakeRefA})
assertManagedFields(t, dependent)
assertOwnerReferences(t, dependent, []metav1.OwnerReference{fakeRefA})
assertWarningCount(t, warningWriter, previousWarningCount+2)
assertWarningMessage(t, b, expectedWarning)
klog.Infof("patching dependent with duplicate owner reference")
dependent = c.patchDependentWithOwner(dependent, fakeRefA)
// TODO: currently a patch request that duplicates owner references can still
// wipe out managed fields. Note that this happens to built-in resources but
// not custom resources. In future we should either dedup before writing manage
// fields, or stop deduping and reject the request.
// assertManagedFields(t, dependent)
expectedPatchWarning := fmt.Sprintf(handlers.DuplicateOwnerReferencesAfterMutatingAdmissionWarningFormat, fakeRefA.UID)
assertOwnerReferences(t, dependent, []metav1.OwnerReference{fakeRefA})
assertWarningCount(t, warningWriter, previousWarningCount+3)
assertWarningMessage(t, b, expectedPatchWarning)
klog.Infof("updating dependent with different owner references")
dependent = c.updateDependentWithOwners(dependent, []metav1.OwnerReference{fakeRefA, fakeRefB})
assertOwnerReferences(t, dependent, []metav1.OwnerReference{fakeRefA, fakeRefB})
assertWarningCount(t, warningWriter, previousWarningCount+3)
assertWarningMessage(t, b, "")
klog.Infof("patching dependent with different owner references")
dependent = c.patchDependentWithOwner(dependent, fakeRefC)
assertOwnerReferences(t, dependent, []metav1.OwnerReference{fakeRefA, fakeRefB, fakeRefC})
assertWarningCount(t, warningWriter, previousWarningCount+3)
assertWarningMessage(t, b, "")
klog.Infof("deleting dependent")
c.deleteDependent()
assertWarningCount(t, warningWriter, previousWarningCount+3)
assertWarningMessage(t, b, "")
})
}
// cleanup
if err := client.CoreV1().Namespaces().Delete(context.TODO(), ns, metav1.DeleteOptions{}); err != nil {
t.Fatalf("failed to delete test ns: %v", err)
}
}
type dependentClient struct {
t *testing.T
client dynamic.ResourceInterface
gvr schema.GroupVersionResource
kind string
}
func (c *dependentClient) createDependentWithOwners(refs []metav1.OwnerReference) *unstructured.Unstructured {
obj := &unstructured.Unstructured{}
obj.SetName("dependent")
obj.SetOwnerReferences(refs)
obj.SetKind(c.kind)
obj.SetAPIVersion(fmt.Sprintf("%s/%s", c.gvr.Group, c.gvr.Version))
obj, err := c.client.Create(context.TODO(), obj, metav1.CreateOptions{})
if err != nil {
c.t.Fatalf("failed to create dependent with owner references %v: %v", refs, err)
}
return obj
}
func (c *dependentClient) updateDependentWithOwners(obj *unstructured.Unstructured, refs []metav1.OwnerReference) *unstructured.Unstructured {
obj.SetOwnerReferences(refs)
obj, err := c.client.Update(context.TODO(), obj, metav1.UpdateOptions{})
if err != nil {
c.t.Fatalf("failed to update dependent with owner references %v: %v", refs, err)
}
return obj
}
func (c *dependentClient) patchDependentWithOwner(obj *unstructured.Unstructured, ref metav1.OwnerReference) *unstructured.Unstructured {
patch := []byte(fmt.Sprintf(`[{"op":"add","path":"/metadata/ownerReferences/-","value":{"apiVersion":"%v", "kind": "%v", "name": "%v", "uid": "%v"}}]`, ref.APIVersion, ref.Kind, ref.Name, ref.UID))
obj, err := c.client.Patch(context.TODO(), obj.GetName(), types.JSONPatchType, patch, metav1.PatchOptions{})
if err != nil {
c.t.Fatalf("failed to append owner reference to dependent with owner reference %v, patch %v: %v",
ref, patch, err)
}
return obj
}
func (c *dependentClient) deleteDependent() {
if err := c.client.Delete(context.TODO(), "dependent", metav1.DeleteOptions{}); err != nil {
c.t.Fatalf("failed to delete dependent: %v", err)
}
}
type warningCounter interface {
WarningCount() int
}
func assertOwnerReferences(t *testing.T, obj *unstructured.Unstructured, refs []metav1.OwnerReference) {
if !reflect.DeepEqual(obj.GetOwnerReferences(), refs) {
t.Errorf("unexpected owner references, expected: %v, got: %v", refs, obj.GetOwnerReferences())
}
}
func assertWarningCount(t *testing.T, counter warningCounter, expected int) {
if counter.WarningCount() != expected {
t.Errorf("unexpected warning count, expected: %v, got: %v", expected, counter.WarningCount())
}
}
func assertWarningMessage(t *testing.T, b *bytes.Buffer, expected string) {
defer b.Reset()
actual := b.String()
if len(expected) == 0 && len(actual) != 0 {
t.Errorf("unexpected warning message, expected no warning, got: %v", actual)
}
if len(expected) == 0 {
return
}
if !strings.Contains(actual, expected) {
t.Errorf("unexpected warning message, expected: %v, got: %v", expected, actual)
}
}
func assertManagedFields(t *testing.T, obj *unstructured.Unstructured) {
if len(obj.GetManagedFields()) == 0 {
t.Errorf("unexpected empty managed fields in object: %v", obj)
}
}