Merge pull request #67817 from tnozicka/fix-rollout-status-wait

Automatic merge from submit-queue (batch tested with PRs 67986, 68210, 67817). If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md.

Fix waiting in kubectl rollout status

**What this PR does / why we need it**:
By `kubectl rollout status` being based on `UntilWithoutRetry` it will fail whenever the watcher is closed - cased by the closing the underlying connection - e.g. on API timeout, LB timeout, ... 

Using UntilWithSync (based on informer) allows it to recover from all the failures and truly work trough unlimited timeouts.

(Split from https://github.com/kubernetes/kubernetes/pull/50102)

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes https://github.com/kubernetes/kubernetes/issues/40224

**Special notes for your reviewer**:

**Release note**:
```release-note
`kubectl rollout status` now works for unlimited timeouts.
```
This commit is contained in:
Kubernetes Submit Queue 2018-09-04 04:19:36 -07:00 committed by GitHub
commit 674401ace1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 140 additions and 66 deletions

View File

@ -122,6 +122,7 @@ go_library(
"//pkg/controller/deployment/util:go_default_library",
"//pkg/credentialprovider:go_default_library",
"//pkg/kubectl/apps:go_default_library",
"//pkg/kubectl/scheme:go_default_library",
"//pkg/kubectl/util:go_default_library",
"//pkg/kubectl/util/hash:go_default_library",
"//pkg/kubectl/util/slice:go_default_library",

View File

@ -28,13 +28,20 @@ go_library(
"//pkg/kubectl/scheme:go_default_library",
"//pkg/kubectl/util/i18n:go_default_library",
"//pkg/util/interrupt:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/printers:go_default_library",
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/resource:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//vendor/github.com/renstrom/dedent:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",

View File

@ -23,10 +23,17 @@ import (
"github.com/spf13/cobra"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/genericclioptions/resource"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
@ -62,9 +69,11 @@ type RolloutStatusOptions struct {
Watch bool
Revision int64
Timeout time.Duration
StatusViewer func(*meta.RESTMapping) (kubectl.StatusViewer, error)
Builder func() *resource.Builder
StatusViewer func(*meta.RESTMapping) (kubectl.StatusViewer, error)
Builder func() *resource.Builder
DynamicClient dynamic.Interface
FilenameOptions *resource.FilenameOptions
genericclioptions.IOStreams
@ -76,6 +85,7 @@ func NewRolloutStatusOptions(streams genericclioptions.IOStreams) *RolloutStatus
FilenameOptions: &resource.FilenameOptions{},
IOStreams: streams,
Watch: true,
Timeout: 0,
}
}
@ -102,6 +112,7 @@ func NewCmdRolloutStatus(f cmdutil.Factory, streams genericclioptions.IOStreams)
cmdutil.AddFilenameOptionFlags(cmd, o.FilenameOptions, usage)
cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "Watch the status of the rollout until it's done.")
cmd.Flags().Int64Var(&o.Revision, "revision", o.Revision, "Pin to a specific revision for showing its status. Defaults to 0 (last revision).")
cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h).")
return cmd
}
@ -119,6 +130,17 @@ func (o *RolloutStatusOptions) Complete(f cmdutil.Factory, args []string) error
o.StatusViewer = func(mapping *meta.RESTMapping) (kubectl.StatusViewer, error) {
return polymorphichelpers.StatusViewerFn(f, mapping)
}
clientConfig, err := f.ToRESTConfig()
if err != nil {
return err
}
o.DynamicClient, err = dynamic.NewForConfig(clientConfig)
if err != nil {
return err
}
return nil
}
@ -158,60 +180,68 @@ func (o *RolloutStatusOptions) Run() error {
info := infos[0]
mapping := info.ResourceMapping()
obj, err := r.Object()
if err != nil {
return err
}
rv, err := meta.NewAccessor().ResourceVersion(obj)
if err != nil {
return err
}
statusViewer, err := o.StatusViewer(mapping)
if err != nil {
return err
}
// check if deployment's has finished the rollout
status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision)
if err != nil {
return err
}
fmt.Fprintf(o.Out, "%s", status)
if done {
return nil
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(options)
},
}
shouldWatch := o.Watch
if !shouldWatch {
return nil
}
preconditionFunc := func(store cache.Store) (bool, error) {
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
if err != nil {
return true, err
}
if !exists {
// We need to make sure we see the object in the cache before we start waiting for events
// or we would be waiting for the timeout if such object didn't exist.
return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
}
// watch for changes to the deployment
w, err := r.Watch(rv)
if err != nil {
return err
return false, nil
}
// if the rollout isn't done yet, keep watching deployment status
// TODO: expose timeout
timeout := 0 * time.Second
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
intr := interrupt.New(nil, cancel)
return intr.Run(func() error {
_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
// print deployment's status
status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision)
if err != nil {
return false, err
_, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, func(e watch.Event) (bool, error) {
switch t := e.Type; t {
case watch.Added, watch.Modified:
status, done, err := statusViewer.Status(e.Object.(runtime.Unstructured), o.Revision)
if err != nil {
return false, err
}
fmt.Fprintf(o.Out, "%s", status)
// Quit waiting if the rollout is done
if done {
return true, nil
}
shouldWatch := o.Watch
if !shouldWatch {
return true, nil
}
return false, nil
case watch.Deleted:
// We need to abort to avoid cases of recreation and not to silently watch the wrong (new) object
return true, fmt.Errorf("object has been deleted")
default:
return true, fmt.Errorf("internal error: unexpected event %#v", e)
}
fmt.Fprintf(o.Out, "%s", status)
// Quit waiting if the rollout is done
if done {
return true, nil
}
return false, nil
})
return err
})

