fix namespace termination conditions to be consistent and correct

This commit is contained in:
David Eads 2019-09-03 14:01:49 -04:00
parent 975d0736b3
commit 076bf949d7
9 changed files with 454 additions and 80 deletions

View File

@ -30,7 +30,10 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["namespaced_resources_deleter_test.go"],
srcs = [
"namespaced_resources_deleter_test.go",
"status_condition_utils_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/apis/core:go_default_library",

View File

@ -515,16 +515,18 @@ func (d *namespacedResourcesDeleter) deleteAllContent(ns *v1.Namespace) (int64,
}
}
if len(errs) > 0 {
if hasChanged := conditionUpdater.Update(ns); hasChanged {
if _, err = d.nsClient.UpdateStatus(ns); err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't update status condition for namespace %q: %v", namespace, err))
}
// we always want to update the conditions because if we have set a condition to "it worked" after it was previously, "it didn't work",
// we need to reflect that information. Recall that additional finalizers can be set on namespaces, so this finalizer may clear itself and
// NOT remove the resource instance.
if hasChanged := conditionUpdater.Update(ns); hasChanged {
if _, err = d.nsClient.UpdateStatus(ns); err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't update status condition for namespace %q: %v", namespace, err))
}
return estimate, utilerrors.NewAggregate(errs)
}
klog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, estimate: %v", namespace, estimate)
return estimate, nil
// if len(errs)==0, NewAggregate returns nil.
klog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, estimate: %v, errors: %v", namespace, estimate, utilerrors.NewAggregate(errs))
return estimate, utilerrors.NewAggregate(errs)
}
// estimateGrracefulTermination will estimate the graceful termination required for the specific entity in the namespace

View File

@ -146,6 +146,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"create", "namespaces", "finalize"}, "-"),
strings.Join([]string{"list", "pods", ""}, "-"),
strings.Join([]string{"update", "namespaces", "status"}, "-"),
strings.Join([]string{"delete", "namespaces", ""}, "-"),
),
metadataClientActionSet: metadataClientActionSet,
@ -187,68 +188,66 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
}
for scenario, testInput := range scenarios {
testHandler := &fakeActionHandler{statusCode: 200}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
t.Run(scenario, func(t *testing.T) {
testHandler := &fakeActionHandler{statusCode: 200}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
mockClient := fake.NewSimpleClientset(testInput.testNamespace)
metadataClient, err := metadata.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}
fn := func() ([]*metav1.APIResourceList, error) {
return resources, testInput.gvrError
}
d := NewNamespacedResourcesDeleter(mockClient.CoreV1().Namespaces(), metadataClient, mockClient.CoreV1(), fn, v1.FinalizerKubernetes, true)
if err := d.Delete(testInput.testNamespace.Name); !matchErrors(err, testInput.expectErrorOnDelete) {
t.Errorf("scenario %s - expected error %q when syncing namespace, got %q, %v", scenario, testInput.expectErrorOnDelete, err, testInput.expectErrorOnDelete == err)
}
// validate traffic from kube client
actionSet := sets.NewString()
for _, action := range mockClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-"))
}
if !actionSet.Equal(testInput.kubeClientActionSet) {
t.Errorf("scenario %s - mock client expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario,
testInput.kubeClientActionSet, actionSet, testInput.kubeClientActionSet.Difference(actionSet))
}
// validate traffic from metadata client
actionSet = sets.NewString()
for _, action := range testHandler.actions {
actionSet.Insert(action.String())
}
if !actionSet.Equal(testInput.metadataClientActionSet) {
t.Errorf("scenario %s - metadata client expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario,
testInput.metadataClientActionSet, actionSet, testInput.metadataClientActionSet.Difference(actionSet))
}
// validate status conditions
if testInput.expectStatus != nil {
obj, err := mockClient.Tracker().Get(schema.GroupVersionResource{Version: "v1", Resource: "namespaces"}, testInput.testNamespace.Namespace, testInput.testNamespace.Name)
mockClient := fake.NewSimpleClientset(testInput.testNamespace)
metadataClient, err := metadata.NewForConfig(clientConfig)
if err != nil {
t.Errorf("Unexpected error in getting the namespace: %v", err)
continue
t.Fatal(err)
}
ns, ok := obj.(*v1.Namespace)
if !ok {
t.Errorf("Expected a namespace but received %v", obj)
continue
fn := func() ([]*metav1.APIResourceList, error) {
return resources, testInput.gvrError
}
if ns.Status.Phase != testInput.expectStatus.Phase {
t.Errorf("Expected namespace status phase %v but received %v", testInput.expectStatus.Phase, ns.Status.Phase)
continue
d := NewNamespacedResourcesDeleter(mockClient.CoreV1().Namespaces(), metadataClient, mockClient.CoreV1(), fn, v1.FinalizerKubernetes, true)
if err := d.Delete(testInput.testNamespace.Name); !matchErrors(err, testInput.expectErrorOnDelete) {
t.Errorf("expected error %q when syncing namespace, got %q, %v", testInput.expectErrorOnDelete, err, testInput.expectErrorOnDelete == err)
}
for _, expCondition := range testInput.expectStatus.Conditions {
nsCondition := getCondition(ns.Status.Conditions, expCondition.Type)
if nsCondition == nil {
t.Errorf("Missing namespace status condition %v", expCondition.Type)
continue
// validate traffic from kube client
actionSet := sets.NewString()
for _, action := range mockClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-"))
}
if !actionSet.Equal(testInput.kubeClientActionSet) {
t.Errorf("mock client expected actions:\n%v\n but got:\n%v\nDifference:\n%v",
testInput.kubeClientActionSet, actionSet, testInput.kubeClientActionSet.Difference(actionSet))
}
// validate traffic from metadata client
actionSet = sets.NewString()
for _, action := range testHandler.actions {
actionSet.Insert(action.String())
}
if !actionSet.Equal(testInput.metadataClientActionSet) {
t.Errorf(" metadata client expected actions:\n%v\n but got:\n%v\nDifference:\n%v",
testInput.metadataClientActionSet, actionSet, testInput.metadataClientActionSet.Difference(actionSet))
}
// validate status conditions
if testInput.expectStatus != nil {
obj, err := mockClient.Tracker().Get(schema.GroupVersionResource{Version: "v1", Resource: "namespaces"}, testInput.testNamespace.Namespace, testInput.testNamespace.Name)
if err != nil {
t.Fatalf("Unexpected error in getting the namespace: %v", err)
}
ns, ok := obj.(*v1.Namespace)
if !ok {
t.Fatalf("Expected a namespace but received %v", obj)
}
if ns.Status.Phase != testInput.expectStatus.Phase {
t.Fatalf("Expected namespace status phase %v but received %v", testInput.expectStatus.Phase, ns.Status.Phase)
}
for _, expCondition := range testInput.expectStatus.Conditions {
nsCondition := getCondition(ns.Status.Conditions, expCondition.Type)
if nsCondition == nil {
t.Fatalf("Missing namespace status condition %v", expCondition.Type)
}
}
}
}
})
}
}

