mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Extract helper for continue handling
This commit is contained in:
parent
f8097c6cee
commit
c56be1fa9f
@ -18,9 +18,12 @@ package resource
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
@ -90,6 +93,54 @@ func (m *Helper) List(namespace, apiVersion string, options *metav1.ListOptions)
|
|||||||
return req.Do(context.TODO()).Get()
|
return req.Do(context.TODO()).Get()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FollowContinue handles the continue parameter returned by the API server when using list
|
||||||
|
// chunking. To take advantage of this, the initial ListOptions provided by the consumer
|
||||||
|
// should include a non-zero Limit parameter.
|
||||||
|
func FollowContinue(initialOpts *metav1.ListOptions,
|
||||||
|
listFunc func(metav1.ListOptions) (runtime.Object, error)) error {
|
||||||
|
opts := initialOpts
|
||||||
|
for {
|
||||||
|
list, err := listFunc(*opts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nextContinueToken, _ := metadataAccessor.Continue(list)
|
||||||
|
if len(nextContinueToken) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
opts.Continue = nextContinueToken
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnhanceListError augments errors typically returned by List operations with additional context,
|
||||||
|
// making sure to retain the StatusError type when applicable.
|
||||||
|
func EnhanceListError(err error, opts metav1.ListOptions, subj string) error {
|
||||||
|
if apierrors.IsResourceExpired(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if apierrors.IsBadRequest(err) || apierrors.IsNotFound(err) {
|
||||||
|
if se, ok := err.(*apierrors.StatusError); ok {
|
||||||
|
// modify the message without hiding this is an API error
|
||||||
|
if len(opts.LabelSelector) == 0 && len(opts.FieldSelector) == 0 {
|
||||||
|
se.ErrStatus.Message = fmt.Sprintf("Unable to list %q: %v", subj,
|
||||||
|
se.ErrStatus.Message)
|
||||||
|
} else {
|
||||||
|
se.ErrStatus.Message = fmt.Sprintf(
|
||||||
|
"Unable to find %q that match label selector %q, field selector %q: %v", subj,
|
||||||
|
opts.LabelSelector,
|
||||||
|
opts.FieldSelector, se.ErrStatus.Message)
|
||||||
|
}
|
||||||
|
return se
|
||||||
|
}
|
||||||
|
if len(opts.LabelSelector) == 0 && len(opts.FieldSelector) == 0 {
|
||||||
|
return fmt.Errorf("Unable to list %q: %v", subj, err)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("Unable to find %q that match label selector %q, field selector %q: %v",
|
||||||
|
subj, opts.LabelSelector, opts.FieldSelector, err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Helper) Watch(namespace, apiVersion string, options *metav1.ListOptions) (watch.Interface, error) {
|
func (m *Helper) Watch(namespace, apiVersion string, options *metav1.ListOptions) (watch.Interface, error) {
|
||||||
options.Watch = true
|
options.Watch = true
|
||||||
return m.RESTClient.Get().
|
return m.RESTClient.Get().
|
||||||
|
@ -19,6 +19,7 @@ package resource
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -26,6 +27,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
@ -628,3 +630,174 @@ func TestHelperReplace(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEnhanceListError(t *testing.T) {
|
||||||
|
podGVR := corev1.SchemeGroupVersion.WithResource(corev1.ResourcePods.String())
|
||||||
|
podSubject := podGVR.String()
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
err error
|
||||||
|
opts metav1.ListOptions
|
||||||
|
subj string
|
||||||
|
|
||||||
|
expectedErr string
|
||||||
|
expectStatusErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "leaves resource expired error as is",
|
||||||
|
err: apierrors.NewResourceExpired("resourceversion too old"),
|
||||||
|
opts: metav1.ListOptions{},
|
||||||
|
subj: podSubject,
|
||||||
|
expectedErr: "resourceversion too old",
|
||||||
|
expectStatusErr: true,
|
||||||
|
}, {
|
||||||
|
name: "leaves unrecognized error as is",
|
||||||
|
err: errors.New("something went wrong"),
|
||||||
|
opts: metav1.ListOptions{},
|
||||||
|
subj: podSubject,
|
||||||
|
expectedErr: "something went wrong",
|
||||||
|
expectStatusErr: false,
|
||||||
|
}, {
|
||||||
|
name: "bad request StatusError without selectors",
|
||||||
|
err: apierrors.NewBadRequest("request is invalid"),
|
||||||
|
opts: metav1.ListOptions{},
|
||||||
|
subj: podSubject,
|
||||||
|
expectedErr: "Unable to list \"/v1, Resource=pods\": request is invalid",
|
||||||
|
expectStatusErr: true,
|
||||||
|
}, {
|
||||||
|
name: "bad request StatusError with selectors",
|
||||||
|
err: apierrors.NewBadRequest("request is invalid"),
|
||||||
|
opts: metav1.ListOptions{
|
||||||
|
LabelSelector: "a=b",
|
||||||
|
FieldSelector: ".spec.nodeName=foo",
|
||||||
|
},
|
||||||
|
subj: podSubject,
|
||||||
|
expectedErr: "Unable to find \"/v1, Resource=pods\" that match label selector \"a=b\", field selector \".spec.nodeName=foo\": request is invalid",
|
||||||
|
expectStatusErr: true,
|
||||||
|
}, {
|
||||||
|
name: "not found without selectors",
|
||||||
|
err: apierrors.NewNotFound(podGVR.GroupResource(), "foo"),
|
||||||
|
opts: metav1.ListOptions{},
|
||||||
|
subj: podSubject,
|
||||||
|
expectedErr: "Unable to list \"/v1, Resource=pods\": pods \"foo\" not found",
|
||||||
|
expectStatusErr: true,
|
||||||
|
}, {
|
||||||
|
name: "not found StatusError with selectors",
|
||||||
|
err: apierrors.NewNotFound(podGVR.GroupResource(), "foo"),
|
||||||
|
opts: metav1.ListOptions{
|
||||||
|
LabelSelector: "a=b",
|
||||||
|
FieldSelector: ".spec.nodeName=foo",
|
||||||
|
},
|
||||||
|
subj: podSubject,
|
||||||
|
expectedErr: "Unable to find \"/v1, Resource=pods\" that match label selector \"a=b\", field selector \".spec.nodeName=foo\": pods \"foo\" not found",
|
||||||
|
expectStatusErr: true,
|
||||||
|
}, {
|
||||||
|
name: "non StatusError without selectors",
|
||||||
|
err: fmt.Errorf("extra info: %w", apierrors.NewNotFound(podGVR.GroupResource(),
|
||||||
|
"foo")),
|
||||||
|
opts: metav1.ListOptions{},
|
||||||
|
subj: podSubject,
|
||||||
|
expectedErr: "Unable to list \"/v1, Resource=pods\": extra info: pods \"foo\" not found",
|
||||||
|
expectStatusErr: false,
|
||||||
|
}, {
|
||||||
|
name: "non StatusError with selectors",
|
||||||
|
err: fmt.Errorf("extra info: %w", apierrors.NewNotFound(podGVR.GroupResource(), "foo")),
|
||||||
|
opts: metav1.ListOptions{
|
||||||
|
LabelSelector: "a=b",
|
||||||
|
FieldSelector: ".spec.nodeName=foo",
|
||||||
|
},
|
||||||
|
subj: podSubject,
|
||||||
|
expectedErr: "Unable to find \"/v1, " +
|
||||||
|
"Resource=pods\" that match label selector \"a=b\", " +
|
||||||
|
"field selector \".spec.nodeName=foo\": extra info: pods \"foo\" not found",
|
||||||
|
expectStatusErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
err := EnhanceListError(tt.err, tt.opts, tt.subj)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("EnhanceListError did not return an error")
|
||||||
|
}
|
||||||
|
if err.Error() != tt.expectedErr {
|
||||||
|
t.Errorf("EnhanceListError() error = %q, expectedErr %q", err, tt.expectedErr)
|
||||||
|
}
|
||||||
|
if tt.expectStatusErr {
|
||||||
|
if _, ok := err.(*apierrors.StatusError); !ok {
|
||||||
|
t.Errorf("EnhanceListError incorrectly returned a non-StatusError: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFollowContinue(t *testing.T) {
|
||||||
|
var continueTokens []string
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
initialOpts *metav1.ListOptions
|
||||||
|
tokensSeen []string
|
||||||
|
listFunc func(metav1.ListOptions) (runtime.Object, error)
|
||||||
|
|
||||||
|
expectedTokens []string
|
||||||
|
wantErr string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "updates list options with continue token until list finished",
|
||||||
|
initialOpts: &metav1.ListOptions{},
|
||||||
|
listFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
continueTokens = append(continueTokens, options.Continue)
|
||||||
|
obj := corev1.PodList{}
|
||||||
|
switch options.Continue {
|
||||||
|
case "":
|
||||||
|
metadataAccessor.SetContinue(&obj, "abc")
|
||||||
|
case "abc":
|
||||||
|
metadataAccessor.SetContinue(&obj, "def")
|
||||||
|
case "def":
|
||||||
|
metadataAccessor.SetKind(&obj, "ListComplete")
|
||||||
|
}
|
||||||
|
return &obj, nil
|
||||||
|
},
|
||||||
|
expectedTokens: []string{"", "abc", "def"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "stops looping if listFunc returns an error",
|
||||||
|
initialOpts: &metav1.ListOptions{},
|
||||||
|
listFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
continueTokens = append(continueTokens, options.Continue)
|
||||||
|
obj := corev1.PodList{}
|
||||||
|
switch options.Continue {
|
||||||
|
case "":
|
||||||
|
metadataAccessor.SetContinue(&obj, "abc")
|
||||||
|
case "abc":
|
||||||
|
return nil, fmt.Errorf("err from list func")
|
||||||
|
case "def":
|
||||||
|
metadataAccessor.SetKind(&obj, "ListComplete")
|
||||||
|
}
|
||||||
|
return &obj, nil
|
||||||
|
},
|
||||||
|
expectedTokens: []string{"", "abc"},
|
||||||
|
wantErr: "err from list func",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
continueTokens = []string{}
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
err := FollowContinue(tt.initialOpts, tt.listFunc)
|
||||||
|
if tt.wantErr != "" {
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("FollowContinue was expected to return an error and did not")
|
||||||
|
} else if err.Error() != tt.wantErr {
|
||||||
|
t.Fatalf("wanted error %q, got %q", tt.wantErr, err.Error())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("FollowContinue failed: %v", tt.wantErr)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(continueTokens, tt.expectedTokens) {
|
||||||
|
t.Errorf("got token list %q, wanted %q", continueTokens, tt.expectedTokens)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -17,11 +17,9 @@ limitations under the License.
|
|||||||
package resource
|
package resource
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -49,41 +47,23 @@ func NewSelector(client RESTClient, mapping *meta.RESTMapping, namespace, labelS
|
|||||||
|
|
||||||
// Visit implements Visitor and uses request chunking by default.
|
// Visit implements Visitor and uses request chunking by default.
|
||||||
func (r *Selector) Visit(fn VisitorFunc) error {
|
func (r *Selector) Visit(fn VisitorFunc) error {
|
||||||
var continueToken string
|
helper := NewHelper(r.Client, r.Mapping)
|
||||||
for {
|
initialOpts := metav1.ListOptions{
|
||||||
list, err := NewHelper(r.Client, r.Mapping).List(
|
|
||||||
r.Namespace,
|
|
||||||
r.ResourceMapping().GroupVersionKind.GroupVersion().String(),
|
|
||||||
&metav1.ListOptions{
|
|
||||||
LabelSelector: r.LabelSelector,
|
LabelSelector: r.LabelSelector,
|
||||||
FieldSelector: r.FieldSelector,
|
FieldSelector: r.FieldSelector,
|
||||||
Limit: r.LimitChunks,
|
Limit: r.LimitChunks,
|
||||||
Continue: continueToken,
|
}
|
||||||
},
|
return FollowContinue(&initialOpts, func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
list, err := helper.List(
|
||||||
|
r.Namespace,
|
||||||
|
r.ResourceMapping().GroupVersionKind.GroupVersion().String(),
|
||||||
|
&options,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.IsResourceExpired(err) {
|
return nil, EnhanceListError(err, options, r.Mapping.Resource.String())
|
||||||
return err
|
|
||||||
}
|
|
||||||
if errors.IsBadRequest(err) || errors.IsNotFound(err) {
|
|
||||||
if se, ok := err.(*errors.StatusError); ok {
|
|
||||||
// modify the message without hiding this is an API error
|
|
||||||
if len(r.LabelSelector) == 0 && len(r.FieldSelector) == 0 {
|
|
||||||
se.ErrStatus.Message = fmt.Sprintf("Unable to list %q: %v", r.Mapping.Resource, se.ErrStatus.Message)
|
|
||||||
} else {
|
|
||||||
se.ErrStatus.Message = fmt.Sprintf("Unable to find %q that match label selector %q, field selector %q: %v", r.Mapping.Resource, r.LabelSelector, r.FieldSelector, se.ErrStatus.Message)
|
|
||||||
}
|
|
||||||
return se
|
|
||||||
}
|
|
||||||
if len(r.LabelSelector) == 0 && len(r.FieldSelector) == 0 {
|
|
||||||
return fmt.Errorf("Unable to list %q: %v", r.Mapping.Resource, err)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("Unable to find %q that match label selector %q, field selector %q: %v", r.Mapping.Resource, r.LabelSelector, r.FieldSelector, err)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
resourceVersion, _ := metadataAccessor.ResourceVersion(list)
|
resourceVersion, _ := metadataAccessor.ResourceVersion(list)
|
||||||
nextContinueToken, _ := metadataAccessor.Continue(list)
|
|
||||||
info := &Info{
|
info := &Info{
|
||||||
Client: r.Client,
|
Client: r.Client,
|
||||||
Mapping: r.Mapping,
|
Mapping: r.Mapping,
|
||||||
@ -95,13 +75,10 @@ func (r *Selector) Visit(fn VisitorFunc) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := fn(info, nil); err != nil {
|
if err := fn(info, nil); err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
|
||||||
if len(nextContinueToken) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
continueToken = nextContinueToken
|
|
||||||
}
|
}
|
||||||
|
return list, nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Selector) Watch(resourceVersion string) (watch.Interface, error) {
|
func (r *Selector) Watch(resourceVersion string) (watch.Interface, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user