View File

@ -21,17 +21,18 @@ import (
appsv1 "k8s.io/api/apps/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/kubectl/scheme"
)
// StatusViewer provides an interface for resources that have rollout status.
type StatusViewer interface {
Status(namespace, name string, revision int64) (string, bool, error)
Status(obj runtime.Unstructured, revision int64) (string, bool, error)
}
// StatusViewerFor returns a StatusViewer for the resource specified by kind.
@ -63,11 +64,13 @@ type StatefulSetStatusViewer struct {
}
// Status returns a message describing deployment status, and a bool value indicating if the status is considered done.
func (s *DeploymentStatusViewer) Status(namespace, name string, revision int64) (string, bool, error) {
deployment, err := s.c.Deployments(namespace).Get(name, metav1.GetOptions{})
func (s *DeploymentStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
deployment := &appsv1.Deployment{}
err := scheme.Scheme.Convert(obj, deployment, nil)
if err != nil {
return "", false, err
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, deployment, err)
}
if revision > 0 {
deploymentRev, err := util.Revision(deployment)
if err != nil {
@ -80,51 +83,55 @@ func (s *DeploymentStatusViewer) Status(namespace, name string, revision int64)
if deployment.Generation <= deployment.Status.ObservedGeneration {
cond := util.GetDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing)
if cond != nil && cond.Reason == util.TimedOutReason {
return "", false, fmt.Errorf("deployment %q exceeded its progress deadline", name)
return "", false, fmt.Errorf("deployment %q exceeded its progress deadline", deployment.Name)
}
if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), false, nil
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), false, nil
}
if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), false, nil
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), false, nil
}
if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), false, nil
return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), false, nil
}
return fmt.Sprintf("deployment %q successfully rolled out\n", name), true, nil
return fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name), true, nil
}
return fmt.Sprintf("Waiting for deployment spec update to be observed...\n"), false, nil
}
// Status returns a message describing daemon set status, and a bool value indicating if the status is considered done.
func (s *DaemonSetStatusViewer) Status(namespace, name string, revision int64) (string, bool, error) {
func (s *DaemonSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
//ignoring revision as DaemonSets does not have history yet
daemon, err := s.c.DaemonSets(namespace).Get(name, metav1.GetOptions{})
daemon := &appsv1.DaemonSet{}
err := scheme.Scheme.Convert(obj, daemon, nil)
if err != nil {
return "", false, err
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, daemon, err)
}
if daemon.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType)
}
if daemon.Generation <= daemon.Status.ObservedGeneration {
if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled {
return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d out of %d new pods have been updated...\n", name, daemon.Status.UpdatedNumberScheduled, daemon.Status.DesiredNumberScheduled), false, nil
return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d out of %d new pods have been updated...\n", daemon.Name, daemon.Status.UpdatedNumberScheduled, daemon.Status.DesiredNumberScheduled), false, nil
}
if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled {
return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d of %d updated pods are available...\n", name, daemon.Status.NumberAvailable, daemon.Status.DesiredNumberScheduled), false, nil
return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d of %d updated pods are available...\n", daemon.Name, daemon.Status.NumberAvailable, daemon.Status.DesiredNumberScheduled), false, nil
}
return fmt.Sprintf("daemon set %q successfully rolled out\n", name), true, nil
return fmt.Sprintf("daemon set %q successfully rolled out\n", daemon.Name), true, nil
}
return fmt.Sprintf("Waiting for daemon set spec update to be observed...\n"), false, nil
}
// Status returns a message describing statefulset status, and a bool value indicating if the status is considered done.
func (s *StatefulSetStatusViewer) Status(namespace, name string, revision int64) (string, bool, error) {
sts, err := s.c.StatefulSets(namespace).Get(name, metav1.GetOptions{})
func (s *StatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) {
sts := &appsv1.StatefulSet{}
err := scheme.Scheme.Convert(obj, sts, nil)
if err != nil {
return "", false, err
return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, sts, err)
}
if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType)
}
@ -138,7 +145,7 @@ func (s *StatefulSetStatusViewer) Status(namespace, name string, revision int64)
if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) {
return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
sts.Status.UpdatedReplicas, (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition)), false, nil
sts.Status.UpdatedReplicas, *sts.Spec.Replicas-*sts.Spec.UpdateStrategy.RollingUpdate.Partition), false, nil
}
}
return fmt.Sprintf("partitioned roll out complete: %d new pods have been updated...\n",

View File

@ -23,7 +23,9 @@ import (
apps "k8s.io/api/apps/v1"
api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/kubectl/scheme"
)
func TestDeploymentStatusViewerStatus(t *testing.T) {
@ -126,9 +128,15 @@ func TestDeploymentStatusViewerStatus(t *testing.T) {
},
Status: test.status,
}
unstructuredD := &unstructured.Unstructured{}
err := scheme.Scheme.Convert(d, unstructuredD, nil)
if err != nil {
t.Fatal(err)
}
client := fake.NewSimpleClientset(d).Apps()
dsv := &DeploymentStatusViewer{c: client}
msg, done, err := dsv.Status("bar", "foo", 0)
msg, done, err := dsv.Status(unstructuredD, 0)
if err != nil {
t.Fatalf("DeploymentStatusViewer.Status(): %v", err)
}
@ -225,9 +233,16 @@ func TestDaemonSetStatusViewerStatus(t *testing.T) {
},
Status: test.status,
}
unstructuredD := &unstructured.Unstructured{}
err := scheme.Scheme.Convert(d, unstructuredD, nil)
if err != nil {
t.Fatal(err)
}
client := fake.NewSimpleClientset(d).Apps()
dsv := &DaemonSetStatusViewer{c: client}
msg, done, err := dsv.Status("bar", "foo", 0)
msg, done, err := dsv.Status(unstructuredD, 0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -370,9 +385,16 @@ func TestStatefulSetStatusViewerStatus(t *testing.T) {
s.Status = test.status
s.Spec.UpdateStrategy = test.strategy
s.Generation = test.generation
unstructuredS := &unstructured.Unstructured{}
err := scheme.Scheme.Convert(s, unstructuredS, nil)
if err != nil {
t.Fatal(err)
}
client := fake.NewSimpleClientset(s).AppsV1()
dsv := &StatefulSetStatusViewer{c: client}
msg, done, err := dsv.Status(s.Namespace, s.Name, 0)
msg, done, err := dsv.Status(unstructuredS, 0)
if test.err && err == nil {
t.Fatalf("%s: expected error", test.name)
}
@ -402,9 +424,16 @@ func TestDaemonSetStatusViewerStatusWithWrongUpdateStrategyType(t *testing.T) {
},
},
}
unstructuredD := &unstructured.Unstructured{}
err := scheme.Scheme.Convert(d, unstructuredD, nil)
if err != nil {
t.Fatal(err)
}
client := fake.NewSimpleClientset(d).Apps()
dsv := &DaemonSetStatusViewer{c: client}
msg, done, err := dsv.Status("bar", "foo", 0)
msg, done, err := dsv.Status(unstructuredD, 0)
errMsg := "rollout status is only available for RollingUpdate strategy type"
if err == nil || err.Error() != errMsg {
t.Errorf("Status for daemon sets with UpdateStrategy type different than RollingUpdate should return error. Instead got: msg: %s\ndone: %t\n err: %v", msg, done, err)