Remaining codec change refactors

This commit is contained in:
Clayton Coleman 2016-01-22 00:11:30 -05:00
parent 33085c0cf2
commit 4a6935b31f
23 changed files with 104 additions and 82 deletions

View File

@ -26,7 +26,7 @@ import (
"net/http/httptest" "net/http/httptest"
"os" "os"
"reflect" "reflect"
"runtime" gruntime "runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -52,6 +52,7 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
@ -441,7 +442,7 @@ func runReplicationControllerTest(c *client.Client) {
glog.Fatalf("Unexpected error: %v", err) glog.Fatalf("Unexpected error: %v", err)
} }
var controller api.ReplicationController var controller api.ReplicationController
if err := api.Scheme.DecodeInto(data, &controller); err != nil { if err := runtime.DecodeInto(testapi.Default.Codec(), data, &controller); err != nil {
glog.Fatalf("Unexpected error: %v", err) glog.Fatalf("Unexpected error: %v", err)
} }
@ -952,7 +953,7 @@ func addFlags(fs *pflag.FlagSet) {
} }
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) gruntime.GOMAXPROCS(gruntime.NumCPU())
addFlags(pflag.CommandLine) addFlags(pflag.CommandLine)
util.InitFlags() util.InitFlags()

View File

@ -42,6 +42,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
kruntime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
) )
@ -343,7 +344,7 @@ func (k *Executor) LaunchTask(driver bindings.ExecutorDriver, taskInfo *mesos.Ta
return return
} }
obj, err := api.Codec.Decode(taskInfo.GetData()) obj, err := kruntime.Decode(api.Codecs.UniversalDecoder(), taskInfo.GetData())
if err != nil { if err != nil {
log.Errorf("failed to extract yaml data from the taskInfo.data %v", err) log.Errorf("failed to extract yaml data from the taskInfo.data %v", err)
k.sendStatus(driver, newStatus(taskInfo.GetTaskId(), mesos.TaskState_TASK_FAILED, k.sendStatus(driver, newStatus(taskInfo.GetTaskId(), mesos.TaskState_TASK_FAILED,

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
_ "k8s.io/kubernetes/pkg/api/install" _ "k8s.io/kubernetes/pkg/api/install"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/runtime"
) )
func Gzip(pods <-chan *api.Pod) ([]byte, error) { func Gzip(pods <-chan *api.Pod) ([]byte, error) {
@ -32,7 +33,7 @@ func Gzip(pods <-chan *api.Pod) ([]byte, error) {
} }
func gzipList(list *api.PodList) ([]byte, error) { func gzipList(list *api.PodList) ([]byte, error) {
raw, err := v1.Codec.Encode(list) raw, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), list)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -68,7 +69,7 @@ func gunzipList(gzipped []byte) (*api.PodList, error) {
return nil, err return nil, err
} }
obj, err := api.Scheme.Decode(raw) obj, err := runtime.Decode(api.Codecs.UniversalDecoder(), raw)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/runtime"
utilyaml "k8s.io/kubernetes/pkg/util/yaml" utilyaml "k8s.io/kubernetes/pkg/util/yaml"
) )
@ -41,7 +42,7 @@ func WriteToDir(pods <-chan *api.Pod, destDir string) error {
log.Warningf("skipping static pod %s/%s that had no filename", p.Namespace, p.Name) log.Warningf("skipping static pod %s/%s that had no filename", p.Namespace, p.Name)
continue continue
} }
raw, err := v1.Codec.Encode(p) raw, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), p)
if err != nil { if err != nil {
log.Errorf("failed to encode static pod as v1 object: %v", err) log.Errorf("failed to encode static pod as v1 object: %v", err)
continue continue
@ -105,7 +106,7 @@ func tryDecodeSinglePod(data []byte) (parsed bool, pod *api.Pod, err error) {
if err != nil { if err != nil {
return false, nil, err return false, nil, err
} }
obj, err := api.Scheme.Decode(json) obj, err := runtime.Decode(api.Codecs.UniversalDecoder(), json)
if err != nil { if err != nil {
return false, pod, err return false, pod, err
} }

View File

@ -26,6 +26,8 @@ import (
annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/runtime"
) )
type Binder interface { type Binder interface {
@ -150,7 +152,7 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod
// the kubelet-executor uses this to instantiate the pod // the kubelet-executor uses this to instantiate the pod
log.V(3).Infof("prepared pod spec: %+v", pod) log.V(3).Infof("prepared pod spec: %+v", pod)
data, err := api.Codec.Encode(&pod) data, err := runtime.Encode(api.Codecs.LegacyCodec(v1.SchemeGroupVersion), &pod)
if err != nil { if err != nil {
log.V(2).Infof("Failed to marshal the pod spec: %v", err) log.V(2).Infof("Failed to marshal the pod spec: %v", err)
return err return err

View File

@ -66,5 +66,5 @@ func main() {
"nginx.key": nginxKey, "nginx.key": nginxKey,
}, },
} }
fmt.Printf(runtime.EncodeOrDie(registered.GroupOrDie(api.GroupName).Codec, secret)) fmt.Printf(runtime.EncodeOrDie(api.Codecs.LegacyCodec(registered.EnabledVersions()...), secret))
} }

View File

@ -59,5 +59,5 @@ func main() {
"config": cfg, "config": cfg,
}, },
} }
fmt.Printf(runtime.EncodeOrDie(registered.GroupOrDie(api.GroupName).Codec, secret)) fmt.Printf(runtime.EncodeOrDie(api.Codecs.LegacyCodec(registered.EnabledVersions()...), secret))
} }

