Merge pull request #2337 from lavalamp/fix3

Add watch, multiple fixes
This commit is contained in:
Eric Tune 2014-11-12 15:23:31 -08:00
commit e0e686896e
18 changed files with 176 additions and 81 deletions

View File

@ -216,13 +216,22 @@ func finishRunning(stepName string, cmd *exec.Cmd) bool {
// returns either "", or a list of args intended for appending with the
// kubecfg or kubectl commands (begining with a space).
func kubeClientArgs() string {
func kubecfgArgs() string {
if *checkVersionSkew {
return " -expect_version_match"
}
return ""
}
// returns either "", or a list of args intended for appending with the
// kubectl command (begining with a space).
func kubectlArgs() string {
if *checkVersionSkew {
return " --match-server-version"
}
return ""
}
func bashWrap(cmd string) string {
return `
set -o errexit
@ -233,8 +242,8 @@ export KUBE_CONFIG_FILE="config-test.sh"
# TODO(jbeda): This will break on usage if there is a space in
# ${KUBE_ROOT}. Covert to an array? Or an exported function?
export KUBECFG="` + *root + `/cluster/kubecfg.sh` + kubeClientArgs() + `"
export KUBECTL="` + *root + `/cluster/kubectl.sh` + kubeClientArgs() + `"
export KUBECFG="` + *root + `/cluster/kubecfg.sh` + kubecfgArgs() + `"
export KUBECTL="` + *root + `/cluster/kubectl.sh` + kubectlArgs() + `"
source "` + *root + `/cluster/kube-env.sh"
source "` + *root + `/cluster/${KUBERNETES_PROVIDER}/util.sh"

View File

@ -64,10 +64,3 @@ func InterpretDeleteError(err error, kind, name string) error {
return err
}
}
// InterpretResourceVersionError returns the appropriate api error
// for a failure to convert the resource version of an object sent
// to the API to an etcd uint64 index.
func InterpretResourceVersionError(err error, kind, value string) error {
return errors.NewInvalid(kind, "", errors.ValidationErrorList{errors.NewFieldInvalid("resourceVersion", value)})
}

View File

@ -278,7 +278,11 @@ func (r *Request) Watch() (watch.Interface, error) {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Got status: %v", resp.StatusCode)
var body []byte
if resp.Body != nil {
body, _ = ioutil.ReadAll(resp.Body)
}
return nil, fmt.Errorf("For request '%v', got status: %v\nbody: %v", req.URL, resp.StatusCode, string(body))
}
return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.codec)), nil
}

View File

@ -106,11 +106,9 @@ Find more information at https://github.com/GoogleCloudPlatform/kubernetes.`,
}
}
// TODO: remove this function and references to it-- errors it prints are
// very unhelpful because file/line number are wrong.
func checkErr(err error) {
if err != nil {
glog.Fatalf("%v", err)
glog.FatalDepth(1, err)
}
}

View File

@ -20,6 +20,7 @@ import (
"fmt"
"io"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/spf13/cobra"
@ -50,7 +51,7 @@ Examples:
mapping, namespace, name := ResourceOrTypeFromArgs(cmd, args, f.Mapper)
selector := GetFlagString(cmd, "selector")
labels, err := labels.ParseSelector(selector)
labelSelector, err := labels.ParseSelector(selector)
checkErr(err)
client, err := f.Client(cmd, mapping)
@ -68,12 +69,26 @@ Examples:
printer, err := kubectl.GetPrinter(outputVersion, outputFormat, templateFile, defaultPrinter)
checkErr(err)
obj, err := kubectl.NewRESTHelper(client, mapping).Get(namespace, name, labels)
restHelper := kubectl.NewRESTHelper(client, mapping)
obj, err := restHelper.Get(namespace, name, labelSelector)
checkErr(err)
if err := printer.PrintObj(obj, out); err != nil {
checkErr(fmt.Errorf("Unable to output the provided object: %v", err))
}
if GetFlagBool(cmd, "watch") {
vi, err := latest.InterfacesFor(outputVersion)
checkErr(err)
rv, err := vi.MetadataAccessor.ResourceVersion(obj)
checkErr(err)
w, err := restHelper.Watch(namespace, rv, labelSelector, labels.Set{}.AsSelector())
checkErr(err)
kubectl.WatchLoop(w, printer, out)
}
},
}
cmd.Flags().StringP("output", "o", "", "Output format: json|yaml|template|templatefile")
@ -81,5 +96,6 @@ Examples:
cmd.Flags().Bool("no-headers", false, "When using the default output, don't print headers")
cmd.Flags().StringP("template", "t", "", "Template string or path to template file to use when --output=template or --output=templatefile")
cmd.Flags().StringP("selector", "l", "", "Selector (label query) to filter on")
cmd.Flags().BoolP("watch", "w", false, "After listing/getting the requested object, watch for changes.")
return cmd
}

