Merge pull request #112127 from gjkim42/update-status-despite-error

Update daemonSet status even if syncDaemonSet fails
This commit is contained in:
Kubernetes Prow Robot 2022-11-07 16:00:28 -08:00 committed by GitHub
commit b4f42864f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 267 additions and 24 deletions

View File

@ -887,6 +887,32 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
return nodesNeedingDaemonPods, podsToDelete return nodesNeedingDaemonPods, podsToDelete
} }
func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
err := dsc.manage(ctx, ds, nodeList, hash)
if err != nil {
return err
}
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(key) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
if err != nil {
return err
}
}
err = dsc.cleanupHistory(ctx, ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
}
return nil
}
// manage manages the scheduling and running of Pods of ds on nodes. // manage manages the scheduling and running of Pods of ds on nodes.
// After figuring out which nodes should run a Pod of ds but not yet running one and // After figuring out which nodes should run a Pod of ds but not yet running one and
// which nodes should not run a Pod of ds but currently running one, it calls function // which nodes should not run a Pod of ds but currently running one, it calls function
@ -1136,7 +1162,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *
err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen) err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
if err != nil { if err != nil {
return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err) return fmt.Errorf("error storing status for daemon set %#v: %w", ds, err)
} }
// Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew. // Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew.
@ -1210,29 +1236,21 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false) return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
} }
err = dsc.manage(ctx, ds, nodeList, hash) err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old)
if err != nil { statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
switch {
case err != nil && statusErr != nil:
// If there was an error, and we failed to update status,
// log it and return the original error.
klog.ErrorS(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
return err return err
} case err != nil:
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
if err != nil {
return err return err
} case statusErr != nil:
return statusErr
} }
err = dsc.cleanupHistory(ctx, ds, old) return nil
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
}
return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
} }
// NodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a // NodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a

View File

