diff --git a/hack/e2e.go b/hack/e2e.go index 0736cefffcb..0f7dcd5f304 100644 --- a/hack/e2e.go +++ b/hack/e2e.go @@ -216,13 +216,22 @@ func finishRunning(stepName string, cmd *exec.Cmd) bool { // returns either "", or a list of args intended for appending with the // kubecfg or kubectl commands (begining with a space). -func kubeClientArgs() string { +func kubecfgArgs() string { if *checkVersionSkew { return " -expect_version_match" } return "" } +// returns either "", or a list of args intended for appending with the +// kubectl command (begining with a space). +func kubectlArgs() string { + if *checkVersionSkew { + return " --match-server-version" + } + return "" +} + func bashWrap(cmd string) string { return ` set -o errexit @@ -233,8 +242,8 @@ export KUBE_CONFIG_FILE="config-test.sh" # TODO(jbeda): This will break on usage if there is a space in # ${KUBE_ROOT}. Covert to an array? Or an exported function? -export KUBECFG="` + *root + `/cluster/kubecfg.sh` + kubeClientArgs() + `" -export KUBECTL="` + *root + `/cluster/kubectl.sh` + kubeClientArgs() + `" +export KUBECFG="` + *root + `/cluster/kubecfg.sh` + kubecfgArgs() + `" +export KUBECTL="` + *root + `/cluster/kubectl.sh` + kubectlArgs() + `" source "` + *root + `/cluster/kube-env.sh" source "` + *root + `/cluster/${KUBERNETES_PROVIDER}/util.sh" diff --git a/pkg/api/errors/etcd/etcd.go b/pkg/api/errors/etcd/etcd.go index ac819d4db0a..dee022828f5 100644 --- a/pkg/api/errors/etcd/etcd.go +++ b/pkg/api/errors/etcd/etcd.go @@ -64,10 +64,3 @@ func InterpretDeleteError(err error, kind, name string) error { return err } } - -// InterpretResourceVersionError returns the appropriate api error -// for a failure to convert the resource version of an object sent -// to the API to an etcd uint64 index. -func InterpretResourceVersionError(err error, kind, value string) error { - return errors.NewInvalid(kind, "", errors.ValidationErrorList{errors.NewFieldInvalid("resourceVersion", value)}) -} diff --git a/pkg/client/request.go b/pkg/client/request.go index c43dcc47314..84fe5a62487 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -278,7 +278,11 @@ func (r *Request) Watch() (watch.Interface, error) { return nil, err } if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("Got status: %v", resp.StatusCode) + var body []byte + if resp.Body != nil { + body, _ = ioutil.ReadAll(resp.Body) + } + return nil, fmt.Errorf("For request '%v', got status: %v\nbody: %v", req.URL, resp.StatusCode, string(body)) } return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.codec)), nil } diff --git a/pkg/kubectl/cmd/cmd.go b/pkg/kubectl/cmd/cmd.go index 60c41b0f2b6..d33f1ee0e3a 100644 --- a/pkg/kubectl/cmd/cmd.go +++ b/pkg/kubectl/cmd/cmd.go @@ -106,11 +106,9 @@ Find more information at https://github.com/GoogleCloudPlatform/kubernetes.`, } } -// TODO: remove this function and references to it-- errors it prints are -// very unhelpful because file/line number are wrong. func checkErr(err error) { if err != nil { - glog.Fatalf("%v", err) + glog.FatalDepth(1, err) } } diff --git a/pkg/kubectl/cmd/get.go b/pkg/kubectl/cmd/get.go index fd0af00c6a8..45e7859d8ca 100644 --- a/pkg/kubectl/cmd/get.go +++ b/pkg/kubectl/cmd/get.go @@ -20,6 +20,7 @@ import ( "fmt" "io" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/spf13/cobra" @@ -50,7 +51,7 @@ Examples: mapping, namespace, name := ResourceOrTypeFromArgs(cmd, args, f.Mapper) selector := GetFlagString(cmd, "selector") - labels, err := labels.ParseSelector(selector) + labelSelector, err := labels.ParseSelector(selector) checkErr(err) client, err := f.Client(cmd, mapping) @@ -68,12 +69,26 @@ Examples: printer, err := kubectl.GetPrinter(outputVersion, outputFormat, templateFile, defaultPrinter) checkErr(err) - obj, err := kubectl.NewRESTHelper(client, mapping).Get(namespace, name, labels) + restHelper := kubectl.NewRESTHelper(client, mapping) + obj, err := restHelper.Get(namespace, name, labelSelector) checkErr(err) if err := printer.PrintObj(obj, out); err != nil { checkErr(fmt.Errorf("Unable to output the provided object: %v", err)) } + + if GetFlagBool(cmd, "watch") { + vi, err := latest.InterfacesFor(outputVersion) + checkErr(err) + + rv, err := vi.MetadataAccessor.ResourceVersion(obj) + checkErr(err) + + w, err := restHelper.Watch(namespace, rv, labelSelector, labels.Set{}.AsSelector()) + checkErr(err) + + kubectl.WatchLoop(w, printer, out) + } }, } cmd.Flags().StringP("output", "o", "", "Output format: json|yaml|template|templatefile") @@ -81,5 +96,6 @@ Examples: cmd.Flags().Bool("no-headers", false, "When using the default output, don't print headers") cmd.Flags().StringP("template", "t", "", "Template string or path to template file to use when --output=template or --output=templatefile") cmd.Flags().StringP("selector", "l", "", "Selector (label query) to filter on") + cmd.Flags().BoolP("watch", "w", false, "After listing/getting the requested object, watch for changes.") return cmd } diff --git a/pkg/kubectl/resource_printer.go b/pkg/kubectl/resource_printer.go index 399a7395c39..a39fa555458 100644 --- a/pkg/kubectl/resource_printer.go +++ b/pkg/kubectl/resource_printer.go @@ -154,10 +154,14 @@ type handlerEntry struct { printFunc reflect.Value } -// HumanReadablePrinter is an implementation of ResourcePrinter which attempts to provide more elegant output. +// HumanReadablePrinter is an implementation of ResourcePrinter which attempts to provide +// more elegant output. It is not threadsafe, but you may call PrintObj repeatedly; headers +// will only be printed if the object type changes. This makes it useful for printing items +// recieved from watches. type HumanReadablePrinter struct { handlerMap map[reflect.Type]*handlerEntry noHeaders bool + lastType reflect.Type } // IsVersioned returns false-- human readable printers do not make versioned output. @@ -348,9 +352,11 @@ func printEventList(list *api.EventList, w io.Writer) error { func (h *HumanReadablePrinter) PrintObj(obj runtime.Object, output io.Writer) error { w := tabwriter.NewWriter(output, 20, 5, 3, ' ', 0) defer w.Flush() - if handler := h.handlerMap[reflect.TypeOf(obj)]; handler != nil { - if !h.noHeaders { + t := reflect.TypeOf(obj) + if handler := h.handlerMap[t]; handler != nil { + if !h.noHeaders && t != h.lastType { h.printHeader(handler.columns, w) + h.lastType = t } args := []reflect.Value{reflect.ValueOf(obj), reflect.ValueOf(w)} resultValue := handler.printFunc.Call(args)[0] diff --git a/pkg/kubectl/resthelper.go b/pkg/kubectl/resthelper.go index 48708263fa6..eb36daa2ad7 100644 --- a/pkg/kubectl/resthelper.go +++ b/pkg/kubectl/resthelper.go @@ -20,6 +20,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // RESTHelper provides methods for retrieving or mutating a RESTful @@ -49,6 +50,17 @@ func (m *RESTHelper) Get(namespace, name string, selector labels.Selector) (runt return m.RESTClient.Get().Path(m.Resource).Namespace(namespace).Path(name).SelectorParam("labels", selector).Do().Get() } +func (m *RESTHelper) Watch(namespace, resourceVersion string, labelSelector, fieldSelector labels.Selector) (watch.Interface, error) { + return m.RESTClient.Get(). + Path("watch"). + Path(m.Resource). + Namespace(namespace). + Param("resourceVersion", resourceVersion). + SelectorParam("labels", labelSelector). + SelectorParam("fields", fieldSelector). + Watch() +} + func (m *RESTHelper) Delete(namespace, name string) error { return m.RESTClient.Delete().Path(m.Resource).Namespace(namespace).Path(name).Do().Error() } diff --git a/pkg/kubectl/watchloop.go b/pkg/kubectl/watchloop.go new file mode 100644 index 00000000000..2e6d3b9f88b --- /dev/null +++ b/pkg/kubectl/watchloop.go @@ -0,0 +1,47 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubectl + +import ( + "io" + "os" + "os/signal" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// WatchLoop loops, writing objects in the events from w to printer. +// If user sends interrupt signal, shut down cleanly. Otherwise, never return. +func WatchLoop(w watch.Interface, printer ResourcePrinter, out io.Writer) { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + defer signal.Stop(signals) + for { + select { + case event, ok := <-w.ResultChan(): + if !ok { + return + } + // TODO: need to print out added/modified/deleted! + if err := printer.PrintObj(event.Object, out); err != nil { + w.Stop() + } + case <-signals: + w.Stop() + } + } +} diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 359866a6570..c190b5b9c5f 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -18,7 +18,6 @@ package etcd import ( "fmt" - "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -97,21 +96,6 @@ func makePodKey(ctx api.Context, id string) (string, error) { return MakeEtcdItemKey(ctx, PodPath, id) } -// ParseWatchResourceVersion takes a resource version argument and converts it to -// the etcd version we should pass to helper.Watch(). Because resourceVersion is -// an opaque value, the default watch behavior for non-zero watch is to watch -// the next value (if you pass "1", you will see updates from "2" onwards). -func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { - if resourceVersion == "" || resourceVersion == "0" { - return 0, nil - } - version, err := strconv.ParseUint(resourceVersion, 10, 64) - if err != nil { - return 0, etcderr.InterpretResourceVersionError(err, kind, resourceVersion) - } - return version + 1, nil -} - // ListPods obtains a list of pods with labels that match selector. func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool { @@ -143,7 +127,7 @@ func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool // WatchPods begins watching for new, changed, or deleted pods. func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) { - version, err := ParseWatchResourceVersion(resourceVersion, "pod") + version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod") if err != nil { return nil, err } @@ -354,7 +338,7 @@ func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerL // WatchControllers begins watching for new, changed, or deleted controllers. func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) { - version, err := ParseWatchResourceVersion(resourceVersion, "replicationControllers") + version, err := tools.ParseWatchResourceVersion(resourceVersion, "replicationControllers") if err != nil { return nil, err } @@ -516,7 +500,7 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error { // WatchServices begins watching for new, changed, or deleted service configurations. func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { - version, err := ParseWatchResourceVersion(resourceVersion, "service") + version, err := tools.ParseWatchResourceVersion(resourceVersion, "service") if err != nil { return nil, err } @@ -561,7 +545,7 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er // WatchEndpoints begins watching for new, changed, or deleted endpoint configurations. func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { - version, err := ParseWatchResourceVersion(resourceVersion, "endpoints") + version, err := tools.ParseWatchResourceVersion(resourceVersion, "endpoints") if err != nil { return nil, err } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 3836b7e0f6e..a69a9bb5b5f 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -42,41 +42,6 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { return registry } -func TestEtcdParseWatchResourceVersion(t *testing.T) { - testCases := []struct { - Version string - Kind string - ExpectVersion uint64 - Err bool - }{ - {Version: "", ExpectVersion: 0}, - {Version: "a", Err: true}, - {Version: " ", Err: true}, - {Version: "1", ExpectVersion: 2}, - {Version: "10", ExpectVersion: 11}, - } - for _, testCase := range testCases { - version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind) - switch { - case testCase.Err: - if err == nil { - t.Errorf("%s: unexpected non-error", testCase.Version) - continue - } - if !errors.IsInvalid(err) { - t.Errorf("%s: unexpected error: %v", testCase.Version, err) - continue - } - case !testCase.Err && err != nil: - t.Errorf("%s: unexpected error: %v", testCase.Version, err) - continue - } - if version != testCase.ExpectVersion { - t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version) - } - } -} - // TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash func TestEtcdGetPodDifferentNamespace(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) diff --git a/pkg/registry/event/rest.go b/pkg/registry/event/rest.go index cb4b530b8ed..26f4948b6bd 100644 --- a/pkg/registry/event/rest.go +++ b/pkg/registry/event/rest.go @@ -109,7 +109,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj // Watch returns Events events via a watch.Interface. // It implements apiserver.ResourceWatcher. -func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion) } diff --git a/pkg/registry/event/rest_test.go b/pkg/registry/event/rest_test.go index 888cbd0a440..ae8e68e7cbb 100644 --- a/pkg/registry/event/rest_test.go +++ b/pkg/registry/event/rest_test.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -50,6 +51,8 @@ func TestRESTCreate(t *testing.T) { if e, a := eventA, (<-c).Object; !reflect.DeepEqual(e, a) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } + // Ensure we implement the interface + _ = apiserver.ResourceWatcher(rest) } func TestRESTDelete(t *testing.T) { @@ -216,7 +219,7 @@ func TestRESTWatch(t *testing.T) { Reason: "forTesting", } reg, rest := NewTestREST() - wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), 0) + wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), "0") if err != nil { t.Fatalf("Unexpected error %v", err) } diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 90ca035b707..0d27246e685 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -91,8 +91,12 @@ func (e *Etcd) Delete(ctx api.Context, id string) error { // Watch starts a watch for the items that m matches. // TODO: Detect if m references a single object instead of a list. -func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) { - return e.Helper.WatchList(e.KeyRoot, resourceVersion, func(obj runtime.Object) bool { +func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { + version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName) + if err != nil { + return nil, err + } + return e.Helper.WatchList(e.KeyRoot, version, func(obj runtime.Object) bool { matches, err := m.Matches(obj) return err == nil && matches }) diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go index 6f1ed30ed2f..dd76590263a 100644 --- a/pkg/registry/generic/etcd/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -417,7 +417,7 @@ func TestEtcdWatch(t *testing.T) { } fakeClient, registry := NewTestGenericEtcdRegistry(t) - wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, 1) + wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, "1") if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/registry/generic/registry.go b/pkg/registry/generic/registry.go index d368905cd8e..2f80ab81029 100644 --- a/pkg/registry/generic/registry.go +++ b/pkg/registry/generic/registry.go @@ -76,7 +76,7 @@ type Registry interface { Update(ctx api.Context, id string, obj runtime.Object) error Get(ctx api.Context, id string) (runtime.Object, error) Delete(ctx api.Context, id string) error - Watch(ctx api.Context, m Matcher, resourceVersion uint64) (watch.Interface, error) + Watch(ctx api.Context, m Matcher, resourceVersion string) (watch.Interface, error) } // FilterList filters any list object that conforms to the api conventions, diff --git a/pkg/registry/registrytest/generic.go b/pkg/registry/registrytest/generic.go index 01b5d0293b3..6142f90f7eb 100644 --- a/pkg/registry/registrytest/generic.go +++ b/pkg/registry/registrytest/generic.go @@ -52,7 +52,7 @@ func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Obje return generic.FilterList(r.ObjectList, m) } -func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) { +func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { // TODO: wire filter down into the mux; it needs access to current and previous state :( return r.Mux.Watch(), nil } diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go index 18c57b012ff..8921b921462 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_tools_watch.go @@ -17,13 +17,16 @@ limitations under the License. package tools import ( + "strconv" "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" ) @@ -37,6 +40,21 @@ func Everything(runtime.Object) bool { return true } +// ParseWatchResourceVersion takes a resource version argument and converts it to +// the etcd version we should pass to helper.Watch(). Because resourceVersion is +// an opaque value, the default watch behavior for non-zero watch is to watch +// the next value (if you pass "1", you will see updates from "2" onwards). +func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { + if resourceVersion == "" || resourceVersion == "0" { + return 0, nil + } + version, err := strconv.ParseUint(resourceVersion, 10, 64) + if err != nil { + return 0, errors.NewInvalid(kind, "", errors.ValidationErrorList{errors.NewFieldInvalid("resourceVersion", resourceVersion)}) + } + return version + 1, nil +} + // WatchList begins watching the specified key's items. Items are decoded into // API objects, and any items passing 'filter' are sent down the returned // watch.Interface. resourceVersion may be used to specify what version to begin diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index 0274ff6607c..ad5bd071634 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -601,3 +602,38 @@ func TestWatchPurposefulShutdown(t *testing.T) { t.Errorf("An injected error did not cause a graceful shutdown") } } + +func TestEtcdParseWatchResourceVersion(t *testing.T) { + testCases := []struct { + Version string + Kind string + ExpectVersion uint64 + Err bool + }{ + {Version: "", ExpectVersion: 0}, + {Version: "a", Err: true}, + {Version: " ", Err: true}, + {Version: "1", ExpectVersion: 2}, + {Version: "10", ExpectVersion: 11}, + } + for _, testCase := range testCases { + version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind) + switch { + case testCase.Err: + if err == nil { + t.Errorf("%s: unexpected non-error", testCase.Version) + continue + } + if !errors.IsInvalid(err) { + t.Errorf("%s: unexpected error: %v", testCase.Version, err) + continue + } + case !testCase.Err && err != nil: + t.Errorf("%s: unexpected error: %v", testCase.Version, err) + continue + } + if version != testCase.ExpectVersion { + t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version) + } + } +}