mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #14297 from derekwaynecarr/improve_namespace_controller
Improve conflict errors in namespace controller
This commit is contained in:
commit
296888fb70
@ -59,7 +59,7 @@ func NewNamespaceController(kubeClient client.Interface, experimentalMode bool,
|
|||||||
framework.ResourceEventHandlerFuncs{
|
framework.ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(obj interface{}) {
|
AddFunc: func(obj interface{}) {
|
||||||
namespace := obj.(*api.Namespace)
|
namespace := obj.(*api.Namespace)
|
||||||
if err := syncNamespace(kubeClient, experimentalMode, *namespace); err != nil {
|
if err := syncNamespace(kubeClient, experimentalMode, namespace); err != nil {
|
||||||
if estimate, ok := err.(*contentRemainingError); ok {
|
if estimate, ok := err.(*contentRemainingError); ok {
|
||||||
go func() {
|
go func() {
|
||||||
// Estimate is the aggregate total of TerminationGracePeriodSeconds, which defaults to 30s
|
// Estimate is the aggregate total of TerminationGracePeriodSeconds, which defaults to 30s
|
||||||
@ -81,7 +81,7 @@ func NewNamespaceController(kubeClient client.Interface, experimentalMode bool,
|
|||||||
},
|
},
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
namespace := newObj.(*api.Namespace)
|
namespace := newObj.(*api.Namespace)
|
||||||
if err := syncNamespace(kubeClient, experimentalMode, *namespace); err != nil {
|
if err := syncNamespace(kubeClient, experimentalMode, namespace); err != nil {
|
||||||
if estimate, ok := err.(*contentRemainingError); ok {
|
if estimate, ok := err.(*contentRemainingError); ok {
|
||||||
go func() {
|
go func() {
|
||||||
t := estimate.Estimate/2 + 1
|
t := estimate.Estimate/2 + 1
|
||||||
@ -121,12 +121,12 @@ func (nm *NamespaceController) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// finalized returns true if the spec.finalizers is empty list
|
// finalized returns true if the spec.finalizers is empty list
|
||||||
func finalized(namespace api.Namespace) bool {
|
func finalized(namespace *api.Namespace) bool {
|
||||||
return len(namespace.Spec.Finalizers) == 0
|
return len(namespace.Spec.Finalizers) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// finalize will finalize the namespace for kubernetes
|
// finalize will finalize the namespace for kubernetes
|
||||||
func finalize(kubeClient client.Interface, namespace api.Namespace) (*api.Namespace, error) {
|
func finalizeNamespaceFunc(kubeClient client.Interface, namespace *api.Namespace) (*api.Namespace, error) {
|
||||||
namespaceFinalize := api.Namespace{}
|
namespaceFinalize := api.Namespace{}
|
||||||
namespaceFinalize.ObjectMeta = namespace.ObjectMeta
|
namespaceFinalize.ObjectMeta = namespace.ObjectMeta
|
||||||
namespaceFinalize.Spec = namespace.Spec
|
namespaceFinalize.Spec = namespace.Spec
|
||||||
@ -209,25 +209,56 @@ func deleteAllContent(kubeClient client.Interface, experimentalMode bool, namesp
|
|||||||
return estimate, nil
|
return estimate, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncNamespace makes namespace life-cycle decisions
|
// updateNamespaceFunc is a function that makes an update to a namespace
|
||||||
func syncNamespace(kubeClient client.Interface, experimentalMode bool, namespace api.Namespace) (err error) {
|
type updateNamespaceFunc func(kubeClient client.Interface, namespace *api.Namespace) (*api.Namespace, error)
|
||||||
|
|
||||||
|
// retryOnConflictError retries the specified fn if there was a conflict error
|
||||||
|
// TODO RetryOnConflict should be a generic concept in client code
|
||||||
|
func retryOnConflictError(kubeClient client.Interface, namespace *api.Namespace, fn updateNamespaceFunc) (result *api.Namespace, err error) {
|
||||||
|
latestNamespace := namespace
|
||||||
|
for {
|
||||||
|
result, err = fn(kubeClient, latestNamespace)
|
||||||
|
if err == nil {
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
if !errors.IsConflict(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
latestNamespace, err = kubeClient.Namespaces().Get(latestNamespace.Name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateNamespaceStatusFunc will verify that the status of the namespace is correct
|
||||||
|
func updateNamespaceStatusFunc(kubeClient client.Interface, namespace *api.Namespace) (*api.Namespace, error) {
|
||||||
|
if namespace.DeletionTimestamp.IsZero() || namespace.Status.Phase == api.NamespaceTerminating {
|
||||||
|
return namespace, nil
|
||||||
|
}
|
||||||
|
newNamespace := api.Namespace{}
|
||||||
|
newNamespace.ObjectMeta = namespace.ObjectMeta
|
||||||
|
newNamespace.Status = namespace.Status
|
||||||
|
newNamespace.Status.Phase = api.NamespaceTerminating
|
||||||
|
return kubeClient.Namespaces().Status(&newNamespace)
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncNamespace orchestrates deletion of a Namespace and its associated content.
|
||||||
|
func syncNamespace(kubeClient client.Interface, experimentalMode bool, namespace *api.Namespace) (err error) {
|
||||||
if namespace.DeletionTimestamp == nil {
|
if namespace.DeletionTimestamp == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("Syncing namespace %s", namespace.Name)
|
glog.V(4).Infof("Syncing namespace %s", namespace.Name)
|
||||||
|
|
||||||
// if there is a deletion timestamp, and the status is not terminating, then update status
|
// ensure that the status is up to date on the namespace
|
||||||
if !namespace.DeletionTimestamp.IsZero() && namespace.Status.Phase != api.NamespaceTerminating {
|
// if we get a not found error, we assume the namespace is truly gone
|
||||||
newNamespace := api.Namespace{}
|
namespace, err = retryOnConflictError(kubeClient, namespace, updateNamespaceStatusFunc)
|
||||||
newNamespace.ObjectMeta = namespace.ObjectMeta
|
if err != nil {
|
||||||
newNamespace.Status = namespace.Status
|
if errors.IsNotFound(err) {
|
||||||
newNamespace.Status.Phase = api.NamespaceTerminating
|
return nil
|
||||||
result, err := kubeClient.Namespaces().Status(&newNamespace)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
// work with the latest copy so we can proceed to clean up right away without another interval
|
return err
|
||||||
namespace = *result
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the namespace is already finalized, delete it
|
// if the namespace is already finalized, delete it
|
||||||
@ -249,13 +280,13 @@ func syncNamespace(kubeClient client.Interface, experimentalMode bool, namespace
|
|||||||
}
|
}
|
||||||
|
|
||||||
// we have removed content, so mark it finalized by us
|
// we have removed content, so mark it finalized by us
|
||||||
result, err := finalize(kubeClient, namespace)
|
result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// now check if all finalizers have reported that we delete now
|
// now check if all finalizers have reported that we delete now
|
||||||
if finalized(*result) {
|
if finalized(result) {
|
||||||
err = kubeClient.Namespaces().Delete(namespace.Name)
|
err = kubeClient.Namespaces().Delete(namespace.Name)
|
||||||
if err != nil && !errors.IsNotFound(err) {
|
if err != nil && !errors.IsNotFound(err) {
|
||||||
return err
|
return err
|
||||||
|
@ -17,18 +17,21 @@ limitations under the License.
|
|||||||
package namespacecontroller
|
package namespacecontroller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFinalized(t *testing.T) {
|
func TestFinalized(t *testing.T) {
|
||||||
testNamespace := api.Namespace{
|
testNamespace := &api.Namespace{
|
||||||
Spec: api.NamespaceSpec{
|
Spec: api.NamespaceSpec{
|
||||||
Finalizers: []api.FinalizerName{"a", "b"},
|
Finalizers: []api.FinalizerName{"a", "b"},
|
||||||
},
|
},
|
||||||
@ -42,9 +45,9 @@ func TestFinalized(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFinalize(t *testing.T) {
|
func TestFinalizeNamespaceFunc(t *testing.T) {
|
||||||
mockClient := &testclient.Fake{}
|
mockClient := &testclient.Fake{}
|
||||||
testNamespace := api.Namespace{
|
testNamespace := &api.Namespace{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "test",
|
Name: "test",
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
@ -53,7 +56,7 @@ func TestFinalize(t *testing.T) {
|
|||||||
Finalizers: []api.FinalizerName{"kubernetes", "other"},
|
Finalizers: []api.FinalizerName{"kubernetes", "other"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
finalize(mockClient, testNamespace)
|
finalizeNamespaceFunc(mockClient, testNamespace)
|
||||||
actions := mockClient.Actions()
|
actions := mockClient.Actions()
|
||||||
if len(actions) != 1 {
|
if len(actions) != 1 {
|
||||||
t.Errorf("Expected 1 mock client action, but got %v", len(actions))
|
t.Errorf("Expected 1 mock client action, but got %v", len(actions))
|
||||||
@ -73,7 +76,7 @@ func TestFinalize(t *testing.T) {
|
|||||||
func testSyncNamespaceThatIsTerminating(t *testing.T, experimentalMode bool) {
|
func testSyncNamespaceThatIsTerminating(t *testing.T, experimentalMode bool) {
|
||||||
mockClient := &testclient.Fake{}
|
mockClient := &testclient.Fake{}
|
||||||
now := unversioned.Now()
|
now := unversioned.Now()
|
||||||
testNamespace := api.Namespace{
|
testNamespace := &api.Namespace{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "test",
|
Name: "test",
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
@ -125,6 +128,26 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, experimentalMode bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRetryOnConflictError(t *testing.T) {
|
||||||
|
mockClient := &testclient.Fake{}
|
||||||
|
numTries := 0
|
||||||
|
retryOnce := func(kubeClient client.Interface, namespace *api.Namespace) (*api.Namespace, error) {
|
||||||
|
numTries++
|
||||||
|
if numTries <= 1 {
|
||||||
|
return namespace, errors.NewConflict(namespace.Kind, namespace.Name, fmt.Errorf("ERROR!"))
|
||||||
|
}
|
||||||
|
return namespace, nil
|
||||||
|
}
|
||||||
|
namespace := &api.Namespace{}
|
||||||
|
_, err := retryOnConflictError(mockClient, namespace, retryOnce)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %v", err)
|
||||||
|
}
|
||||||
|
if numTries != 2 {
|
||||||
|
t.Errorf("Expected %v, but got %v", 2, numTries)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSyncNamespaceThatIsTerminatingNonExperimental(t *testing.T) {
|
func TestSyncNamespaceThatIsTerminatingNonExperimental(t *testing.T) {
|
||||||
testSyncNamespaceThatIsTerminating(t, false)
|
testSyncNamespaceThatIsTerminating(t, false)
|
||||||
}
|
}
|
||||||
@ -135,7 +158,7 @@ func TestSyncNamespaceThatIsTerminatingExperimental(t *testing.T) {
|
|||||||
|
|
||||||
func TestSyncNamespaceThatIsActive(t *testing.T) {
|
func TestSyncNamespaceThatIsActive(t *testing.T) {
|
||||||
mockClient := &testclient.Fake{}
|
mockClient := &testclient.Fake{}
|
||||||
testNamespace := api.Namespace{
|
testNamespace := &api.Namespace{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "test",
|
Name: "test",
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
|
Loading…
Reference in New Issue
Block a user