View File

@ -21,7 +21,7 @@ import (
"sort"
"strings"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
)
@ -128,25 +128,18 @@ func makeDeleteContentCondition(err []error) *v1.NamespaceCondition {
func updateConditions(status *v1.NamespaceStatus, newConditions []v1.NamespaceCondition) (hasChanged bool) {
for _, conditionType := range conditionTypes {
newCondition := getCondition(newConditions, conditionType)
oldCondition := getCondition(status.Conditions, conditionType)
if newCondition == nil && oldCondition == nil {
// both are nil, no update necessary
continue
// if we weren't failing, then this returned nil. We should set the "ok" variant of the condition
if newCondition == nil {
newCondition = newSuccessfulCondition(conditionType)
}
oldCondition := getCondition(status.Conditions, conditionType)
// only new condition of this type exists, add to the list
if oldCondition == nil {
// only new condition of this type exists, add to the list
status.Conditions = append(status.Conditions, *newCondition)
hasChanged = true
} else if newCondition == nil {
// only old condition of this type exists, set status to false
if oldCondition.Status != v1.ConditionFalse {
oldCondition.Status = v1.ConditionFalse
oldCondition.Message = okMessages[conditionType]
oldCondition.Reason = okReasons[conditionType]
oldCondition.LastTransitionTime = metav1.Now()
hasChanged = true
}
} else if oldCondition.Message != newCondition.Message {
} else if oldCondition.Status != newCondition.Status || oldCondition.Message != newCondition.Message || oldCondition.Reason != newCondition.Reason {
// old condition needs to be updated
if oldCondition.Status != newCondition.Status {
oldCondition.LastTransitionTime = metav1.Now()
@ -161,6 +154,16 @@ func updateConditions(status *v1.NamespaceStatus, newConditions []v1.NamespaceCo
return
}
func newSuccessfulCondition(conditionType v1.NamespaceConditionType) *v1.NamespaceCondition {
return &v1.NamespaceCondition{
Type: conditionType,
Status: v1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: okReasons[conditionType],
Message: okMessages[conditionType],
}
}
func getCondition(conditions []v1.NamespaceCondition, conditionType v1.NamespaceConditionType) *v1.NamespaceCondition {
for i := range conditions {
if conditions[i].Type == conditionType {

View File

@ -0,0 +1,137 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deletion
import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
)
func TestUpdateConditions(t *testing.T) {
tests := []struct {
name string
newConditions []v1.NamespaceCondition
startingStatus *v1.NamespaceStatus
expecteds []v1.NamespaceCondition
}{
{
name: "leave unknown",
newConditions: []v1.NamespaceCondition{},
startingStatus: &v1.NamespaceStatus{
Conditions: []v1.NamespaceCondition{
{Type: "unknown", Status: v1.ConditionTrue},
},
},
expecteds: []v1.NamespaceCondition{
{Type: "unknown", Status: v1.ConditionTrue},
*newSuccessfulCondition(v1.NamespaceDeletionDiscoveryFailure),
*newSuccessfulCondition(v1.NamespaceDeletionGVParsingFailure),
*newSuccessfulCondition(v1.NamespaceDeletionContentFailure),
},
},
{
name: "replace with success",
newConditions: []v1.NamespaceCondition{},
startingStatus: &v1.NamespaceStatus{
Conditions: []v1.NamespaceCondition{
{Type: v1.NamespaceDeletionDiscoveryFailure, Status: v1.ConditionTrue},
},
},
expecteds: []v1.NamespaceCondition{
*newSuccessfulCondition(v1.NamespaceDeletionDiscoveryFailure),
*newSuccessfulCondition(v1.NamespaceDeletionGVParsingFailure),
*newSuccessfulCondition(v1.NamespaceDeletionContentFailure),
},
},
{
name: "leave different order",
newConditions: []v1.NamespaceCondition{},
startingStatus: &v1.NamespaceStatus{
Conditions: []v1.NamespaceCondition{
{Type: v1.NamespaceDeletionGVParsingFailure, Status: v1.ConditionTrue},
{Type: v1.NamespaceDeletionDiscoveryFailure, Status: v1.ConditionTrue},
},
},
expecteds: []v1.NamespaceCondition{
*newSuccessfulCondition(v1.NamespaceDeletionGVParsingFailure),
*newSuccessfulCondition(v1.NamespaceDeletionDiscoveryFailure),
*newSuccessfulCondition(v1.NamespaceDeletionContentFailure),
},
},
{
name: "overwrite with failure",
newConditions: []v1.NamespaceCondition{
{Type: v1.NamespaceDeletionGVParsingFailure, Status: v1.ConditionTrue, Reason: "foo", Message: "bar"},
},
startingStatus: &v1.NamespaceStatus{
Conditions: []v1.NamespaceCondition{
{Type: v1.NamespaceDeletionGVParsingFailure, Status: v1.ConditionFalse},
{Type: v1.NamespaceDeletionDiscoveryFailure, Status: v1.ConditionTrue},
},
},
expecteds: []v1.NamespaceCondition{
{Type: v1.NamespaceDeletionGVParsingFailure, Status: v1.ConditionTrue, Reason: "foo", Message: "bar"},
*newSuccessfulCondition(v1.NamespaceDeletionDiscoveryFailure),
*newSuccessfulCondition(v1.NamespaceDeletionContentFailure),
},
},
{
name: "write new failure",
newConditions: []v1.NamespaceCondition{
{Type: v1.NamespaceDeletionGVParsingFailure, Status: v1.ConditionTrue, Reason: "foo", Message: "bar"},
},
startingStatus: &v1.NamespaceStatus{
Conditions: []v1.NamespaceCondition{
{Type: v1.NamespaceDeletionDiscoveryFailure, Status: v1.ConditionTrue},
},
},
expecteds: []v1.NamespaceCondition{
*newSuccessfulCondition(v1.NamespaceDeletionDiscoveryFailure),
{Type: v1.NamespaceDeletionGVParsingFailure, Status: v1.ConditionTrue, Reason: "foo", Message: "bar"},
*newSuccessfulCondition(v1.NamespaceDeletionContentFailure),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
updateConditions(test.startingStatus, test.newConditions)
actuals := test.startingStatus.Conditions
if len(actuals) != len(test.expecteds) {
t.Fatal(actuals)
}
for i := range actuals {
actual := actuals[i]
expected := test.expecteds[i]
expected.LastTransitionTime = actual.LastTransitionTime
if !reflect.DeepEqual(expected, actual) {
t.Error(actual)
}
}
})
}
}

View File

@ -58,6 +58,7 @@ filegroup(
"//test/integration/kubelet:all-srcs",
"//test/integration/master:all-srcs",
"//test/integration/metrics:all-srcs",
"//test/integration/namespace:all-srcs",
"//test/integration/objectmeta:all-srcs",
"//test/integration/openshift:all-srcs",
"//test/integration/pods:all-srcs",

View File

@ -0,0 +1,42 @@
package(default_visibility = ["//visibility:public"])
load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_test(
name = "go_default_test",
size = "large",
srcs = [
"main_test.go",
"ns_conditions_test.go",
],
tags = ["integration"],
deps = [
"//pkg/controller/namespace:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/metadata:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//test/integration/etcd:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,27 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespace
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespace
import (
"encoding/json"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/controller/namespace"
"k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework"
)
func TestNamespaceCondition(t *testing.T) {
closeFn, nsController, informers, kubeClient, dynamicClient := namespaceLifecycleSetup(t)
defer closeFn()
nsName := "test-namespace-conditions"
_, err := kubeClient.CoreV1().Namespaces().Create(&corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: nsName,
},
})
if err != nil {
t.Fatal(err)
}
// Start informer and controllers
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
go nsController.Run(5, stopCh)
data := etcd.GetEtcdStorageDataForNamespace(nsName)
podJSON, err := jsonToUnstructured(data[corev1.SchemeGroupVersion.WithResource("pods")].Stub, "v1", "Pod")
if err != nil {
t.Fatal(err)
}
_, err = dynamicClient.Resource(corev1.SchemeGroupVersion.WithResource("pods")).Namespace(nsName).Create(podJSON, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
deploymentJSON, err := jsonToUnstructured(data[appsv1.SchemeGroupVersion.WithResource("deployments")].Stub, "apps/v1", "Deployment")
if err != nil {
t.Fatal(err)
}
deploymentJSON.SetFinalizers([]string{"custom.io/finalizer"})
_, err = dynamicClient.Resource(appsv1.SchemeGroupVersion.WithResource("deployments")).Namespace(nsName).Create(deploymentJSON, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
if err = kubeClient.CoreV1().Namespaces().Delete(nsName, nil); err != nil {
t.Fatal(err)
}
err = wait.PollImmediate(1*time.Second, 60*time.Second, func() (bool, error) {
curr, err := kubeClient.CoreV1().Namespaces().Get(nsName, metav1.GetOptions{})
if err != nil {
return false, err
}
foundContentCondition := false
foundFinalizerCondition := false
for _, condition := range curr.Status.Conditions {
if condition.Type == corev1.NamespaceDeletionGVParsingFailure && condition.Message == `All legacy kube types successfully parsed` {
foundContentCondition = true
}
if condition.Type == corev1.NamespaceDeletionDiscoveryFailure && condition.Message == `All resources successfully discovered` {
foundFinalizerCondition = true
}
if condition.Type == corev1.NamespaceDeletionContentFailure && condition.Message == `All content successfully deleted` {
foundFinalizerCondition = true
}
}
t.Log(spew.Sdump(curr))
return foundContentCondition && foundFinalizerCondition, nil
})
if err != nil {
t.Fatal(err)
}
}
// JSONToUnstructured converts a JSON stub to unstructured.Unstructured and
// returns a dynamic resource client that can be used to interact with it
func jsonToUnstructured(stub, version, kind string) (*unstructured.Unstructured, error) {
typeMetaAdder := map[string]interface{}{}
if err := json.Unmarshal([]byte(stub), &typeMetaAdder); err != nil {
return nil, err
}
// we don't require GVK on the data we provide, so we fill it in here. We could, but that seems extraneous.
typeMetaAdder["apiVersion"] = version
typeMetaAdder["kind"] = kind
return &unstructured.Unstructured{Object: typeMetaAdder}, nil
}
func namespaceLifecycleSetup(t *testing.T) (framework.CloseFunc, *namespace.NamespaceController, informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
_, s, closeFn := framework.RunAMaster(masterConfig)
config := restclient.Config{Host: s.URL}
config.QPS = 10000
config.Burst = 10000
clientSet, err := clientset.NewForConfig(&config)
if err != nil {
t.Fatalf("error in create clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "deployment-informers")), resyncPeriod)
metadataClient, err := metadata.NewForConfig(&config)
if err != nil {
panic(err)
}
discoverResourcesFn := clientSet.Discovery().ServerPreferredNamespacedResources
controller := namespace.NewNamespaceController(
clientSet,
metadataClient,
discoverResourcesFn,
informers.Core().V1().Namespaces(),
10*time.Hour,
corev1.FinalizerKubernetes)
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
return closeFn, controller, informers, clientSet, dynamic.NewForConfigOrDie(&config)
}