From 5ad518cd2b180db8be9c0f4be341b896b6778101 Mon Sep 17 00:00:00 2001 From: bindata-mockuser Date: Mon, 8 Aug 2016 19:56:19 +0200 Subject: [PATCH 1/2] Switch kubectl to use watch.Until --- pkg/client/restclient/request.go | 5 +- pkg/kubectl/cmd/get.go | 24 +++-- pkg/kubectl/cmd/rollout/rollout_status.go | 29 +++--- pkg/kubectl/cmd/run.go | 113 +++++++++++----------- pkg/kubectl/watchloop.go | 45 --------- pkg/watch/until.go | 3 +- pkg/watch/until_test.go | 27 ++++-- 7 files changed, 116 insertions(+), 130 deletions(-) delete mode 100644 pkg/kubectl/watchloop.go diff --git a/pkg/client/restclient/request.go b/pkg/client/restclient/request.go index 49fcf18cff0..1431ad4f3cb 100644 --- a/pkg/client/restclient/request.go +++ b/pkg/client/restclient/request.go @@ -357,8 +357,9 @@ var fieldMappings = versionToResourceToFieldMapping{ nodeUnschedulable: nodeUnschedulable, }, "pods": clientFieldNameToAPIVersionFieldName{ - podHost: podHost, - podStatus: podStatus, + objectNameField: objectNameField, + podHost: podHost, + podStatus: podStatus, }, "secrets": clientFieldNameToAPIVersionFieldName{ secretType: secretType, diff --git a/pkg/kubectl/cmd/get.go b/pkg/kubectl/cmd/get.go index 1ac627dd441..7b803a6f73a 100644 --- a/pkg/kubectl/cmd/get.go +++ b/pkg/kubectl/cmd/get.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/runtime" utilerrors "k8s.io/kubernetes/pkg/util/errors" + "k8s.io/kubernetes/pkg/util/interrupt" "k8s.io/kubernetes/pkg/watch" ) @@ -266,17 +267,22 @@ func RunGet(f *cmdutil.Factory, out io.Writer, errOut io.Writer, cmd *cobra.Comm first := true filteredResourceCount = 0 - kubectl.WatchLoop(w, func(e watch.Event) error { - if !isList && first { - // drop the initial watch event in the single resource case - first = false - return nil - } - err := printer.PrintObj(e.Object, out) - if err == nil { + intr := interrupt.New(nil, w.Stop) + intr.Run(func() error { + _, err := watch.Until(0, w, func(e watch.Event) (bool, error) { + if !isList && first { + // drop the initial watch event in the single resource case + first = false + return false, nil + } + err := printer.PrintObj(e.Object, out) + if err != nil { + return false, err + } filteredResourceCount++ cmdutil.PrintFilterCount(filteredResourceCount, mapping.Resource, errOut, filterOpts) - } + return false, nil + }) return err }) return nil diff --git a/pkg/kubectl/cmd/rollout/rollout_status.go b/pkg/kubectl/cmd/rollout/rollout_status.go index 490540bd27d..86b9aeb5a5a 100644 --- a/pkg/kubectl/cmd/rollout/rollout_status.go +++ b/pkg/kubectl/cmd/rollout/rollout_status.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/kubectl" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" + "k8s.io/kubernetes/pkg/util/interrupt" "k8s.io/kubernetes/pkg/watch" "github.com/spf13/cobra" @@ -125,18 +126,22 @@ func RunStatus(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, args []str } // if the rollout isn't done yet, keep watching deployment status - kubectl.WatchLoop(w, func(e watch.Event) error { - // print deployment's status - status, done, err := statusViewer.Status(cmdNamespace, info.Name) - if err != nil { - return err - } - fmt.Fprintf(out, "%s", status) - // Quit waiting if the rollout is done - if done { - w.Stop() - } - return nil + intr := interrupt.New(nil, w.Stop) + intr.Run(func() error { + _, err := watch.Until(0, w, func(e watch.Event) (bool, error) { + // print deployment's status + status, done, err := statusViewer.Status(cmdNamespace, info.Name) + if err != nil { + return false, err + } + fmt.Fprintf(out, "%s", status) + // Quit waiting if the rollout is done + if done { + return true, nil + } + return false, nil + }) + return err }) return nil } diff --git a/pkg/kubectl/cmd/run.go b/pkg/kubectl/cmd/run.go index b6fce73636a..24d8f88c8b5 100644 --- a/pkg/kubectl/cmd/run.go +++ b/pkg/kubectl/cmd/run.go @@ -28,16 +28,19 @@ import ( "github.com/docker/distribution/reference" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/unversioned" batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + conditions "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/kubectl" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/runtime" uexec "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/interrupt" "k8s.io/kubernetes/pkg/watch" ) @@ -372,90 +375,90 @@ func contains(resourcesList map[string]*unversioned.APIResourceList, resource un // waitForPod watches the given pod until the exitCondition is true. Each two seconds // the tick function is called e.g. for progress output. -func waitForPod(podClient coreclient.PodsGetter, ns, name string, exitCondition func(*api.Pod) bool, tick func(*api.Pod)) (*api.Pod, error) { - pod, err := podClient.Pods(ns).Get(name) - if err != nil { - return nil, err - } - if exitCondition(pod) { - return pod, nil - } - - tick(pod) - - w, err := podClient.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: pod.Name, ResourceVersion: pod.ResourceVersion})) +func waitForPod(podClient coreclient.PodsGetter, ns, name string, exitCondition watch.ConditionFunc, tick func(*api.Pod)) (*api.Pod, error) { + w, err := podClient.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: name})) if err != nil { return nil, err } - t := time.NewTicker(2 * time.Second) - defer t.Stop() + pods := make(chan *api.Pod) // observed pods passed to the exitCondition + defer close(pods) + + // wait for the first event, then start the 2 sec ticker and loop go func() { - for range t.C { - tick(pod) + pod := <-pods + if pod == nil { + return + } + tick(pod) + + t := time.NewTicker(2 * time.Second) + defer t.Stop() + + for { + select { + case pod = <-pods: + if pod == nil { + return + } + case _, ok := <-t.C: + if !ok { + return + } + tick(pod) + } } }() - err = nil - result := pod - kubectl.WatchLoop(w, func(ev watch.Event) error { - switch ev.Type { - case watch.Added, watch.Modified: - pod = ev.Object.(*api.Pod) - if exitCondition(pod) { - result = pod - w.Stop() + intr := interrupt.New(nil, w.Stop) + var result *api.Pod + intr.Run(func() error { + ev, err := watch.Until(0, w, func(ev watch.Event) (bool, error) { + c, err := exitCondition(ev) + if c == false && err == nil { + pods <- ev.Object.(*api.Pod) // send to ticker } - case watch.Deleted: - w.Stop() - case watch.Error: - result = nil - err = fmt.Errorf("failed to watch pod %s/%s", ns, name) - w.Stop() - } - return nil + return c, err + }) + result = ev.Object.(*api.Pod) + return err }) - return result, err } func waitForPodRunning(podClient coreclient.PodsGetter, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) { - exitCondition := func(pod *api.Pod) bool { - switch pod.Status.Phase { - case api.PodRunning: - for _, status := range pod.Status.ContainerStatuses { - if !status.Ready { - return false - } - } - return true - case api.PodSucceeded, api.PodFailed: - return true - default: - return false - } - } - return waitForPod(podClient, ns, name, exitCondition, func(pod *api.Pod) { + pod, err := waitForPod(podClient, ns, name, conditions.PodRunningAndReady, func(pod *api.Pod) { if !quiet { fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: false\n", pod.Namespace, pod.Name, pod.Status.Phase) } }) + + // fix generic not found error with empty name in PodRunningAndReady + if err != nil && errors.IsNotFound(err) { + return nil, errors.NewNotFound(api.Resource("pods"), name) + } + + return pod, err } func waitForPodTerminated(podClient coreclient.PodsGetter, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) { - exitCondition := func(pod *api.Pod) bool { - return pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed - } - return waitForPod(podClient, ns, name, exitCondition, func(pod *api.Pod) { + pod, err := waitForPod(podClient, ns, name, conditions.PodCompleted, func(pod *api.Pod) { if !quiet { fmt.Fprintf(out, "Waiting for pod %s/%s to terminate, status is %s\n", pod.Namespace, pod.Name, pod.Status.Phase) } }) + + // fix generic not found error with empty name in PodCompleted + if err != nil && errors.IsNotFound(err) { + return nil, errors.NewNotFound(api.Resource("pods"), name) + } + + return pod, err } func handleAttachPod(f *cmdutil.Factory, podClient coreclient.PodsGetter, ns, name string, opts *AttachOptions, quiet bool) error { pod, err := waitForPodRunning(podClient, ns, name, opts.Out, quiet) - if err != nil { + if err != nil && err != conditions.ErrPodCompleted { return err } ctrName, err := opts.GetContainerName(pod) diff --git a/pkg/kubectl/watchloop.go b/pkg/kubectl/watchloop.go deleted file mode 100644 index 2f814a61ccf..00000000000 --- a/pkg/kubectl/watchloop.go +++ /dev/null @@ -1,45 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubectl - -import ( - "os" - "os/signal" - - "k8s.io/kubernetes/pkg/watch" -) - -// WatchLoop loops, passing events in w to fn. -// If user sends interrupt signal, shut down cleanly. Otherwise, never return. -func WatchLoop(w watch.Interface, fn func(watch.Event) error) { - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) - defer signal.Stop(signals) - for { - select { - case event, ok := <-w.ResultChan(): - if !ok { - return - } - if err := fn(event); err != nil { - w.Stop() - } - case <-signals: - w.Stop() - } - } -} diff --git a/pkg/watch/until.go b/pkg/watch/until.go index 4259f51bb5c..da4ebc49eef 100644 --- a/pkg/watch/until.go +++ b/pkg/watch/until.go @@ -32,6 +32,7 @@ type ConditionFunc func(event Event) (bool, error) // encountered. The first condition that returns an error terminates the watch (and the event is also returned). // If no event has been received, the returned event will be nil. // Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. +// A zero timeout means to wait forever. func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) { ch := watcher.ResultChan() defer watcher.Stop() @@ -40,7 +41,7 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc after = time.After(timeout) } else { ch := make(chan time.Time) - close(ch) + defer close(ch) after = ch } var lastEvent *Event diff --git a/pkg/watch/until_test.go b/pkg/watch/until_test.go index bfd5ce70fa5..1e72f70cc3c 100644 --- a/pkg/watch/until_test.go +++ b/pkg/watch/until_test.go @@ -23,7 +23,6 @@ import ( "time" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/util/wait" ) func TestUntil(t *testing.T) { @@ -83,17 +82,33 @@ func TestUntilMultipleConditions(t *testing.T) { func TestUntilTimeout(t *testing.T) { fw := NewFake() + go func() { + var obj *api.Pod + fw.Add(obj) + fw.Modify(obj) + }() conditions := []ConditionFunc{ - func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event Event) (bool, error) { + return event.Type == Added, nil + }, + func(event Event) (bool, error) { + return event.Type == Modified, nil + }, } timeout := time.Duration(0) lastEvent, err := Until(timeout, fw, conditions...) - if err != wait.ErrWaitTimeout { - t.Fatalf("expected ErrWaitTimeout error, got %#v", err) + if err != nil { + t.Fatalf("expected nil error, got %#v", err) } - if lastEvent != nil { - t.Fatalf("expected nil event, got %#v", lastEvent) + if lastEvent == nil { + t.Fatal("expected an event") + } + if lastEvent.Type != Modified { + t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) + } + if got, isPod := lastEvent.Object.(*api.Pod); !isPod { + t.Fatalf("expected a pod event, got %#v", got) } } From 40b7fabaf25858bbef6bc492ab0e5a472784d8da Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Mon, 3 Oct 2016 17:41:49 +0200 Subject: [PATCH 2/2] kubectl: return exit status appropriately after running watch.Until --- pkg/kubectl/cmd/rollout/rollout_status.go | 3 +-- pkg/kubectl/cmd/run.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/kubectl/cmd/rollout/rollout_status.go b/pkg/kubectl/cmd/rollout/rollout_status.go index 86b9aeb5a5a..1d47e82a7d0 100644 --- a/pkg/kubectl/cmd/rollout/rollout_status.go +++ b/pkg/kubectl/cmd/rollout/rollout_status.go @@ -127,7 +127,7 @@ func RunStatus(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, args []str // if the rollout isn't done yet, keep watching deployment status intr := interrupt.New(nil, w.Stop) - intr.Run(func() error { + return intr.Run(func() error { _, err := watch.Until(0, w, func(e watch.Event) (bool, error) { // print deployment's status status, done, err := statusViewer.Status(cmdNamespace, info.Name) @@ -143,5 +143,4 @@ func RunStatus(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, args []str }) return err }) - return nil } diff --git a/pkg/kubectl/cmd/run.go b/pkg/kubectl/cmd/run.go index 24d8f88c8b5..c6e0d3e7e64 100644 --- a/pkg/kubectl/cmd/run.go +++ b/pkg/kubectl/cmd/run.go @@ -412,7 +412,7 @@ func waitForPod(podClient coreclient.PodsGetter, ns, name string, exitCondition intr := interrupt.New(nil, w.Stop) var result *api.Pod - intr.Run(func() error { + err = intr.Run(func() error { ev, err := watch.Until(0, w, func(ev watch.Event) (bool, error) { c, err := exitCondition(ev) if c == false && err == nil {