mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Merge pull request #33942 from kargakis/sttts-kubectl-run-watch-until
Automatic merge from submit-queue Switch kubectl to use watch.Until Squashed version of https://github.com/kubernetes/kubernetes/pull/30235 @ncdc PTAL cc: @kubernetes/kubectl
This commit is contained in:
commit
0798f02391
@ -357,6 +357,7 @@ var fieldMappings = versionToResourceToFieldMapping{
|
|||||||
nodeUnschedulable: nodeUnschedulable,
|
nodeUnschedulable: nodeUnschedulable,
|
||||||
},
|
},
|
||||||
"pods": clientFieldNameToAPIVersionFieldName{
|
"pods": clientFieldNameToAPIVersionFieldName{
|
||||||
|
objectNameField: objectNameField,
|
||||||
podHost: podHost,
|
podHost: podHost,
|
||||||
podStatus: podStatus,
|
podStatus: podStatus,
|
||||||
},
|
},
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubectl/resource"
|
"k8s.io/kubernetes/pkg/kubectl/resource"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||||
|
"k8s.io/kubernetes/pkg/util/interrupt"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -266,17 +267,22 @@ func RunGet(f *cmdutil.Factory, out io.Writer, errOut io.Writer, cmd *cobra.Comm
|
|||||||
|
|
||||||
first := true
|
first := true
|
||||||
filteredResourceCount = 0
|
filteredResourceCount = 0
|
||||||
kubectl.WatchLoop(w, func(e watch.Event) error {
|
intr := interrupt.New(nil, w.Stop)
|
||||||
|
intr.Run(func() error {
|
||||||
|
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
|
||||||
if !isList && first {
|
if !isList && first {
|
||||||
// drop the initial watch event in the single resource case
|
// drop the initial watch event in the single resource case
|
||||||
first = false
|
first = false
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
err := printer.PrintObj(e.Object, out)
|
err := printer.PrintObj(e.Object, out)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
filteredResourceCount++
|
filteredResourceCount++
|
||||||
cmdutil.PrintFilterCount(filteredResourceCount, mapping.Resource, errOut, filterOpts)
|
cmdutil.PrintFilterCount(filteredResourceCount, mapping.Resource, errOut, filterOpts)
|
||||||
}
|
return false, nil
|
||||||
|
})
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubectl"
|
"k8s.io/kubernetes/pkg/kubectl"
|
||||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
"k8s.io/kubernetes/pkg/kubectl/resource"
|
"k8s.io/kubernetes/pkg/kubectl/resource"
|
||||||
|
"k8s.io/kubernetes/pkg/util/interrupt"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
@ -125,18 +126,21 @@ func RunStatus(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, args []str
|
|||||||
}
|
}
|
||||||
|
|
||||||
// if the rollout isn't done yet, keep watching deployment status
|
// if the rollout isn't done yet, keep watching deployment status
|
||||||
kubectl.WatchLoop(w, func(e watch.Event) error {
|
intr := interrupt.New(nil, w.Stop)
|
||||||
|
return intr.Run(func() error {
|
||||||
|
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
|
||||||
// print deployment's status
|
// print deployment's status
|
||||||
status, done, err := statusViewer.Status(cmdNamespace, info.Name)
|
status, done, err := statusViewer.Status(cmdNamespace, info.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
fmt.Fprintf(out, "%s", status)
|
fmt.Fprintf(out, "%s", status)
|
||||||
// Quit waiting if the rollout is done
|
// Quit waiting if the rollout is done
|
||||||
if done {
|
if done {
|
||||||
w.Stop()
|
return true, nil
|
||||||
}
|
}
|
||||||
return nil
|
return false, nil
|
||||||
|
})
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -28,16 +28,19 @@ import (
|
|||||||
"github.com/docker/distribution/reference"
|
"github.com/docker/distribution/reference"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
"k8s.io/kubernetes/pkg/api/meta"
|
"k8s.io/kubernetes/pkg/api/meta"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
|
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
|
||||||
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||||
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
||||||
|
conditions "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/kubectl"
|
"k8s.io/kubernetes/pkg/kubectl"
|
||||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
"k8s.io/kubernetes/pkg/kubectl/resource"
|
"k8s.io/kubernetes/pkg/kubectl/resource"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
uexec "k8s.io/kubernetes/pkg/util/exec"
|
uexec "k8s.io/kubernetes/pkg/util/exec"
|
||||||
|
"k8s.io/kubernetes/pkg/util/interrupt"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -372,90 +375,90 @@ func contains(resourcesList map[string]*unversioned.APIResourceList, resource un
|
|||||||
|
|
||||||
// waitForPod watches the given pod until the exitCondition is true. Each two seconds
|
// waitForPod watches the given pod until the exitCondition is true. Each two seconds
|
||||||
// the tick function is called e.g. for progress output.
|
// the tick function is called e.g. for progress output.
|
||||||
func waitForPod(podClient coreclient.PodsGetter, ns, name string, exitCondition func(*api.Pod) bool, tick func(*api.Pod)) (*api.Pod, error) {
|
func waitForPod(podClient coreclient.PodsGetter, ns, name string, exitCondition watch.ConditionFunc, tick func(*api.Pod)) (*api.Pod, error) {
|
||||||
pod, err := podClient.Pods(ns).Get(name)
|
w, err := podClient.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: name}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if exitCondition(pod) {
|
|
||||||
return pod, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
pods := make(chan *api.Pod) // observed pods passed to the exitCondition
|
||||||
|
defer close(pods)
|
||||||
|
|
||||||
|
// wait for the first event, then start the 2 sec ticker and loop
|
||||||
|
go func() {
|
||||||
|
pod := <-pods
|
||||||
|
if pod == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
tick(pod)
|
tick(pod)
|
||||||
|
|
||||||
w, err := podClient.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: pod.Name, ResourceVersion: pod.ResourceVersion}))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
t := time.NewTicker(2 * time.Second)
|
t := time.NewTicker(2 * time.Second)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
go func() {
|
|
||||||
for range t.C {
|
for {
|
||||||
|
select {
|
||||||
|
case pod = <-pods:
|
||||||
|
if pod == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case _, ok := <-t.C:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
tick(pod)
|
tick(pod)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = nil
|
intr := interrupt.New(nil, w.Stop)
|
||||||
result := pod
|
var result *api.Pod
|
||||||
kubectl.WatchLoop(w, func(ev watch.Event) error {
|
err = intr.Run(func() error {
|
||||||
switch ev.Type {
|
ev, err := watch.Until(0, w, func(ev watch.Event) (bool, error) {
|
||||||
case watch.Added, watch.Modified:
|
c, err := exitCondition(ev)
|
||||||
pod = ev.Object.(*api.Pod)
|
if c == false && err == nil {
|
||||||
if exitCondition(pod) {
|
pods <- ev.Object.(*api.Pod) // send to ticker
|
||||||
result = pod
|
|
||||||
w.Stop()
|
|
||||||
}
|
}
|
||||||
case watch.Deleted:
|
return c, err
|
||||||
w.Stop()
|
})
|
||||||
case watch.Error:
|
result = ev.Object.(*api.Pod)
|
||||||
result = nil
|
return err
|
||||||
err = fmt.Errorf("failed to watch pod %s/%s", ns, name)
|
|
||||||
w.Stop()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForPodRunning(podClient coreclient.PodsGetter, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
|
func waitForPodRunning(podClient coreclient.PodsGetter, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
|
||||||
exitCondition := func(pod *api.Pod) bool {
|
pod, err := waitForPod(podClient, ns, name, conditions.PodRunningAndReady, func(pod *api.Pod) {
|
||||||
switch pod.Status.Phase {
|
|
||||||
case api.PodRunning:
|
|
||||||
for _, status := range pod.Status.ContainerStatuses {
|
|
||||||
if !status.Ready {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
case api.PodSucceeded, api.PodFailed:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return waitForPod(podClient, ns, name, exitCondition, func(pod *api.Pod) {
|
|
||||||
if !quiet {
|
if !quiet {
|
||||||
fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: false\n", pod.Namespace, pod.Name, pod.Status.Phase)
|
fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: false\n", pod.Namespace, pod.Name, pod.Status.Phase)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// fix generic not found error with empty name in PodRunningAndReady
|
||||||
|
if err != nil && errors.IsNotFound(err) {
|
||||||
|
return nil, errors.NewNotFound(api.Resource("pods"), name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return pod, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForPodTerminated(podClient coreclient.PodsGetter, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
|
func waitForPodTerminated(podClient coreclient.PodsGetter, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
|
||||||
exitCondition := func(pod *api.Pod) bool {
|
pod, err := waitForPod(podClient, ns, name, conditions.PodCompleted, func(pod *api.Pod) {
|
||||||
return pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed
|
|
||||||
}
|
|
||||||
return waitForPod(podClient, ns, name, exitCondition, func(pod *api.Pod) {
|
|
||||||
if !quiet {
|
if !quiet {
|
||||||
fmt.Fprintf(out, "Waiting for pod %s/%s to terminate, status is %s\n", pod.Namespace, pod.Name, pod.Status.Phase)
|
fmt.Fprintf(out, "Waiting for pod %s/%s to terminate, status is %s\n", pod.Namespace, pod.Name, pod.Status.Phase)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// fix generic not found error with empty name in PodCompleted
|
||||||
|
if err != nil && errors.IsNotFound(err) {
|
||||||
|
return nil, errors.NewNotFound(api.Resource("pods"), name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return pod, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleAttachPod(f *cmdutil.Factory, podClient coreclient.PodsGetter, ns, name string, opts *AttachOptions, quiet bool) error {
|
func handleAttachPod(f *cmdutil.Factory, podClient coreclient.PodsGetter, ns, name string, opts *AttachOptions, quiet bool) error {
|
||||||
pod, err := waitForPodRunning(podClient, ns, name, opts.Out, quiet)
|
pod, err := waitForPodRunning(podClient, ns, name, opts.Out, quiet)
|
||||||
if err != nil {
|
if err != nil && err != conditions.ErrPodCompleted {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ctrName, err := opts.GetContainerName(pod)
|
ctrName, err := opts.GetContainerName(pod)
|
||||||
|
@ -1,45 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2014 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package kubectl
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
|
||||||
)
|
|
||||||
|
|
||||||
// WatchLoop loops, passing events in w to fn.
|
|
||||||
// If user sends interrupt signal, shut down cleanly. Otherwise, never return.
|
|
||||||
func WatchLoop(w watch.Interface, fn func(watch.Event) error) {
|
|
||||||
signals := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(signals, os.Interrupt)
|
|
||||||
defer signal.Stop(signals)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event, ok := <-w.ResultChan():
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := fn(event); err != nil {
|
|
||||||
w.Stop()
|
|
||||||
}
|
|
||||||
case <-signals:
|
|
||||||
w.Stop()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -32,6 +32,7 @@ type ConditionFunc func(event Event) (bool, error)
|
|||||||
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
|
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
|
||||||
// If no event has been received, the returned event will be nil.
|
// If no event has been received, the returned event will be nil.
|
||||||
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
|
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
|
||||||
|
// A zero timeout means to wait forever.
|
||||||
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
|
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
|
||||||
ch := watcher.ResultChan()
|
ch := watcher.ResultChan()
|
||||||
defer watcher.Stop()
|
defer watcher.Stop()
|
||||||
@ -40,7 +41,7 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
|
|||||||
after = time.After(timeout)
|
after = time.After(timeout)
|
||||||
} else {
|
} else {
|
||||||
ch := make(chan time.Time)
|
ch := make(chan time.Time)
|
||||||
close(ch)
|
defer close(ch)
|
||||||
after = ch
|
after = ch
|
||||||
}
|
}
|
||||||
var lastEvent *Event
|
var lastEvent *Event
|
||||||
|
@ -23,7 +23,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestUntil(t *testing.T) {
|
func TestUntil(t *testing.T) {
|
||||||
@ -83,17 +82,33 @@ func TestUntilMultipleConditions(t *testing.T) {
|
|||||||
|
|
||||||
func TestUntilTimeout(t *testing.T) {
|
func TestUntilTimeout(t *testing.T) {
|
||||||
fw := NewFake()
|
fw := NewFake()
|
||||||
|
go func() {
|
||||||
|
var obj *api.Pod
|
||||||
|
fw.Add(obj)
|
||||||
|
fw.Modify(obj)
|
||||||
|
}()
|
||||||
conditions := []ConditionFunc{
|
conditions := []ConditionFunc{
|
||||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
func(event Event) (bool, error) {
|
||||||
|
return event.Type == Added, nil
|
||||||
|
},
|
||||||
|
func(event Event) (bool, error) {
|
||||||
|
return event.Type == Modified, nil
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := time.Duration(0)
|
timeout := time.Duration(0)
|
||||||
lastEvent, err := Until(timeout, fw, conditions...)
|
lastEvent, err := Until(timeout, fw, conditions...)
|
||||||
if err != wait.ErrWaitTimeout {
|
if err != nil {
|
||||||
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
|
t.Fatalf("expected nil error, got %#v", err)
|
||||||
}
|
}
|
||||||
if lastEvent != nil {
|
if lastEvent == nil {
|
||||||
t.Fatalf("expected nil event, got %#v", lastEvent)
|
t.Fatal("expected an event")
|
||||||
|
}
|
||||||
|
if lastEvent.Type != Modified {
|
||||||
|
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
||||||
|
}
|
||||||
|
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
|
||||||
|
t.Fatalf("expected a pod event, got %#v", got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user