Revert "Update daemonSet status even if syncDaemonSet fails"

This reverts commit 2ee024a4df.
This commit is contained in:
Maciej Szulik 2022-11-08 15:01:09 +01:00
parent 34ca18d1d0
commit 3c93d540c6
No known key found for this signature in database
GPG Key ID: F15E55D276FA84C4
4 changed files with 24 additions and 267 deletions

View File

@ -887,32 +887,6 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
return nodesNeedingDaemonPods, podsToDelete
}
func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
err := dsc.manage(ctx, ds, nodeList, hash)
if err != nil {
return err
}
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(key) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
if err != nil {
return err
}
}
err = dsc.cleanupHistory(ctx, ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
}
return nil
}
// manage manages the scheduling and running of Pods of ds on nodes.
// After figuring out which nodes should run a Pod of ds but not yet running one and
// which nodes should not run a Pod of ds but currently running one, it calls function
@ -1162,7 +1136,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *
err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
if err != nil {
return fmt.Errorf("error storing status for daemon set %#v: %w", ds, err)
return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
}
// Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew.
@ -1236,21 +1210,29 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
}
err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old)
statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
switch {
case err != nil && statusErr != nil:
// If there was an error, and we failed to update status,
// log it and return the original error.
klog.ErrorS(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
err = dsc.manage(ctx, ds, nodeList, hash)
if err != nil {
return err
case err != nil:
return err
case statusErr != nil:
return statusErr
}
return nil
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
if err != nil {
return err
}
}
err = dsc.cleanupHistory(ctx, ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
}
return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
}
// NodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a

View File

