From 4a6935b31fcc4d1498c977d90387e02b6b93288f Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Fri, 22 Jan 2016 00:11:30 -0500 Subject: [PATCH] Remaining codec change refactors --- cmd/integration/integration.go | 7 ++--- contrib/mesos/pkg/executor/executor.go | 3 ++- contrib/mesos/pkg/podutil/gzip.go | 5 ++-- contrib/mesos/pkg/podutil/io.go | 5 ++-- .../pkg/scheduler/components/binder/binder.go | 4 ++- examples/https-nginx/make_secret.go | 2 +- examples/sharing-clusters/make_secret.go | 2 +- pkg/controller/controller_utils.go | 10 +++++-- pkg/kubelet/config/common.go | 11 +++----- pkg/kubelet/dockertools/labels.go | 5 ++-- pkg/kubelet/dockertools/manager.go | 13 +++++----- pkg/kubelet/server/server.go | 20 +++++++------- pkg/storage/etcd/etcd_helper.go | 26 ++++++++++++------- pkg/storage/etcd/etcd_watcher.go | 21 ++++++++------- pkg/util/httpstream/spdy/roundtripper.go | 3 ++- pkg/util/io/io.go | 7 +++-- pkg/util/io/io_test.go | 6 +++-- pkg/watch/json/decoder.go | 2 +- pkg/watch/json/encoder.go | 4 +-- pkg/watch/json/types.go | 4 +-- plugin/cmd/kube-scheduler/app/server.go | 18 ++++++------- test/e2e/serviceloadbalancers.go | 5 ++-- test/soak/serve_hostnames/serve_hostnames.go | 3 ++- 23 files changed, 104 insertions(+), 82 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index f7d1cd80995..eb9d174dde5 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -26,7 +26,7 @@ import ( "net/http/httptest" "os" "reflect" - "runtime" + gruntime "runtime" "strconv" "strings" "sync" @@ -52,6 +52,7 @@ import ( kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/master" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/sets" @@ -441,7 +442,7 @@ func runReplicationControllerTest(c *client.Client) { glog.Fatalf("Unexpected error: %v", err) } var controller api.ReplicationController - if err := api.Scheme.DecodeInto(data, &controller); err != nil { + if err := runtime.DecodeInto(testapi.Default.Codec(), data, &controller); err != nil { glog.Fatalf("Unexpected error: %v", err) } @@ -952,7 +953,7 @@ func addFlags(fs *pflag.FlagSet) { } func main() { - runtime.GOMAXPROCS(runtime.NumCPU()) + gruntime.GOMAXPROCS(gruntime.NumCPU()) addFlags(pflag.CommandLine) util.InitFlags() diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 5270a4ee29a..3edb92697e0 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -42,6 +42,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" + kruntime "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" ) @@ -343,7 +344,7 @@ func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.Ta return } - obj, err := api.Codec.Decode(taskInfo.GetData()) + obj, err := kruntime.Decode(api.Codecs.UniversalDecoder(), taskInfo.GetData()) if err != nil { log.Errorf("failed to extract yaml data from the taskInfo.data %v", err) k.sendStatus(driver, newStatus(taskInfo.GetTaskId(), mesos.TaskState_TASK_FAILED, diff --git a/contrib/mesos/pkg/podutil/gzip.go b/contrib/mesos/pkg/podutil/gzip.go index 8778326d092..ba0f1cd1b72 100644 --- a/contrib/mesos/pkg/podutil/gzip.go +++ b/contrib/mesos/pkg/podutil/gzip.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api" _ "k8s.io/kubernetes/pkg/api/install" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/runtime" ) func Gzip(pods <-chan *api.Pod) ([]byte, error) { @@ -32,7 +33,7 @@ func Gzip(pods <-chan *api.Pod) ([]byte, error) { } func gzipList(list *api.PodList) ([]byte, error) { - raw, err := v1.Codec.Encode(list) + raw, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), list) if err != nil { return nil, err } @@ -68,7 +69,7 @@ func gunzipList(gzipped []byte) (*api.PodList, error) { return nil, err } - obj, err := api.Scheme.Decode(raw) + obj, err := runtime.Decode(api.Codecs.UniversalDecoder(), raw) if err != nil { return nil, err } diff --git a/contrib/mesos/pkg/podutil/io.go b/contrib/mesos/pkg/podutil/io.go index e21c22f7ab7..4eb563a2f9c 100644 --- a/contrib/mesos/pkg/podutil/io.go +++ b/contrib/mesos/pkg/podutil/io.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/validation" + "k8s.io/kubernetes/pkg/runtime" utilyaml "k8s.io/kubernetes/pkg/util/yaml" ) @@ -41,7 +42,7 @@ func WriteToDir(pods <-chan *api.Pod, destDir string) error { log.Warningf("skipping static pod %s/%s that had no filename", p.Namespace, p.Name) continue } - raw, err := v1.Codec.Encode(p) + raw, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), p) if err != nil { log.Errorf("failed to encode static pod as v1 object: %v", err) continue @@ -105,7 +106,7 @@ func tryDecodeSinglePod(data []byte) (parsed bool, pod *api.Pod, err error) { if err != nil { return false, nil, err } - obj, err := api.Scheme.Decode(json) + obj, err := runtime.Decode(api.Codecs.UniversalDecoder(), json) if err != nil { return false, pod, err } diff --git a/contrib/mesos/pkg/scheduler/components/binder/binder.go b/contrib/mesos/pkg/scheduler/components/binder/binder.go index 55425316376..4cb39fa529c 100644 --- a/contrib/mesos/pkg/scheduler/components/binder/binder.go +++ b/contrib/mesos/pkg/scheduler/components/binder/binder.go @@ -26,6 +26,8 @@ import ( annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/runtime" ) type Binder interface { @@ -150,7 +152,7 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod // the kubelet-executor uses this to instantiate the pod log.V(3).Infof("prepared pod spec: %+v", pod) - data, err := api.Codec.Encode(&pod) + data, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), &pod) if err != nil { log.V(2).Infof("Failed to marshal the pod spec: %v", err) return err diff --git a/examples/https-nginx/make_secret.go b/examples/https-nginx/make_secret.go index ebefa958664..ae3947fcaac 100644 --- a/examples/https-nginx/make_secret.go +++ b/examples/https-nginx/make_secret.go @@ -66,5 +66,5 @@ func main() { "nginx.key": nginxKey, }, } - fmt.Printf(runtime.EncodeOrDie(registered.GroupOrDie(api.GroupName).Codec, secret)) + fmt.Printf(runtime.EncodeOrDie(api.Codecs.LegacyCodec(registered.EnabledVersions()...), secret)) } diff --git a/examples/sharing-clusters/make_secret.go b/examples/sharing-clusters/make_secret.go index 6219948c3e6..20b35b513d8 100644 --- a/examples/sharing-clusters/make_secret.go +++ b/examples/sharing-clusters/make_secret.go @@ -59,5 +59,5 @@ func main() { "config": cfg, }, } - fmt.Printf(runtime.EncodeOrDie(registered.GroupOrDie(api.GroupName).Codec, secret)) + fmt.Printf(runtime.EncodeOrDie(api.Codecs.LegacyCodec(registered.EnabledVersions()...), secret)) } diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index b7481d76316..ef1bca9df80 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -24,8 +24,8 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/validation" - "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -259,7 +259,13 @@ func getPodsAnnotationSet(template *api.PodTemplateSpec, object runtime.Object) if err != nil { return desiredAnnotations, fmt.Errorf("unable to get controller reference: %v", err) } - createdByRefJson, err := registered.GroupOrDie(api.GroupName).Codec.Encode(&api.SerializedReference{ + + // TODO: this code was not safe previously - as soon as new code came along that switched to v2, old clients + // would be broken upon reading it. This is explicitly hardcoded to v1 to guarantee predictable deployment. + // We need to consistently handle this case of annotation versioning. + codec := api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"}) + + createdByRefJson, err := runtime.Encode(codec, &api.SerializedReference{ Reference: *createdByRef, }) if err != nil { diff --git a/pkg/kubelet/config/common.go b/pkg/kubelet/config/common.go index 86318deb883..0838699ec26 100644 --- a/pkg/kubelet/config/common.go +++ b/pkg/kubelet/config/common.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/apimachinery/registered" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/hash" utilyaml "k8s.io/kubernetes/pkg/util/yaml" @@ -93,7 +94,7 @@ func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *a if err != nil { return false, nil, err } - obj, err := api.Scheme.Decode(json) + obj, err := runtime.Decode(api.Codecs.UniversalDecoder(), json) if err != nil { return false, pod, err } @@ -115,17 +116,13 @@ func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *a } func tryDecodePodList(data []byte, defaultFn defaultFunc) (parsed bool, pods api.PodList, err error) { - json, err := utilyaml.ToJSON(data) - if err != nil { - return false, api.PodList{}, err - } - obj, err := api.Scheme.Decode(json) + obj, err := runtime.Decode(api.Codecs.UniversalDecoder(), data) if err != nil { return false, pods, err } // Check whether the object could be converted to list of pods. if _, ok := obj.(*api.PodList); !ok { - err = fmt.Errorf("invalid pods list: %+v", obj) + err = fmt.Errorf("invalid pods list: %#v", obj) return false, pods, err } newPods := obj.(*api.PodList) diff --git a/pkg/kubelet/dockertools/labels.go b/pkg/kubelet/dockertools/labels.go index 15535d97522..fd7ad7c8985 100644 --- a/pkg/kubelet/dockertools/labels.go +++ b/pkg/kubelet/dockertools/labels.go @@ -22,9 +22,9 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/apimachinery/registered" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/util/format" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" ) @@ -187,8 +187,7 @@ func supplyContainerInfoWithOldLabel(labels map[string]string, containerInfo *la return } pod = &api.Pod{} - err := registered.GroupOrDie(api.GroupName).Codec.DecodeInto([]byte(data), pod) - if err != nil { + if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(data), pod); err != nil { // If the pod label can't be parsed, we should report an error logError(containerInfo, kubernetesPodLabel, err) return diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index f19a6566797..973f78facef 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -37,7 +37,6 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -48,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/qos" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -655,11 +655,12 @@ func (dm *DockerManager) runContainer( // TODO(random-liu): Remove this when we start to use new labels for KillContainerInPod if container.Lifecycle != nil && container.Lifecycle.PreStop != nil { // TODO: This is kind of hacky, we should really just encode the bits we need. - data, err := registered.GroupOrDie(api.GroupName).Codec.Encode(pod) - if err != nil { - glog.Errorf("Failed to encode pod: %s for prestop hook", pod.Name) - } else { + // TODO: This is hacky because the Kubelet should be parameterized to encode a specific version + // and needs to be able to migrate this whenever we deprecate v1. Should be a member of DockerManager. + if data, err := runtime.Encode(api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"}), pod); err == nil { labels[kubernetesPodLabel] = string(data) + } else { + glog.Errorf("Failed to encode pod: %s for prestop hook", pod.Name) } } memoryLimit := container.Resources.Limits.Memory().Value() @@ -1432,7 +1433,7 @@ func containerAndPodFromLabels(inspect *docker.Container) (pod *api.Pod, contain // the pod data may not be set if body, found := labels[kubernetesPodLabel]; found { pod = &api.Pod{} - if err = registered.GroupOrDie(api.GroupName).Codec.DecodeInto([]byte(body), pod); err == nil { + if err = runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(body), pod); err == nil { name := labels[kubernetesContainerNameLabel] for ix := range pod.Spec.Containers { if pod.Spec.Containers[ix].Name == name { diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 37c7f563d66..0f61d40b426 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/validation" - "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/auth/authenticator" "k8s.io/kubernetes/pkg/auth/authorizer" "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" @@ -48,6 +47,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/stats" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flushwriter" @@ -402,18 +402,12 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re delete(query, "tailLines") } } - // container logs on the kubelet are locked to v1 - versioned := &v1.PodLogOptions{} - if err := api.Scheme.Convert(&query, versioned); err != nil { + // container logs on the kubelet are locked to the v1 API version of PodLogOptions + logOptions := &api.PodLogOptions{} + if err := api.ParameterCodec.DecodeParameters(query, v1.SchemeGroupVersion, logOptions); err != nil { response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`)) return } - out, err := api.Scheme.ConvertToVersion(versioned, "") - if err != nil { - response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to convert request query."}`)) - return - } - logOptions := out.(*api.PodLogOptions) logOptions.TypeMeta = unversioned.TypeMeta{} if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 { response.WriteError(apierrs.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`)) @@ -462,7 +456,11 @@ func encodePods(pods []*api.Pod) (data []byte, err error) { for _, pod := range pods { podList.Items = append(podList.Items, *pod) } - return registered.GroupOrDie(api.GroupName).Codec.Encode(podList) + // TODO: this needs to be parameterized to the kubelet, not hardcoded. Depends on Kubelet + // as API server refactor. + // TODO: Locked to v1, needs to be made generic + codec := api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"}) + return runtime.Encode(codec, podList) } // getPods returns a list of pods bound to the Kubelet and their spec. diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 7e4fcac21f0..960c1b71207 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -148,7 +148,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob glog.Errorf("Context is nil") } key = h.prefixEtcdKey(key) - data, err := h.codec.Encode(obj) + data, err := runtime.Encode(h.codec, obj) if err != nil { return err } @@ -183,7 +183,7 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec glog.Errorf("Context is nil") } var response *etcd.Response - data, err := h.codec.Encode(obj) + data, err := runtime.Encode(h.codec, obj) if err != nil { return err } @@ -333,7 +333,13 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response) } body = node.Value - err = h.codec.DecodeInto([]byte(body), objPtr) + out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr) + if err != nil { + return body, nil, err + } + if out != objPtr { + return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr)) + } if h.versioner != nil { _ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex) // being unable to set the version does not prevent the object from being extracted @@ -403,19 +409,19 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } } else { - obj := reflect.New(v.Type().Elem()) - if err := h.codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil { + obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) + if err != nil { return err } if h.versioner != nil { // being unable to set the version does not prevent the object from being extracted - _ = h.versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex) + _ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex) } - if filter(obj.Interface().(runtime.Object)) { - v.Set(reflect.Append(v, obj.Elem())) + if filter(obj) { + v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } if node.ModifiedIndex != 0 { - h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object)) + h.addToCache(node.ModifiedIndex, obj) } } } @@ -532,7 +538,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType ttl = *newTTL } - data, err := h.codec.Encode(ret) + data, err := runtime.Encode(h.codec, ret) if err != nil { return err } diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 4c807dac356..0411afe8e33 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -17,6 +17,7 @@ limitations under the License. package etcd import ( + "fmt" "net/http" "sync" "sync/atomic" @@ -210,7 +211,7 @@ func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key stri resp, err := client.Get(ctx, key, &opts) if err != nil { if !etcdutil.IsEtcdNotFound(err) { - glog.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err) + util.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err)) return resourceVersion, err } if etcdError, ok := err.(etcd.Error); ok { @@ -300,7 +301,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { return obj, nil } - obj, err := w.encoding.Decode([]byte(node.Value)) + obj, err := runtime.Decode(w.encoding, []byte(node.Value)) if err != nil { return nil, err } @@ -308,7 +309,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { // ensure resource version is set on the object we load from etcd if w.versioner != nil { if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil { - glog.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err) + util.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)) } } @@ -316,7 +317,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { if w.transform != nil { obj, err = w.transform(obj) if err != nil { - glog.Errorf("failure to transform api object %#v: %v", obj, err) + util.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err)) return nil, err } } @@ -329,7 +330,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { func (w *etcdWatcher) sendAdd(res *etcd.Response) { if res.Node == nil { - glog.Errorf("unexpected nil node: %#v", res) + util.HandleError(fmt.Errorf("unexpected nil node: %#v", res)) return } if w.include != nil && !w.include(res.Node.Key) { @@ -337,7 +338,7 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) { } obj, err := w.decodeObject(res.Node) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node) + util.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node)) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. @@ -366,7 +367,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { } curObj, err := w.decodeObject(res.Node) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node) + util.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node)) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. @@ -406,7 +407,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { func (w *etcdWatcher) sendDelete(res *etcd.Response) { if res.PrevNode == nil { - glog.Errorf("unexpected nil prev node: %#v", res) + util.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res)) return } if w.include != nil && !w.include(res.PrevNode.Key) { @@ -421,7 +422,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { } obj, err := w.decodeObject(&node) if err != nil { - glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.PrevNode.Value), res, res.PrevNode) + util.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node)) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. @@ -445,7 +446,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { case EtcdDelete, EtcdExpire: w.sendDelete(res) default: - glog.Errorf("unknown action: %v", res.Action) + util.HandleError(fmt.Errorf("unknown action: %v", res.Action)) } } diff --git a/pkg/util/httpstream/spdy/roundtripper.go b/pkg/util/httpstream/spdy/roundtripper.go index 47b67307735..a054d30c233 100644 --- a/pkg/util/httpstream/spdy/roundtripper.go +++ b/pkg/util/httpstream/spdy/roundtripper.go @@ -212,7 +212,8 @@ func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connec if err != nil { responseError = "unable to read error from server response" } else { - if obj, err := api.Scheme.Decode(responseErrorBytes); err == nil { + // TODO: I don't belong here, I should be abstracted from this class + if obj, _, err := api.Codecs.UniversalDecoder().Decode(responseErrorBytes, nil, &unversioned.Status{}); err == nil { if status, ok := obj.(*unversioned.Status); ok { return nil, &apierrors.StatusError{ErrStatus: *status} } diff --git a/pkg/util/io/io.go b/pkg/util/io/io.go index 64bc074ba5f..d600c08c191 100644 --- a/pkg/util/io/io.go +++ b/pkg/util/io/io.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apimachinery/registered" + "k8s.io/kubernetes/pkg/runtime" ) // LoadPodFromFile will read, decode, and return a Pod from a file. @@ -39,7 +40,8 @@ func LoadPodFromFile(filePath string) (*api.Pod, error) { } pod := &api.Pod{} - if err := registered.GroupOrDie(api.GroupName).Codec.DecodeInto(podDef, pod); err != nil { + codec := api.Codecs.LegacyCodec(registered.GroupOrDie(api.GroupName).GroupVersion) + if err := runtime.DecodeInto(codec, podDef, pod); err != nil { return nil, fmt.Errorf("failed decoding file: %v", err) } return pod, nil @@ -50,7 +52,8 @@ func SavePodToFile(pod *api.Pod, filePath string, perm os.FileMode) error { if filePath == "" { return fmt.Errorf("file path not specified") } - data, err := registered.GroupOrDie(api.GroupName).Codec.Encode(pod) + codec := api.Codecs.LegacyCodec(registered.GroupOrDie(api.GroupName).GroupVersion) + data, err := runtime.Encode(codec, pod) if err != nil { return fmt.Errorf("failed encoding pod: %v", err) } diff --git a/pkg/util/io/io_test.go b/pkg/util/io/io_test.go index 993ff5fec57..466f436b8cb 100644 --- a/pkg/util/io/io_test.go +++ b/pkg/util/io/io_test.go @@ -24,6 +24,7 @@ import ( "github.com/pborman/uuid" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apimachinery/registered" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/volume" ) @@ -32,8 +33,9 @@ func TestSavePodToFile(t *testing.T) { pod := volume.NewPersistentVolumeRecyclerPodTemplate() // sets all default values on a pod for equality comparison after decoding from file - encoded, err := registered.GroupOrDie(api.GroupName).Codec.Encode(pod) - registered.GroupOrDie(api.GroupName).Codec.DecodeInto(encoded, pod) + codec := api.Codecs.LegacyCodec(registered.GroupOrDie(api.GroupName).GroupVersion) + encoded, err := runtime.Encode(codec, pod) + runtime.DecodeInto(codec, encoded, pod) path := fmt.Sprintf("/tmp/kube-io-test-%s", uuid.New()) defer os.Remove(path) diff --git a/pkg/watch/json/decoder.go b/pkg/watch/json/decoder.go index f6c0924dd40..d1632336f67 100644 --- a/pkg/watch/json/decoder.go +++ b/pkg/watch/json/decoder.go @@ -56,7 +56,7 @@ func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) { return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type) } - obj, err := d.codec.Decode(got.Object.RawJSON) + obj, err := runtime.Decode(d.codec, got.Object.RawJSON) if err != nil { return "", nil, fmt.Errorf("unable to decode watch event: %v", err) } diff --git a/pkg/watch/json/encoder.go b/pkg/watch/json/encoder.go index 1110e9293fa..1eaf6a8a09c 100644 --- a/pkg/watch/json/encoder.go +++ b/pkg/watch/json/encoder.go @@ -30,11 +30,11 @@ import ( type Encoder struct { w io.Writer encoder *json.Encoder - codec runtime.Codec + codec runtime.Encoder } // NewEncoder creates an Encoder for the given writer and codec -func NewEncoder(w io.Writer, codec runtime.Codec) *Encoder { +func NewEncoder(w io.Writer, codec runtime.Encoder) *Encoder { return &Encoder{ w: w, encoder: json.NewEncoder(w), diff --git a/pkg/watch/json/types.go b/pkg/watch/json/types.go index f8fbd397aa8..28ee0dec75f 100644 --- a/pkg/watch/json/types.go +++ b/pkg/watch/json/types.go @@ -40,12 +40,12 @@ type WatchEvent struct { } // Object converts a watch.Event into an appropriately serializable JSON object -func Object(codec runtime.Codec, event *watch.Event) (interface{}, error) { +func Object(encoder runtime.Encoder, event *watch.Event) (interface{}, error) { obj, ok := event.Object.(runtime.Object) if !ok { return nil, fmt.Errorf("the event object cannot be safely converted to JSON: %v", reflect.TypeOf(event.Object).Name()) } - data, err := runtime.Encode(codec, obj) + data, err := runtime.Encode(encoder, obj) if err != nil { return nil, err } diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index fde563c0f06..07d85c60118 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -32,6 +32,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/healthz" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" "k8s.io/kubernetes/plugin/pkg/scheduler" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" @@ -152,19 +153,18 @@ func Run(s *options.SchedulerServer) error { } func createConfig(s *options.SchedulerServer, configFactory *factory.ConfigFactory) (*scheduler.Config, error) { - var policy schedulerapi.Policy - var configData []byte - if _, err := os.Stat(s.PolicyConfigFile); err == nil { - configData, err = ioutil.ReadFile(s.PolicyConfigFile) + var ( + policy schedulerapi.Policy + configData []byte + ) + configData, err := ioutil.ReadFile(s.PolicyConfigFile) if err != nil { - return nil, fmt.Errorf("Unable to read policy config: %v", err) + return nil, fmt.Errorf("unable to read policy config: %v", err) } - err = latestschedulerapi.Codec.DecodeInto(configData, &policy) - if err != nil { - return nil, fmt.Errorf("Invalid configuration: %v", err) + if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil { + return nil, fmt.Errorf("invalid configuration: %v", err) } - return configFactory.CreateFromConfig(policy) } diff --git a/test/e2e/serviceloadbalancers.go b/test/e2e/serviceloadbalancers.go index 82748cb695b..b8376e2cf3a 100644 --- a/test/e2e/serviceloadbalancers.go +++ b/test/e2e/serviceloadbalancers.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/wait" utilyaml "k8s.io/kubernetes/pkg/util/yaml" @@ -274,7 +275,7 @@ func rcFromManifest(fileName string) *api.ReplicationController { json, err := utilyaml.ToJSON(data) Expect(err).NotTo(HaveOccurred()) - Expect(api.Scheme.DecodeInto(json, &controller)).NotTo(HaveOccurred()) + Expect(runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &controller)).NotTo(HaveOccurred()) return &controller } @@ -288,6 +289,6 @@ func svcFromManifest(fileName string) *api.Service { json, err := utilyaml.ToJSON(data) Expect(err).NotTo(HaveOccurred()) - Expect(api.Scheme.DecodeInto(json, &svc)).NotTo(HaveOccurred()) + Expect(runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &svc)).NotTo(HaveOccurred()) return &svc } diff --git a/test/soak/serve_hostnames/serve_hostnames.go b/test/soak/serve_hostnames/serve_hostnames.go index 57b06661097..f19d42eb67f 100644 --- a/test/soak/serve_hostnames/serve_hostnames.go +++ b/test/soak/serve_hostnames/serve_hostnames.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/intstr" ) @@ -264,7 +265,7 @@ func main() { continue } var r unversioned.Status - if err := api.Scheme.DecodeInto(hostname, &r); err != nil { + if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), hostname, &r); err != nil { break } if r.Status == unversioned.StatusFailure {