View File

@ -24,8 +24,8 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
@ -259,7 +259,13 @@ func getPodsAnnotationSet(template *api.PodTemplateSpec, object runtime.Object)
if err != nil { if err != nil {
return desiredAnnotations, fmt.Errorf("unable to get controller reference: %v", err) return desiredAnnotations, fmt.Errorf("unable to get controller reference: %v", err)
} }
createdByRefJson, err := registered.GroupOrDie(api.GroupName).Codec.Encode(&api.SerializedReference{
// TODO: this code was not safe previously - as soon as new code came along that switched to v2, old clients
// would be broken upon reading it. This is explicitly hardcoded to v1 to guarantee predictable deployment.
// We need to consistently handle this case of annotation versioning.
codec := api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"})
createdByRefJson, err := runtime.Encode(codec, &api.SerializedReference{
Reference: *createdByRef, Reference: *createdByRef,
}) })
if err != nil { if err != nil {

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apimachinery/registered"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/hash" "k8s.io/kubernetes/pkg/util/hash"
utilyaml "k8s.io/kubernetes/pkg/util/yaml" utilyaml "k8s.io/kubernetes/pkg/util/yaml"
@ -93,7 +94,7 @@ func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *a
if err != nil { if err != nil {
return false, nil, err return false, nil, err
} }
obj, err := api.Scheme.Decode(json) obj, err := runtime.Decode(api.Codecs.UniversalDecoder(), json)
if err != nil { if err != nil {
return false, pod, err return false, pod, err
} }
@ -115,17 +116,13 @@ func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *a
} }
func tryDecodePodList(data []byte, defaultFn defaultFunc) (parsed bool, pods api.PodList, err error) { func tryDecodePodList(data []byte, defaultFn defaultFunc) (parsed bool, pods api.PodList, err error) {
json, err := utilyaml.ToJSON(data) obj, err := runtime.Decode(api.Codecs.UniversalDecoder(), data)
if err != nil {
return false, api.PodList{}, err
}
obj, err := api.Scheme.Decode(json)
if err != nil { if err != nil {
return false, pods, err return false, pods, err
} }
// Check whether the object could be converted to list of pods. // Check whether the object could be converted to list of pods.
if _, ok := obj.(*api.PodList); !ok { if _, ok := obj.(*api.PodList); !ok {
err = fmt.Errorf("invalid pods list: %+v", obj) err = fmt.Errorf("invalid pods list: %#v", obj)
return false, pods, err return false, pods, err
} }
newPods := obj.(*api.PodList) newPods := obj.(*api.PodList)

View File

@ -22,9 +22,9 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apimachinery/registered"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
) )
@ -187,8 +187,7 @@ func supplyContainerInfoWithOldLabel(labels map[string]string, containerInfo *la
return return
} }
pod = &api.Pod{} pod = &api.Pod{}
err := registered.GroupOrDie(api.GroupName).Codec.DecodeInto([]byte(data), pod) if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(data), pod); err != nil {
if err != nil {
// If the pod label can't be parsed, we should report an error // If the pod label can't be parsed, we should report an error
logError(containerInfo, kubernetesPodLabel, err) logError(containerInfo, kubernetesPodLabel, err)
return return

View File

@ -37,7 +37,6 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -48,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
@ -655,11 +655,12 @@ func (dm *DockerManager) runContainer(
// TODO(random-liu): Remove this when we start to use new labels for KillContainerInPod // TODO(random-liu): Remove this when we start to use new labels for KillContainerInPod
if container.Lifecycle != nil && container.Lifecycle.PreStop != nil { if container.Lifecycle != nil && container.Lifecycle.PreStop != nil {
// TODO: This is kind of hacky, we should really just encode the bits we need. // TODO: This is kind of hacky, we should really just encode the bits we need.
data, err := registered.GroupOrDie(api.GroupName).Codec.Encode(pod) // TODO: This is hacky because the Kubelet should be parameterized to encode a specific version
if err != nil { // and needs to be able to migrate this whenever we deprecate v1. Should be a member of DockerManager.
glog.Errorf("Failed to encode pod: %s for prestop hook", pod.Name) if data, err := runtime.Encode(api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"}), pod); err == nil {
} else {
labels[kubernetesPodLabel] = string(data) labels[kubernetesPodLabel] = string(data)
} else {
glog.Errorf("Failed to encode pod: %s for prestop hook", pod.Name)
} }
} }
memoryLimit := container.Resources.Limits.Memory().Value() memoryLimit := container.Resources.Limits.Memory().Value()
@ -1432,7 +1433,7 @@ func containerAndPodFromLabels(inspect *docker.Container) (pod *api.Pod, contain
// the pod data may not be set // the pod data may not be set
if body, found := labels[kubernetesPodLabel]; found { if body, found := labels[kubernetesPodLabel]; found {
pod = &api.Pod{} pod = &api.Pod{}
if err = registered.GroupOrDie(api.GroupName).Codec.DecodeInto([]byte(body), pod); err == nil { if err = runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(body), pod); err == nil {
name := labels[kubernetesContainerNameLabel] name := labels[kubernetesContainerNameLabel]
for ix := range pod.Spec.Containers { for ix := range pod.Spec.Containers {
if pod.Spec.Containers[ix].Name == name { if pod.Spec.Containers[ix].Name == name {

View File

@ -39,7 +39,6 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/auth/authenticator" "k8s.io/kubernetes/pkg/auth/authenticator"
"k8s.io/kubernetes/pkg/auth/authorizer" "k8s.io/kubernetes/pkg/auth/authorizer"
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand" "k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
@ -48,6 +47,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flushwriter" "k8s.io/kubernetes/pkg/util/flushwriter"
@ -402,18 +402,12 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
delete(query, "tailLines") delete(query, "tailLines")
} }
} }
// container logs on the kubelet are locked to v1 // container logs on the kubelet are locked to the v1 API version of PodLogOptions
versioned := &v1.PodLogOptions{} logOptions := &api.PodLogOptions{}
if err := api.Scheme.Convert(&query, versioned); err != nil { if err := api.ParameterCodec.DecodeParameters(query, v1.SchemeGroupVersion, logOptions); err != nil {
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`)) response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
return return
} }
out, err := api.Scheme.ConvertToVersion(versioned, "")
if err != nil {
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to convert request query."}`))
return
}
logOptions := out.(*api.PodLogOptions)
logOptions.TypeMeta = unversioned.TypeMeta{} logOptions.TypeMeta = unversioned.TypeMeta{}
if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 { if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
response.WriteError(apierrs.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`)) response.WriteError(apierrs.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
@ -462,7 +456,11 @@ func encodePods(pods []*api.Pod) (data []byte, err error) {
for _, pod := range pods { for _, pod := range pods {
podList.Items = append(podList.Items, *pod) podList.Items = append(podList.Items, *pod)
} }
return registered.GroupOrDie(api.GroupName).Codec.Encode(podList) // TODO: this needs to be parameterized to the kubelet, not hardcoded. Depends on Kubelet
// as API server refactor.
// TODO: Locked to v1, needs to be made generic
codec := api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"})
return runtime.Encode(codec, podList)
} }
// getPods returns a list of pods bound to the Kubelet and their spec. // getPods returns a list of pods bound to the Kubelet and their spec.

View File

@ -148,7 +148,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
key = h.prefixEtcdKey(key) key = h.prefixEtcdKey(key)
data, err := h.codec.Encode(obj) data, err := runtime.Encode(h.codec, obj)
if err != nil { if err != nil {
return err return err
} }
@ -183,7 +183,7 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
var response *etcd.Response var response *etcd.Response
data, err := h.codec.Encode(obj) data, err := runtime.Encode(h.codec, obj)
if err != nil { if err != nil {
return err return err
} }
@ -333,7 +333,13 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response) return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
} }
body = node.Value body = node.Value
err = h.codec.DecodeInto([]byte(body), objPtr) out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
if err != nil {
return body, nil, err
}
if out != objPtr {
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
}
if h.versioner != nil { if h.versioner != nil {
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex) _ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
@ -403,19 +409,19 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} }
} else { } else {
obj := reflect.New(v.Type().Elem()) obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err := h.codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil { if err != nil {
return err return err
} }
if h.versioner != nil { if h.versioner != nil {
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex) _ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex)
} }
if filter(obj.Interface().(runtime.Object)) { if filter(obj) {
v.Set(reflect.Append(v, obj.Elem())) v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} }
if node.ModifiedIndex != 0 { if node.ModifiedIndex != 0 {
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object)) h.addToCache(node.ModifiedIndex, obj)
} }
} }
} }
@ -532,7 +538,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
ttl = *newTTL ttl = *newTTL
} }
data, err := h.codec.Encode(ret) data, err := runtime.Encode(h.codec, ret)
if err != nil { if err != nil {
return err return err
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package etcd package etcd
import ( import (
"fmt"
"net/http" "net/http"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -210,7 +211,7 @@ func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key stri
resp, err := client.Get(ctx, key, &opts) resp, err := client.Get(ctx, key, &opts)
if err != nil { if err != nil {
if !etcdutil.IsEtcdNotFound(err) { if !etcdutil.IsEtcdNotFound(err) {
glog.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err) util.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err))
return resourceVersion, err return resourceVersion, err
} }
if etcdError, ok := err.(etcd.Error); ok { if etcdError, ok := err.(etcd.Error); ok {
@ -300,7 +301,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
return obj, nil return obj, nil
} }
obj, err := w.encoding.Decode([]byte(node.Value)) obj, err := runtime.Decode(w.encoding, []byte(node.Value))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -308,7 +309,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
// ensure resource version is set on the object we load from etcd // ensure resource version is set on the object we load from etcd
if w.versioner != nil { if w.versioner != nil {
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil { if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
glog.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err) util.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
} }
} }
@ -316,7 +317,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
if w.transform != nil { if w.transform != nil {
obj, err = w.transform(obj) obj, err = w.transform(obj)
if err != nil { if err != nil {
glog.Errorf("failure to transform api object %#v: %v", obj, err) util.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err))
return nil, err return nil, err
} }
} }
@ -329,7 +330,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
func (w *etcdWatcher) sendAdd(res *etcd.Response) { func (w *etcdWatcher) sendAdd(res *etcd.Response) {
if res.Node == nil { if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res) util.HandleError(fmt.Errorf("unexpected nil node: %#v", res))
return return
} }
if w.include != nil && !w.include(res.Node.Key) { if w.include != nil && !w.include(res.Node.Key) {
@ -337,7 +338,7 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
} }
obj, err := w.decodeObject(res.Node) obj, err := w.decodeObject(res.Node)
if err != nil { if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node) util.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
// TODO: expose an error through watch.Interface? // TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses // Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value. // the resourceVersion to resume will never be able to get past a bad value.
@ -366,7 +367,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
} }
curObj, err := w.decodeObject(res.Node) curObj, err := w.decodeObject(res.Node)
if err != nil { if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node) util.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
// TODO: expose an error through watch.Interface? // TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses // Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value. // the resourceVersion to resume will never be able to get past a bad value.
@ -406,7 +407,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
func (w *etcdWatcher) sendDelete(res *etcd.Response) { func (w *etcdWatcher) sendDelete(res *etcd.Response) {
if res.PrevNode == nil { if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res) util.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res))
return return
} }
if w.include != nil && !w.include(res.PrevNode.Key) { if w.include != nil && !w.include(res.PrevNode.Key) {
@ -421,7 +422,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
} }
obj, err := w.decodeObject(&node) obj, err := w.decodeObject(&node)
if err != nil { if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.PrevNode.Value), res, res.PrevNode) util.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node))
// TODO: expose an error through watch.Interface? // TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses // Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value. // the resourceVersion to resume will never be able to get past a bad value.
@ -445,7 +446,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
case EtcdDelete, EtcdExpire: case EtcdDelete, EtcdExpire:
w.sendDelete(res) w.sendDelete(res)
default: default:
glog.Errorf("unknown action: %v", res.Action) util.HandleError(fmt.Errorf("unknown action: %v", res.Action))
} }
} }

View File

@ -212,7 +212,8 @@ func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connec
if err != nil { if err != nil {
responseError = "unable to read error from server response" responseError = "unable to read error from server response"
} else { } else {
if obj, err := api.Scheme.Decode(responseErrorBytes); err == nil { // TODO: I don't belong here, I should be abstracted from this class
if obj, _, err := api.Codecs.UniversalDecoder().Decode(responseErrorBytes, nil, &unversioned.Status{}); err == nil {
if status, ok := obj.(*unversioned.Status); ok { if status, ok := obj.(*unversioned.Status); ok {
return nil, &apierrors.StatusError{ErrStatus: *status} return nil, &apierrors.StatusError{ErrStatus: *status}
} }

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/runtime"
) )
// LoadPodFromFile will read, decode, and return a Pod from a file. // LoadPodFromFile will read, decode, and return a Pod from a file.
@ -39,7 +40,8 @@ func LoadPodFromFile(filePath string) (*api.Pod, error) {
} }
pod := &api.Pod{} pod := &api.Pod{}
if err := registered.GroupOrDie(api.GroupName).Codec.DecodeInto(podDef, pod); err != nil { codec := api.Codecs.LegacyCodec(registered.GroupOrDie(api.GroupName).GroupVersion)
if err := runtime.DecodeInto(codec, podDef, pod); err != nil {
return nil, fmt.Errorf("failed decoding file: %v", err) return nil, fmt.Errorf("failed decoding file: %v", err)
} }
return pod, nil return pod, nil
@ -50,7 +52,8 @@ func SavePodToFile(pod *api.Pod, filePath string, perm os.FileMode) error {
if filePath == "" { if filePath == "" {
return fmt.Errorf("file path not specified") return fmt.Errorf("file path not specified")
} }
data, err := registered.GroupOrDie(api.GroupName).Codec.Encode(pod) codec := api.Codecs.LegacyCodec(registered.GroupOrDie(api.GroupName).GroupVersion)
data, err := runtime.Encode(codec, pod)
if err != nil { if err != nil {
return fmt.Errorf("failed encoding pod: %v", err) return fmt.Errorf("failed encoding pod: %v", err)
} }

View File

@ -24,6 +24,7 @@ import (
"github.com/pborman/uuid" "github.com/pborman/uuid"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
@ -32,8 +33,9 @@ func TestSavePodToFile(t *testing.T) {
pod := volume.NewPersistentVolumeRecyclerPodTemplate() pod := volume.NewPersistentVolumeRecyclerPodTemplate()
// sets all default values on a pod for equality comparison after decoding from file // sets all default values on a pod for equality comparison after decoding from file
encoded, err := registered.GroupOrDie(api.GroupName).Codec.Encode(pod) codec := api.Codecs.LegacyCodec(registered.GroupOrDie(api.GroupName).GroupVersion)
registered.GroupOrDie(api.GroupName).Codec.DecodeInto(encoded, pod) encoded, err := runtime.Encode(codec, pod)
runtime.DecodeInto(codec, encoded, pod)
path := fmt.Sprintf("/tmp/kube-io-test-%s", uuid.New()) path := fmt.Sprintf("/tmp/kube-io-test-%s", uuid.New())
defer os.Remove(path) defer os.Remove(path)

View File

@ -56,7 +56,7 @@ func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type) return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
} }
obj, err := d.codec.Decode(got.Object.RawJSON) obj, err := runtime.Decode(d.codec, got.Object.RawJSON)
if err != nil { if err != nil {
return "", nil, fmt.Errorf("unable to decode watch event: %v", err) return "", nil, fmt.Errorf("unable to decode watch event: %v", err)
} }

View File

@ -30,11 +30,11 @@ import (
type Encoder struct { type Encoder struct {
w io.Writer w io.Writer
encoder *json.Encoder encoder *json.Encoder
codec runtime.Codec codec runtime.Encoder
} }
// NewEncoder creates an Encoder for the given writer and codec // NewEncoder creates an Encoder for the given writer and codec
func NewEncoder(w io.Writer, codec runtime.Codec) *Encoder { func NewEncoder(w io.Writer, codec runtime.Encoder) *Encoder {
return &Encoder{ return &Encoder{
w: w, w: w,
encoder: json.NewEncoder(w), encoder: json.NewEncoder(w),

View File

@ -40,12 +40,12 @@ type WatchEvent struct {
} }
// Object converts a watch.Event into an appropriately serializable JSON object // Object converts a watch.Event into an appropriately serializable JSON object
func Object(codec runtime.Codec, event *watch.Event) (interface{}, error) { func Object(encoder runtime.Encoder, event *watch.Event) (interface{}, error) {
obj, ok := event.Object.(runtime.Object) obj, ok := event.Object.(runtime.Object)
if !ok { if !ok {
return nil, fmt.Errorf("the event object cannot be safely converted to JSON: %v", reflect.TypeOf(event.Object).Name()) return nil, fmt.Errorf("the event object cannot be safely converted to JSON: %v", reflect.TypeOf(event.Object).Name())
} }
data, err := runtime.Encode(codec, obj) data, err := runtime.Encode(encoder, obj)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -32,6 +32,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
"k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
@ -152,19 +153,18 @@ func Run(s *options.SchedulerServer) error {
} }
func createConfig(s *options.SchedulerServer, configFactory *factory.ConfigFactory) (*scheduler.Config, error) { func createConfig(s *options.SchedulerServer, configFactory *factory.ConfigFactory) (*scheduler.Config, error) {
var policy schedulerapi.Policy
var configData []byte
if _, err := os.Stat(s.PolicyConfigFile); err == nil { if _, err := os.Stat(s.PolicyConfigFile); err == nil {
configData, err = ioutil.ReadFile(s.PolicyConfigFile) var (
policy schedulerapi.Policy
configData []byte
)
configData, err := ioutil.ReadFile(s.PolicyConfigFile)
if err != nil { if err != nil {
return nil, fmt.Errorf("Unable to read policy config: %v", err) return nil, fmt.Errorf("unable to read policy config: %v", err)
} }
err = latestschedulerapi.Codec.DecodeInto(configData, &policy) if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
if err != nil { return nil, fmt.Errorf("invalid configuration: %v", err)
return nil, fmt.Errorf("Invalid configuration: %v", err)
} }
return configFactory.CreateFromConfig(policy) return configFactory.CreateFromConfig(policy)
} }

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
utilyaml "k8s.io/kubernetes/pkg/util/yaml" utilyaml "k8s.io/kubernetes/pkg/util/yaml"
@ -274,7 +275,7 @@ func rcFromManifest(fileName string) *api.ReplicationController {
json, err := utilyaml.ToJSON(data) json, err := utilyaml.ToJSON(data)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(api.Scheme.DecodeInto(json, &controller)).NotTo(HaveOccurred()) Expect(runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &controller)).NotTo(HaveOccurred())
return &controller return &controller
} }
@ -288,6 +289,6 @@ func svcFromManifest(fileName string) *api.Service {
json, err := utilyaml.ToJSON(data) json, err := utilyaml.ToJSON(data)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(api.Scheme.DecodeInto(json, &svc)).NotTo(HaveOccurred()) Expect(runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &svc)).NotTo(HaveOccurred())
return &svc return &svc
} }

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
) )
@ -264,7 +265,7 @@ func main() {
continue continue
} }
var r unversioned.Status var r unversioned.Status
if err := api.Scheme.DecodeInto(hostname, &r); err != nil { if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), hostname, &r); err != nil {
break break
} }
if r.Status == unversioned.StatusFailure { if r.Status == unversioned.StatusFailure {