diff --git a/pkg/kubectl/BUILD b/pkg/kubectl/BUILD index d87986be3a8..d9cb4e0774b 100644 --- a/pkg/kubectl/BUILD +++ b/pkg/kubectl/BUILD @@ -122,6 +122,7 @@ go_library( "//pkg/controller/deployment/util:go_default_library", "//pkg/credentialprovider:go_default_library", "//pkg/kubectl/apps:go_default_library", + "//pkg/kubectl/scheme:go_default_library", "//pkg/kubectl/util:go_default_library", "//pkg/kubectl/util/hash:go_default_library", "//pkg/kubectl/util/slice:go_default_library", diff --git a/pkg/kubectl/cmd/rollout/BUILD b/pkg/kubectl/cmd/rollout/BUILD index c53a88500e0..3860f24fdac 100644 --- a/pkg/kubectl/cmd/rollout/BUILD +++ b/pkg/kubectl/cmd/rollout/BUILD @@ -28,13 +28,20 @@ go_library( "//pkg/kubectl/scheme:go_default_library", "//pkg/kubectl/util/i18n:go_default_library", "//pkg/util/interrupt:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/printers:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/resource:go_default_library", + "//staging/src/k8s.io/client-go/dynamic:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/watch:go_default_library", "//vendor/github.com/renstrom/dedent:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library", diff --git a/pkg/kubectl/cmd/rollout/rollout_status.go b/pkg/kubectl/cmd/rollout/rollout_status.go index e231497d6a8..160f4e845b3 100644 --- a/pkg/kubectl/cmd/rollout/rollout_status.go +++ b/pkg/kubectl/cmd/rollout/rollout_status.go @@ -23,10 +23,17 @@ import ( "github.com/spf13/cobra" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions/resource" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" @@ -62,9 +69,11 @@ type RolloutStatusOptions struct { Watch bool Revision int64 + Timeout time.Duration - StatusViewer func(*meta.RESTMapping) (kubectl.StatusViewer, error) - Builder func() *resource.Builder + StatusViewer func(*meta.RESTMapping) (kubectl.StatusViewer, error) + Builder func() *resource.Builder + DynamicClient dynamic.Interface FilenameOptions *resource.FilenameOptions genericclioptions.IOStreams @@ -76,6 +85,7 @@ func NewRolloutStatusOptions(streams genericclioptions.IOStreams) *RolloutStatus FilenameOptions: &resource.FilenameOptions{}, IOStreams: streams, Watch: true, + Timeout: 0, } } @@ -102,6 +112,7 @@ func NewCmdRolloutStatus(f cmdutil.Factory, streams genericclioptions.IOStreams) cmdutil.AddFilenameOptionFlags(cmd, o.FilenameOptions, usage) cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "Watch the status of the rollout until it's done.") cmd.Flags().Int64Var(&o.Revision, "revision", o.Revision, "Pin to a specific revision for showing its status. Defaults to 0 (last revision).") + cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h).") return cmd } @@ -119,6 +130,17 @@ func (o *RolloutStatusOptions) Complete(f cmdutil.Factory, args []string) error o.StatusViewer = func(mapping *meta.RESTMapping) (kubectl.StatusViewer, error) { return polymorphichelpers.StatusViewerFn(f, mapping) } + + clientConfig, err := f.ToRESTConfig() + if err != nil { + return err + } + + o.DynamicClient, err = dynamic.NewForConfig(clientConfig) + if err != nil { + return err + } + return nil } @@ -158,60 +180,68 @@ func (o *RolloutStatusOptions) Run() error { info := infos[0] mapping := info.ResourceMapping() - obj, err := r.Object() - if err != nil { - return err - } - rv, err := meta.NewAccessor().ResourceVersion(obj) - if err != nil { - return err - } - statusViewer, err := o.StatusViewer(mapping) if err != nil { return err } - // check if deployment's has finished the rollout - status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision) - if err != nil { - return err - } - fmt.Fprintf(o.Out, "%s", status) - if done { - return nil + fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(options) + }, } - shouldWatch := o.Watch - if !shouldWatch { - return nil - } + preconditionFunc := func(store cache.Store) (bool, error) { + _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name}) + if err != nil { + return true, err + } + if !exists { + // We need to make sure we see the object in the cache before we start waiting for events + // or we would be waiting for the timeout if such object didn't exist. + return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name) + } - // watch for changes to the deployment - w, err := r.Watch(rv) - if err != nil { - return err + return false, nil } // if the rollout isn't done yet, keep watching deployment status - // TODO: expose timeout - timeout := 0 * time.Second - ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout) intr := interrupt.New(nil, cancel) return intr.Run(func() error { - _, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) { - // print deployment's status - status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision) - if err != nil { - return false, err + _, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, func(e watch.Event) (bool, error) { + switch t := e.Type; t { + case watch.Added, watch.Modified: + status, done, err := statusViewer.Status(e.Object.(runtime.Unstructured), o.Revision) + if err != nil { + return false, err + } + fmt.Fprintf(o.Out, "%s", status) + // Quit waiting if the rollout is done + if done { + return true, nil + } + + shouldWatch := o.Watch + if !shouldWatch { + return true, nil + } + + return false, nil + + case watch.Deleted: + // We need to abort to avoid cases of recreation and not to silently watch the wrong (new) object + return true, fmt.Errorf("object has been deleted") + + default: + return true, fmt.Errorf("internal error: unexpected event %#v", e) } - fmt.Fprintf(o.Out, "%s", status) - // Quit waiting if the rollout is done - if done { - return true, nil - } - return false, nil }) return err }) diff --git a/pkg/kubectl/rollout_status.go b/pkg/kubectl/rollout_status.go index fbaa0419f12..184cb892494 100644 --- a/pkg/kubectl/rollout_status.go +++ b/pkg/kubectl/rollout_status.go @@ -21,17 +21,18 @@ import ( appsv1 "k8s.io/api/apps/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/controller/deployment/util" + "k8s.io/kubernetes/pkg/kubectl/scheme" ) // StatusViewer provides an interface for resources that have rollout status. type StatusViewer interface { - Status(namespace, name string, revision int64) (string, bool, error) + Status(obj runtime.Unstructured, revision int64) (string, bool, error) } // StatusViewerFor returns a StatusViewer for the resource specified by kind. @@ -63,11 +64,13 @@ type StatefulSetStatusViewer struct { } // Status returns a message describing deployment status, and a bool value indicating if the status is considered done. -func (s *DeploymentStatusViewer) Status(namespace, name string, revision int64) (string, bool, error) { - deployment, err := s.c.Deployments(namespace).Get(name, metav1.GetOptions{}) +func (s *DeploymentStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) { + deployment := &appsv1.Deployment{} + err := scheme.Scheme.Convert(obj, deployment, nil) if err != nil { - return "", false, err + return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, deployment, err) } + if revision > 0 { deploymentRev, err := util.Revision(deployment) if err != nil { @@ -80,51 +83,55 @@ func (s *DeploymentStatusViewer) Status(namespace, name string, revision int64) if deployment.Generation <= deployment.Status.ObservedGeneration { cond := util.GetDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing) if cond != nil && cond.Reason == util.TimedOutReason { - return "", false, fmt.Errorf("deployment %q exceeded its progress deadline", name) + return "", false, fmt.Errorf("deployment %q exceeded its progress deadline", deployment.Name) } if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas { - return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), false, nil + return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n", deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), false, nil } if deployment.Status.Replicas > deployment.Status.UpdatedReplicas { - return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), false, nil + return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n", deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), false, nil } if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas { - return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), false, nil + return fmt.Sprintf("Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n", deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), false, nil } - return fmt.Sprintf("deployment %q successfully rolled out\n", name), true, nil + return fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name), true, nil } return fmt.Sprintf("Waiting for deployment spec update to be observed...\n"), false, nil } // Status returns a message describing daemon set status, and a bool value indicating if the status is considered done. -func (s *DaemonSetStatusViewer) Status(namespace, name string, revision int64) (string, bool, error) { +func (s *DaemonSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) { //ignoring revision as DaemonSets does not have history yet - daemon, err := s.c.DaemonSets(namespace).Get(name, metav1.GetOptions{}) + daemon := &appsv1.DaemonSet{} + err := scheme.Scheme.Convert(obj, daemon, nil) if err != nil { - return "", false, err + return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, daemon, err) } + if daemon.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType { return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType) } if daemon.Generation <= daemon.Status.ObservedGeneration { if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled { - return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d out of %d new pods have been updated...\n", name, daemon.Status.UpdatedNumberScheduled, daemon.Status.DesiredNumberScheduled), false, nil + return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d out of %d new pods have been updated...\n", daemon.Name, daemon.Status.UpdatedNumberScheduled, daemon.Status.DesiredNumberScheduled), false, nil } if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled { - return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d of %d updated pods are available...\n", name, daemon.Status.NumberAvailable, daemon.Status.DesiredNumberScheduled), false, nil + return fmt.Sprintf("Waiting for daemon set %q rollout to finish: %d of %d updated pods are available...\n", daemon.Name, daemon.Status.NumberAvailable, daemon.Status.DesiredNumberScheduled), false, nil } - return fmt.Sprintf("daemon set %q successfully rolled out\n", name), true, nil + return fmt.Sprintf("daemon set %q successfully rolled out\n", daemon.Name), true, nil } return fmt.Sprintf("Waiting for daemon set spec update to be observed...\n"), false, nil } // Status returns a message describing statefulset status, and a bool value indicating if the status is considered done. -func (s *StatefulSetStatusViewer) Status(namespace, name string, revision int64) (string, bool, error) { - sts, err := s.c.StatefulSets(namespace).Get(name, metav1.GetOptions{}) +func (s *StatefulSetStatusViewer) Status(obj runtime.Unstructured, revision int64) (string, bool, error) { + sts := &appsv1.StatefulSet{} + err := scheme.Scheme.Convert(obj, sts, nil) if err != nil { - return "", false, err + return "", false, fmt.Errorf("failed to convert %T to %T: %v", obj, sts, err) } + if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType { return "", true, fmt.Errorf("rollout status is only available for %s strategy type", appsv1.RollingUpdateStatefulSetStrategyType) } @@ -138,7 +145,7 @@ func (s *StatefulSetStatusViewer) Status(namespace, name string, revision int64) if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil { if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) { return fmt.Sprintf("Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n", - sts.Status.UpdatedReplicas, (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition)), false, nil + sts.Status.UpdatedReplicas, *sts.Spec.Replicas-*sts.Spec.UpdateStrategy.RollingUpdate.Partition), false, nil } } return fmt.Sprintf("partitioned roll out complete: %d new pods have been updated...\n", diff --git a/pkg/kubectl/rollout_status_test.go b/pkg/kubectl/rollout_status_test.go index 2f7951326b9..08ff610dd7c 100644 --- a/pkg/kubectl/rollout_status_test.go +++ b/pkg/kubectl/rollout_status_test.go @@ -23,7 +23,9 @@ import ( apps "k8s.io/api/apps/v1" api "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/kubectl/scheme" ) func TestDeploymentStatusViewerStatus(t *testing.T) { @@ -126,9 +128,15 @@ func TestDeploymentStatusViewerStatus(t *testing.T) { }, Status: test.status, } + unstructuredD := &unstructured.Unstructured{} + err := scheme.Scheme.Convert(d, unstructuredD, nil) + if err != nil { + t.Fatal(err) + } + client := fake.NewSimpleClientset(d).Apps() dsv := &DeploymentStatusViewer{c: client} - msg, done, err := dsv.Status("bar", "foo", 0) + msg, done, err := dsv.Status(unstructuredD, 0) if err != nil { t.Fatalf("DeploymentStatusViewer.Status(): %v", err) } @@ -225,9 +233,16 @@ func TestDaemonSetStatusViewerStatus(t *testing.T) { }, Status: test.status, } + + unstructuredD := &unstructured.Unstructured{} + err := scheme.Scheme.Convert(d, unstructuredD, nil) + if err != nil { + t.Fatal(err) + } + client := fake.NewSimpleClientset(d).Apps() dsv := &DaemonSetStatusViewer{c: client} - msg, done, err := dsv.Status("bar", "foo", 0) + msg, done, err := dsv.Status(unstructuredD, 0) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -370,9 +385,16 @@ func TestStatefulSetStatusViewerStatus(t *testing.T) { s.Status = test.status s.Spec.UpdateStrategy = test.strategy s.Generation = test.generation + + unstructuredS := &unstructured.Unstructured{} + err := scheme.Scheme.Convert(s, unstructuredS, nil) + if err != nil { + t.Fatal(err) + } + client := fake.NewSimpleClientset(s).AppsV1() dsv := &StatefulSetStatusViewer{c: client} - msg, done, err := dsv.Status(s.Namespace, s.Name, 0) + msg, done, err := dsv.Status(unstructuredS, 0) if test.err && err == nil { t.Fatalf("%s: expected error", test.name) } @@ -402,9 +424,16 @@ func TestDaemonSetStatusViewerStatusWithWrongUpdateStrategyType(t *testing.T) { }, }, } + + unstructuredD := &unstructured.Unstructured{} + err := scheme.Scheme.Convert(d, unstructuredD, nil) + if err != nil { + t.Fatal(err) + } + client := fake.NewSimpleClientset(d).Apps() dsv := &DaemonSetStatusViewer{c: client} - msg, done, err := dsv.Status("bar", "foo", 0) + msg, done, err := dsv.Status(unstructuredD, 0) errMsg := "rollout status is only available for RollingUpdate strategy type" if err == nil || err.Error() != errMsg { t.Errorf("Status for daemon sets with UpdateStrategy type different than RollingUpdate should return error. Instead got: msg: %s\ndone: %t\n err: %v", msg, done, err)