@ -18,7 +18,6 @@ package daemon
import (
"context"
"errors"
"fmt"
"reflect"
"sort"
@ -256,7 +255,7 @@ func (f *fakePodControl) CreatePods(ctx context.Context, namespace string, templ
f.Lock()
defer f.Unlock()
if err := f.FakePodControl.CreatePods(ctx, namespace, template, object, controllerRef); err != nil {
return fmt.Errorf("failed to create pod for DaemonSet: %w", err)
return fmt.Errorf("failed to create pod for DaemonSet")
}
pod := &v1.Pod{
@ -388,11 +387,6 @@ func validateSyncDaemonSets(manager *daemonSetsController, fakePodControl *fakeP
}
func expectSyncDaemonSets(t *testing.T, manager *daemonSetsController, ds *apps.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int) {
t.Helper()
expectSyncDaemonSetsWithError(t, manager, ds, podControl, expectedCreates, expectedDeletes, expectedEvents, nil)
}
func expectSyncDaemonSetsWithError(t *testing.T, manager *daemonSetsController, ds *apps.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int, expectedError error) {
t.Helper()
key, err := controller.KeyFunc(ds)
if err != nil {
@ -400,11 +394,7 @@ func expectSyncDaemonSetsWithError(t *testing.T, manager *daemonSetsController,
}
err = manager.syncHandler(context.TODO(), key)
if expectedError != nil && !errors.Is(err, expectedError) {
t.Fatalf("Unexpected error returned from syncHandler: %v", err)
}
if expectedError == nil && err != nil {
if err != nil {
t.Log(err)
}
@ -781,7 +771,7 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, clientset, err := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -792,17 +782,6 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
t.Fatal(err)
}
var updated *apps.DaemonSet
clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "status" {
return false, nil, nil
}
if u, ok := action.(core.UpdateAction); ok {
updated = u.GetObject().(*apps.DaemonSet)
}
return false, nil, nil
})
expectSyncDaemonSets(t, manager, ds, podControl, podControl.FakePodControl.CreateLimit, 0, 0)
expectedLimit := 0
@ -812,18 +791,6 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
if podControl.FakePodControl.CreateCallCount > expectedLimit {
t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", podControl.FakePodControl.CreateLimit*2, podControl.FakePodControl.CreateCallCount)
}
if updated == nil {
t.Fatalf("Failed to get updated status")
}
if got, want := updated.Status.DesiredNumberScheduled, int32(podControl.FakePodControl.CreateLimit)*10; got != want {
t.Errorf("Status.DesiredNumberScheduled = %v, want %v", got, want)
}
if got, want := updated.Status.CurrentNumberScheduled, int32(podControl.FakePodControl.CreateLimit); got != want {
t.Errorf("Status.CurrentNumberScheduled = %v, want %v", got, want)
}
if got, want := updated.Status.UpdatedNumberScheduled, int32(podControl.FakePodControl.CreateLimit); got != want {
t.Errorf("Status.UpdatedNumberScheduled = %v, want %v", got, want)
}
}
}
@ -889,74 +856,6 @@ func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) {
}
}
func TestSimpleDaemonSetUpdatesStatusError(t *testing.T) {
var (
syncErr = fmt.Errorf("sync error")
statusErr = fmt.Errorf("status error")
)
testCases := []struct {
desc string
hasSyncErr bool
hasStatusErr bool
expectedErr error
}{
{
desc: "sync error",
hasSyncErr: true,
hasStatusErr: false,
expectedErr: syncErr,
},
{
desc: "status error",
hasSyncErr: false,
hasStatusErr: true,
expectedErr: statusErr,
},
{
desc: "sync and status error",
hasSyncErr: true,
hasStatusErr: true,
expectedErr: syncErr,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, clientset, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
if tc.hasSyncErr {
podControl.FakePodControl.Err = syncErr
}
clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "status" {
return false, nil, nil
}
if tc.hasStatusErr {
return true, nil, statusErr
} else {
return false, nil, nil
}
})
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 1, nil)
expectSyncDaemonSetsWithError(t, manager, ds, podControl, 1, 0, 0, tc.expectedErr)
}
})
}
}
// DaemonSets should do nothing if there aren't any nodes
func TestNoNodesDoesNothing(t *testing.T) {
for _, strategy := range updateStrategies() {

View File

@ -43,7 +43,6 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/profile"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
@ -993,78 +992,3 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
validateDaemonSetStatus(dsClient, ds.Name, 2, t)
})
}
func TestUpdateStatusDespitePodCreationFailure(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
limitedPodNumber := 2
// use framework.StartTestServer to inject admission control
// e.g. to emulate the resource quota behavior
c, config, closeFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerConfig: func(config *controlplane.Config) {
config.GenericConfig.AdmissionControl = &fakePodFailAdmission{
limitedPodNumber: limitedPodNumber,
}
},
})
defer closeFn()
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "daemonset-informers")), resyncPeriod)
dc, err := daemon.NewDaemonSetsController(
informers.Apps().V1().DaemonSets(),
informers.Apps().V1().ControllerRevisions(),
informers.Core().V1().Pods(),
informers.Core().V1().Nodes(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "daemonset-controller")),
flowcontrol.NewBackOff(5*time.Second, 15*time.Minute),
)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: c.EventsV1(),
})
sched, err := scheduler.New(
c,
informers,
nil,
profile.NewRecorderFactory(eventBroadcaster),
ctx.Done(),
)
if err != nil {
t.Fatalf("Couldn't create scheduler: %v", err)
}
eventBroadcaster.StartRecordingToSink(ctx.Done())
defer eventBroadcaster.Shutdown()
go sched.Run(ctx)
ns := framework.CreateNamespaceOrDie(c, "update-status-despite-pod-failure", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
dsClient := c.AppsV1().DaemonSets(ns.Name)
podClient := c.CoreV1().Pods(ns.Name)
nodeClient := c.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
_, err = dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
}
defer cleanupDaemonSets(t, c, ds)
addNodes(nodeClient, 0, 5, nil, t)
validateDaemonSetPodsAndMarkReady(podClient, podInformer, limitedPodNumber, t)
validateDaemonSetStatus(dsClient, ds.Name, int32(limitedPodNumber), t)
})
}

View File

@ -1,48 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package daemonset
import (
"context"
"fmt"
"k8s.io/apiserver/pkg/admission"
api "k8s.io/kubernetes/pkg/apis/core"
)
var _ admission.ValidationInterface = &fakePodFailAdmission{}
type fakePodFailAdmission struct {
limitedPodNumber int
succeedPodsCount int
}
func (f *fakePodFailAdmission) Handles(operation admission.Operation) bool {
return operation == admission.Create
}
func (f *fakePodFailAdmission) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) (err error) {
if attr.GetKind().GroupKind() != api.Kind("Pod") {
return nil
}
if f.succeedPodsCount >= f.limitedPodNumber {
return fmt.Errorf("fakePodFailAdmission error")
}
f.succeedPodsCount++
return nil
}