@ -18,6 +18,7 @@ package daemon
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
@ -255,7 +256,7 @@ func (f *fakePodControl) CreatePods(ctx context.Context, namespace string, templ
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
if err := f.FakePodControl.CreatePods(ctx, namespace, template, object, controllerRef); err != nil { if err := f.FakePodControl.CreatePods(ctx, namespace, template, object, controllerRef); err != nil {
return fmt.Errorf("failed to create pod for DaemonSet") return fmt.Errorf("failed to create pod for DaemonSet: %w", err)
} }
pod := &v1.Pod{ pod := &v1.Pod{
@ -387,6 +388,11 @@ func validateSyncDaemonSets(manager *daemonSetsController, fakePodControl *fakeP
} }
func expectSyncDaemonSets(t *testing.T, manager *daemonSetsController, ds *apps.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int) { func expectSyncDaemonSets(t *testing.T, manager *daemonSetsController, ds *apps.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int) {
t.Helper()
expectSyncDaemonSetsWithError(t, manager, ds, podControl, expectedCreates, expectedDeletes, expectedEvents, nil)
}
func expectSyncDaemonSetsWithError(t *testing.T, manager *daemonSetsController, ds *apps.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int, expectedError error) {
t.Helper() t.Helper()
key, err := controller.KeyFunc(ds) key, err := controller.KeyFunc(ds)
if err != nil { if err != nil {
@ -394,7 +400,11 @@ func expectSyncDaemonSets(t *testing.T, manager *daemonSetsController, ds *apps.
} }
err = manager.syncHandler(context.TODO(), key) err = manager.syncHandler(context.TODO(), key)
if err != nil { if expectedError != nil && !errors.Is(err, expectedError) {
t.Fatalf("Unexpected error returned from syncHandler: %v", err)
}
if expectedError == nil && err != nil {
t.Log(err) t.Log(err)
} }
@ -771,7 +781,7 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
for _, strategy := range updateStrategies() { for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo") ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds) manager, podControl, clientset, err := newTestController(ds)
if err != nil { if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err) t.Fatalf("error creating DaemonSets controller: %v", err)
} }
@ -782,6 +792,17 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
var updated *apps.DaemonSet
clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "status" {
return false, nil, nil
}
if u, ok := action.(core.UpdateAction); ok {
updated = u.GetObject().(*apps.DaemonSet)
}
return false, nil, nil
})
expectSyncDaemonSets(t, manager, ds, podControl, podControl.FakePodControl.CreateLimit, 0, 0) expectSyncDaemonSets(t, manager, ds, podControl, podControl.FakePodControl.CreateLimit, 0, 0)
expectedLimit := 0 expectedLimit := 0
@ -791,6 +812,18 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
if podControl.FakePodControl.CreateCallCount > expectedLimit { if podControl.FakePodControl.CreateCallCount > expectedLimit {
t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", podControl.FakePodControl.CreateLimit*2, podControl.FakePodControl.CreateCallCount) t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", podControl.FakePodControl.CreateLimit*2, podControl.FakePodControl.CreateCallCount)
} }
if updated == nil {
t.Fatalf("Failed to get updated status")
}
if got, want := updated.Status.DesiredNumberScheduled, int32(podControl.FakePodControl.CreateLimit)*10; got != want {
t.Errorf("Status.DesiredNumberScheduled = %v, want %v", got, want)
}
if got, want := updated.Status.CurrentNumberScheduled, int32(podControl.FakePodControl.CreateLimit); got != want {
t.Errorf("Status.CurrentNumberScheduled = %v, want %v", got, want)
}
if got, want := updated.Status.UpdatedNumberScheduled, int32(podControl.FakePodControl.CreateLimit); got != want {
t.Errorf("Status.UpdatedNumberScheduled = %v, want %v", got, want)
}
} }
} }
@ -856,6 +889,74 @@ func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) {
} }
} }
func TestSimpleDaemonSetUpdatesStatusError(t *testing.T) {
var (
syncErr = fmt.Errorf("sync error")
statusErr = fmt.Errorf("status error")
)
testCases := []struct {
desc string
hasSyncErr bool
hasStatusErr bool
expectedErr error
}{
{
desc: "sync error",
hasSyncErr: true,
hasStatusErr: false,
expectedErr: syncErr,
},
{
desc: "status error",
hasSyncErr: false,
hasStatusErr: true,
expectedErr: statusErr,
},
{
desc: "sync and status error",
hasSyncErr: true,
hasStatusErr: true,
expectedErr: syncErr,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, clientset, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
if tc.hasSyncErr {
podControl.FakePodControl.Err = syncErr
}
clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "status" {
return false, nil, nil
}
if tc.hasStatusErr {
return true, nil, statusErr
} else {
return false, nil, nil
}
})
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 1, nil)
expectSyncDaemonSetsWithError(t, manager, ds, podControl, 1, 0, 0, tc.expectedErr)
}
})
}
}
// DaemonSets should do nothing if there aren't any nodes // DaemonSets should do nothing if there aren't any nodes
func TestNoNodesDoesNothing(t *testing.T) { func TestNoNodesDoesNothing(t *testing.T) {
for _, strategy := range updateStrategies() { for _, strategy := range updateStrategies() {

View File

@ -43,6 +43,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
labelsutil "k8s.io/kubernetes/pkg/util/labels" labelsutil "k8s.io/kubernetes/pkg/util/labels"
@ -992,3 +993,78 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
validateDaemonSetStatus(dsClient, ds.Name, 2, t) validateDaemonSetStatus(dsClient, ds.Name, 2, t)
}) })
} }
func TestUpdateStatusDespitePodCreationFailure(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
limitedPodNumber := 2
// use framework.StartTestServer to inject admission control
// e.g. to emulate the resource quota behavior
c, config, closeFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerConfig: func(config *controlplane.Config) {
config.GenericConfig.AdmissionControl = &fakePodFailAdmission{
limitedPodNumber: limitedPodNumber,
}
},
})
defer closeFn()
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "daemonset-informers")), resyncPeriod)
dc, err := daemon.NewDaemonSetsController(
informers.Apps().V1().DaemonSets(),
informers.Apps().V1().ControllerRevisions(),
informers.Core().V1().Pods(),
informers.Core().V1().Nodes(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "daemonset-controller")),
flowcontrol.NewBackOff(5*time.Second, 15*time.Minute),
)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: c.EventsV1(),
})
sched, err := scheduler.New(
c,
informers,
nil,
profile.NewRecorderFactory(eventBroadcaster),
ctx.Done(),
)
if err != nil {
t.Fatalf("Couldn't create scheduler: %v", err)
}
eventBroadcaster.StartRecordingToSink(ctx.Done())
defer eventBroadcaster.Shutdown()
go sched.Run(ctx)
ns := framework.CreateNamespaceOrDie(c, "update-status-despite-pod-failure", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
dsClient := c.AppsV1().DaemonSets(ns.Name)
podClient := c.CoreV1().Pods(ns.Name)
nodeClient := c.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
_, err = dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
}
defer cleanupDaemonSets(t, c, ds)
addNodes(nodeClient, 0, 5, nil, t)
validateDaemonSetPodsAndMarkReady(podClient, podInformer, limitedPodNumber, t)
validateDaemonSetStatus(dsClient, ds.Name, int32(limitedPodNumber), t)
})
}

View File

@ -0,0 +1,48 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package daemonset
import (
"context"
"fmt"
"k8s.io/apiserver/pkg/admission"
api "k8s.io/kubernetes/pkg/apis/core"
)
var _ admission.ValidationInterface = &fakePodFailAdmission{}
type fakePodFailAdmission struct {
limitedPodNumber int
succeedPodsCount int
}
func (f *fakePodFailAdmission) Handles(operation admission.Operation) bool {
return operation == admission.Create
}
func (f *fakePodFailAdmission) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) (err error) {
if attr.GetKind().GroupKind() != api.Kind("Pod") {
return nil
}
if f.succeedPodsCount >= f.limitedPodNumber {
return fmt.Errorf("fakePodFailAdmission error")
}
f.succeedPodsCount++
return nil
}