View File

@ -154,10 +154,14 @@ type handlerEntry struct {
printFunc reflect.Value
}
// HumanReadablePrinter is an implementation of ResourcePrinter which attempts to provide more elegant output.
// HumanReadablePrinter is an implementation of ResourcePrinter which attempts to provide
// more elegant output. It is not threadsafe, but you may call PrintObj repeatedly; headers
// will only be printed if the object type changes. This makes it useful for printing items
// recieved from watches.
type HumanReadablePrinter struct {
handlerMap map[reflect.Type]*handlerEntry
noHeaders bool
lastType reflect.Type
}
// IsVersioned returns false-- human readable printers do not make versioned output.
@ -348,9 +352,11 @@ func printEventList(list *api.EventList, w io.Writer) error {
func (h *HumanReadablePrinter) PrintObj(obj runtime.Object, output io.Writer) error {
w := tabwriter.NewWriter(output, 20, 5, 3, ' ', 0)
defer w.Flush()
if handler := h.handlerMap[reflect.TypeOf(obj)]; handler != nil {
if !h.noHeaders {
t := reflect.TypeOf(obj)
if handler := h.handlerMap[t]; handler != nil {
if !h.noHeaders && t != h.lastType {
h.printHeader(handler.columns, w)
h.lastType = t
}
args := []reflect.Value{reflect.ValueOf(obj), reflect.ValueOf(w)}
resultValue := handler.printFunc.Call(args)[0]

View File

@ -20,6 +20,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// RESTHelper provides methods for retrieving or mutating a RESTful
@ -49,6 +50,17 @@ func (m *RESTHelper) Get(namespace, name string, selector labels.Selector) (runt
return m.RESTClient.Get().Path(m.Resource).Namespace(namespace).Path(name).SelectorParam("labels", selector).Do().Get()
}
func (m *RESTHelper) Watch(namespace, resourceVersion string, labelSelector, fieldSelector labels.Selector) (watch.Interface, error) {
return m.RESTClient.Get().
Path("watch").
Path(m.Resource).
Namespace(namespace).
Param("resourceVersion", resourceVersion).
SelectorParam("labels", labelSelector).
SelectorParam("fields", fieldSelector).
Watch()
}
func (m *RESTHelper) Delete(namespace, name string) error {
return m.RESTClient.Delete().Path(m.Resource).Namespace(namespace).Path(name).Do().Error()
}

47
pkg/kubectl/watchloop.go Normal file
View File

@ -0,0 +1,47 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubectl
import (
"io"
"os"
"os/signal"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// WatchLoop loops, writing objects in the events from w to printer.
// If user sends interrupt signal, shut down cleanly. Otherwise, never return.
func WatchLoop(w watch.Interface, printer ResourcePrinter, out io.Writer) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
defer signal.Stop(signals)
for {
select {
case event, ok := <-w.ResultChan():
if !ok {
return
}
// TODO: need to print out added/modified/deleted!
if err := printer.PrintObj(event.Object, out); err != nil {
w.Stop()
}
case <-signals:
w.Stop()
}
}
}

View File

@ -18,7 +18,6 @@ package etcd
import (
"fmt"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@ -97,21 +96,6 @@ func makePodKey(ctx api.Context, id string) (string, error) {
return MakeEtcdItemKey(ctx, PodPath, id)
}
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, etcderr.InterpretResourceVersionError(err, kind, resourceVersion)
}
return version + 1, nil
}
// ListPods obtains a list of pods with labels that match selector.
func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool {
@ -143,7 +127,7 @@ func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool
// WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "pod")
version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod")
if err != nil {
return nil, err
}
@ -354,7 +338,7 @@ func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerL
// WatchControllers begins watching for new, changed, or deleted controllers.
func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "replicationControllers")
version, err := tools.ParseWatchResourceVersion(resourceVersion, "replicationControllers")
if err != nil {
return nil, err
}
@ -516,7 +500,7 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error {
// WatchServices begins watching for new, changed, or deleted service configurations.
func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "service")
version, err := tools.ParseWatchResourceVersion(resourceVersion, "service")
if err != nil {
return nil, err
}
@ -561,7 +545,7 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er
// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations.
func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "endpoints")
version, err := tools.ParseWatchResourceVersion(resourceVersion, "endpoints")
if err != nil {
return nil, err
}

View File

@ -42,41 +42,6 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
return registry
}
func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
Version string
Kind string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 2},
{Version: "10", ExpectVersion: 11},
}
for _, testCase := range testCases {
version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind)
switch {
case testCase.Err:
if err == nil {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !errors.IsInvalid(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
case !testCase.Err && err != nil:
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
if version != testCase.ExpectVersion {
t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
}
}
}
// TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash
func TestEtcdGetPodDifferentNamespace(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)

View File

@ -109,7 +109,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
// Watch returns Events events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -50,6 +51,8 @@ func TestRESTCreate(t *testing.T) {
if e, a := eventA, (<-c).Object; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
// Ensure we implement the interface
_ = apiserver.ResourceWatcher(rest)
}
func TestRESTDelete(t *testing.T) {
@ -216,7 +219,7 @@ func TestRESTWatch(t *testing.T) {
Reason: "forTesting",
}
reg, rest := NewTestREST()
wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), 0)
wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), "0")
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

