mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-05 23:47:50 +00:00
Merge pull request #69389 from smarterclayton/wait_error
kubectl wait must handle errors returned by watch
This commit is contained in:
@@ -45,6 +45,7 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -145,7 +146,7 @@ func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conditionFn, err := conditionFuncFor(flags.ForCondition)
|
||||
conditionFn, err := conditionFuncFor(flags.ForCondition, flags.ErrOut)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -168,7 +169,7 @@ func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) {
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func conditionFuncFor(condition string) (ConditionFunc, error) {
|
||||
func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) {
|
||||
if strings.ToLower(condition) == "delete" {
|
||||
return IsDeleted, nil
|
||||
}
|
||||
@@ -183,6 +184,7 @@ func conditionFuncFor(condition string) (ConditionFunc, error) {
|
||||
return ConditionalWait{
|
||||
conditionName: conditionName,
|
||||
conditionStatus: conditionValue,
|
||||
errOut: errOut,
|
||||
}.IsConditionMet, nil
|
||||
}
|
||||
|
||||
@@ -281,7 +283,7 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error
|
||||
}
|
||||
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
|
||||
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, isDeleted)
|
||||
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, Wait{errOut: o.ErrOut}.IsDeleted)
|
||||
cancel()
|
||||
switch {
|
||||
case err == nil:
|
||||
@@ -299,14 +301,33 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error
|
||||
}
|
||||
}
|
||||
|
||||
func isDeleted(event watch.Event) (bool, error) {
|
||||
return event.Type == watch.Deleted, nil
|
||||
// Wait has helper methods for handling watches, including error handling.
|
||||
type Wait struct {
|
||||
errOut io.Writer
|
||||
}
|
||||
|
||||
// IsDeleted returns true if the object is deleted. It prints any errors it encounters.
|
||||
func (w Wait) IsDeleted(event watch.Event) (bool, error) {
|
||||
switch event.Type {
|
||||
case watch.Error:
|
||||
// keep waiting in the event we see an error - we expect the watch to be closed by
|
||||
// the server if the error is unrecoverable.
|
||||
err := apierrors.FromObject(event.Object)
|
||||
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
|
||||
return false, nil
|
||||
case watch.Deleted:
|
||||
return true, nil
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// ConditionalWait hold information to check an API status condition
|
||||
type ConditionalWait struct {
|
||||
conditionName string
|
||||
conditionStatus string
|
||||
// errOut is written to if an error occurs
|
||||
errOut io.Writer
|
||||
}
|
||||
|
||||
// IsConditionMet is a conditionfunc for waiting on an API condition to be met
|
||||
@@ -389,6 +410,13 @@ func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, e
|
||||
}
|
||||
|
||||
func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
|
||||
if event.Type == watch.Error {
|
||||
// keep waiting in the event we see an error - we expect the watch to be closed by
|
||||
// the server
|
||||
err := apierrors.FromObject(event.Object)
|
||||
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
// this will chain back out, result in another get and an return false back up the chain
|
||||
return false, nil
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package wait
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"time"
|
||||
@@ -26,6 +27,7 @@ import (
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@@ -53,6 +55,16 @@ func newUnstructured(apiVersion, kind, namespace, name string) *unstructured.Uns
|
||||
}
|
||||
}
|
||||
|
||||
func newUnstructuredStatus(status *metav1.Status) runtime.Unstructured {
|
||||
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(status)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &unstructured.Unstructured{
|
||||
Object: obj,
|
||||
}
|
||||
}
|
||||
|
||||
func addCondition(in *unstructured.Unstructured, name, status string) *unstructured.Unstructured {
|
||||
conditions, _, _ := unstructured.NestedSlice(in.Object, "status", "conditions")
|
||||
conditions = append(conditions, map[string]interface{}{
|
||||
@@ -286,6 +298,61 @@ func TestWaitForDeletion(t *testing.T) {
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ignores watch error",
|
||||
infos: []*resource.Info{
|
||||
{
|
||||
Mapping: &meta.RESTMapping{
|
||||
Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "theresource"},
|
||||
},
|
||||
Name: "name-foo",
|
||||
Namespace: "ns-foo",
|
||||
},
|
||||
},
|
||||
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
|
||||
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
|
||||
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil
|
||||
})
|
||||
count := 0
|
||||
fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
|
||||
fakeWatch := watch.NewRaceFreeFake()
|
||||
if count == 0 {
|
||||
fakeWatch.Error(newUnstructuredStatus(&metav1.Status{
|
||||
TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"},
|
||||
Status: "Failure",
|
||||
Code: 500,
|
||||
Message: "Bad",
|
||||
}))
|
||||
fakeWatch.Stop()
|
||||
} else {
|
||||
fakeWatch.Action(watch.Deleted, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"))
|
||||
}
|
||||
count++
|
||||
return true, fakeWatch, nil
|
||||
})
|
||||
return fakeClient
|
||||
},
|
||||
timeout: 10 * time.Second,
|
||||
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) != 4 {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
|
||||
t.Error(spew.Sdump(actions))
|
||||
}
|
||||
if !actions[1].Matches("watch", "theresource") {
|
||||
t.Error(spew.Sdump(actions))
|
||||
}
|
||||
if !actions[2].Matches("get", "theresource") || actions[2].(clienttesting.GetAction).GetName() != "name-foo" {
|
||||
t.Error(spew.Sdump(actions))
|
||||
}
|
||||
if !actions[3].Matches("watch", "theresource") {
|
||||
t.Error(spew.Sdump(actions))
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
@@ -544,6 +611,64 @@ func TestWaitForCondition(t *testing.T) {
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ignores watch error",
|
||||
infos: []*resource.Info{
|
||||
{
|
||||
Mapping: &meta.RESTMapping{
|
||||
Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "theresource"},
|
||||
},
|
||||
Name: "name-foo",
|
||||
Namespace: "ns-foo",
|
||||
},
|
||||
},
|
||||
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
|
||||
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
|
||||
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil
|
||||
})
|
||||
count := 0
|
||||
fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
|
||||
fakeWatch := watch.NewRaceFreeFake()
|
||||
if count == 0 {
|
||||
fakeWatch.Error(newUnstructuredStatus(&metav1.Status{
|
||||
TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"},
|
||||
Status: "Failure",
|
||||
Code: 500,
|
||||
Message: "Bad",
|
||||
}))
|
||||
fakeWatch.Stop()
|
||||
} else {
|
||||
fakeWatch.Action(watch.Modified, addCondition(
|
||||
newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"),
|
||||
"the-condition", "status-value",
|
||||
))
|
||||
}
|
||||
count++
|
||||
return true, fakeWatch, nil
|
||||
})
|
||||
return fakeClient
|
||||
},
|
||||
timeout: 10 * time.Second,
|
||||
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) != 4 {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
|
||||
t.Error(spew.Sdump(actions))
|
||||
}
|
||||
if !actions[1].Matches("watch", "theresource") {
|
||||
t.Error(spew.Sdump(actions))
|
||||
}
|
||||
if !actions[2].Matches("get", "theresource") || actions[2].(clienttesting.GetAction).GetName() != "name-foo" {
|
||||
t.Error(spew.Sdump(actions))
|
||||
}
|
||||
if !actions[3].Matches("watch", "theresource") {
|
||||
t.Error(spew.Sdump(actions))
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
@@ -555,7 +680,7 @@ func TestWaitForCondition(t *testing.T) {
|
||||
Timeout: test.timeout,
|
||||
|
||||
Printer: printers.NewDiscardingPrinter(),
|
||||
ConditionFn: ConditionalWait{conditionName: "the-condition", conditionStatus: "status-value"}.IsConditionMet,
|
||||
ConditionFn: ConditionalWait{conditionName: "the-condition", conditionStatus: "status-value", errOut: ioutil.Discard}.IsConditionMet,
|
||||
IOStreams: genericclioptions.NewTestIOStreamsDiscard(),
|
||||
}
|
||||
err := o.RunWait()
|
||||
|
||||
Reference in New Issue
Block a user