From b63974bd21cb9b9d1e9ebbdfc318ad163f60dc68 Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Fri, 3 Oct 2014 11:23:25 -0400 Subject: [PATCH 1/2] Extract list must flatten nodes across directories --- pkg/tools/etcd_tools.go | 28 +++++++---- pkg/tools/etcd_tools_test.go | 98 ++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 9 deletions(-) diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index b3fcb1facae..8f7ff6618a6 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -164,6 +164,12 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersi if err != nil { return err } + h.decodeNodeList(nodes, slicePtr) + return nil +} + +// decodeNodeList walks the tree of each node in the list and decodes into the specified object +func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error { pv := reflect.ValueOf(slicePtr) if pv.Type().Kind() != reflect.Ptr || pv.Type().Elem().Kind() != reflect.Slice { // This should not happen at runtime. @@ -171,16 +177,20 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersi } v := pv.Elem() for _, node := range nodes { - obj := reflect.New(v.Type().Elem()) - err = h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)) - if h.ResourceVersioner != nil { - _ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex) - // being unable to set the version does not prevent the object from being extracted + if node.Dir { + h.decodeNodeList(node.Nodes, slicePtr) + } else { + obj := reflect.New(v.Type().Elem()) + err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)) + if h.ResourceVersioner != nil { + _ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex) + // being unable to set the version does not prevent the object from being extracted + } + if err != nil { + return err + } + v.Set(reflect.Append(v, obj.Elem())) } - if err != nil { - return err - } - v.Set(reflect.Append(v, obj.Elem())) } return nil } diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 54c6432c05f..40918825690 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -108,6 +108,104 @@ func TestExtractToList(t *testing.T) { } } +// TestExtractToListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query +func TestExtractToListAcrossDirectories(t *testing.T) { + fakeClient := NewFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + EtcdIndex: 10, + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: `{"id": "directory1"}`, + Dir: true, + Nodes: []*etcd.Node{ + { + Value: `{"id":"foo"}`, + ModifiedIndex: 1, + }, + }, + }, + { + Value: `{"id": "directory2"}`, + Dir: true, + Nodes: []*etcd.Node{ + { + Value: `{"id":"bar"}`, + ModifiedIndex: 2, + }, + }, + }, + }, + }, + }, + } + expect := api.PodList{ + JSONBase: api.JSONBase{ResourceVersion: 10}, + Items: []api.Pod{ + {JSONBase: api.JSONBase{ID: "foo", ResourceVersion: 1}}, + {JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 2}}, + }, + } + + var got api.PodList + helper := EtcdHelper{fakeClient, latest.Codec, versioner} + err := helper.ExtractToList("/some/key", &got) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if e, a := expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } +} + +func TestExtractToListExcludesDirectories(t *testing.T) { + fakeClient := NewFakeEtcdClient(t) + fakeClient.Data["/some/key"] = EtcdResponseWithError{ + R: &etcd.Response{ + EtcdIndex: 10, + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: `{"id":"foo"}`, + ModifiedIndex: 1, + }, + { + Value: `{"id":"bar"}`, + ModifiedIndex: 2, + }, + { + Value: `{"id":"baz"}`, + ModifiedIndex: 3, + }, + { + Value: `{"id": "directory"}`, + Dir: true, + }, + }, + }, + }, + } + expect := api.PodList{ + JSONBase: api.JSONBase{ResourceVersion: 10}, + Items: []api.Pod{ + {JSONBase: api.JSONBase{ID: "foo", ResourceVersion: 1}}, + {JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 2}}, + {JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 3}}, + }, + } + + var got api.PodList + helper := EtcdHelper{fakeClient, latest.Codec, versioner} + err := helper.ExtractToList("/some/key", &got) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if e, a := expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } +} + func TestExtractObj(t *testing.T) { fakeClient := NewFakeEtcdClient(t) expect := api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}} From 085ca40291dc5d40f8e9fccb83352fc6226fdf4f Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Fri, 3 Oct 2014 11:44:06 -0400 Subject: [PATCH 2/2] Enforce unique constraint at namespace boundary in etcd, make client and server namespace aware --- cmd/kubecfg/kubecfg.go | 51 ++- pkg/api/context.go | 15 + pkg/api/context_test.go | 6 + pkg/apiserver/resthandler.go | 19 +- pkg/client/client.go | 40 ++- pkg/client/request.go | 9 + pkg/controller/replication_controller_test.go | 2 +- pkg/kubecfg/kubecfg.go | 33 ++ pkg/kubecfg/kubecfg_test.go | 52 +++ pkg/registry/etcd/etcd.go | 227 +++++++++--- pkg/registry/etcd/etcd_test.go | 327 +++++++++++++----- pkg/tools/etcd_tools.go | 22 +- pkg/tools/etcd_tools_test.go | 14 +- plugin/pkg/scheduler/factory/factory.go | 6 +- plugin/pkg/scheduler/scheduler.go | 5 +- 15 files changed, 642 insertions(+), 186 deletions(-) diff --git a/cmd/kubecfg/kubecfg.go b/cmd/kubecfg/kubecfg.go index b947802948e..38272e4de0b 100644 --- a/cmd/kubecfg/kubecfg.go +++ b/cmd/kubecfg/kubecfg.go @@ -61,6 +61,8 @@ var ( imageName = flag.String("image", "", "Image used when updating a replicationController. Will apply to the first container in the pod template.") clientConfig = &client.Config{} openBrowser = flag.Bool("open_browser", true, "If true, and -proxy is specified, open a browser pointed at the Kubernetes UX. Default true.") + ns = flag.String("ns", "", "If present, the namespace scope for this request.") + nsFile = flag.String("ns_file", os.Getenv("HOME")+"/.kubernetes_ns", "Path to the namespace file") ) func init() { @@ -97,6 +99,9 @@ on the given image: kubecfg [OPTIONS] [-p ] run +Manage namespace: + kubecfg [OPTIONS] ns [] + Options: `, prettyWireStorage()) flag.PrintDefaults() @@ -178,8 +183,18 @@ func main() { clientConfig.Host = os.Getenv("KUBERNETES_MASTER") } - // TODO: get the namespace context when kubecfg ns is completed - ctx := api.NewContext() + // Load namespace information for requests + // Check if the namespace was overriden by the -ns argument + ctx := api.NewDefaultContext() + if len(*ns) > 0 { + ctx = api.WithNamespace(ctx, *ns) + } else { + nsInfo, err := kubecfg.LoadNamespaceInfo(*nsFile) + if err != nil { + glog.Fatalf("Error loading current namespace: %v", err) + } + ctx = api.WithNamespace(ctx, nsInfo.Namespace) + } if clientConfig.Host == "" { // TODO: eventually apiserver should start on 443 and be secure by default @@ -255,7 +270,7 @@ func main() { } method := flag.Arg(0) - matchFound := executeAPIRequest(ctx, method, kubeClient) || executeControllerRequest(ctx, method, kubeClient) + matchFound := executeAPIRequest(ctx, method, kubeClient) || executeControllerRequest(ctx, method, kubeClient) || executeNamespaceRequest(method, kubeClient) if matchFound == false { glog.Fatalf("Unknown command %s", method) } @@ -347,7 +362,7 @@ func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool { glog.Fatalf("usage: kubecfg [OPTIONS] %s <%s>", method, prettyWireStorage()) } case "update": - obj, err := c.Verb("GET").Path(path).Do().Get() + obj, err := c.Verb("GET").Namespace(api.Namespace(ctx)).Path(path).Do().Get() if err != nil { glog.Fatalf("error obtaining resource version for update: %v", err) } @@ -373,7 +388,7 @@ func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool { return false } - r := c.Verb(verb).Path(path) + r := c.Verb(verb).Namespace(api.Namespace(ctx)).Path(path) if len(*selector) > 0 { r.ParseSelectorParam("labels", *selector) } @@ -464,6 +479,32 @@ func executeControllerRequest(ctx api.Context, method string, c *client.Client) return true } +// executeNamespaceRequest handles client operations for namespaces +func executeNamespaceRequest(method string, c *client.Client) bool { + var err error + var ns *kubecfg.NamespaceInfo + switch method { + case "ns": + args := flag.Args() + switch len(args) { + case 1: + ns, err = kubecfg.LoadNamespaceInfo(*nsFile) + case 2: + ns = &kubecfg.NamespaceInfo{Namespace: args[1]} + err = kubecfg.SaveNamespaceInfo(*nsFile, ns) + default: + glog.Fatalf("usage: kubecfg ns []") + } + default: + return false + } + if err != nil { + glog.Fatalf("Error: %v", err) + } + fmt.Printf("Using namespace %s\n", ns.Namespace) + return true +} + func humanReadablePrinter() *kubecfg.HumanReadablePrinter { printer := kubecfg.NewHumanReadablePrinter() // Add Handler calls here to support additional types diff --git a/pkg/api/context.go b/pkg/api/context.go index aab1d2d28d5..5c74020c47e 100644 --- a/pkg/api/context.go +++ b/pkg/api/context.go @@ -63,6 +63,12 @@ func NamespaceFrom(ctx Context) (string, bool) { return namespace, ok } +// Namespace returns the value of the namespace key on the ctx, or the empty string if none +func Namespace(ctx Context) string { + namespace, _ := NamespaceFrom(ctx) + return namespace +} + // ValidNamespace returns false if the namespace on the context differs from the resource. If the resource has no namespace, it is set to the value in the context. func ValidNamespace(ctx Context, resource *TypeMeta) bool { ns, ok := NamespaceFrom(ctx) @@ -71,3 +77,12 @@ func ValidNamespace(ctx Context, resource *TypeMeta) bool { } return ns == resource.Namespace && ok } + +// WithNamespaceDefaultIfNone returns a context whose namespace is the default if and only if the parent context has no namespace value +func WithNamespaceDefaultIfNone(parent Context) Context { + namespace, ok := NamespaceFrom(parent) + if !ok || len(namespace) == 0 { + return WithNamespace(parent, NamespaceDefault) + } + return parent +} diff --git a/pkg/api/context_test.go b/pkg/api/context_test.go index beaa405e909..7baad8f9105 100644 --- a/pkg/api/context_test.go +++ b/pkg/api/context_test.go @@ -59,4 +59,10 @@ func TestValidNamespace(t *testing.T) { if api.ValidNamespace(ctx, &resource.TypeMeta) { t.Errorf("Expected error that resource and context errors do not match since context has no namespace") } + + ctx = api.NewContext() + ns := api.Namespace(ctx) + if ns != "" { + t.Errorf("Expected the empty string") + } } diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index e2c7457d3c7..0f368305ec1 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -100,10 +100,17 @@ func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func( // timeout= Timeout for synchronous requests, only applies if sync=true // labels= Used for filtering list operations func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) { - // TODO for now, we perform all operations in the default namespace - ctx := api.NewDefaultContext() + ctx := api.NewContext() sync := req.URL.Query().Get("sync") == "true" timeout := parseTimeout(req.URL.Query().Get("timeout")) + // TODO for now, we pull namespace from query parameter, but according to spec, it must go in resource path in future PR + // if a namespace if specified, it's always used. + // for list/watch operations, a namespace is not required if omitted. + // for all other operations, if namespace is omitted, we will default to default namespace. + namespace := req.URL.Query().Get("namespace") + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } switch req.Method { case "GET": switch len(parts) { @@ -129,7 +136,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt } writeJSON(http.StatusOK, h.codec, list, w) case 2: - item, err := storage.Get(ctx, parts[1]) + item, err := storage.Get(api.WithNamespaceDefaultIfNone(ctx), parts[1]) if err != nil { errorJSON(err, h.codec, w) return @@ -159,7 +166,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt errorJSON(err, h.codec, w) return } - out, err := storage.Create(ctx, obj) + out, err := storage.Create(api.WithNamespaceDefaultIfNone(ctx), obj) if err != nil { errorJSON(err, h.codec, w) return @@ -172,7 +179,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt notFound(w, req) return } - out, err := storage.Delete(ctx, parts[1]) + out, err := storage.Delete(api.WithNamespaceDefaultIfNone(ctx), parts[1]) if err != nil { errorJSON(err, h.codec, w) return @@ -196,7 +203,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt errorJSON(err, h.codec, w) return } - out, err := storage.Update(ctx, obj) + out, err := storage.Update(api.WithNamespaceDefaultIfNone(ctx), obj) if err != nil { errorJSON(err, h.codec, w) return diff --git a/pkg/client/client.go b/pkg/client/client.go index af47ea41714..d3e2e0121e0 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -104,26 +104,26 @@ type Client struct { // ListPods takes a selector, and returns the list of pods that match that selector. func (c *Client) ListPods(ctx api.Context, selector labels.Selector) (result *api.PodList, err error) { result = &api.PodList{} - err = c.Get().Path("pods").SelectorParam("labels", selector).Do().Into(result) + err = c.Get().Namespace(api.Namespace(ctx)).Path("pods").SelectorParam("labels", selector).Do().Into(result) return } // GetPod takes the id of the pod, and returns the corresponding Pod object, and an error if it occurs func (c *Client) GetPod(ctx api.Context, id string) (result *api.Pod, err error) { result = &api.Pod{} - err = c.Get().Path("pods").Path(id).Do().Into(result) + err = c.Get().Namespace(api.Namespace(ctx)).Path("pods").Path(id).Do().Into(result) return } // DeletePod takes the id of the pod, and returns an error if one occurs func (c *Client) DeletePod(ctx api.Context, id string) error { - return c.Delete().Path("pods").Path(id).Do().Error() + return c.Delete().Namespace(api.Namespace(ctx)).Path("pods").Path(id).Do().Error() } // CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs. func (c *Client) CreatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err error) { result = &api.Pod{} - err = c.Post().Path("pods").Body(pod).Do().Into(result) + err = c.Post().Namespace(api.Namespace(ctx)).Path("pods").Body(pod).Do().Into(result) return } @@ -134,28 +134,28 @@ func (c *Client) UpdatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err err = fmt.Errorf("invalid update object, missing resource version: %v", pod) return } - err = c.Put().Path("pods").Path(pod.ID).Body(pod).Do().Into(result) + err = c.Put().Namespace(api.Namespace(ctx)).Path("pods").Path(pod.ID).Body(pod).Do().Into(result) return } // ListReplicationControllers takes a selector, and returns the list of replication controllers that match that selector. func (c *Client) ListReplicationControllers(ctx api.Context, selector labels.Selector) (result *api.ReplicationControllerList, err error) { result = &api.ReplicationControllerList{} - err = c.Get().Path("replicationControllers").SelectorParam("labels", selector).Do().Into(result) + err = c.Get().Namespace(api.Namespace(ctx)).Path("replicationControllers").SelectorParam("labels", selector).Do().Into(result) return } // GetReplicationController returns information about a particular replication controller. func (c *Client) GetReplicationController(ctx api.Context, id string) (result *api.ReplicationController, err error) { result = &api.ReplicationController{} - err = c.Get().Path("replicationControllers").Path(id).Do().Into(result) + err = c.Get().Namespace(api.Namespace(ctx)).Path("replicationControllers").Path(id).Do().Into(result) return } // CreateReplicationController creates a new replication controller. func (c *Client) CreateReplicationController(ctx api.Context, controller *api.ReplicationController) (result *api.ReplicationController, err error) { result = &api.ReplicationController{} - err = c.Post().Path("replicationControllers").Body(controller).Do().Into(result) + err = c.Post().Namespace(api.Namespace(ctx)).Path("replicationControllers").Body(controller).Do().Into(result) return } @@ -166,18 +166,19 @@ func (c *Client) UpdateReplicationController(ctx api.Context, controller *api.Re err = fmt.Errorf("invalid update object, missing resource version: %v", controller) return } - err = c.Put().Path("replicationControllers").Path(controller.ID).Body(controller).Do().Into(result) + err = c.Put().Namespace(api.Namespace(ctx)).Path("replicationControllers").Path(controller.ID).Body(controller).Do().Into(result) return } // DeleteReplicationController deletes an existing replication controller. func (c *Client) DeleteReplicationController(ctx api.Context, id string) error { - return c.Delete().Path("replicationControllers").Path(id).Do().Error() + return c.Delete().Namespace(api.Namespace(ctx)).Path("replicationControllers").Path(id).Do().Error() } // WatchReplicationControllers returns a watch.Interface that watches the requested controllers. func (c *Client) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return c.Get(). + Namespace(api.Namespace(ctx)). Path("watch"). Path("replicationControllers"). Param("resourceVersion", resourceVersion). @@ -189,21 +190,21 @@ func (c *Client) WatchReplicationControllers(ctx api.Context, label, field label // ListServices takes a selector, and returns the list of services that match that selector func (c *Client) ListServices(ctx api.Context, selector labels.Selector) (result *api.ServiceList, err error) { result = &api.ServiceList{} - err = c.Get().Path("services").SelectorParam("labels", selector).Do().Into(result) + err = c.Get().Namespace(api.Namespace(ctx)).Path("services").SelectorParam("labels", selector).Do().Into(result) return } // GetService returns information about a particular service. func (c *Client) GetService(ctx api.Context, id string) (result *api.Service, err error) { result = &api.Service{} - err = c.Get().Path("services").Path(id).Do().Into(result) + err = c.Get().Namespace(api.Namespace(ctx)).Path("services").Path(id).Do().Into(result) return } // CreateService creates a new service. func (c *Client) CreateService(ctx api.Context, svc *api.Service) (result *api.Service, err error) { result = &api.Service{} - err = c.Post().Path("services").Body(svc).Do().Into(result) + err = c.Post().Namespace(api.Namespace(ctx)).Path("services").Body(svc).Do().Into(result) return } @@ -214,18 +215,19 @@ func (c *Client) UpdateService(ctx api.Context, svc *api.Service) (result *api.S err = fmt.Errorf("invalid update object, missing resource version: %v", svc) return } - err = c.Put().Path("services").Path(svc.ID).Body(svc).Do().Into(result) + err = c.Put().Namespace(api.Namespace(ctx)).Path("services").Path(svc.ID).Body(svc).Do().Into(result) return } // DeleteService deletes an existing service. func (c *Client) DeleteService(ctx api.Context, id string) error { - return c.Delete().Path("services").Path(id).Do().Error() + return c.Delete().Namespace(api.Namespace(ctx)).Path("services").Path(id).Do().Error() } // WatchServices returns a watch.Interface that watches the requested services. func (c *Client) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return c.Get(). + Namespace(api.Namespace(ctx)). Path("watch"). Path("services"). Param("resourceVersion", resourceVersion). @@ -237,20 +239,21 @@ func (c *Client) WatchServices(ctx api.Context, label, field labels.Selector, re // ListEndpoints takes a selector, and returns the list of endpoints that match that selector func (c *Client) ListEndpoints(ctx api.Context, selector labels.Selector) (result *api.EndpointsList, err error) { result = &api.EndpointsList{} - err = c.Get().Path("endpoints").SelectorParam("labels", selector).Do().Into(result) + err = c.Get().Namespace(api.Namespace(ctx)).Path("endpoints").SelectorParam("labels", selector).Do().Into(result) return } // GetEndpoints returns information about the endpoints for a particular service. func (c *Client) GetEndpoints(ctx api.Context, id string) (result *api.Endpoints, err error) { result = &api.Endpoints{} - err = c.Get().Path("endpoints").Path(id).Do().Into(result) + err = c.Get().Namespace(api.Namespace(ctx)).Path("endpoints").Path(id).Do().Into(result) return } // WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service. func (c *Client) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { return c.Get(). + Namespace(api.Namespace(ctx)). Path("watch"). Path("endpoints"). Param("resourceVersion", resourceVersion). @@ -261,7 +264,7 @@ func (c *Client) WatchEndpoints(ctx api.Context, label, field labels.Selector, r func (c *Client) CreateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*api.Endpoints, error) { result := &api.Endpoints{} - err := c.Post().Path("endpoints").Body(endpoints).Do().Into(result) + err := c.Post().Namespace(api.Namespace(ctx)).Path("endpoints").Body(endpoints).Do().Into(result) return result, err } @@ -271,6 +274,7 @@ func (c *Client) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*ap return nil, fmt.Errorf("invalid update object, missing resource version: %v", endpoints) } err := c.Put(). + Namespace(api.Namespace(ctx)). Path("endpoints"). Path(endpoints.ID). Body(endpoints). diff --git a/pkg/client/request.go b/pkg/client/request.go index 0a3fba29a74..1678ef3ed3b 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -74,6 +74,14 @@ func (r *Request) Sync(sync bool) *Request { return r } +// Namespace applies the namespace scope to a request +func (r *Request) Namespace(namespace string) *Request { + if len(namespace) > 0 { + return r.setParam("namespace", namespace) + } + return r +} + // AbsPath overwrites an existing path with the path parameter. func (r *Request) AbsPath(path string) *Request { if r.err != nil { @@ -196,6 +204,7 @@ func (r *Request) finalURL() string { for key, value := range r.params { query.Add(key, value) } + // sync and timeout are handled specially here, to allow setting them // in any order. if r.sync { diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 4b81737c6cb..3e51c38fdf7 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -215,7 +215,7 @@ func TestCreateReplica(t *testing.T) { Labels: controllerSpec.DesiredState.PodTemplate.Labels, DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState, } - fakeHandler.ValidateRequest(t, makeURL("/pods"), "POST", nil) + fakeHandler.ValidateRequest(t, makeURL("/pods?namespace=default"), "POST", nil) actualPod := api.Pod{} if err := json.Unmarshal([]byte(fakeHandler.RequestBody), &actualPod); err != nil { t.Errorf("Unexpected error: %#v", err) diff --git a/pkg/kubecfg/kubecfg.go b/pkg/kubecfg/kubecfg.go index 9b7805eb3dd..680ba59fe13 100644 --- a/pkg/kubecfg/kubecfg.go +++ b/pkg/kubecfg/kubecfg.go @@ -60,6 +60,10 @@ type AuthInfo struct { Insecure *bool } +type NamespaceInfo struct { + Namespace string +} + // LoadAuthInfo parses an AuthInfo object from a file path. It prompts user and creates file if it doesn't exist. func LoadAuthInfo(path string, r io.Reader) (*AuthInfo, error) { var auth AuthInfo @@ -84,6 +88,35 @@ func LoadAuthInfo(path string, r io.Reader) (*AuthInfo, error) { return &auth, err } +// LoadNamespaceInfo parses a NamespaceInfo object from a file path. It creates a file at the specified path if it doesn't exist with the default namespace. +func LoadNamespaceInfo(path string) (*NamespaceInfo, error) { + var ns NamespaceInfo + if _, err := os.Stat(path); os.IsNotExist(err) { + ns.Namespace = api.NamespaceDefault + err = SaveNamespaceInfo(path, &ns) + return &ns, err + } + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + err = json.Unmarshal(data, &ns) + if err != nil { + return nil, err + } + return &ns, err +} + +// SaveNamespaceInfo saves a NamespaceInfo object at the specified file path. +func SaveNamespaceInfo(path string, ns *NamespaceInfo) error { + if !util.IsDNSLabel(ns.Namespace) { + return fmt.Errorf("Namespace %s is not a valid DNS Label", ns.Namespace) + } + data, err := json.Marshal(ns) + err = ioutil.WriteFile(path, data, 0600) + return err +} + // Update performs a rolling update of a collection of pods. // 'name' points to a replication controller. // 'client' is used for updating pods. diff --git a/pkg/kubecfg/kubecfg_test.go b/pkg/kubecfg/kubecfg_test.go index 5534a0855fe..4082f0f1d02 100644 --- a/pkg/kubecfg/kubecfg_test.go +++ b/pkg/kubecfg/kubecfg_test.go @@ -240,6 +240,58 @@ func TestCloudCfgDeleteControllerWithReplicas(t *testing.T) { } } +func TestLoadNamespaceInfo(t *testing.T) { + loadNamespaceInfoTests := []struct { + nsData string + nsInfo *NamespaceInfo + }{ + { + `{"Namespace":"test"}`, + &NamespaceInfo{Namespace: "test"}, + }, + { + "", nil, + }, + { + "missing", + &NamespaceInfo{Namespace: "default"}, + }, + } + for _, loadNamespaceInfoTest := range loadNamespaceInfoTests { + tt := loadNamespaceInfoTest + nsfile, err := ioutil.TempFile("", "testNamespaceInfo") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if tt.nsData != "missing" { + defer os.Remove(nsfile.Name()) + defer nsfile.Close() + _, err := nsfile.WriteString(tt.nsData) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + } else { + nsfile.Close() + os.Remove(nsfile.Name()) + } + nsInfo, err := LoadNamespaceInfo(nsfile.Name()) + if len(tt.nsData) == 0 && tt.nsData != "missing" { + if err == nil { + t.Error("LoadNamespaceInfo didn't fail on an empty file") + } + continue + } + if tt.nsData != "missing" { + if err != nil { + t.Errorf("Unexpected error: %v, %v", tt.nsData, err) + } + if !reflect.DeepEqual(nsInfo, tt.nsInfo) { + t.Errorf("Expected %v, got %v", tt.nsInfo, nsInfo) + } + } + } +} + func TestLoadAuthInfo(t *testing.T) { loadAuthInfoTests := []struct { authData string diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 9d0d4860b88..540a052ac45 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -34,6 +34,17 @@ import ( "github.com/golang/glog" ) +const ( + // PodPath is the path to pod resources in etcd + PodPath string = "/registry/pods" + // ControllerPath is the path to controller resources in etcd + ControllerPath string = "/registry/controllers" + // ServicePath is the path to service resources in etcd + ServicePath string = "/registry/services/specs" + // ServiceEndpointPath is the path to service endpoints resources in etcd + ServiceEndpointPath string = "/registry/services/endpoints" +) + // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into // kubelet (and vice versa) @@ -52,8 +63,38 @@ func NewRegistry(helper tools.EtcdHelper, manifestFactory pod.ManifestFactory) * return registry } -func makePodKey(podID string) string { - return "/registry/pods/" + podID +// MakeEtcdListKey constructs etcd paths to resource directories enforcing namespace rules +func MakeEtcdListKey(ctx api.Context, prefix string) string { + key := prefix + ns, ok := api.NamespaceFrom(ctx) + if ok && len(ns) > 0 { + key = key + "/" + ns + } + return key +} + +// MakeEtcdItemKey constructs etcd paths to a resource relative to prefix enforcing namespace rules. If no namespace is on context, it errors. +func MakeEtcdItemKey(ctx api.Context, prefix string, id string) (string, error) { + key := MakeEtcdListKey(ctx, prefix) + ns, ok := api.NamespaceFrom(ctx) + if !ok || len(ns) == 0 { + return "", fmt.Errorf("Invalid request. Unable to address and item without a namespace on context") + } + if len(id) == 0 { + return "", fmt.Errorf("Invalid request. Id parameter required") + } + key = key + "/" + id + return key, nil +} + +// makePodListKey constructs etcd paths to pod directories enforcing namespace rules. +func makePodListKey(ctx api.Context) string { + return MakeEtcdListKey(ctx, PodPath) +} + +// makePodKey constructs etcd paths to pod items enforcing namespace rules. +func makePodKey(ctx api.Context, id string) (string, error) { + return MakeEtcdItemKey(ctx, PodPath, id) } // parseWatchResourceVersion takes a resource version argument and converts it to @@ -81,7 +122,8 @@ func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.Pod // ListPodsPredicate obtains a list of pods that match filter. func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) { allPods := api.PodList{} - err := r.ExtractToList("/registry/pods", &allPods) + key := makePodListKey(ctx) + err := r.ExtractToList(key, &allPods) if err != nil { return nil, err } @@ -105,7 +147,8 @@ func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter fun if err != nil { return nil, err } - return r.WatchList("/registry/pods", version, func(obj runtime.Object) bool { + key := makePodListKey(ctx) + return r.WatchList(key, version, func(obj runtime.Object) bool { switch t := obj.(type) { case *api.Pod: return filter(t) @@ -117,10 +160,14 @@ func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter fun } // GetPod gets a specific pod specified by its ID. -func (r *Registry) GetPod(ctx api.Context, podID string) (*api.Pod, error) { +func (r *Registry) GetPod(ctx api.Context, id string) (*api.Pod, error) { var pod api.Pod - if err := r.ExtractObj(makePodKey(podID), &pod, false); err != nil { - return nil, etcderr.InterpretGetError(err, "pod", podID) + key, err := makePodKey(ctx, id) + if err != nil { + return nil, err + } + if err = r.ExtractObj(key, &pod, false); err != nil { + return nil, etcderr.InterpretGetError(err, "pod", id) } // TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets // the CurrentState.Host and Status fields. Here we pretend that reality perfectly @@ -141,19 +188,26 @@ func (r *Registry) CreatePod(ctx api.Context, pod *api.Pod) error { // DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling. pod.DesiredState.Status = api.PodRunning pod.DesiredState.Host = "" - err := r.CreateObj(makePodKey(pod.ID), pod, 0) + key, err := makePodKey(ctx, pod.ID) + if err != nil { + return err + } + err = r.CreateObj(key, pod, 0) return etcderr.InterpretCreateError(err, "pod", pod.ID) } // ApplyBinding implements binding's registry func (r *Registry) ApplyBinding(ctx api.Context, binding *api.Binding) error { - return etcderr.InterpretCreateError(r.assignPod(binding.PodID, binding.Host), "binding", "") + return etcderr.InterpretCreateError(r.assignPod(ctx, binding.PodID, binding.Host), "binding", "") } // setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'. // Returns the current state of the pod, or an error. -func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *api.Pod, err error) { - podKey := makePodKey(podID) +func (r *Registry) setPodHostTo(ctx api.Context, podID, oldMachine, machine string) (finalPod *api.Pod, err error) { + podKey, err := makePodKey(ctx, podID) + if err != nil { + return nil, err + } err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj runtime.Object) (runtime.Object, error) { pod, ok := obj.(*api.Pod) if !ok { @@ -170,8 +224,8 @@ func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *ap } // assignPod assigns the given pod to the given machine. -func (r *Registry) assignPod(podID string, machine string) error { - finalPod, err := r.setPodHostTo(podID, "", machine) +func (r *Registry) assignPod(ctx api.Context, podID string, machine string) error { + finalPod, err := r.setPodHostTo(ctx, podID, "", machine) if err != nil { return err } @@ -192,7 +246,7 @@ func (r *Registry) assignPod(podID string, machine string) error { if err != nil { // Put the pod's host back the way it was. This is a terrible hack that // won't be needed if we convert this to a rectification loop. - if _, err2 := r.setPodHostTo(podID, machine, ""); err2 != nil { + if _, err2 := r.setPodHostTo(ctx, podID, machine, ""); err2 != nil { glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2) } } @@ -201,8 +255,11 @@ func (r *Registry) assignPod(podID string, machine string) error { func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error { var podOut api.Pod - podKey := makePodKey(pod.ID) - err := r.EtcdHelper.ExtractObj(podKey, &podOut, false) + podKey, err := makePodKey(ctx, pod.ID) + if err != nil { + return err + } + err = r.EtcdHelper.ExtractObj(podKey, &podOut, false) if err != nil { return err } @@ -243,8 +300,11 @@ func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error { // DeletePod deletes an existing pod specified by its ID. func (r *Registry) DeletePod(ctx api.Context, podID string) error { var pod api.Pod - podKey := makePodKey(podID) - err := r.ExtractObj(podKey, &pod, false) + podKey, err := makePodKey(ctx, podID) + if err != nil { + return err + } + err = r.ExtractObj(podKey, &pod, false) if err != nil { return etcderr.InterpretDeleteError(err, "pod", podID) } @@ -286,7 +346,8 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error { // ListControllers obtains a list of ReplicationControllers. func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) { controllers := &api.ReplicationControllerList{} - err := r.ExtractToList("/registry/controllers", controllers) + key := makeControllerListKey(ctx) + err := r.ExtractToList(key, controllers) return controllers, err } @@ -296,18 +357,28 @@ func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (wa if err != nil { return nil, err } - return r.WatchList("/registry/controllers", version, tools.Everything) + key := makeControllerListKey(ctx) + return r.WatchList(key, version, tools.Everything) } -func makeControllerKey(id string) string { - return "/registry/controllers/" + id +// makeControllerListKey constructs etcd paths to controller directories enforcing namespace rules. +func makeControllerListKey(ctx api.Context) string { + return MakeEtcdListKey(ctx, ControllerPath) +} + +// makeControllerKey constructs etcd paths to controller items enforcing namespace rules. +func makeControllerKey(ctx api.Context, id string) (string, error) { + return MakeEtcdItemKey(ctx, ControllerPath, id) } // GetController gets a specific ReplicationController specified by its ID. func (r *Registry) GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error) { var controller api.ReplicationController - key := makeControllerKey(controllerID) - err := r.ExtractObj(key, &controller, false) + key, err := makeControllerKey(ctx, controllerID) + if err != nil { + return nil, err + } + err = r.ExtractObj(key, &controller, false) if err != nil { return nil, etcderr.InterpretGetError(err, "replicationController", controllerID) } @@ -316,45 +387,69 @@ func (r *Registry) GetController(ctx api.Context, controllerID string) (*api.Rep // CreateController creates a new ReplicationController. func (r *Registry) CreateController(ctx api.Context, controller *api.ReplicationController) error { - err := r.CreateObj(makeControllerKey(controller.ID), controller, 0) + key, err := makeControllerKey(ctx, controller.ID) + if err != nil { + return err + } + err = r.CreateObj(key, controller, 0) return etcderr.InterpretCreateError(err, "replicationController", controller.ID) } // UpdateController replaces an existing ReplicationController. func (r *Registry) UpdateController(ctx api.Context, controller *api.ReplicationController) error { - err := r.SetObj(makeControllerKey(controller.ID), controller) + key, err := makeControllerKey(ctx, controller.ID) + if err != nil { + return err + } + err = r.SetObj(key, controller) return etcderr.InterpretUpdateError(err, "replicationController", controller.ID) } // DeleteController deletes a ReplicationController specified by its ID. func (r *Registry) DeleteController(ctx api.Context, controllerID string) error { - key := makeControllerKey(controllerID) - err := r.Delete(key, false) + key, err := makeControllerKey(ctx, controllerID) + if err != nil { + return err + } + err = r.Delete(key, false) return etcderr.InterpretDeleteError(err, "replicationController", controllerID) } -func makeServiceKey(name string) string { - return "/registry/services/specs/" + name +// makePodListKey constructs etcd paths to service directories enforcing namespace rules. +func makeServiceListKey(ctx api.Context) string { + return MakeEtcdListKey(ctx, ServicePath) +} + +// makePodKey constructs etcd paths to service items enforcing namespace rules. +func makeServiceKey(ctx api.Context, name string) (string, error) { + return MakeEtcdItemKey(ctx, ServicePath, name) } // ListServices obtains a list of Services. func (r *Registry) ListServices(ctx api.Context) (*api.ServiceList, error) { list := &api.ServiceList{} - err := r.ExtractToList("/registry/services/specs", list) + err := r.ExtractToList(makeServiceListKey(ctx), list) return list, err } // CreateService creates a new Service. func (r *Registry) CreateService(ctx api.Context, svc *api.Service) error { - err := r.CreateObj(makeServiceKey(svc.ID), svc, 0) + key, err := makeServiceKey(ctx, svc.ID) + if err != nil { + return err + } + err = r.CreateObj(key, svc, 0) return etcderr.InterpretCreateError(err, "service", svc.ID) } // GetService obtains a Service specified by its name. func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error) { - key := makeServiceKey(name) + key, err := makeServiceKey(ctx, name) + if err != nil { + return nil, err + } var svc api.Service - err := r.ExtractObj(key, &svc, false) + err = r.ExtractObj(key, &svc, false) if err != nil { return nil, etcderr.InterpretGetError(err, "service", name) } @@ -363,30 +458,45 @@ func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error // GetEndpoints obtains the endpoints for the service identified by 'name'. func (r *Registry) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) { - key := makeServiceEndpointsKey(name) var endpoints api.Endpoints - err := r.ExtractObj(key, &endpoints, false) + key, err := makeServiceEndpointsKey(ctx, name) + if err != nil { + return nil, err + } + err = r.ExtractObj(key, &endpoints, false) if err != nil { return nil, etcderr.InterpretGetError(err, "endpoints", name) } return &endpoints, nil } -func makeServiceEndpointsKey(name string) string { - return "/registry/services/endpoints/" + name +// makeServiceEndpointsListKey constructs etcd paths to service endpoint directories enforcing namespace rules. +func makeServiceEndpointsListKey(ctx api.Context) string { + return MakeEtcdListKey(ctx, ServiceEndpointPath) +} + +// makeServiceEndpointsListKey constructs etcd paths to service endpoint items enforcing namespace rules. +func makeServiceEndpointsKey(ctx api.Context, name string) (string, error) { + return MakeEtcdItemKey(ctx, ServiceEndpointPath, name) } // DeleteService deletes a Service specified by its name. func (r *Registry) DeleteService(ctx api.Context, name string) error { - key := makeServiceKey(name) - err := r.Delete(key, true) + key, err := makeServiceKey(ctx, name) + if err != nil { + return err + } + err = r.Delete(key, true) if err != nil { return etcderr.InterpretDeleteError(err, "service", name) } // TODO: can leave dangling endpoints, and potentially return incorrect // endpoints if a new service is created with the same name - key = makeServiceEndpointsKey(name) + key, err = makeServiceEndpointsKey(ctx, name) + if err != nil { + return err + } if err := r.Delete(key, true); err != nil && !tools.IsEtcdNotFound(err) { return etcderr.InterpretDeleteError(err, "endpoints", name) } @@ -395,7 +505,11 @@ func (r *Registry) DeleteService(ctx api.Context, name string) error { // UpdateService replaces an existing Service. func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error { - err := r.SetObj(makeServiceKey(svc.ID), svc) + key, err := makeServiceKey(ctx, svc.ID) + if err != nil { + return err + } + err = r.SetObj(key, svc) return etcderr.InterpretUpdateError(err, "service", svc.ID) } @@ -409,10 +523,14 @@ func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, return nil, fmt.Errorf("label selectors are not supported on services") } if value, found := field.RequiresExactMatch("ID"); found { - return r.Watch(makeServiceKey(value), version), nil + key, err := makeServiceKey(ctx, value) + if err != nil { + return nil, err + } + return r.Watch(key, version), nil } if field.Empty() { - return r.WatchList("/registry/services/specs", version, tools.Everything) + return r.WatchList(makeServiceListKey(ctx), version, tools.Everything) } return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") } @@ -420,14 +538,19 @@ func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, // ListEndpoints obtains a list of Services. func (r *Registry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { list := &api.EndpointsList{} - err := r.ExtractToList("/registry/services/endpoints", list) + key := makeServiceEndpointsListKey(ctx) + err := r.ExtractToList(key, list) return list, err } // UpdateEndpoints update Endpoints of a Service. func (r *Registry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error { + key, err := makeServiceEndpointsKey(ctx, e.ID) + if err != nil { + return err + } // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. - err := r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, + err = r.AtomicUpdate(key, &api.Endpoints{}, func(input runtime.Object) (runtime.Object, error) { // TODO: racy - label query is returning different results for two simultaneous updaters return e, nil @@ -445,10 +568,18 @@ func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, return nil, fmt.Errorf("label selectors are not supported on endpoints") } if value, found := field.RequiresExactMatch("ID"); found { - return r.Watch(makeServiceEndpointsKey(value), version), nil + key, err := makeServiceEndpointsKey(ctx, value) + if err != nil { + return nil, err + } + return r.Watch(key, version), nil } if field.Empty() { - return r.WatchList("/registry/services/endpoints", version, tools.Everything) + key, err := makeServiceEndpointsKey(ctx, "") + if err != nil { + return nil, err + } + return r.WatchList(key, version, tools.Everything) } return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index dbd964ffe8f..f738a734aab 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -19,6 +19,7 @@ package etcd import ( "reflect" "strconv" + "strings" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -76,10 +77,50 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) { } } -func TestEtcdGetPod(t *testing.T) { - ctx := api.NewContext() +// TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash +func TestEtcdGetPodDifferentNamespace(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Set("/registry/pods/foo", runtime.EncodeOrDie(latest.Codec, &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) + + ctx1 := api.NewDefaultContext() + ctx2 := api.WithNamespace(api.NewContext(), "other") + + key1, _ := makePodKey(ctx1, "foo") + key2, _ := makePodKey(ctx2, "foo") + + fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.Pod{TypeMeta: api.TypeMeta{Namespace: "default", ID: "foo"}}), 0) + fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.Pod{TypeMeta: api.TypeMeta{Namespace: "other", ID: "foo"}}), 0) + + registry := NewTestEtcdRegistry(fakeClient) + + pod1, err := registry.GetPod(ctx1, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if pod1.ID != "foo" { + t.Errorf("Unexpected pod: %#v", pod1) + } + if pod1.Namespace != "default" { + t.Errorf("Unexpected pod: %#v", pod1) + } + + pod2, err := registry.GetPod(ctx2, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if pod2.ID != "foo" { + t.Errorf("Unexpected pod: %#v", pod2) + } + if pod2.Namespace != "other" { + t.Errorf("Unexpected pod: %#v", pod2) + } + +} + +func TestEtcdGetPod(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + key, _ := makePodKey(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) pod, err := registry.GetPod(ctx, "foo") if err != nil { @@ -92,9 +133,10 @@ func TestEtcdGetPod(t *testing.T) { } func TestEtcdGetPodNotFound(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ + key, _ := makePodKey(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -108,10 +150,11 @@ func TestEtcdGetPodNotFound(t *testing.T) { } func TestEtcdCreatePod(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ + key, _ := makePodKey(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -138,12 +181,12 @@ func TestEtcdCreatePod(t *testing.T) { } // Suddenly, a wild scheduler appears: - err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"}) + err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine", TypeMeta: api.TypeMeta{Namespace: api.NamespaceDefault}}) if err != nil { t.Fatalf("unexpected error: %v", err) } - resp, err := fakeClient.Get("/registry/pods/foo", false, false) + resp, err := fakeClient.Get(key, false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -168,10 +211,34 @@ func TestEtcdCreatePod(t *testing.T) { } } -func TestEtcdCreatePodAlreadyExisting(t *testing.T) { - ctx := api.NewContext() +func TestEtcdCreatePodFailsWithoutNamespace(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ + fakeClient.TestIndex = true + registry := NewTestEtcdRegistry(fakeClient) + err := registry.CreatePod(api.NewContext(), &api.Pod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + }, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Name: "foo", + }, + }, + }, + }, + }) + if err == nil || !strings.Contains(err.Error(), "namespace") { + t.Fatalf("expected error that namespace was missing from context") + } +} + +func TestEtcdCreatePodAlreadyExisting(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + key, _ := makePodKey(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}}), @@ -191,10 +258,11 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) { } func TestEtcdCreatePodWithContainersError(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ + key, _ := makePodKey(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -232,10 +300,11 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { } func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ + key, _ := makePodKey(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -273,7 +342,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - resp, err := fakeClient.Get("/registry/pods/foo", false, false) + resp, err := fakeClient.Get(key, false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -299,10 +368,11 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { } func TestEtcdCreatePodWithExistingContainers(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ + key, _ := makePodKey(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -339,7 +409,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - resp, err := fakeClient.Get("/registry/pods/foo", false, false) + resp, err := fakeClient.Get(key, false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -365,11 +435,11 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { } func TestEtcdUpdatePodNotFound(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - key := "/registry/pods/foo" + key, _ := makePodKey(ctx, "foo") fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: tools.EtcdErrorNotFound, @@ -389,11 +459,11 @@ func TestEtcdUpdatePodNotFound(t *testing.T) { } func TestEtcdUpdatePodNotScheduled(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - key := "/registry/pods/foo" + key, _ := makePodKey(ctx, "foo") fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ TypeMeta: api.TypeMeta{ID: "foo"}, }), 1) @@ -421,11 +491,11 @@ func TestEtcdUpdatePodNotScheduled(t *testing.T) { } func TestEtcdUpdatePodScheduled(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - key := "/registry/pods/foo" + key, _ := makePodKey(ctx, "foo") fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ TypeMeta: api.TypeMeta{ID: "foo"}, DesiredState: api.PodState{ @@ -507,11 +577,11 @@ func TestEtcdUpdatePodScheduled(t *testing.T) { } func TestEtcdDeletePod(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - key := "/registry/pods/foo" + key, _ := makePodKey(ctx, "foo") fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ TypeMeta: api.TypeMeta{ID: "foo"}, DesiredState: api.PodState{Host: "machine"}, @@ -544,11 +614,10 @@ func TestEtcdDeletePod(t *testing.T) { } func TestEtcdDeletePodMultipleContainers(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - - key := "/registry/pods/foo" + key, _ := makePodKey(ctx, "foo") fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ TypeMeta: api.TypeMeta{ID: "foo"}, DesiredState: api.PodState{Host: "machine"}, @@ -587,7 +656,8 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { func TestEtcdEmptyListPods(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) - key := "/registry/pods" + ctx := api.NewDefaultContext() + key := makePodListKey(ctx) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ @@ -597,12 +667,10 @@ func TestEtcdEmptyListPods(t *testing.T) { E: nil, } registry := NewTestEtcdRegistry(fakeClient) - ctx := api.NewContext() pods, err := registry.ListPods(ctx, labels.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) } - if len(pods.Items) != 0 { t.Errorf("Unexpected pod list: %#v", pods) } @@ -610,13 +678,13 @@ func TestEtcdEmptyListPods(t *testing.T) { func TestEtcdListPodsNotFound(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) - key := "/registry/pods" + ctx := api.NewDefaultContext() + key := makePodListKey(ctx) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: tools.EtcdErrorNotFound, } registry := NewTestEtcdRegistry(fakeClient) - ctx := api.NewContext() pods, err := registry.ListPods(ctx, labels.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) @@ -629,7 +697,8 @@ func TestEtcdListPodsNotFound(t *testing.T) { func TestEtcdListPods(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) - key := "/registry/pods" + ctx := api.NewDefaultContext() + key := makePodListKey(ctx) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ @@ -652,7 +721,6 @@ func TestEtcdListPods(t *testing.T) { E: nil, } registry := NewTestEtcdRegistry(fakeClient) - ctx := api.NewContext() pods, err := registry.ListPods(ctx, labels.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) @@ -668,9 +736,9 @@ func TestEtcdListPods(t *testing.T) { } func TestEtcdListControllersNotFound(t *testing.T) { - ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) - key := "/registry/controllers" + ctx := api.NewDefaultContext() + key := makeControllerListKey(ctx) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: tools.EtcdErrorNotFound, @@ -687,9 +755,9 @@ func TestEtcdListControllersNotFound(t *testing.T) { } func TestEtcdListServicesNotFound(t *testing.T) { - ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) - key := "/registry/services/specs" + ctx := api.NewDefaultContext() + key := makeServiceListKey(ctx) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: tools.EtcdErrorNotFound, @@ -706,9 +774,9 @@ func TestEtcdListServicesNotFound(t *testing.T) { } func TestEtcdListControllers(t *testing.T) { - ctx := api.NewContext() fakeClient := tools.NewFakeEtcdClient(t) - key := "/registry/controllers" + ctx := api.NewDefaultContext() + key := makeControllerListKey(ctx) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ @@ -735,10 +803,50 @@ func TestEtcdListControllers(t *testing.T) { } } -func TestEtcdGetController(t *testing.T) { - ctx := api.NewContext() +// TestEtcdGetControllerDifferentNamespace ensures same-name controllers in different namespaces do not clash +func TestEtcdGetControllerDifferentNamespace(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) + + ctx1 := api.NewDefaultContext() + ctx2 := api.WithNamespace(api.NewContext(), "other") + + key1, _ := makeControllerKey(ctx1, "foo") + key2, _ := makeControllerKey(ctx2, "foo") + + fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{Namespace: "default", ID: "foo"}}), 0) + fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{Namespace: "other", ID: "foo"}}), 0) + + registry := NewTestEtcdRegistry(fakeClient) + + ctrl1, err := registry.GetController(ctx1, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if ctrl1.ID != "foo" { + t.Errorf("Unexpected controller: %#v", ctrl1) + } + if ctrl1.Namespace != "default" { + t.Errorf("Unexpected controller: %#v", ctrl1) + } + + ctrl2, err := registry.GetController(ctx2, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if ctrl2.ID != "foo" { + t.Errorf("Unexpected controller: %#v", ctrl2) + } + if ctrl2.Namespace != "other" { + t.Errorf("Unexpected controller: %#v", ctrl2) + } + +} + +func TestEtcdGetController(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + key, _ := makeControllerKey(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) ctrl, err := registry.GetController(ctx, "foo") if err != nil { @@ -751,9 +859,10 @@ func TestEtcdGetController(t *testing.T) { } func TestEtcdGetControllerNotFound(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Data["/registry/controllers/foo"] = tools.EtcdResponseWithError{ + key, _ := makeControllerKey(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -770,9 +879,10 @@ func TestEtcdGetControllerNotFound(t *testing.T) { } func TestEtcdDeleteController(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) + key, _ := makeControllerKey(ctx, "foo") err := registry.DeleteController(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) @@ -781,16 +891,16 @@ func TestEtcdDeleteController(t *testing.T) { if len(fakeClient.DeletedKeys) != 1 { t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) } - key := "/registry/controllers/foo" if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } } func TestEtcdCreateController(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) + key, _ := makeControllerKey(ctx, "foo") err := registry.CreateController(ctx, &api.ReplicationController{ TypeMeta: api.TypeMeta{ ID: "foo", @@ -799,8 +909,7 @@ func TestEtcdCreateController(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - - resp, err := fakeClient.Get("/registry/controllers/foo", false, false) + resp, err := fakeClient.Get(key, false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -816,9 +925,10 @@ func TestEtcdCreateController(t *testing.T) { } func TestEtcdCreateControllerAlreadyExisting(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) + key, _ := makeControllerKey(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) err := registry.CreateController(ctx, &api.ReplicationController{ @@ -832,11 +942,11 @@ func TestEtcdCreateControllerAlreadyExisting(t *testing.T) { } func TestEtcdUpdateController(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - - resp, _ := fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) + key, _ := makeControllerKey(ctx, "foo") + resp, _ := fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) err := registry.UpdateController(ctx, &api.ReplicationController{ TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: strconv.FormatUint(resp.Node.ModifiedIndex, 10)}, @@ -855,9 +965,9 @@ func TestEtcdUpdateController(t *testing.T) { } func TestEtcdListServices(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - key := "/registry/services/specs" + key := makeServiceListKey(ctx) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ @@ -885,7 +995,7 @@ func TestEtcdListServices(t *testing.T) { } func TestEtcdCreateService(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) err := registry.CreateService(ctx, &api.Service{ @@ -895,7 +1005,8 @@ func TestEtcdCreateService(t *testing.T) { t.Errorf("unexpected error: %v", err) } - resp, err := fakeClient.Get("/registry/services/specs/foo", false, false) + key, _ := makeServiceKey(ctx, "foo") + resp, err := fakeClient.Get(key, false, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -912,9 +1023,10 @@ func TestEtcdCreateService(t *testing.T) { } func TestEtcdCreateServiceAlreadyExisting(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) + key, _ := makeServiceKey(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) err := registry.CreateService(ctx, &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, @@ -924,10 +1036,50 @@ func TestEtcdCreateServiceAlreadyExisting(t *testing.T) { } } -func TestEtcdGetService(t *testing.T) { - ctx := api.NewContext() +// TestEtcdGetServiceDifferentNamespace ensures same-name services in different namespaces do not clash +func TestEtcdGetServiceDifferentNamespace(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) + + ctx1 := api.NewDefaultContext() + ctx2 := api.WithNamespace(api.NewContext(), "other") + + key1, _ := makeServiceKey(ctx1, "foo") + key2, _ := makeServiceKey(ctx2, "foo") + + fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{Namespace: "default", ID: "foo"}}), 0) + fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{Namespace: "other", ID: "foo"}}), 0) + + registry := NewTestEtcdRegistry(fakeClient) + + service1, err := registry.GetService(ctx1, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if service1.ID != "foo" { + t.Errorf("Unexpected service: %#v", service1) + } + if service1.Namespace != "default" { + t.Errorf("Unexpected service: %#v", service1) + } + + service2, err := registry.GetService(ctx2, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if service2.ID != "foo" { + t.Errorf("Unexpected service: %#v", service2) + } + if service2.Namespace != "other" { + t.Errorf("Unexpected service: %#v", service2) + } + +} + +func TestEtcdGetService(t *testing.T) { + ctx := api.NewDefaultContext() + fakeClient := tools.NewFakeEtcdClient(t) + key, _ := makeServiceKey(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) service, err := registry.GetService(ctx, "foo") if err != nil { @@ -940,9 +1092,10 @@ func TestEtcdGetService(t *testing.T) { } func TestEtcdGetServiceNotFound(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{ + key, _ := makeServiceKey(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -956,7 +1109,7 @@ func TestEtcdGetServiceNotFound(t *testing.T) { } func TestEtcdDeleteService(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) err := registry.DeleteService(ctx, "foo") @@ -967,22 +1120,22 @@ func TestEtcdDeleteService(t *testing.T) { if len(fakeClient.DeletedKeys) != 2 { t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys) } - key := "/registry/services/specs/foo" + key, _ := makeServiceKey(ctx, "foo") if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } - key = "/registry/services/endpoints/foo" + key, _ = makeServiceEndpointsKey(ctx, "foo") if fakeClient.DeletedKeys[1] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], key) } } func TestEtcdUpdateService(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - - resp, _ := fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) + key, _ := makeServiceKey(ctx, "foo") + resp, _ := fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0) registry := NewTestEtcdRegistry(fakeClient) testService := api.Service{ TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: strconv.FormatUint(resp.Node.ModifiedIndex, 10)}, @@ -1012,9 +1165,9 @@ func TestEtcdUpdateService(t *testing.T) { } func TestEtcdListEndpoints(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - key := "/registry/services/endpoints" + key := makeServiceEndpointsListKey(ctx) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ @@ -1042,7 +1195,7 @@ func TestEtcdListEndpoints(t *testing.T) { } func TestEtcdGetEndpoints(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) endpoints := &api.Endpoints{ @@ -1050,7 +1203,8 @@ func TestEtcdGetEndpoints(t *testing.T) { Endpoints: []string{"127.0.0.1:34855"}, } - fakeClient.Set("/registry/services/endpoints/foo", runtime.EncodeOrDie(latest.Codec, endpoints), 0) + key, _ := makeServiceEndpointsKey(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, endpoints), 0) got, err := registry.GetEndpoints(ctx, "foo") if err != nil { @@ -1063,7 +1217,7 @@ func TestEtcdGetEndpoints(t *testing.T) { } func TestEtcdUpdateEndpoints(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true registry := NewTestEtcdRegistry(fakeClient) @@ -1072,14 +1226,15 @@ func TestEtcdUpdateEndpoints(t *testing.T) { Endpoints: []string{"baz", "bar"}, } - fakeClient.Set("/registry/services/endpoints/foo", runtime.EncodeOrDie(latest.Codec, &api.Endpoints{}), 0) + key, _ := makeServiceEndpointsKey(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Endpoints{}), 0) err := registry.UpdateEndpoints(ctx, &endpoints) if err != nil { t.Errorf("unexpected error: %v", err) } - response, err := fakeClient.Get("/registry/services/endpoints/foo", false, false) + response, err := fakeClient.Get(key, false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -1091,7 +1246,7 @@ func TestEtcdUpdateEndpoints(t *testing.T) { } func TestEtcdWatchServices(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) watching, err := registry.WatchServices(ctx, @@ -1119,7 +1274,7 @@ func TestEtcdWatchServices(t *testing.T) { } func TestEtcdWatchServicesBadSelector(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) _, err := registry.WatchServices( @@ -1144,7 +1299,7 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) { } func TestEtcdWatchEndpoints(t *testing.T) { - ctx := api.NewContext() + ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) registry := NewTestEtcdRegistry(fakeClient) watching, err := registry.WatchEndpoints( diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 8f7ff6618a6..7044313788c 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -179,18 +179,18 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er for _, node := range nodes { if node.Dir { h.decodeNodeList(node.Nodes, slicePtr) - } else { - obj := reflect.New(v.Type().Elem()) - err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)) - if h.ResourceVersioner != nil { - _ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex) - // being unable to set the version does not prevent the object from being extracted - } - if err != nil { - return err - } - v.Set(reflect.Append(v, obj.Elem())) + continue } + obj := reflect.New(v.Type().Elem()) + err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)) + if h.ResourceVersioner != nil { + _ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex) + // being unable to set the version does not prevent the object from being extracted + } + if err != nil { + return err + } + v.Set(reflect.Append(v, obj.Elem())) } return nil } diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 40918825690..1cfee63db54 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -141,10 +141,10 @@ func TestExtractToListAcrossDirectories(t *testing.T) { }, } expect := api.PodList{ - JSONBase: api.JSONBase{ResourceVersion: 10}, + TypeMeta: api.TypeMeta{ResourceVersion: "10"}, Items: []api.Pod{ - {JSONBase: api.JSONBase{ID: "foo", ResourceVersion: 1}}, - {JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 2}}, + {TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}}, + {TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}}, }, } @@ -187,11 +187,11 @@ func TestExtractToListExcludesDirectories(t *testing.T) { }, } expect := api.PodList{ - JSONBase: api.JSONBase{ResourceVersion: 10}, + TypeMeta: api.TypeMeta{ResourceVersion: "10"}, Items: []api.Pod{ - {JSONBase: api.JSONBase{ID: "foo", ResourceVersion: 1}}, - {JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 2}}, - {JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 3}}, + {TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}}, + {TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}}, + {TypeMeta: api.TypeMeta{ID: "baz", ResourceVersion: "3"}}, }, } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 5a57a069c16..9b1abc17f02 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -184,7 +184,8 @@ func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue backoff.wait(podID) // Get the pod again; it may have changed/been scheduled already. pod = &api.Pod{} - err := factory.Client.Get().Path("pods").Path(podID).Do().Into(pod) + ctx := api.WithNamespace(api.NewContext(), pod.Namespace) + err := factory.Client.Get().Namespace(api.Namespace(ctx)).Path("pods").Path(podID).Do().Into(pod) if err != nil { glog.Errorf("Error getting pod %v for retry: %v; abandoning", podID, err) return @@ -256,7 +257,8 @@ type binder struct { // Bind just does a POST binding RPC. func (b *binder) Bind(binding *api.Binding) error { glog.V(2).Infof("Attempting to bind %v to %v", binding.PodID, binding.Host) - return b.Post().Path("bindings").Body(binding).Do().Error() + ctx := api.WithNamespace(api.NewContext(), binding.Namespace) + return b.Post().Namespace(api.Namespace(ctx)).Path("bindings").Body(binding).Do().Error() } type clock interface { diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index c1527600e13..d0d3e41c18f 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -73,8 +73,9 @@ func (s *Scheduler) scheduleOne() { return } b := &api.Binding{ - PodID: pod.ID, - Host: dest, + TypeMeta: api.TypeMeta{Namespace: pod.Namespace}, + PodID: pod.ID, + Host: dest, } if err := s.config.Binder.Bind(b); err != nil { record.Eventf(pod, "", string(api.PodWaiting), "failedScheduling", "Binding rejected: %v", err)