View File

@ -91,8 +91,12 @@ func (e *Etcd) Delete(ctx api.Context, id string) error {
// Watch starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list.
func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) {
return e.Helper.WatchList(e.KeyRoot, resourceVersion, func(obj runtime.Object) bool {
func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
if err != nil {
return nil, err
}
return e.Helper.WatchList(e.KeyRoot, version, func(obj runtime.Object) bool {
matches, err := m.Matches(obj)
return err == nil && matches
})

View File

@ -417,7 +417,7 @@ func TestEtcdWatch(t *testing.T) {
}
fakeClient, registry := NewTestGenericEtcdRegistry(t)
wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, 1)
wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, "1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -76,7 +76,7 @@ type Registry interface {
Update(ctx api.Context, id string, obj runtime.Object) error
Get(ctx api.Context, id string) (runtime.Object, error)
Delete(ctx api.Context, id string) error
Watch(ctx api.Context, m Matcher, resourceVersion uint64) (watch.Interface, error)
Watch(ctx api.Context, m Matcher, resourceVersion string) (watch.Interface, error)
}
// FilterList filters any list object that conforms to the api conventions,

View File

@ -52,7 +52,7 @@ func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Obje
return generic.FilterList(r.ObjectList, m)
}
func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) {
func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.Mux.Watch(), nil
}

View File

@ -17,13 +17,16 @@ limitations under the License.
package tools
import (
"strconv"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
@ -37,6 +40,21 @@ func Everything(runtime.Object) bool {
return true
}
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, errors.NewInvalid(kind, "", errors.ValidationErrorList{errors.NewFieldInvalid("resourceVersion", resourceVersion)})
}
return version + 1, nil
}
// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. resourceVersion may be used to specify what version to begin

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -601,3 +602,38 @@ func TestWatchPurposefulShutdown(t *testing.T) {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}
func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
Version string
Kind string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 2},
{Version: "10", ExpectVersion: 11},
}
for _, testCase := range testCases {
version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind)
switch {
case testCase.Err:
if err == nil {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !errors.IsInvalid(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
case !testCase.Err && err != nil:
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
if version != testCase.ExpectVersion {
t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
}
}
}