mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Make a change to visitor to allow it to accept an error, like Go's path walker
This commit is contained in:
parent
b5a4a548df
commit
29dc7f6ec2
@ -166,16 +166,19 @@ func (o AnnotateOptions) RunAnnotate() error {
|
||||
if err := r.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
return r.Visit(func(info *resource.Info) error {
|
||||
_, err := cmdutil.UpdateObject(info, func(obj runtime.Object) error {
|
||||
return r.Visit(func(info *resource.Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, uErr := cmdutil.UpdateObject(info, func(obj runtime.Object) error {
|
||||
err := o.updateAnnotations(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
if uErr != nil {
|
||||
return uErr
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
@ -68,7 +68,10 @@ func RunClusterInfo(factory *cmdutil.Factory, out io.Writer, cmd *cobra.Command)
|
||||
SelectorParam("kubernetes.io/cluster-service=true").
|
||||
ResourceTypeOrNameArgs(false, []string{"services"}...).
|
||||
Latest()
|
||||
b.Do().Visit(func(r *resource.Info) error {
|
||||
b.Do().Visit(func(r *resource.Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
services := r.Object.(*api.ServiceList).Items
|
||||
for _, service := range services {
|
||||
var link string
|
||||
|
@ -96,7 +96,10 @@ func RunCreate(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer) error {
|
||||
}
|
||||
|
||||
count := 0
|
||||
err = r.Visit(func(info *resource.Info) error {
|
||||
err = r.Visit(func(info *resource.Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := info.Mapping.Codec.Encode(info.Object)
|
||||
if err != nil {
|
||||
return cmdutil.AddSourceToErr("creating", info.Source, err)
|
||||
|
@ -132,7 +132,10 @@ func ReapResult(r *resource.Result, f *cmdutil.Factory, out io.Writer, isDefault
|
||||
if ignoreNotFound {
|
||||
r = r.IgnoreErrors(errors.IsNotFound)
|
||||
}
|
||||
err := r.Visit(func(info *resource.Info) error {
|
||||
err := r.Visit(func(info *resource.Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
found++
|
||||
reaper, err := f.Reaper(info.Mapping)
|
||||
if err != nil {
|
||||
@ -166,7 +169,10 @@ func DeleteResult(r *resource.Result, out io.Writer, ignoreNotFound bool, shortO
|
||||
if ignoreNotFound {
|
||||
r = r.IgnoreErrors(errors.IsNotFound)
|
||||
}
|
||||
err := r.Visit(func(info *resource.Info) error {
|
||||
err := r.Visit(func(info *resource.Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
found++
|
||||
return deleteResource(info, out, shortOutput, mapper)
|
||||
})
|
||||
|
@ -205,7 +205,10 @@ func RunGet(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string
|
||||
}
|
||||
|
||||
// use the default printer for each object
|
||||
return b.Do().Visit(func(r *resource.Info) error {
|
||||
return b.Do().Visit(func(r *resource.Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
printer, err := f.PrinterForMapping(cmd, r.Mapping, allNamespaces)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -199,7 +199,10 @@ func RunLabel(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []stri
|
||||
}
|
||||
|
||||
// TODO: support bulk generic output a la Get
|
||||
return r.Visit(func(info *resource.Info) error {
|
||||
return r.Visit(func(info *resource.Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
obj, err := cmdutil.UpdateObject(info, func(obj runtime.Object) error {
|
||||
err := labelFunc(obj, overwrite, resourceVersion, labels, remove)
|
||||
if err != nil {
|
||||
|
@ -115,7 +115,10 @@ func RunReplace(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []st
|
||||
return err
|
||||
}
|
||||
|
||||
return r.Visit(func(info *resource.Info) error {
|
||||
return r.Visit(func(info *resource.Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := info.Mapping.Codec.Encode(info.Object)
|
||||
if err != nil {
|
||||
return cmdutil.AddSourceToErr("replacing", info.Source, err)
|
||||
@ -196,7 +199,10 @@ func forceReplace(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []
|
||||
}
|
||||
|
||||
count := 0
|
||||
err = r.Visit(func(info *resource.Info) error {
|
||||
err = r.Visit(func(info *resource.Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := info.Mapping.Codec.Encode(info.Object)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -133,7 +133,7 @@ func (b *Builder) URL(urls ...*url.URL) *Builder {
|
||||
// will be ignored (but logged at V(2)).
|
||||
func (b *Builder) Stdin() *Builder {
|
||||
b.stream = true
|
||||
b.paths = append(b.paths, FileVisitorForSTDIN(b.mapper, b.continueOnError, b.schema))
|
||||
b.paths = append(b.paths, FileVisitorForSTDIN(b.mapper, b.schema))
|
||||
return b
|
||||
}
|
||||
|
||||
@ -143,7 +143,7 @@ func (b *Builder) Stdin() *Builder {
|
||||
// will be ignored (but logged at V(2)).
|
||||
func (b *Builder) Stream(r io.Reader, name string) *Builder {
|
||||
b.stream = true
|
||||
b.paths = append(b.paths, NewStreamVisitor(r, b.mapper, name, b.continueOnError, b.schema))
|
||||
b.paths = append(b.paths, NewStreamVisitor(r, b.mapper, name, b.schema))
|
||||
return b
|
||||
}
|
||||
|
||||
@ -164,7 +164,7 @@ func (b *Builder) Path(paths ...string) *Builder {
|
||||
continue
|
||||
}
|
||||
|
||||
visitors, err := ExpandPathsToFileVisitors(b.mapper, p, false, []string{".json", ".stdin", ".yaml", ".yml"}, b.continueOnError, b.schema)
|
||||
visitors, err := ExpandPathsToFileVisitors(b.mapper, p, false, []string{".json", ".stdin", ".yaml", ".yml"}, b.schema)
|
||||
if err != nil {
|
||||
b.errs = append(b.errs, fmt.Errorf("error reading %q: %v", p, err))
|
||||
}
|
||||
|
@ -162,7 +162,10 @@ type testVisitor struct {
|
||||
Infos []*Info
|
||||
}
|
||||
|
||||
func (v *testVisitor) Handle(info *Info) error {
|
||||
func (v *testVisitor) Handle(info *Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v.Infos = append(v.Infos, info)
|
||||
return v.InjectErr
|
||||
}
|
||||
@ -649,7 +652,7 @@ func TestContinueOnErrorVisitor(t *testing.T) {
|
||||
Do()
|
||||
count := 0
|
||||
testErr := fmt.Errorf("test error")
|
||||
err := req.Visit(func(_ *Info) error {
|
||||
err := req.Visit(func(_ *Info, _ error) error {
|
||||
count++
|
||||
if count > 1 {
|
||||
return testErr
|
||||
@ -872,40 +875,6 @@ func TestLatest(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIgnoreStreamErrors(t *testing.T) {
|
||||
pods, svc := testData()
|
||||
|
||||
r, w := io.Pipe()
|
||||
go func() {
|
||||
defer w.Close()
|
||||
w.Write([]byte(`{}`))
|
||||
w.Write([]byte(runtime.EncodeOrDie(latest.Codec, &pods.Items[0])))
|
||||
}()
|
||||
|
||||
r2, w2 := io.Pipe()
|
||||
go func() {
|
||||
defer w2.Close()
|
||||
w2.Write([]byte(`{}`))
|
||||
w2.Write([]byte(runtime.EncodeOrDie(latest.Codec, &svc.Items[0])))
|
||||
}()
|
||||
|
||||
b := NewBuilder(latest.RESTMapper, api.Scheme, fakeClient()).
|
||||
ContinueOnError(). // TODO: order seems bad, but allows clients to determine what they want...
|
||||
Stream(r, "1").Stream(r2, "2")
|
||||
|
||||
test := &testVisitor{}
|
||||
singular := false
|
||||
|
||||
err := b.Do().IntoSingular(&singular).Visit(test.Handle)
|
||||
if err != nil || singular || len(test.Infos) != 2 {
|
||||
t.Fatalf("unexpected response: %v %t %#v", err, singular, test.Infos)
|
||||
}
|
||||
|
||||
if !api.Semantic.DeepDerivative([]runtime.Object{&pods.Items[0], &svc.Items[0]}, test.Objects()) {
|
||||
t.Errorf("unexpected visited objects: %#v", test.Objects())
|
||||
}
|
||||
}
|
||||
|
||||
func TestReceiveMultipleErrors(t *testing.T) {
|
||||
pods, svc := testData()
|
||||
|
||||
@ -931,7 +900,7 @@ func TestReceiveMultipleErrors(t *testing.T) {
|
||||
singular := false
|
||||
|
||||
err := b.Do().IntoSingular(&singular).Visit(test.Handle)
|
||||
if err == nil || singular || len(test.Infos) != 0 {
|
||||
if err == nil || singular || len(test.Infos) != 2 {
|
||||
t.Fatalf("unexpected response: %v %t %#v", err, singular, test.Infos)
|
||||
}
|
||||
|
||||
|
@ -98,7 +98,10 @@ func (r *Result) Infos() ([]*Info, error) {
|
||||
}
|
||||
|
||||
infos := []*Info{}
|
||||
err := r.visitor.Visit(func(info *Info) error {
|
||||
err := r.visitor.Visit(func(info *Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
infos = append(infos, info)
|
||||
return nil
|
||||
})
|
||||
|
@ -68,7 +68,7 @@ func (r *Selector) Visit(fn VisitorFunc) error {
|
||||
Object: list,
|
||||
ResourceVersion: resourceVersion,
|
||||
}
|
||||
return fn(info)
|
||||
return fn(info, nil)
|
||||
}
|
||||
|
||||
func (r *Selector) Watch(resourceVersion string) (watch.Interface, error) {
|
||||
|
@ -26,8 +26,6 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
@ -39,14 +37,16 @@ import (
|
||||
const constSTDINstr string = "STDIN"
|
||||
|
||||
// Visitor lets clients walk a list of resources.
|
||||
// TODO: we should rethink how we handle errors in the visit loop
|
||||
// (See http://pr.k8s.io/9357#issuecomment-109600305)
|
||||
type Visitor interface {
|
||||
Visit(VisitorFunc) error
|
||||
}
|
||||
|
||||
// VisitorFunc implements the Visitor interface for a matching function
|
||||
type VisitorFunc func(*Info) error
|
||||
// VisitorFunc implements the Visitor interface for a matching function.
|
||||
// If there was a problem walking a list of resources, the incoming error
|
||||
// will describe the problem and the function can decide how to handle that error.
|
||||
// A nil returned indicates to accept an error to continue loops even when errors happen.
|
||||
// This is useful for ignoring certain kinds of errors or aggregating errors in some way.
|
||||
type VisitorFunc func(*Info, error) error
|
||||
|
||||
// Watchable describes a resource that can be watched for changes that occur on the server,
|
||||
// beginning after the provided resource version.
|
||||
@ -96,7 +96,7 @@ func NewInfo(client RESTClient, mapping *meta.RESTMapping, namespace, name strin
|
||||
|
||||
// Visit implements Visitor
|
||||
func (i *Info) Visit(fn VisitorFunc) error {
|
||||
return fn(i)
|
||||
return fn(i, nil)
|
||||
}
|
||||
|
||||
// Get retrieves the object from the Namespace and Name fields
|
||||
@ -180,8 +180,12 @@ type EagerVisitorList []Visitor
|
||||
func (l EagerVisitorList) Visit(fn VisitorFunc) error {
|
||||
errs := []error(nil)
|
||||
for i := range l {
|
||||
if err := l[i].Visit(func(info *Info) error {
|
||||
if err := fn(info); err != nil {
|
||||
if err := l[i].Visit(func(info *Info, err error) error {
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
return nil
|
||||
}
|
||||
if err := fn(info, nil); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
return nil
|
||||
@ -234,7 +238,7 @@ func (v *URLVisitor) Visit(fn VisitorFunc) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fn(info)
|
||||
return fn(info, nil)
|
||||
}
|
||||
|
||||
// DecoratedVisitor will invoke the decorators in order prior to invoking the visitor function
|
||||
@ -256,13 +260,16 @@ func NewDecoratedVisitor(v Visitor, fn ...VisitorFunc) Visitor {
|
||||
|
||||
// Visit implements Visitor
|
||||
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
|
||||
return v.visitor.Visit(func(info *Info) error {
|
||||
return v.visitor.Visit(func(info *Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range v.decorators {
|
||||
if err := v.decorators[i](info); err != nil {
|
||||
if err := v.decorators[i](info, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return fn(info)
|
||||
return fn(info, nil)
|
||||
})
|
||||
}
|
||||
|
||||
@ -281,8 +288,12 @@ type ContinueOnErrorVisitor struct {
|
||||
// not being visited.
|
||||
func (v ContinueOnErrorVisitor) Visit(fn VisitorFunc) error {
|
||||
errs := []error{}
|
||||
err := v.Visitor.Visit(func(info *Info) error {
|
||||
if err := fn(info); err != nil {
|
||||
err := v.Visitor.Visit(func(info *Info, err error) error {
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
return nil
|
||||
}
|
||||
if err := fn(info, nil); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
return nil
|
||||
@ -314,13 +325,16 @@ func NewFlattenListVisitor(v Visitor, mapper *Mapper) Visitor {
|
||||
}
|
||||
|
||||
func (v FlattenListVisitor) Visit(fn VisitorFunc) error {
|
||||
return v.Visitor.Visit(func(info *Info) error {
|
||||
return v.Visitor.Visit(func(info *Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.Object == nil {
|
||||
return fn(info)
|
||||
return fn(info, nil)
|
||||
}
|
||||
items, err := runtime.ExtractList(info.Object)
|
||||
if err != nil {
|
||||
return fn(info)
|
||||
return fn(info, nil)
|
||||
}
|
||||
if errs := runtime.DecodeList(items, struct {
|
||||
runtime.ObjectTyper
|
||||
@ -336,7 +350,7 @@ func (v FlattenListVisitor) Visit(fn VisitorFunc) error {
|
||||
if len(info.ResourceVersion) != 0 {
|
||||
item.ResourceVersion = info.ResourceVersion
|
||||
}
|
||||
if err := fn(item); err != nil {
|
||||
if err := fn(item, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -358,17 +372,17 @@ func ignoreFile(path string, extensions []string) bool {
|
||||
}
|
||||
|
||||
// FileVisitorForSTDIN return a special FileVisitor just for STDIN
|
||||
func FileVisitorForSTDIN(mapper *Mapper, ignoreErrors bool, schema validation.Schema) Visitor {
|
||||
func FileVisitorForSTDIN(mapper *Mapper, schema validation.Schema) Visitor {
|
||||
return &FileVisitor{
|
||||
Path: constSTDINstr,
|
||||
StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, ignoreErrors, schema),
|
||||
StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, schema),
|
||||
}
|
||||
}
|
||||
|
||||
// ExpandPathsToFileVisitors will return a slice of FileVisitors that will handle files from the provided path.
|
||||
// After FileVisitors open the files, they will pass a io.Reader to a StreamVisitor to do the reading. (stdin
|
||||
// is also taken care of). Paths argument also accepts a single file, and will return a single visitor
|
||||
func ExpandPathsToFileVisitors(mapper *Mapper, paths string, recursive bool, extensions []string, ignoreErrors bool, schema validation.Schema) ([]Visitor, error) {
|
||||
func ExpandPathsToFileVisitors(mapper *Mapper, paths string, recursive bool, extensions []string, schema validation.Schema) ([]Visitor, error) {
|
||||
var visitors []Visitor
|
||||
err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
@ -388,7 +402,7 @@ func ExpandPathsToFileVisitors(mapper *Mapper, paths string, recursive bool, ext
|
||||
|
||||
visitor := &FileVisitor{
|
||||
Path: path,
|
||||
StreamVisitor: NewStreamVisitor(nil, mapper, path, ignoreErrors, schema),
|
||||
StreamVisitor: NewStreamVisitor(nil, mapper, path, schema),
|
||||
}
|
||||
|
||||
visitors = append(visitors, visitor)
|
||||
@ -432,19 +446,17 @@ type StreamVisitor struct {
|
||||
io.Reader
|
||||
*Mapper
|
||||
|
||||
Source string
|
||||
IgnoreErrors bool
|
||||
Schema validation.Schema
|
||||
Source string
|
||||
Schema validation.Schema
|
||||
}
|
||||
|
||||
// NewStreamVisitor is a helper function that is useful when we want to change the fields of the struct but keep calls the same.
|
||||
func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, ignoreErrors bool, schema validation.Schema) *StreamVisitor {
|
||||
func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, schema validation.Schema) *StreamVisitor {
|
||||
return &StreamVisitor{
|
||||
Reader: r,
|
||||
Mapper: mapper,
|
||||
Source: source,
|
||||
IgnoreErrors: ignoreErrors,
|
||||
Schema: schema,
|
||||
Reader: r,
|
||||
Mapper: mapper,
|
||||
Source: source,
|
||||
Schema: schema,
|
||||
}
|
||||
}
|
||||
|
||||
@ -468,20 +480,21 @@ func (v *StreamVisitor) Visit(fn VisitorFunc) error {
|
||||
}
|
||||
info, err := v.InfoForData(ext.RawJSON, v.Source)
|
||||
if err != nil {
|
||||
if v.IgnoreErrors {
|
||||
fmt.Fprintf(os.Stderr, "error: could not read an encoded object: %v\n", err)
|
||||
glog.V(4).Infof("Unreadable: %s", string(ext.RawJSON))
|
||||
continue
|
||||
if fnErr := fn(info, err); fnErr != nil {
|
||||
return fnErr
|
||||
}
|
||||
return err
|
||||
continue
|
||||
}
|
||||
if err := fn(info); err != nil {
|
||||
if err := fn(info, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateObjectNamespace(info *Info) error {
|
||||
func UpdateObjectNamespace(info *Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.Object != nil {
|
||||
return info.Mapping.MetadataAccessor.SetNamespace(info.Object, info.Namespace)
|
||||
}
|
||||
@ -489,10 +502,13 @@ func UpdateObjectNamespace(info *Info) error {
|
||||
}
|
||||
|
||||
// FilterNamespace omits the namespace if the object is not namespace scoped
|
||||
func FilterNamespace(info *Info) error {
|
||||
func FilterNamespace(info *Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.Namespaced() {
|
||||
info.Namespace = ""
|
||||
UpdateObjectNamespace(info)
|
||||
UpdateObjectNamespace(info, nil)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -500,13 +516,16 @@ func FilterNamespace(info *Info) error {
|
||||
// SetNamespace ensures that every Info object visited will have a namespace
|
||||
// set. If info.Object is set, it will be mutated as well.
|
||||
func SetNamespace(namespace string) VisitorFunc {
|
||||
return func(info *Info) error {
|
||||
return func(info *Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.Namespaced() {
|
||||
return nil
|
||||
}
|
||||
if len(info.Namespace) == 0 {
|
||||
info.Namespace = namespace
|
||||
UpdateObjectNamespace(info)
|
||||
UpdateObjectNamespace(info, nil)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -517,13 +536,16 @@ func SetNamespace(namespace string) VisitorFunc {
|
||||
// value, returns an error. This is intended to guard against administrators
|
||||
// accidentally operating on resources outside their namespace.
|
||||
func RequireNamespace(namespace string) VisitorFunc {
|
||||
return func(info *Info) error {
|
||||
return func(info *Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.Namespaced() {
|
||||
return nil
|
||||
}
|
||||
if len(info.Namespace) == 0 {
|
||||
info.Namespace = namespace
|
||||
UpdateObjectNamespace(info)
|
||||
UpdateObjectNamespace(info, nil)
|
||||
return nil
|
||||
}
|
||||
if info.Namespace != namespace {
|
||||
@ -535,7 +557,10 @@ func RequireNamespace(namespace string) VisitorFunc {
|
||||
|
||||
// RetrieveLatest updates the Object on each Info by invoking a standard client
|
||||
// Get.
|
||||
func RetrieveLatest(info *Info) error {
|
||||
func RetrieveLatest(info *Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(info.Name) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -552,7 +577,10 @@ func RetrieveLatest(info *Info) error {
|
||||
}
|
||||
|
||||
// RetrieveLazy updates the object if it has not been loaded yet.
|
||||
func RetrieveLazy(info *Info) error {
|
||||
func RetrieveLazy(info *Info, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.Object == nil {
|
||||
return info.Get()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user