diff --git a/pkg/api/rest/create_test.go b/pkg/api/rest/create_test.go index 695f8cd746a..bb2be35245c 100644 --- a/pkg/api/rest/create_test.go +++ b/pkg/api/rest/create_test.go @@ -25,17 +25,17 @@ import ( func TestCheckGeneratedNameError(t *testing.T) { expect := errors.NewNotFound("foo", "bar") - if err := CheckGeneratedNameError(Pods, expect, &api.Pod{}); err != expect { + if err := CheckGeneratedNameError(Services, expect, &api.Pod{}); err != expect { t.Errorf("NotFoundError should be ignored: %v", err) } expect = errors.NewAlreadyExists("foo", "bar") - if err := CheckGeneratedNameError(Pods, expect, &api.Pod{}); err != expect { + if err := CheckGeneratedNameError(Services, expect, &api.Pod{}); err != expect { t.Errorf("AlreadyExists should be returned when no GenerateName field: %v", err) } expect = errors.NewAlreadyExists("foo", "bar") - if err := CheckGeneratedNameError(Pods, expect, &api.Pod{ObjectMeta: api.ObjectMeta{GenerateName: "foo"}}); err == nil || !errors.IsServerTimeout(err) { + if err := CheckGeneratedNameError(Services, expect, &api.Pod{ObjectMeta: api.ObjectMeta{GenerateName: "foo"}}); err == nil || !errors.IsServerTimeout(err) { t.Errorf("expected try again later error: %v", err) } } diff --git a/pkg/api/rest/types.go b/pkg/api/rest/types.go index 71c710f37c8..f3b6741f82d 100644 --- a/pkg/api/rest/types.go +++ b/pkg/api/rest/types.go @@ -72,46 +72,6 @@ func (rcStrategy) Validate(obj runtime.Object) errors.ValidationErrorList { return validation.ValidateReplicationController(controller) } -// podStrategy implements behavior for Pods -// TODO: move to a pod specific package. -type podStrategy struct { - runtime.ObjectTyper - api.NameGenerator -} - -// Pods is the default logic that applies when creating and updating Pod -// objects. -var Pods = podStrategy{api.Scheme, api.SimpleNameGenerator} - -// NamespaceScoped is true for pods. -func (podStrategy) NamespaceScoped() bool { - return true -} - -// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation. -func (podStrategy) ResetBeforeCreate(obj runtime.Object) { - pod := obj.(*api.Pod) - pod.Status = api.PodStatus{ - Phase: api.PodPending, - } -} - -// Validate validates a new pod. -func (podStrategy) Validate(obj runtime.Object) errors.ValidationErrorList { - pod := obj.(*api.Pod) - return validation.ValidatePod(pod) -} - -// AllowCreateOnUpdate is false for pods. -func (podStrategy) AllowCreateOnUpdate() bool { - return false -} - -// ValidateUpdate is the default update validation for an end user. -func (podStrategy) ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList { - return validation.ValidatePodUpdate(obj.(*api.Pod), old.(*api.Pod)) -} - // svcStrategy implements behavior for Services // TODO: move to a service specific package. type svcStrategy struct { diff --git a/pkg/master/master.go b/pkg/master/master.go index 89970391395..18f22fc9fa9 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -42,7 +42,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" @@ -52,6 +51,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" + podetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequota" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequotausage" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" @@ -117,20 +117,9 @@ type Config struct { // Master contains state for a Kubernetes cluster master/api server. type Master struct { // "Inputs", Copied from Config - podRegistry pod.Registry - controllerRegistry controller.Registry - serviceRegistry service.Registry - endpointRegistry endpoint.Registry - minionRegistry minion.Registry - bindingRegistry binding.Registry - eventRegistry generic.Registry - limitRangeRegistry generic.Registry - resourceQuotaRegistry resourcequota.Registry - namespaceRegistry generic.Registry - storage map[string]apiserver.RESTStorage - client *client.Client - portalNet *net.IPNet - cacheTimeout time.Duration + client *client.Client + portalNet *net.IPNet + cacheTimeout time.Duration mux apiserver.Mux muxHelper *apiserver.MuxHelper @@ -157,6 +146,17 @@ type Master struct { serviceReadWritePort int masterServices *util.Runner + // storage contains the RESTful endpoints exposed by this master + storage map[string]apiserver.RESTStorage + + // registries are internal client APIs for accessing the storage layer + // TODO: define the internal typed interface in a way that clients can + // also be replaced + nodeRegistry minion.Registry + namespaceRegistry generic.Registry + serviceRegistry service.Registry + endpointRegistry endpoint.Registry + // "Outputs" Handler http.Handler InsecureHandler http.Handler @@ -260,7 +260,6 @@ func setDefaults(c *Config) { // any unhandled paths to "Handler". func New(c *Config) *Master { setDefaults(c) - boundPodFactory := &pod.BasicBoundPodFactory{} if c.KubeletClient == nil { glog.Fatalf("master.New() called with config.KubeletClient == nil") } @@ -277,16 +276,6 @@ func New(c *Config) *Master { glog.Infof("Setting master service IPs based on PortalNet subnet to %q (read-only) and %q (read-write).", serviceReadOnlyIP, serviceReadWriteIP) m := &Master{ - podRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory), - controllerRegistry: etcd.NewRegistry(c.EtcdHelper, nil), - serviceRegistry: etcd.NewRegistry(c.EtcdHelper, nil), - endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil), - bindingRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory), - eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())), - namespaceRegistry: namespace.NewEtcdRegistry(c.EtcdHelper), - minionRegistry: etcd.NewRegistry(c.EtcdHelper, nil), - limitRangeRegistry: limitrange.NewEtcdRegistry(c.EtcdHelper), - resourceQuotaRegistry: resourcequota.NewEtcdRegistry(c.EtcdHelper), client: c.Client, portalNet: c.PortalNet, rootWebService: new(restful.WebService), @@ -376,34 +365,51 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) // init initializes master. func (m *Master) init(c *Config) { - nodeRESTStorage := minion.NewREST(m.minionRegistry) + boundPodFactory := &pod.BasicBoundPodFactory{} + podStorage, bindingStorage := podetcd.NewREST(c.EtcdHelper, boundPodFactory) + podRegistry := pod.NewRegistry(podStorage) + + eventRegistry := event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())) + limitRangeRegistry := limitrange.NewEtcdRegistry(c.EtcdHelper) + resourceQuotaRegistry := resourcequota.NewEtcdRegistry(c.EtcdHelper) + m.namespaceRegistry = namespace.NewEtcdRegistry(c.EtcdHelper) + + // TODO: split me up into distinct storage registries + registry := etcd.NewRegistry(c.EtcdHelper, podRegistry) + + m.serviceRegistry = registry + m.endpointRegistry = registry + m.nodeRegistry = registry + + nodeStorage := minion.NewREST(m.nodeRegistry) + // TODO: unify the storage -> registry and storage -> client patterns + nodeStorageClient := RESTStorageToNodes(nodeStorage) podCache := NewPodCache( c.KubeletClient, - RESTStorageToNodes(nodeRESTStorage).Nodes(), - m.podRegistry, + nodeStorageClient.Nodes(), + podRegistry, ) go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout) go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30) + // TODO: refactor podCache to sit on top of podStorage via status calls + podStorage = podStorage.WithPodStatus(podCache) + // TODO: Factor out the core API registration m.storage = map[string]apiserver.RESTStorage{ - "pods": pod.NewREST(&pod.RESTConfig{ - PodCache: podCache, - Registry: m.podRegistry, - }), - "replicationControllers": controller.NewREST(m.controllerRegistry, m.podRegistry), - "services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry, m.portalNet), + "pods": podStorage, + "bindings": bindingStorage, + + "replicationControllers": controller.NewREST(registry, podRegistry), + "services": service.NewREST(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.portalNet), "endpoints": endpoint.NewREST(m.endpointRegistry), - "minions": nodeRESTStorage, - "nodes": nodeRESTStorage, - "events": event.NewREST(m.eventRegistry), + "minions": nodeStorage, + "nodes": nodeStorage, + "events": event.NewREST(eventRegistry), - // TODO: should appear only in scheduler API group. - "bindings": binding.NewREST(m.bindingRegistry), - - "limitRanges": limitrange.NewREST(m.limitRangeRegistry), - "resourceQuotas": resourcequota.NewREST(m.resourceQuotaRegistry), - "resourceQuotaUsages": resourcequotausage.NewREST(m.resourceQuotaRegistry), + "limitRanges": limitrange.NewREST(limitRangeRegistry), + "resourceQuotas": resourcequota.NewREST(resourceQuotaRegistry), + "resourceQuotaUsages": resourcequotausage.NewREST(resourceQuotaRegistry), "namespaces": namespace.NewREST(m.namespaceRegistry), } @@ -542,7 +548,7 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server { } serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/v2/keys/"} } - nodes, err := m.minionRegistry.ListMinions(api.NewDefaultContext()) + nodes, err := m.nodeRegistry.ListMinions(api.NewDefaultContext()) if err != nil { glog.Errorf("Failed to list minions: %v", err) } diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 77492d7c616..a78e0d6ddbb 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -32,7 +32,7 @@ func TestGetServersToValidate(t *testing.T) { fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"} config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil} - master.minionRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{}) + master.nodeRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{}) servers := master.getServersToValidate(&config) diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 9f4315fc3de..cabf7e2f59c 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -20,10 +20,7 @@ import ( "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/constraint" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -34,8 +31,6 @@ import ( ) const ( - // PodPath is the path to pod resources in etcd - PodPath string = "/registry/pods" // ControllerPath is the path to controller resources in etcd ControllerPath string = "/registry/controllers" // ServicePath is the path to service resources in etcd @@ -53,15 +48,15 @@ const ( // MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd. type Registry struct { tools.EtcdHelper - boundPodFactory pod.BoundPodFactory + pods pod.Registry } // NewRegistry creates an etcd registry. -func NewRegistry(helper tools.EtcdHelper, boundPodFactory pod.BoundPodFactory) *Registry { +func NewRegistry(helper tools.EtcdHelper, pods pod.Registry) *Registry { registry := &Registry{ EtcdHelper: helper, + pods: pods, } - registry.boundPodFactory = boundPodFactory return registry } @@ -89,239 +84,6 @@ func MakeEtcdItemKey(ctx api.Context, prefix string, id string) (string, error) return key, nil } -// makePodListKey constructs etcd paths to pod directories enforcing namespace rules. -func makePodListKey(ctx api.Context) string { - return MakeEtcdListKey(ctx, PodPath) -} - -// makePodKey constructs etcd paths to pod items enforcing namespace rules. -func makePodKey(ctx api.Context, id string) (string, error) { - return MakeEtcdItemKey(ctx, PodPath, id) -} - -// ListPods obtains a list of pods with labels that match selector. -func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { - return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool { - return selector.Matches(labels.Set(pod.Labels)) - }) -} - -// ListPodsPredicate obtains a list of pods that match filter. -func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) { - allPods := api.PodList{} - key := makePodListKey(ctx) - err := r.ExtractToList(key, &allPods) - if err != nil { - return nil, err - } - filtered := []api.Pod{} - for _, pod := range allPods.Items { - if filter(&pod) { - filtered = append(filtered, pod) - } - } - allPods.Items = filtered - return &allPods, nil -} - -// WatchPods begins watching for new, changed, or deleted pods. -func (r *Registry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { - version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod") - if err != nil { - return nil, err - } - key := makePodListKey(ctx) - return r.WatchList(key, version, func(obj runtime.Object) bool { - podObj, ok := obj.(*api.Pod) - if !ok { - // Must be an error: return true to propagate to upper level. - return true - } - fields := pod.PodToSelectableFields(podObj) - return label.Matches(labels.Set(podObj.Labels)) && field.Matches(fields) - }) -} - -// GetPod gets a specific pod specified by its ID. -func (r *Registry) GetPod(ctx api.Context, id string) (*api.Pod, error) { - var pod api.Pod - key, err := makePodKey(ctx, id) - if err != nil { - return nil, err - } - if err = r.ExtractObj(key, &pod, false); err != nil { - return nil, etcderr.InterpretGetError(err, "pod", id) - } - return &pod, nil -} - -func makeBoundPodsKey(machine string) string { - return "/registry/nodes/" + machine + "/boundpods" -} - -// CreatePod creates a pod based on a specification. -func (r *Registry) CreatePod(ctx api.Context, pod *api.Pod) error { - // Set current status to "Waiting". - pod.Status.Phase = api.PodPending - pod.Status.Host = "" - key, err := makePodKey(ctx, pod.Name) - if err != nil { - return err - } - err = r.CreateObj(key, pod, 0) - return etcderr.InterpretCreateError(err, "pod", pod.Name) -} - -// ApplyBinding implements binding's registry -func (r *Registry) ApplyBinding(ctx api.Context, binding *api.Binding) error { - return etcderr.InterpretCreateError(r.assignPod(ctx, binding.PodID, binding.Host), "binding", "") -} - -// setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'. -// Returns the current state of the pod, or an error. -func (r *Registry) setPodHostTo(ctx api.Context, podID, oldMachine, machine string) (finalPod *api.Pod, err error) { - podKey, err := makePodKey(ctx, podID) - if err != nil { - return nil, err - } - err = r.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) { - pod, ok := obj.(*api.Pod) - if !ok { - return nil, fmt.Errorf("unexpected object: %#v", obj) - } - if pod.Status.Host != oldMachine { - return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.Name, pod.Status.Host) - } - pod.Status.Host = machine - finalPod = pod - return pod, nil - }) - return finalPod, err -} - -// assignPod assigns the given pod to the given machine. -func (r *Registry) assignPod(ctx api.Context, podID string, machine string) error { - finalPod, err := r.setPodHostTo(ctx, podID, "", machine) - if err != nil { - return err - } - boundPod, err := r.boundPodFactory.MakeBoundPod(machine, finalPod) - if err != nil { - return err - } - // Doing the constraint check this way provides atomicity guarantees. - contKey := makeBoundPodsKey(machine) - err = r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) { - boundPodList := in.(*api.BoundPods) - boundPodList.Items = append(boundPodList.Items, *boundPod) - if errors := constraint.Allowed(boundPodList.Items); len(errors) > 0 { - return nil, fmt.Errorf("the assignment would cause the following constraints violation: %v", errors) - } - return boundPodList, nil - }) - if err != nil { - // Put the pod's host back the way it was. This is a terrible hack, but - // can't really be helped, since there's not really a way to do atomic - // multi-object changes in etcd. - if _, err2 := r.setPodHostTo(ctx, podID, machine, ""); err2 != nil { - glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2) - } - } - return err -} - -func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error { - var podOut api.Pod - podKey, err := makePodKey(ctx, pod.Name) - if err != nil { - return err - } - err = r.EtcdHelper.ExtractObj(podKey, &podOut, false) - if err != nil { - return err - } - scheduled := podOut.Status.Host != "" - if scheduled { - pod.Status.Host = podOut.Status.Host - // If it's already been scheduled, limit the types of updates we'll accept. - errs := validation.ValidatePodUpdate(pod, &podOut) - if len(errs) != 0 { - return errors.NewInvalid("Pod", pod.Name, errs) - } - } - // There's no race with the scheduler, because either this write will fail because the host - // has been updated, or the host update will fail because this pod has been updated. - err = r.EtcdHelper.SetObj(podKey, pod, 0 /* ttl */) - if err != nil { - return err - } - if !scheduled { - // never scheduled, just update. - return nil - } - - containerKey := makeBoundPodsKey(podOut.Status.Host) - return r.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) { - boundPods := in.(*api.BoundPods) - for ix := range boundPods.Items { - item := &boundPods.Items[ix] - if item.Name == pod.Name && item.Namespace == pod.Namespace { - item.Spec = pod.Spec - return boundPods, nil - } - } - // This really shouldn't happen - glog.Warningf("Couldn't find: %s in %#v", pod.Name, boundPods) - return boundPods, fmt.Errorf("failed to update pod, couldn't find %s in %#v", pod.Name, boundPods) - }) -} - -// DeletePod deletes an existing pod specified by its ID. -func (r *Registry) DeletePod(ctx api.Context, podID string) error { - var pod api.Pod - podKey, err := makePodKey(ctx, podID) - if err != nil { - return err - } - err = r.ExtractObj(podKey, &pod, false) - if err != nil { - return etcderr.InterpretDeleteError(err, "pod", podID) - } - // First delete the pod, so a scheduler doesn't notice it getting removed from the - // machine and attempt to put it somewhere. - err = r.Delete(podKey, true) - if err != nil { - return etcderr.InterpretDeleteError(err, "pod", podID) - } - machine := pod.Status.Host - if machine == "" { - // Pod was never scheduled anywhere, just return. - return nil - } - // Next, remove the pod from the machine atomically. - contKey := makeBoundPodsKey(machine) - return r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) { - pods := in.(*api.BoundPods) - newPods := make([]api.BoundPod, 0, len(pods.Items)) - found := false - for _, item := range pods.Items { - if item.Name != pod.Name || item.Namespace != pod.Namespace { - newPods = append(newPods, item) - } else { - found = true - } - } - if !found { - // This really shouldn't happen, it indicates something is broken, and likely - // there is a lost pod somewhere. - // However it is "deleted" so log it and move on - glog.Warningf("Couldn't find: %s in %#v", podID, pods) - } - pods.Items = newPods - return pods, nil - }) -} - // ListControllers obtains a list of ReplicationControllers. func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) { controllers := &api.ReplicationControllerList{} @@ -348,7 +110,7 @@ func (r *Registry) WatchControllers(ctx api.Context, label, field labels.Selecto } match := label.Matches(labels.Set(controller.Labels)) if match { - pods, err := r.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector()) + pods, err := r.pods.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector()) if err != nil { glog.Warningf("Error listing pods: %v", err) // No object that's useable so drop it on the floor diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index a395352738d..83f47e52b96 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -18,7 +18,6 @@ package etcd import ( "strconv" - "strings" "testing" "time" @@ -26,7 +25,9 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" + podetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" @@ -34,791 +35,15 @@ import ( ) func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { - registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, - &pod.BasicBoundPodFactory{}) + registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, nil) return registry } -// TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash -func TestEtcdGetPodDifferentNamespace(t *testing.T) { - fakeClient := tools.NewFakeEtcdClient(t) - - ctx1 := api.NewDefaultContext() - ctx2 := api.WithNamespace(api.NewContext(), "other") - - key1, _ := makePodKey(ctx1, "foo") - key2, _ := makePodKey(ctx2, "foo") - - fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: "default", Name: "foo"}}), 0) - fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: "other", Name: "foo"}}), 0) - - registry := NewTestEtcdRegistry(fakeClient) - - pod1, err := registry.GetPod(ctx1, "foo") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if pod1.Name != "foo" { - t.Errorf("Unexpected pod: %#v", pod1) - } - if pod1.Namespace != "default" { - t.Errorf("Unexpected pod: %#v", pod1) - } - - pod2, err := registry.GetPod(ctx2, "foo") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if pod2.Name != "foo" { - t.Errorf("Unexpected pod: %#v", pod2) - } - if pod2.Namespace != "other" { - t.Errorf("Unexpected pod: %#v", pod2) - } - -} - -func TestEtcdGetPod(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - key, _ := makePodKey(ctx, "foo") - fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) - registry := NewTestEtcdRegistry(fakeClient) - pod, err := registry.GetPod(ctx, "foo") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if pod.Name != "foo" { - t.Errorf("Unexpected pod: %#v", pod) - } -} - -func TestEtcdGetPodNotFound(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - key, _ := makePodKey(ctx, "foo") - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: tools.EtcdErrorNotFound, - } - registry := NewTestEtcdRegistry(fakeClient) - _, err := registry.GetPod(ctx, "foo") - if !errors.IsNotFound(err) { - t.Errorf("Unexpected error returned: %#v", err) - } -} - -func TestEtcdCreatePod(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - key, _ := makePodKey(ctx, "foo") - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: tools.EtcdErrorNotFound, - } - fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0) - registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(ctx, &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: api.NamespaceDefault, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "foo", - }, - }, - }, - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // Suddenly, a wild scheduler appears: - err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine", ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - resp, err := fakeClient.Get(key, false, false) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - var pod api.Pod - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if pod.Name != "foo" { - t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) - } - var boundPods api.BoundPods - resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) - if len(boundPods.Items) != 1 || boundPods.Items[0].Name != "foo" { - t.Errorf("Unexpected boundPod list: %#v", boundPods) - } -} - -func TestEtcdCreatePodFailsWithoutNamespace(t *testing.T) { - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(api.NewContext(), &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "foo", - }, - }, - }, - }) - // Accept "namespace" or "Namespace". - if err == nil || !strings.Contains(err.Error(), "amespace") { - t.Fatalf("expected error that namespace was missing from context, got: %v", err) - } -} - -func TestEtcdCreatePodAlreadyExisting(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - key, _ := makePodKey(ctx, "foo") - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), - }, - }, - E: nil, - } - registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(ctx, &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - }, - }) - if !errors.IsAlreadyExists(err) { - t.Errorf("Unexpected error returned: %#v", err) - } -} - -func TestEtcdCreatePodWithContainersError(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - key, _ := makePodKey(ctx, "foo") - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: tools.EtcdErrorNotFound, - } - fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: tools.EtcdErrorNodeExist, // validate that ApplyBinding is translating Create errors - } - registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(ctx, &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: api.NamespaceDefault, - }, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Suddenly, a wild scheduler appears: - err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"}) - if !errors.IsAlreadyExists(err) { - t.Fatalf("Unexpected error returned: %#v", err) - } - - existingPod, err := registry.GetPod(ctx, "foo") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if existingPod.Status.Host == "machine" { - t.Fatal("Pod's host changed in response to an non-apply-able binding.") - } -} - -func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - key, _ := makePodKey(ctx, "foo") - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: tools.EtcdErrorNotFound, - } - fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: tools.EtcdErrorNotFound, - } - registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(ctx, &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: api.NamespaceDefault, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "foo", - }, - }, - }, - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // Suddenly, a wild scheduler appears: - err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - resp, err := fakeClient.Get(key, false, false) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - var pod api.Pod - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if pod.Name != "foo" { - t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) - } - var boundPods api.BoundPods - resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) - if len(boundPods.Items) != 1 || boundPods.Items[0].Name != "foo" { - t.Errorf("Unexpected boundPod list: %#v", boundPods) - } -} - -func TestEtcdCreatePodWithExistingContainers(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - key, _ := makePodKey(ctx, "foo") - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: nil, - }, - E: tools.EtcdErrorNotFound, - } - fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ - Items: []api.BoundPod{ - {ObjectMeta: api.ObjectMeta{Name: "bar"}}, - }, - }), 0) - registry := NewTestEtcdRegistry(fakeClient) - err := registry.CreatePod(ctx, &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: api.NamespaceDefault, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "foo", - }, - }, - }, - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // Suddenly, a wild scheduler appears: - err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - resp, err := fakeClient.Get(key, false, false) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - var pod api.Pod - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if pod.Name != "foo" { - t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) - } - var boundPods api.BoundPods - resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) - if len(boundPods.Items) != 2 || boundPods.Items[1].Name != "foo" { - t.Errorf("Unexpected boundPod list: %#v", boundPods) - } -} - -func TestEtcdUpdatePodNotFound(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - - key, _ := makePodKey(ctx, "foo") - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{}, - E: tools.EtcdErrorNotFound, - } - - registry := NewTestEtcdRegistry(fakeClient) - podIn := api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - ResourceVersion: "1", - Labels: map[string]string{ - "foo": "bar", - }, - }, - } - err := registry.UpdatePod(ctx, &podIn) - if err == nil { - t.Errorf("unexpected non-error") - } -} - -func TestEtcdUpdatePodNotScheduled(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - - key, _ := makePodKey(ctx, "foo") - fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - }), 1) - - registry := NewTestEtcdRegistry(fakeClient) - podIn := api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - ResourceVersion: "1", - Labels: map[string]string{ - "foo": "bar", - }, - }, - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - DNSPolicy: api.DNSClusterFirst, - }, - } - err := registry.UpdatePod(ctx, &podIn) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - response, err := fakeClient.Get(key, false, false) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - var podOut api.Pod - latest.Codec.DecodeInto([]byte(response.Node.Value), &podOut) - if !api.Semantic.DeepEqual(podOut, podIn) { - t.Errorf("expected: %v, got: %v", podOut, podIn) - } -} - -func TestEtcdUpdatePodScheduled(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - - key, _ := makePodKey(ctx, "foo") - fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Spec: api.PodSpec{ - // Host: "machine", - Containers: []api.Container{ - { - Image: "foo:v1", - }, - }, - }, - Status: api.PodStatus{ - Host: "machine", - }, - }), 1) - - contKey := "/registry/nodes/machine/boundpods" - fakeClient.Set(contKey, runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ - Items: []api.BoundPod{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Image: "foo:v1", - }, - }, - }, - }, { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Image: "foo:v1", - }, - }, - }, - }, - }, - }), 0) - - registry := NewTestEtcdRegistry(fakeClient) - podIn := api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: api.NamespaceDefault, - ResourceVersion: "1", - Labels: map[string]string{ - "foo": "bar", - }, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Image: "foo:v2", - ImagePullPolicy: api.PullIfNotPresent, - TerminationMessagePath: api.TerminationMessagePathDefault, - }, - }, - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - DNSPolicy: api.DNSClusterFirst, - }, - Status: api.PodStatus{ - Host: "machine", - }, - } - err := registry.UpdatePod(ctx, &podIn) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - response, err := fakeClient.Get(key, false, false) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - var podOut api.Pod - latest.Codec.DecodeInto([]byte(response.Node.Value), &podOut) - if !api.Semantic.DeepEqual(podOut, podIn) { - t.Errorf("expected: %#v, got: %#v", podOut, podIn) - } - - response, err = fakeClient.Get(contKey, false, false) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - var list api.BoundPods - if err := latest.Codec.DecodeInto([]byte(response.Node.Value), &list); err != nil { - t.Fatalf("unexpected error decoding response: %v", err) - } - - if len(list.Items) != 2 || !api.Semantic.DeepEqual(list.Items[1].Spec, podIn.Spec) { - t.Errorf("unexpected container list: %d\n items[0] - %#v\n podin.spec - %#v\n", len(list.Items), list.Items[1].Spec, podIn.Spec) - } -} - -func TestEtcdDeletePod(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - - key, _ := makePodKey(ctx, "foo") - fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Status: api.PodStatus{Host: "machine"}, - }), 0) - fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ - Items: []api.BoundPod{ - {ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}}, - }, - }), 0) - registry := NewTestEtcdRegistry(fakeClient) - err := registry.DeletePod(ctx, "foo") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if len(fakeClient.DeletedKeys) != 1 { - t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) - } else if fakeClient.DeletedKeys[0] != key { - t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) - } - response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - var boundPods api.BoundPods - latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods) - if len(boundPods.Items) != 0 { - t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value) - } -} - -func TestEtcdDeletePodMultipleContainers(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.TestIndex = true - key, _ := makePodKey(ctx, "foo") - fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Status: api.PodStatus{Host: "machine"}, - }), 0) - fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ - Items: []api.BoundPod{ - {ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}}, - {ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}}, - }, - }), 0) - registry := NewTestEtcdRegistry(fakeClient) - err := registry.DeletePod(ctx, "foo") - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if len(fakeClient.DeletedKeys) != 1 { - t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) - } - if fakeClient.DeletedKeys[0] != key { - t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) - } - response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - var boundPods api.BoundPods - latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods) - if len(boundPods.Items) != 1 { - t.Fatalf("Unexpected boundPod set: %#v, expected empty", boundPods) - } - if boundPods.Items[0].Namespace != "other" { - t.Errorf("Deleted wrong boundPod: %#v", boundPods) - } -} - -func TestEtcdEmptyListPods(t *testing.T) { - fakeClient := tools.NewFakeEtcdClient(t) - ctx := api.NewDefaultContext() - key := makePodListKey(ctx) - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Nodes: []*etcd.Node{}, - }, - }, - E: nil, - } - registry := NewTestEtcdRegistry(fakeClient) - pods, err := registry.ListPods(ctx, labels.Everything()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if len(pods.Items) != 0 { - t.Errorf("Unexpected pod list: %#v", pods) - } -} - -func TestEtcdListPodsNotFound(t *testing.T) { - fakeClient := tools.NewFakeEtcdClient(t) - ctx := api.NewDefaultContext() - key := makePodListKey(ctx) - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{}, - E: tools.EtcdErrorNotFound, - } - registry := NewTestEtcdRegistry(fakeClient) - pods, err := registry.ListPods(ctx, labels.Everything()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if len(pods.Items) != 0 { - t.Errorf("Unexpected pod list: %#v", pods) - } -} - -func TestEtcdListPods(t *testing.T) { - fakeClient := tools.NewFakeEtcdClient(t) - ctx := api.NewDefaultContext() - key := makePodListKey(ctx) - fakeClient.Data[key] = tools.EtcdResponseWithError{ - R: &etcd.Response{ - Node: &etcd.Node{ - Nodes: []*etcd.Node{ - { - Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Status: api.PodStatus{Host: "machine"}, - }), - }, - { - Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "bar"}, - Status: api.PodStatus{Host: "machine"}, - }), - }, - }, - }, - }, - E: nil, - } - registry := NewTestEtcdRegistry(fakeClient) - pods, err := registry.ListPods(ctx, labels.Everything()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if len(pods.Items) != 2 || pods.Items[0].Name != "foo" || pods.Items[1].Name != "bar" { - t.Errorf("Unexpected pod list: %#v", pods) - } - if pods.Items[0].Status.Host != "machine" || - pods.Items[1].Status.Host != "machine" { - t.Errorf("Failed to populate host name.") - } -} - -func TestEtcdWatchPods(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - watching, err := registry.WatchPods(ctx, - labels.Everything(), - labels.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - -func TestEtcdWatchPodsMatch(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - watching, err := registry.WatchPods(ctx, - labels.SelectorFromSet(labels.Set{"name": "foo"}), - labels.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Labels: map[string]string{ - "name": "foo", - }, - }, - } - podBytes, _ := latest.Codec.Encode(pod) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(podBytes), - }, - } - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - case <-time.After(time.Millisecond * 100): - t.Error("unexpected timeout from result channel") - } - watching.Stop() -} - -func TestEtcdWatchPodsNotMatch(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - watching, err := registry.WatchPods(ctx, - labels.SelectorFromSet(labels.Set{"name": "foo"}), - labels.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "bar", - Labels: map[string]string{ - "name": "bar", - }, - }, - } - podBytes, _ := latest.Codec.Encode(pod) - fakeClient.WatchResponse <- &etcd.Response{ - Action: "create", - Node: &etcd.Node{ - Value: string(podBytes), - }, - } - - select { - case <-watching.ResultChan(): - t.Error("unexpected result from result channel") - case <-time.After(time.Millisecond * 100): - // expected case - } +func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { + helper := tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + podStorage, _ := podetcd.NewREST(helper, nil) + registry := NewRegistry(helper, pod.NewRegistry(podStorage)) + return registry } func TestEtcdListControllersNotFound(t *testing.T) { @@ -1081,8 +306,8 @@ func TestEtcdWatchController(t *testing.T) { func TestEtcdWatchControllersMatch(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.ExpectNotFoundGet(makePodListKey(ctx)) - registry := NewTestEtcdRegistry(fakeClient) + fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) + registry := NewTestEtcdRegistryWithPods(fakeClient) watching, err := registry.WatchControllers(ctx, labels.SelectorFromSet(labels.Set{"name": "foo"}), labels.Everything(), @@ -1122,8 +347,8 @@ func TestEtcdWatchControllersMatch(t *testing.T) { func TestEtcdWatchControllersNotMatch(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - fakeClient.ExpectNotFoundGet(makePodListKey(ctx)) - registry := NewTestEtcdRegistry(fakeClient) + fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) + registry := NewTestEtcdRegistryWithPods(fakeClient) watching, err := registry.WatchControllers(ctx, labels.SelectorFromSet(labels.Set{"name": "foo"}), labels.Everything(), diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index c03fc11cb46..2b6c2b6404f 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -43,6 +43,8 @@ import ( // ResourceVersion and semantics. The RESTCreateStrategy and // RESTUpdateStrategy are generic across all backends, and encapsulate // logic specific to the API. +// +// TODO: make the default exposed methods exactly match a generic RESTStorage type Etcd struct { // Called to make a new object, should return e.g., &api.Pod{} NewFunc func() runtime.Object @@ -116,6 +118,8 @@ func NamespaceKeyFunc(ctx api.Context, prefix string, name string) (string, erro } // List returns a list of all the items matching m. +// TODO: rename this to ListPredicate, take the default predicate function on the constructor, and +// introduce a List method that uses the default predicate function func (e *Etcd) List(ctx api.Context, m generic.Matcher) (runtime.Object, error) { list := e.NewListFunc() err := e.Helper.ExtractToList(e.KeyRootFunc(ctx), list) @@ -126,6 +130,7 @@ func (e *Etcd) List(ctx api.Context, m generic.Matcher) (runtime.Object, error) } // CreateWithName inserts a new item with the provided name +// DEPRECATED: use Create instead func (e *Etcd) CreateWithName(ctx api.Context, name string, obj runtime.Object) error { key, err := e.KeyFunc(ctx, name) if err != nil { @@ -191,6 +196,7 @@ func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, erro } // UpdateWithName updates the item with the provided name +// DEPRECATED: use Update instead func (e *Etcd) UpdateWithName(ctx api.Context, name string, obj runtime.Object) error { key, err := e.KeyFunc(ctx, name) if err != nil { @@ -323,6 +329,8 @@ func (e *Etcd) Delete(ctx api.Context, name string) (runtime.Object, error) { // Watch starts a watch for the items that m matches. // TODO: Detect if m references a single object instead of a list. +// TODO: rename this to WatchPredicate, take the default predicate function on the constructor, and +// introduce a Watch method that uses the default predicate function func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName) if err != nil { diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go new file mode 100644 index 00000000000..6c7f1b96e04 --- /dev/null +++ b/pkg/registry/pod/etcd/etcd.go @@ -0,0 +1,252 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/constraint" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +// rest implements a RESTStorage for pods against etcd +type REST struct { + store *etcdgeneric.Etcd +} + +// NewREST returns a RESTStorage object that will work against pods. +func NewREST(h tools.EtcdHelper, factory pod.BoundPodFactory) (*REST, *BindingREST) { + prefix := "/registry/pods" + bindings := &podLifecycle{h} + store := &etcdgeneric.Etcd{ + NewFunc: func() runtime.Object { return &api.Pod{} }, + NewListFunc: func() runtime.Object { return &api.PodList{} }, + KeyRootFunc: func(ctx api.Context) string { + return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix) + }, + KeyFunc: func(ctx api.Context, name string) (string, error) { + return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name) + }, + ObjectNameFunc: func(obj runtime.Object) (string, error) { + return obj.(*api.Pod).Name, nil + }, + EndpointName: "pods", + + CreateStrategy: pod.Strategy, + + UpdateStrategy: pod.Strategy, + AfterUpdate: bindings.AfterUpdate, + + ReturnDeletedObject: true, + AfterDelete: bindings.AfterDelete, + + Helper: h, + } + return &REST{store: store}, &BindingREST{store: store, factory: factory} +} + +// WithPodStatus returns a rest object that decorates returned responses with extra +// status information. +func (r *REST) WithPodStatus(cache pod.PodStatusGetter) *REST { + store := *r.store + store.Decorator = pod.PodStatusDecorator(cache) + store.AfterDelete = rest.AllFuncs(store.AfterDelete, pod.PodStatusReset(cache)) + return &REST{store: &store} +} + +// New returns a new object +func (r *REST) New() runtime.Object { + return r.store.NewFunc() +} + +// NewList returns a new list object +func (r *REST) NewList() runtime.Object { + return r.store.NewListFunc() +} + +// List obtains a list of pods with labels that match selector. +func (r *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { + return r.store.List(ctx, pod.MatchPod(label, field)) +} + +// Watch begins watching for new, changed, or deleted pods. +func (r *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + return r.store.Watch(ctx, pod.MatchPod(label, field), resourceVersion) +} + +// Get gets a specific pod specified by its ID. +func (r *REST) Get(ctx api.Context, name string) (runtime.Object, error) { + return r.store.Get(ctx, name) +} + +// Create creates a pod based on a specification. +func (r *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { + return r.store.Create(ctx, obj) +} + +// Update changes a pod specification. +func (r *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { + return r.store.Update(ctx, obj) +} + +// Delete deletes an existing pod specified by its ID. +func (r *REST) Delete(ctx api.Context, name string) (runtime.Object, error) { + return r.store.Delete(ctx, name) +} + +// ResourceLocation returns a pods location from its HostIP +func (r *REST) ResourceLocation(ctx api.Context, name string) (string, error) { + return pod.ResourceLocation(r, ctx, name) +} + +func makeBoundPodsKey(machine string) string { + return "/registry/nodes/" + machine + "/boundpods" +} + +// BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use. +type BindingREST struct { + store *etcdgeneric.Etcd + factory pod.BoundPodFactory +} + +func (r *BindingREST) New() runtime.Object { + return &api.Binding{} +} + +// Create ensures a pod is bound to a specific host. +func (r *BindingREST) Create(ctx api.Context, obj runtime.Object) (out runtime.Object, err error) { + binding := obj.(*api.Binding) + err = r.assignPod(ctx, binding.PodID, binding.Host) + err = etcderr.InterpretCreateError(err, "binding", "") + out = &api.Status{Status: api.StatusSuccess} + return +} + +// setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'. +// Returns the current state of the pod, or an error. +func (r *BindingREST) setPodHostTo(ctx api.Context, podID, oldMachine, machine string) (finalPod *api.Pod, err error) { + podKey, err := r.store.KeyFunc(ctx, podID) + if err != nil { + return nil, err + } + err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) { + pod, ok := obj.(*api.Pod) + if !ok { + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + if pod.Status.Host != oldMachine { + return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.Name, pod.Status.Host) + } + pod.Status.Host = machine + finalPod = pod + return pod, nil + }) + return finalPod, err +} + +// assignPod assigns the given pod to the given machine. +func (r *BindingREST) assignPod(ctx api.Context, podID string, machine string) error { + finalPod, err := r.setPodHostTo(ctx, podID, "", machine) + if err != nil { + return err + } + boundPod, err := r.factory.MakeBoundPod(machine, finalPod) + if err != nil { + return err + } + // Doing the constraint check this way provides atomicity guarantees. + contKey := makeBoundPodsKey(machine) + err = r.store.Helper.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) { + boundPodList := in.(*api.BoundPods) + boundPodList.Items = append(boundPodList.Items, *boundPod) + if errors := constraint.Allowed(boundPodList.Items); len(errors) > 0 { + return nil, fmt.Errorf("the assignment would cause the following constraints violation: %v", errors) + } + return boundPodList, nil + }) + if err != nil { + // Put the pod's host back the way it was. This is a terrible hack, but + // can't really be helped, since there's not really a way to do atomic + // multi-object changes in etcd. + if _, err2 := r.setPodHostTo(ctx, podID, machine, ""); err2 != nil { + glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2) + } + } + return err +} + +type podLifecycle struct { + tools.EtcdHelper +} + +func (h *podLifecycle) AfterUpdate(obj runtime.Object) error { + pod := obj.(*api.Pod) + if len(pod.Status.Host) == 0 { + return nil + } + containerKey := makeBoundPodsKey(pod.Status.Host) + return h.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) { + boundPods := in.(*api.BoundPods) + for ix := range boundPods.Items { + if boundPods.Items[ix].Name == pod.Name && boundPods.Items[ix].Namespace == pod.Namespace { + boundPods.Items[ix].Spec = pod.Spec + return boundPods, nil + } + } + // This really shouldn't happen + glog.Warningf("Couldn't find: %s in %#v", pod.Name, boundPods) + return boundPods, fmt.Errorf("failed to update pod, couldn't find %s in %#v", pod.Name, boundPods) + }) +} + +func (h *podLifecycle) AfterDelete(obj runtime.Object) error { + pod := obj.(*api.Pod) + if len(pod.Status.Host) == 0 { + return nil + } + containerKey := makeBoundPodsKey(pod.Status.Host) + return h.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) { + pods := in.(*api.BoundPods) + newPods := make([]api.BoundPod, 0, len(pods.Items)) + found := false + for _, boundPod := range pods.Items { + if boundPod.Name != pod.Name || boundPod.Namespace != pod.Namespace { + newPods = append(newPods, boundPod) + } else { + found = true + } + } + if !found { + // This really shouldn't happen, it indicates something is broken, and likely + // there is a lost pod somewhere. + // However it is "deleted" so log it and move on + glog.Warningf("Couldn't find: %s in %#v", pod.Name, pods) + } + pods.Items = newPods + return pods, nil + }) +} diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go new file mode 100644 index 00000000000..cb9d1d4b4a1 --- /dev/null +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -0,0 +1,1436 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/coreos/go-etcd/etcd" +) + +type fakeCache struct { + requestedNamespace string + requestedName string + clearedNamespace string + clearedName string + + statusToReturn *api.PodStatus + errorToReturn error +} + +func (f *fakeCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) { + f.requestedNamespace = namespace + f.requestedName = name + return f.statusToReturn, f.errorToReturn +} + +func (f *fakeCache) ClearPodStatus(namespace, name string) { + f.clearedNamespace = namespace + f.clearedName = name +} + +func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { + fakeEtcdClient := tools.NewFakeEtcdClient(t) + fakeEtcdClient.TestIndex = true + helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}} + return fakeEtcdClient, helper +} + +func newStorage(t *testing.T) (*REST, *BindingREST, *tools.FakeEtcdClient, tools.EtcdHelper) { + fakeEtcdClient, h := newHelper(t) + storage, bindingStorage := NewREST(h, &pod.BasicBoundPodFactory{}) + storage = storage.WithPodStatus(&fakeCache{statusToReturn: &api.PodStatus{}}) + return storage, bindingStorage, fakeEtcdClient, h +} + +func validNewPod() *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, + DNSPolicy: api.DNSClusterFirst, + Containers: []api.Container{ + { + Name: "foo", + Image: "test", + ImagePullPolicy: api.PullAlways, + + TerminationMessagePath: api.TerminationMessagePathDefault, + }, + }, + }, + } +} + +func validChangedPod() *api.Pod { + pod := validNewPod() + pod.ResourceVersion = "1" + pod.Labels = map[string]string{ + "foo": "bar", + } + return pod +} + +func TestStorage(t *testing.T) { + storage, _, _, _ := newStorage(t) + pod.NewRegistry(storage) +} + +func TestCreate(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + storage, _ := NewREST(helper, nil) + cache := &fakeCache{statusToReturn: &api.PodStatus{}} + storage = storage.WithPodStatus(cache) + test := resttest.New(t, storage, fakeEtcdClient.SetError) + pod := validNewPod() + pod.ObjectMeta = api.ObjectMeta{} + test.TestCreate( + // valid + pod, + // invalid + &api.Pod{ + Spec: api.PodSpec{ + Containers: []api.Container{}, + }, + }, + ) +} + +func expectPod(t *testing.T, out runtime.Object) (*api.Pod, bool) { + pod, ok := out.(*api.Pod) + if !ok || pod == nil { + t.Errorf("Expected an api.Pod object, was %#v", out) + return nil, false + } + return pod, true +} + +func TestCreateRegistryError(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.Err = fmt.Errorf("test error") + storage, _ := NewREST(helper, nil) + + pod := validNewPod() + _, err := storage.Create(api.NewDefaultContext(), pod) + if err != fakeEtcdClient.Err { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestCreateSetsFields(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + storage, _ := NewREST(helper, nil) + cache := &fakeCache{statusToReturn: &api.PodStatus{}} + storage = storage.WithPodStatus(cache) + pod := validNewPod() + _, err := storage.Create(api.NewDefaultContext(), pod) + if err != fakeEtcdClient.Err { + t.Fatalf("unexpected error: %v", err) + } + + actual := &api.Pod{} + if err := helper.ExtractObj("/registry/pods/default/foo", actual, false); err != nil { + t.Fatalf("unexpected extraction error: %v", err) + } + if actual.Name != pod.Name { + t.Errorf("unexpected pod: %#v", actual) + } + if len(actual.UID) == 0 { + t.Errorf("expected pod UID to be set: %#v", actual) + } +} + +func TestListError(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.Err = fmt.Errorf("test error") + storage, _ := NewREST(helper, nil) + cache := &fakeCache{} + storage = storage.WithPodStatus(cache) + pods, err := storage.List(api.NewDefaultContext(), labels.Everything(), labels.Everything()) + if err != fakeEtcdClient.Err { + t.Fatalf("Expected %#v, Got %#v", fakeEtcdClient.Err, err) + } + if pods != nil { + t.Errorf("Unexpected non-nil pod list: %#v", pods) + } +} + +func TestListCacheError(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.Data["/registry/pods/default"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Status: api.PodStatus{Host: "machine"}, + }), + }, + }, + }, + }, + } + storage, _ := NewREST(helper, nil) + cache := &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable} + storage = storage.WithPodStatus(cache) + + pods, err := storage.List(api.NewDefaultContext(), labels.Everything(), labels.Everything()) + if err != nil { + t.Fatalf("Expected no error, got %#v", err) + } + pl := pods.(*api.PodList) + if len(pl.Items) != 1 { + t.Fatalf("Unexpected 0-len pod list: %+v", pl) + } + if e, a := api.PodUnknown, pl.Items[0].Status.Phase; e != a { + t.Errorf("Expected %v, got %v", e, a) + } +} + +func TestListEmptyPodList(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.ChangeIndex = 1 + fakeEtcdClient.Data["/registry/pods"] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: fakeEtcdClient.NewError(tools.EtcdErrorCodeNotFound), + } + + storage, _ := NewREST(helper, nil) + cache := &fakeCache{} + storage = storage.WithPodStatus(cache) + pods, err := storage.List(api.NewContext(), labels.Everything(), labels.Everything()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(pods.(*api.PodList).Items) != 0 { + t.Errorf("Unexpected non-zero pod list: %#v", pods) + } + if pods.(*api.PodList).ResourceVersion != "1" { + t.Errorf("Unexpected resource version: %#v", pods) + } +} + +func TestListPodList(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.Data["/registry/pods/default"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Status: api.PodStatus{Host: "machine"}, + }), + }, + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "bar"}, + Status: api.PodStatus{Host: "machine"}, + }), + }, + }, + }, + }, + } + storage, _ := NewREST(helper, nil) + cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}} + storage = storage.WithPodStatus(cache) + + podsObj, err := storage.List(api.NewDefaultContext(), labels.Everything(), labels.Everything()) + pods := podsObj.(*api.PodList) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(pods.Items) != 2 { + t.Errorf("Unexpected pod list: %#v", pods) + } + if pods.Items[0].Name != "foo" || pods.Items[0].Status.Phase != api.PodRunning || pods.Items[0].Status.Host != "machine" { + t.Errorf("Unexpected pod: %#v", pods.Items[0]) + } + if pods.Items[1].Name != "bar" || pods.Items[1].Status.Host != "machine" { + t.Errorf("Unexpected pod: %#v", pods.Items[1]) + } +} + +func TestListPodListSelection(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.Data["/registry/pods/default"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + {Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + })}, + {Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "bar"}, + Status: api.PodStatus{Host: "barhost"}, + })}, + {Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "baz"}, + Status: api.PodStatus{Phase: api.PodFailed}, + })}, + {Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "qux", + Labels: map[string]string{"label": "qux"}, + }, + })}, + {Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "zot"}, + })}, + }, + }, + }, + } + storage, _ := NewREST(helper, nil) + cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}} + storage = storage.WithPodStatus(cache) + + ctx := api.NewDefaultContext() + + table := []struct { + label, field string + expectedIDs util.StringSet + }{ + { + expectedIDs: util.NewStringSet("foo", "bar", "baz", "qux", "zot"), + }, { + field: "name=zot", + expectedIDs: util.NewStringSet("zot"), + }, { + label: "label=qux", + expectedIDs: util.NewStringSet("qux"), + }, { + field: "Status.Phase=Failed", + expectedIDs: util.NewStringSet("baz"), + }, { + field: "Status.Host=barhost", + expectedIDs: util.NewStringSet("bar"), + }, { + field: "Status.Host=", + expectedIDs: util.NewStringSet("foo", "baz", "qux", "zot"), + }, { + field: "Status.Host!=", + expectedIDs: util.NewStringSet("bar"), + }, + } + + for index, item := range table { + label, err := labels.ParseSelector(item.label) + if err != nil { + t.Errorf("unexpected error: %v", err) + continue + } + field, err := labels.ParseSelector(item.field) + if err != nil { + t.Errorf("unexpected error: %v", err) + continue + } + podsObj, err := storage.List(ctx, label, field) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + pods := podsObj.(*api.PodList) + + set := util.NewStringSet() + for i := range pods.Items { + set.Insert(pods.Items[i].Name) + } + if e, a := len(item.expectedIDs), len(set); e != a { + t.Errorf("%v: Expected %v, got %v", index, item.expectedIDs, set) + } + /*for _, pod := range pods.Items { + if !item.expectedIDs.Has(pod.Name) { + t.Errorf("%v: Unexpected pod %v", index, pod.Name) + } + t.Logf("%v: Got pod Name: %v", index, pod.Name) + }*/ + } +} + +func TestPodDecode(t *testing.T) { + storage, _ := NewREST(tools.EtcdHelper{}, nil) + expected := validNewPod() + body, err := latest.Codec.Encode(expected) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + actual := storage.New() + if err := latest.Codec.DecodeInto(body, actual); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !api.Semantic.DeepEqual(expected, actual) { + t.Errorf("mismatch: %s", util.ObjectDiff(expected, actual)) + } +} + +func TestGet(t *testing.T) { + expect := validNewPod() + expect.Status.Host = "machine" + + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.Data["/registry/pods/test/foo"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(latest.Codec, expect), + }, + }, + } + storage, _ := NewREST(helper, nil) + cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}} + storage = storage.WithPodStatus(cache) + + obj, err := storage.Get(api.WithNamespace(api.NewContext(), "test"), "foo") + pod := obj.(*api.Pod) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expect.Status.Phase = api.PodRunning + if e, a := expect, pod; !api.Semantic.DeepEqual(e, a) { + t.Errorf("Unexpected pod: %s", util.ObjectDiff(e, a)) + } +} + +func TestGetCacheError(t *testing.T) { + expect := validNewPod() + expect.Status.Host = "machine" + + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.Data["/registry/pods/default/foo"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(latest.Codec, expect), + }, + }, + } + storage, _ := NewREST(helper, nil) + cache := &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable} + storage = storage.WithPodStatus(cache) + + obj, err := storage.Get(api.NewDefaultContext(), "foo") + pod := obj.(*api.Pod) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expect.Status.Phase = api.PodUnknown + if e, a := expect, pod; !api.Semantic.DeepEqual(e, a) { + t.Errorf("unexpected object: %s", util.ObjectDiff(e, a)) + } +} + +// TODO: remove, this is covered by RESTTest.TestCreate +func TestPodStorageValidatesCreate(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.Err = fmt.Errorf("test error") + storage, _ := NewREST(helper, nil) + cache := &fakeCache{statusToReturn: &api.PodStatus{}} + storage = storage.WithPodStatus(cache) + + pod := validNewPod() + pod.Labels = map[string]string{ + "invalid/label/to/cause/validation/failure": "bar", + } + c, err := storage.Create(api.NewDefaultContext(), pod) + if c != nil { + t.Errorf("Expected nil object") + } + if !errors.IsInvalid(err) { + t.Errorf("Expected to get an invalid resource error, got %v", err) + } +} + +// TODO: remove, this is covered by RESTTest.TestCreate +func TestCreatePod(t *testing.T) { + _, helper := newHelper(t) + storage, _ := NewREST(helper, nil) + cache := &fakeCache{statusToReturn: &api.PodStatus{}} + storage = storage.WithPodStatus(cache) + + pod := validNewPod() + obj, err := storage.Create(api.NewDefaultContext(), pod) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if obj == nil { + t.Fatalf("unexpected object: %#v", obj) + } + actual := &api.Pod{} + if err := helper.ExtractObj("/registry/pods/default/foo", actual, false); err != nil { + t.Fatalf("unexpected extraction error: %v", err) + } + if !api.HasObjectMetaSystemFieldValues(&actual.ObjectMeta) { + t.Errorf("Expected ObjectMeta field values were populated: %#v", actual) + } +} + +// TODO: remove, this is covered by RESTTest.TestCreate +func TestCreateWithConflictingNamespace(t *testing.T) { + _, helper := newHelper(t) + storage, _ := NewREST(helper, nil) + cache := &fakeCache{} + storage = storage.WithPodStatus(cache) + + pod := validNewPod() + pod.Namespace = "not-default" + + obj, err := storage.Create(api.NewDefaultContext(), pod) + if obj != nil { + t.Error("Expected a nil obj, but we got a value") + } + if err == nil { + t.Errorf("Expected an error, but we didn't get one") + } else if strings.Contains(err.Error(), "Controller.Namespace does not match the provided context") { + t.Errorf("Expected 'Pod.Namespace does not match the provided context' error, got '%v'", err.Error()) + } +} + +func TestUpdateWithConflictingNamespace(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.Data["/registry/pods/default/foo"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "default"}, + Status: api.PodStatus{Host: "machine"}, + }), + ModifiedIndex: 1, + }, + }, + } + storage, _ := NewREST(helper, nil) + cache := &fakeCache{} + storage = storage.WithPodStatus(cache) + + pod := validChangedPod() + pod.Namespace = "not-default" + + obj, created, err := storage.Update(api.NewDefaultContext(), pod) + if obj != nil || created { + t.Error("Expected a nil channel, but we got a value or created") + } + if err == nil { + t.Errorf("Expected an error, but we didn't get one") + } else if strings.Index(err.Error(), "the namespace of the provided object does not match the namespace sent on the request") == -1 { + t.Errorf("Expected 'Pod.Namespace does not match the provided context' error, got '%v'", err.Error()) + } +} + +func TestResourceLocation(t *testing.T) { + expectedIP := "1.2.3.4" + testCases := []struct { + pod api.Pod + query string + location string + }{ + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + }, + query: "foo", + location: expectedIP, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + }, + query: "foo:12345", + location: expectedIP + ":12345", + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "ctr"}, + }, + }, + }, + query: "foo", + location: expectedIP, + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}}, + }, + }, + }, + query: "foo", + location: expectedIP + ":9376", + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}}, + }, + }, + }, + query: "foo:12345", + location: expectedIP + ":12345", + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "ctr1"}, + {Name: "ctr2", Ports: []api.Port{{ContainerPort: 9376}}}, + }, + }, + }, + query: "foo", + location: expectedIP + ":9376", + }, + { + pod: api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "ctr1", Ports: []api.Port{{ContainerPort: 9376}}}, + {Name: "ctr2", Ports: []api.Port{{ContainerPort: 1234}}}, + }, + }, + }, + query: "foo", + location: expectedIP + ":9376", + }, + } + + for _, tc := range testCases { + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.Data["/registry/pods/default/foo"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(latest.Codec, &tc.pod), + }, + }, + } + storage, _ := NewREST(helper, nil) + cache := &fakeCache{statusToReturn: &api.PodStatus{PodIP: expectedIP}} + storage = storage.WithPodStatus(cache) + + redirector := apiserver.Redirector(storage) + location, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if location != tc.location { + t.Errorf("Expected %v, but got %v", tc.location, location) + } + } +} + +func TestDeletePod(t *testing.T) { + fakeEtcdClient, helper := newHelper(t) + fakeEtcdClient.ChangeIndex = 1 + fakeEtcdClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "other", + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + }, + }, + }), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + } + fakeEtcdClient.Data["/registry/pods/default/foo"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Status: api.PodStatus{Host: "machine"}, + }), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + } + storage, _ := NewREST(helper, nil) + cache := &fakeCache{statusToReturn: &api.PodStatus{}} + storage = storage.WithPodStatus(cache) + + result, err := storage.Delete(api.NewDefaultContext(), "foo") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cache.clearedNamespace != "default" || cache.clearedName != "foo" { + t.Fatalf("Unexpected cache delete: %s %s %#v", cache.clearedName, cache.clearedNamespace, result) + } + + actual := &api.BoundPods{} + if err := helper.ExtractObj("/registry/nodes/machine/boundpods", actual, false); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // verify bound pods removes the correct namsepace + if len(actual.Items) != 1 || actual.Items[0].Namespace != "other" { + t.Errorf("bound pods should be empty: %#v", actual) + } +} + +// TestEtcdGetDifferentNamespace ensures same-name pods in different namespaces do not clash +func TestEtcdGetDifferentNamespace(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + + ctx1 := api.NewDefaultContext() + ctx2 := api.WithNamespace(api.NewContext(), "other") + + key1, _ := registry.store.KeyFunc(ctx1, "foo") + key2, _ := registry.store.KeyFunc(ctx2, "foo") + + fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: "default", Name: "foo"}}), 0) + fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: "other", Name: "foo"}}), 0) + + obj, err := registry.Get(ctx1, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + pod1 := obj.(*api.Pod) + if pod1.Name != "foo" { + t.Errorf("Unexpected pod: %#v", pod1) + } + if pod1.Namespace != "default" { + t.Errorf("Unexpected pod: %#v", pod1) + } + + obj, err = registry.Get(ctx2, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + pod2 := obj.(*api.Pod) + if pod2.Name != "foo" { + t.Errorf("Unexpected pod: %#v", pod2) + } + if pod2.Namespace != "other" { + t.Errorf("Unexpected pod: %#v", pod2) + } + +} + +func TestEtcdGet(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) + obj, err := registry.Get(ctx, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + pod := obj.(*api.Pod) + if pod.Name != "foo" { + t.Errorf("Unexpected pod: %#v", pod) + } +} + +func TestEtcdGetNotFound(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: tools.EtcdErrorNotFound, + } + _, err := registry.Get(ctx, "foo") + if !errors.IsNotFound(err) { + t.Errorf("Unexpected error returned: %#v", err) + } +} + +func TestEtcdCreate(t *testing.T) { + registry, bindingRegistry, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + fakeClient.TestIndex = true + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: tools.EtcdErrorNotFound, + } + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0) + _, err := registry.Create(ctx, validNewPod()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Suddenly, a wild scheduler appears: + _, err = bindingRegistry.Create(ctx, &api.Binding{PodID: "foo", Host: "machine", ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + resp, err := fakeClient.Get(key, false, false) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + var pod api.Pod + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if pod.Name != "foo" { + t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) + } + var boundPods api.BoundPods + resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) + if len(boundPods.Items) != 1 || boundPods.Items[0].Name != "foo" { + t.Errorf("Unexpected boundPod list: %#v", boundPods) + } +} + +func TestEtcdCreateFailsWithoutNamespace(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + fakeClient.TestIndex = true + pod := validNewPod() + pod.Namespace = "" + _, err := registry.Create(api.NewContext(), pod) + // Accept "namespace" or "Namespace". + if err == nil || !strings.Contains(err.Error(), "amespace") { + t.Fatalf("expected error that namespace was missing from context, got: %v", err) + } +} + +func TestEtcdCreateAlreadyExisting(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), + }, + }, + E: nil, + } + _, err := registry.Create(ctx, validNewPod()) + if !errors.IsAlreadyExists(err) { + t.Errorf("Unexpected error returned: %#v", err) + } +} + +func TestEtcdCreateWithContainersError(t *testing.T) { + registry, bindingRegistry, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + fakeClient.TestIndex = true + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: tools.EtcdErrorNotFound, + } + fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: tools.EtcdErrorNodeExist, // validate that ApplyBinding is translating Create errors + } + _, err := registry.Create(ctx, validNewPod()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Suddenly, a wild scheduler appears: + _, err = bindingRegistry.Create(ctx, &api.Binding{PodID: "foo", Host: "machine"}) + if !errors.IsAlreadyExists(err) { + t.Fatalf("Unexpected error returned: %#v", err) + } + + obj, err := registry.Get(ctx, "foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + existingPod := obj.(*api.Pod) + if existingPod.Status.Host == "machine" { + t.Fatal("Pod's host changed in response to an non-apply-able binding.") + } +} + +func TestEtcdCreateWithContainersNotFound(t *testing.T) { + registry, bindingRegistry, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + fakeClient.TestIndex = true + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: tools.EtcdErrorNotFound, + } + fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: tools.EtcdErrorNotFound, + } + _, err := registry.Create(ctx, validNewPod()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Suddenly, a wild scheduler appears: + _, err = bindingRegistry.Create(ctx, &api.Binding{PodID: "foo", Host: "machine"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + resp, err := fakeClient.Get(key, false, false) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + var pod api.Pod + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if pod.Name != "foo" { + t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) + } + var boundPods api.BoundPods + resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) + if len(boundPods.Items) != 1 || boundPods.Items[0].Name != "foo" { + t.Errorf("Unexpected boundPod list: %#v", boundPods) + } +} + +func TestEtcdCreateWithExistingContainers(t *testing.T) { + registry, bindingRegistry, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + fakeClient.TestIndex = true + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: tools.EtcdErrorNotFound, + } + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + {ObjectMeta: api.ObjectMeta{Name: "bar"}}, + }, + }), 0) + _, err := registry.Create(ctx, validNewPod()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Suddenly, a wild scheduler appears: + _, err = bindingRegistry.Create(ctx, &api.Binding{PodID: "foo", Host: "machine"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + resp, err := fakeClient.Get(key, false, false) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + var pod api.Pod + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if pod.Name != "foo" { + t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) + } + var boundPods api.BoundPods + resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) + if len(boundPods.Items) != 2 || boundPods.Items[1].Name != "foo" { + t.Errorf("Unexpected boundPod list: %#v", boundPods) + } +} + +func TestEtcdUpdateNotFound(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + fakeClient.TestIndex = true + + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + podIn := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + ResourceVersion: "1", + Labels: map[string]string{ + "foo": "bar", + }, + }, + } + _, _, err := registry.Update(ctx, &podIn) + if err == nil { + t.Errorf("unexpected non-error") + } +} + +func TestEtcdUpdateNotScheduled(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + fakeClient.TestIndex = true + + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, validNewPod()), 1) + + podIn := validChangedPod() + _, _, err := registry.Update(ctx, podIn) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + response, err := fakeClient.Get(key, false, false) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + podOut := &api.Pod{} + latest.Codec.DecodeInto([]byte(response.Node.Value), podOut) + if !api.Semantic.DeepEqual(podOut, podIn) { + t.Errorf("objects differ: %v", util.ObjectDiff(podOut, podIn)) + } +} + +func TestEtcdUpdateScheduled(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + fakeClient.TestIndex = true + + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + // Host: "machine", + Containers: []api.Container{ + { + Image: "foo:v1", + }, + }, + }, + Status: api.PodStatus{ + Host: "machine", + }, + }), 1) + + contKey := "/registry/nodes/machine/boundpods" + fakeClient.Set(contKey, runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: "other", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Image: "foo:v1", + }, + }, + }, + }, { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Image: "foo:v1", + }, + }, + }, + }, + }, + }), 0) + + podIn := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + ResourceVersion: "1", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Image: "foo:v2", + ImagePullPolicy: api.PullIfNotPresent, + TerminationMessagePath: api.TerminationMessagePathDefault, + }, + }, + RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, + DNSPolicy: api.DNSClusterFirst, + }, + Status: api.PodStatus{ + Host: "machine", + }, + } + _, _, err := registry.Update(ctx, &podIn) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + response, err := fakeClient.Get(key, false, false) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var podOut api.Pod + latest.Codec.DecodeInto([]byte(response.Node.Value), &podOut) + if !api.Semantic.DeepEqual(podOut, podIn) { + t.Errorf("expected: %#v, got: %#v", podOut, podIn) + } + + response, err = fakeClient.Get(contKey, false, false) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var list api.BoundPods + if err := latest.Codec.DecodeInto([]byte(response.Node.Value), &list); err != nil { + t.Fatalf("unexpected error decoding response: %v", err) + } + + if len(list.Items) != 2 || !api.Semantic.DeepEqual(list.Items[1].Spec, podIn.Spec) { + t.Errorf("unexpected container list: %d\n items[0] - %#v\n podin.spec - %#v\n", len(list.Items), list.Items[0].Spec, podIn.Spec) + } +} + +func TestEtcdDeletePod(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + fakeClient.TestIndex = true + + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Status: api.PodStatus{Host: "machine"}, + }), 0) + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + {ObjectMeta: api.ObjectMeta{Name: "foo"}}, + }, + }), 0) + _, err := registry.Delete(ctx, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if len(fakeClient.DeletedKeys) != 1 { + t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) + } else if fakeClient.DeletedKeys[0] != key { + t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) + } + response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + var boundPods api.BoundPods + latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods) + if len(boundPods.Items) != 0 { + t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value) + } +} + +func TestEtcdDeletePodMultipleContainers(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + fakeClient.TestIndex = true + key, _ := registry.store.KeyFunc(ctx, "foo") + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Status: api.PodStatus{Host: "machine"}, + }), 0) + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + {ObjectMeta: api.ObjectMeta{Name: "foo"}}, + {ObjectMeta: api.ObjectMeta{Name: "bar"}}, + }, + }), 0) + _, err := registry.Delete(ctx, "foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if len(fakeClient.DeletedKeys) != 1 { + t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) + } + if fakeClient.DeletedKeys[0] != key { + t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) + } + response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + var boundPods api.BoundPods + latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods) + if len(boundPods.Items) != 1 { + t.Fatalf("Unexpected boundPod set: %#v, expected empty", boundPods) + } + if boundPods.Items[0].Name != "bar" { + t.Errorf("Deleted wrong boundPod: %#v", boundPods) + } +} + +func TestEtcdEmptyList(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + key := registry.store.KeyRootFunc(ctx) + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{}, + }, + }, + E: nil, + } + + obj, err := registry.List(ctx, labels.Everything(), labels.Everything()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + pods := obj.(*api.PodList) + if len(pods.Items) != 0 { + t.Errorf("Unexpected pod list: %#v", pods) + } +} + +func TestEtcdListNotFound(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + key := registry.store.KeyRootFunc(ctx) + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + obj, err := registry.List(ctx, labels.Everything(), labels.Everything()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + pods := obj.(*api.PodList) + if len(pods.Items) != 0 { + t.Errorf("Unexpected pod list: %#v", pods) + } +} + +func TestEtcdList(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + key := registry.store.KeyRootFunc(ctx) + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Status: api.PodStatus{Host: "machine"}, + }), + }, + { + Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "bar"}, + Status: api.PodStatus{Host: "machine"}, + }), + }, + }, + }, + }, + E: nil, + } + obj, err := registry.List(ctx, labels.Everything(), labels.Everything()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + pods := obj.(*api.PodList) + + if len(pods.Items) != 2 || pods.Items[0].Name != "foo" || pods.Items[1].Name != "bar" { + t.Errorf("Unexpected pod list: %#v", pods) + } + if pods.Items[0].Status.Host != "machine" || + pods.Items[1].Status.Host != "machine" { + t.Errorf("Failed to populate host name.") + } +} + +func TestEtcdWatchPods(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + watching, err := registry.Watch(ctx, + labels.Everything(), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + default: + } + fakeClient.WatchInjectError <- nil + if _, ok := <-watching.ResultChan(); ok { + t.Errorf("watching channel should be closed") + } + watching.Stop() +} + +func TestEtcdWatchPodsMatch(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + watching, err := registry.Watch(ctx, + labels.SelectorFromSet(labels.Set{"name": "foo"}), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Labels: map[string]string{ + "name": "foo", + }, + }, + } + podBytes, _ := latest.Codec.Encode(pod) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(podBytes), + }, + } + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + case <-time.After(time.Millisecond * 100): + t.Error("unexpected timeout from result channel") + } + watching.Stop() +} + +func TestEtcdWatchPodsNotMatch(t *testing.T) { + registry, _, fakeClient, _ := newStorage(t) + ctx := api.NewDefaultContext() + watching, err := registry.Watch(ctx, + labels.SelectorFromSet(labels.Set{"name": "foo"}), + labels.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Labels: map[string]string{ + "name": "bar", + }, + }, + } + podBytes, _ := latest.Codec.Encode(pod) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(podBytes), + }, + } + + select { + case <-watching.ResultChan(): + t.Error("unexpected result from result channel") + case <-time.After(time.Millisecond * 100): + // expected case + } +} diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 4fdaafdb5b4..23b99e31925 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -18,7 +18,9 @@ package pod import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -26,8 +28,6 @@ import ( type Registry interface { // ListPods obtains a list of pods having labels which match selector. ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) - // ListPodsPredicate obtains a list of pods for which filter returns true. - ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) // Watch for new/changed/deleted pods WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) // Get a specific pod @@ -39,3 +39,61 @@ type Registry interface { // Delete an existing pod DeletePod(ctx api.Context, podID string) error } + +// Storage is an interface for a standard REST Storage backend +// TODO: move me somewhere common +type Storage interface { + apiserver.RESTDeleter + apiserver.RESTLister + apiserver.RESTGetter + apiserver.ResourceWatcher + + Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) + Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) +} + +// storage puts strong typing around storage calls +type storage struct { + Storage +} + +// NewRegistry returns a new Registry interface for the given Storage. Any mismatched +// types will panic. +func NewRegistry(s Storage) Registry { + return &storage{s} +} + +func (s *storage) ListPods(ctx api.Context, label labels.Selector) (*api.PodList, error) { + obj, err := s.List(ctx, label, labels.Everything()) + if err != nil { + return nil, err + } + return obj.(*api.PodList), nil +} + +func (s *storage) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + return s.Watch(ctx, label, field, resourceVersion) +} + +func (s *storage) GetPod(ctx api.Context, podID string) (*api.Pod, error) { + obj, err := s.Get(ctx, podID) + if err != nil { + return nil, err + } + return obj.(*api.Pod), nil +} + +func (s *storage) CreatePod(ctx api.Context, pod *api.Pod) error { + _, err := s.Create(ctx, pod) + return err +} + +func (s *storage) UpdatePod(ctx api.Context, pod *api.Pod) error { + _, _, err := s.Update(ctx, pod) + return err +} + +func (s *storage) DeletePod(ctx api.Context, podID string) error { + _, err := s.Delete(ctx, podID) + return err +} diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index 4e7c995ae69..53c1df1a403 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -26,82 +26,100 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) +// podStrategy implements behavior for Pods +// TODO: move to a pod specific package. +type podStrategy struct { + runtime.ObjectTyper + api.NameGenerator +} + +// Strategy is the default logic that applies when creating and updating Pod +// objects via the REST API. +// TODO: Create other strategies for updating status, bindings, etc +var Strategy = podStrategy{api.Scheme, api.SimpleNameGenerator} + +// NamespaceScoped is true for pods. +func (podStrategy) NamespaceScoped() bool { + return true +} + +// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation. +func (podStrategy) ResetBeforeCreate(obj runtime.Object) { + pod := obj.(*api.Pod) + pod.Status = api.PodStatus{ + Phase: api.PodPending, + } +} + +// Validate validates a new pod. +func (podStrategy) Validate(obj runtime.Object) errors.ValidationErrorList { + pod := obj.(*api.Pod) + return validation.ValidatePod(pod) +} + +// AllowCreateOnUpdate is false for pods. +func (podStrategy) AllowCreateOnUpdate() bool { + return false +} + +// ValidateUpdate is the default update validation for an end user. +func (podStrategy) ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList { + return validation.ValidatePodUpdate(obj.(*api.Pod), old.(*api.Pod)) +} + +// PodStatusGetter is an interface used by Pods to fetch and retrieve status info. type PodStatusGetter interface { GetPodStatus(namespace, name string) (*api.PodStatus, error) ClearPodStatus(namespace, name string) } -// REST implements the RESTStorage interface in terms of a PodRegistry. -type REST struct { - podCache PodStatusGetter - registry Registry -} - -type RESTConfig struct { - PodCache PodStatusGetter - Registry Registry -} - -// NewREST returns a new REST. -func NewREST(config *RESTConfig) *REST { - return &REST{ - podCache: config.PodCache, - registry: config.Registry, - } -} - -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { - pod := obj.(*api.Pod) - - if err := rest.BeforeCreate(rest.Pods, ctx, obj); err != nil { - return nil, err - } - - if err := rs.registry.CreatePod(ctx, pod); err != nil { - err = rest.CheckGeneratedNameError(rest.Pods, err, pod) - return nil, err - } - return rs.registry.GetPod(ctx, pod.Name) -} - -func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { - namespace, found := api.NamespaceFrom(ctx) - if !found { - return &api.Status{Status: api.StatusFailure}, nil - } - rs.podCache.ClearPodStatus(namespace, id) - - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(ctx, id) -} - -func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { - pod, err := rs.registry.GetPod(ctx, id) - if err != nil { - return pod, err - } - if pod == nil { - return pod, nil - } - host := pod.Status.Host - if status, err := rs.podCache.GetPodStatus(pod.Namespace, pod.Name); err != nil { - pod.Status = api.PodStatus{ - Phase: api.PodUnknown, +// PodStatusDecorator returns a function that updates pod.Status based +// on the provided pod cache. +func PodStatusDecorator(cache PodStatusGetter) rest.ObjectFunc { + return func(obj runtime.Object) error { + pod := obj.(*api.Pod) + host := pod.Status.Host + if status, err := cache.GetPodStatus(pod.Namespace, pod.Name); err != nil { + pod.Status = api.PodStatus{ + Phase: api.PodUnknown, + } + } else { + pod.Status = *status } - } else { - pod.Status = *status + pod.Status.Host = host + return nil } - // Make sure not to hide a recent host with an old one from the cache. - // TODO: move host to spec - pod.Status.Host = host - return pod, err } -func PodToSelectableFields(pod *api.Pod) labels.Set { +// PodStatusReset returns a function that clears the pod cache when the object +// is deleted. +func PodStatusReset(cache PodStatusGetter) rest.ObjectFunc { + return func(obj runtime.Object) error { + pod := obj.(*api.Pod) + cache.ClearPodStatus(pod.Namespace, pod.Name) + return nil + } +} +// MatchPod returns a generic matcher for a given label and field selector. +func MatchPod(label, field labels.Selector) generic.Matcher { + return generic.MatcherFunc(func(obj runtime.Object) (bool, error) { + podObj, ok := obj.(*api.Pod) + if !ok { + return false, fmt.Errorf("not a pod") + } + fields := PodToSelectableFields(podObj) + return label.Matches(labels.Set(podObj.Labels)) && field.Matches(fields), nil + }) +} + +// PodToSelectableFields returns a label set that represents the object +// TODO: fields are not labels, and the validation rules for them do not apply. +func PodToSelectableFields(pod *api.Pod) labels.Set { // TODO we are populating both Status and DesiredState because selectors are not aware of API versions // see https://github.com/GoogleCloudPlatform/kubernetes/pull/2503 @@ -117,68 +135,13 @@ func PodToSelectableFields(pod *api.Pod) labels.Set { } } -// filterFunc returns a predicate based on label & field selectors that can be passed to registry's -// ListPods & WatchPods. -func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool { - return func(pod *api.Pod) bool { - fields := PodToSelectableFields(pod) - return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) - } -} - -func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { - pods, err := rs.registry.ListPodsPredicate(ctx, rs.filterFunc(label, field)) - if err == nil { - for i := range pods.Items { - pod := &pods.Items[i] - host := pod.Status.Host - if status, err := rs.podCache.GetPodStatus(pod.Namespace, pod.Name); err != nil { - pod.Status = api.PodStatus{ - Phase: api.PodUnknown, - } - } else { - pod.Status = *status - } - // Make sure not to hide a recent host with an old one from the cache. - // This is tested by the integration test. - // TODO: move host to spec - pod.Status.Host = host - } - } - return pods, err -} - -// Watch begins watching for new, changed, or deleted pods. -func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { - // TODO: Add pod status to watch command - return rs.registry.WatchPods(ctx, label, field, resourceVersion) -} - -func (*REST) New() runtime.Object { - return &api.Pod{} -} - -func (*REST) NewList() runtime.Object { - return &api.PodList{} -} - -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { - pod := obj.(*api.Pod) - if !api.ValidNamespace(ctx, &pod.ObjectMeta) { - return nil, false, errors.NewConflict("pod", pod.Namespace, fmt.Errorf("Pod.Namespace does not match the provided context")) - } - if errs := validation.ValidatePod(pod); len(errs) > 0 { - return nil, false, errors.NewInvalid("pod", pod.Name, errs) - } - if err := rs.registry.UpdatePod(ctx, pod); err != nil { - return nil, false, err - } - out, err := rs.registry.GetPod(ctx, pod.Name) - return out, false, err +// ResourceGetter is an interface for retrieving resources by ResourceLocation. +type ResourceGetter interface { + Get(api.Context, string) (runtime.Object, error) } // ResourceLocation returns a URL to which one can send traffic for the specified pod. -func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { +func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string, error) { // Allow ID as "podname" or "podname:port". If port is not specified, // try to use the first defined port on the pod. parts := strings.Split(id, ":") @@ -192,7 +155,7 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { port = parts[1] } - obj, err := rs.Get(ctx, name) + obj, err := getter.Get(ctx, name) if err != nil { return "", err } diff --git a/pkg/registry/pod/rest_test.go b/pkg/registry/pod/rest_test.go index ff133675499..e10e51c0e33 100644 --- a/pkg/registry/pod/rest_test.go +++ b/pkg/registry/pod/rest_test.go @@ -18,20 +18,9 @@ package pod import ( "fmt" - "reflect" - "strings" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) type fakeCache struct { @@ -55,626 +44,35 @@ func (f *fakeCache) ClearPodStatus(namespace, name string) { f.clearedName = name } -func expectApiStatusError(t *testing.T, out runtime.Object, msg string) { - status, ok := out.(*api.Status) - if !ok { - t.Errorf("Expected an api.Status object, was %#v", out) - return - } - if msg != status.Message { - t.Errorf("Expected %#v, was %s", msg, status.Message) - } -} - -func expectPod(t *testing.T, out runtime.Object) (*api.Pod, bool) { - pod, ok := out.(*api.Pod) - if !ok || pod == nil { - t.Errorf("Expected an api.Pod object, was %#v", out) - return nil, false - } - return pod, true -} - -func TestCreatePodRegistryError(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Err = fmt.Errorf("test error") - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, - } - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - }, - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - DNSPolicy: api.DNSClusterFirst, - }, - } - ctx := api.NewDefaultContext() - _, err := storage.Create(ctx, pod) - if err != podRegistry.Err { +func TestPodStatusDecorator(t *testing.T) { + cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}} + pod := &api.Pod{} + if err := PodStatusDecorator(cache)(pod); err != nil { t.Fatalf("unexpected error: %v", err) } -} - -func TestCreatePodSetsIds(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Err = fmt.Errorf("test error") - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, + if pod.Status.Phase != api.PodRunning { + t.Errorf("unexpected pod: %#v", pod) } - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - }, - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - DNSPolicy: api.DNSClusterFirst, - }, - } - ctx := api.NewDefaultContext() - _, err := storage.Create(ctx, pod) - if err != podRegistry.Err { - t.Fatalf("unexpected error: %v", err) - } - - if len(podRegistry.Pod.Name) == 0 { - t.Errorf("Expected pod ID to be set, Got %#v", pod) - } - if pod.Name != podRegistry.Pod.Name { - t.Errorf("Expected manifest ID to be equal to pod ID, Got %#v", pod) - } -} - -func TestCreatePodSetsUID(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Err = fmt.Errorf("test error") - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, - } - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - }, - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - DNSPolicy: api.DNSClusterFirst, - }, - } - ctx := api.NewDefaultContext() - _, err := storage.Create(ctx, pod) - if err != podRegistry.Err { - t.Fatalf("unexpected error: %v", err) - } - - if len(podRegistry.Pod.UID) == 0 { - t.Errorf("Expected pod UID to be set, Got %#v", pod) - } -} - -func TestListPodsError(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Err = fmt.Errorf("test error") - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, - } - ctx := api.NewContext() - pods, err := storage.List(ctx, labels.Everything(), labels.Everything()) - if err != podRegistry.Err { - t.Errorf("Expected %#v, Got %#v", podRegistry.Err, err) - } - if pods.(*api.PodList) != nil { - t.Errorf("Unexpected non-nil pod list: %#v", pods) - } -} - -func TestListPodsCacheError(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Pods = &api.PodList{ - Items: []api.Pod{ - { - ObjectMeta: api.ObjectMeta{ - Name: "foo", - }, - }, - }, - } - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable}, - } - ctx := api.NewContext() - pods, err := storage.List(ctx, labels.Everything(), labels.Everything()) - if err != nil { - t.Fatalf("Expected no error, got %#v", err) - } - pl := pods.(*api.PodList) - if len(pl.Items) != 1 { - t.Fatalf("Unexpected 0-len pod list: %+v", pl) - } - if e, a := api.PodUnknown, pl.Items[0].Status.Phase; e != a { - t.Errorf("Expected %v, got %v", e, a) - } -} - -func TestListEmptyPodList(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(&api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}}) - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, - } - ctx := api.NewContext() - pods, err := storage.List(ctx, labels.Everything(), labels.Everything()) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if len(pods.(*api.PodList).Items) != 0 { - t.Errorf("Unexpected non-zero pod list: %#v", pods) - } - if pods.(*api.PodList).ResourceVersion != "1" { - t.Errorf("Unexpected resource version: %#v", pods) - } -} - -func TestListPodList(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Pods = &api.PodList{ - Items: []api.Pod{ - { - ObjectMeta: api.ObjectMeta{ - Name: "foo", - }, - }, - { - ObjectMeta: api.ObjectMeta{ - Name: "bar", - }, - }, - }, - } - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}, - } - ctx := api.NewContext() - podsObj, err := storage.List(ctx, labels.Everything(), labels.Everything()) - pods := podsObj.(*api.PodList) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if len(pods.Items) != 2 { - t.Errorf("Unexpected pod list: %#v", pods) - } - if pods.Items[0].Name != "foo" || pods.Items[0].Status.Phase != api.PodRunning { - t.Errorf("Unexpected pod: %#v", pods.Items[0]) - } - if pods.Items[1].Name != "bar" { - t.Errorf("Unexpected pod: %#v", pods.Items[1]) - } -} - -func TestListPodListSelection(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Pods = &api.PodList{ - Items: []api.Pod{ - { - ObjectMeta: api.ObjectMeta{Name: "foo"}, - }, { - ObjectMeta: api.ObjectMeta{Name: "bar"}, - Status: api.PodStatus{Host: "barhost"}, - }, { - ObjectMeta: api.ObjectMeta{Name: "baz"}, - Status: api.PodStatus{Phase: "bazstatus"}, - }, { - ObjectMeta: api.ObjectMeta{ - Name: "qux", - Labels: map[string]string{"label": "qux"}, - }, - }, { - ObjectMeta: api.ObjectMeta{Name: "zot"}, - }, - }, - } - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, - } - ctx := api.NewContext() - - table := []struct { - label, field string - expectedIDs util.StringSet - }{ - { - expectedIDs: util.NewStringSet("foo", "bar", "baz", "qux", "zot"), - }, { - field: "name=zot", - expectedIDs: util.NewStringSet("zot"), - }, { - label: "label=qux", - expectedIDs: util.NewStringSet("qux"), - }, { - field: "Status.Phase=bazstatus", - expectedIDs: util.NewStringSet("baz"), - }, { - field: "Status.Host=barhost", - expectedIDs: util.NewStringSet("bar"), - }, { - field: "Status.Host=", - expectedIDs: util.NewStringSet("foo", "baz", "qux", "zot"), - }, { - field: "Status.Host!=", - expectedIDs: util.NewStringSet("bar"), - }, - } - - for index, item := range table { - label, err := labels.ParseSelector(item.label) - if err != nil { - t.Errorf("unexpected error: %v", err) - continue - } - field, err := labels.ParseSelector(item.field) - if err != nil { - t.Errorf("unexpected error: %v", err) - continue - } - podsObj, err := storage.List(ctx, label, field) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - pods := podsObj.(*api.PodList) - - if e, a := len(item.expectedIDs), len(pods.Items); e != a { - t.Errorf("%v: Expected %v, got %v", index, e, a) - } - for _, pod := range pods.Items { - if !item.expectedIDs.Has(pod.Name) { - t.Errorf("%v: Unexpected pod %v", index, pod.Name) - } - t.Logf("%v: Got pod Name: %v", index, pod.Name) - } - } -} - -func TestPodDecode(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, - } - expected := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - }, - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - DNSPolicy: api.DNSClusterFirst, - }, - } - body, err := latest.Codec.Encode(expected) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - actual := storage.New() - if err := latest.Codec.DecodeInto(body, actual); err != nil { - t.Fatalf("unexpected error: %v", err) - } - - if !reflect.DeepEqual(expected, actual) { - t.Errorf("Expected %#v, Got %#v", expected, actual) - } -} - -func TestGetPod(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Pod = &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Status: api.PodStatus{Host: "machine"}, - } - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}, - } - ctx := api.NewContext() - obj, err := storage.Get(ctx, "foo") - pod := obj.(*api.Pod) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - expect := *podRegistry.Pod - expect.Status.Phase = api.PodRunning - // TODO: when host is moved to spec, remove this line. - expect.Status.Host = "machine" - if e, a := &expect, pod; !reflect.DeepEqual(e, a) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a) - } -} - -func TestGetPodCacheError(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Pod = &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable}, - } - ctx := api.NewContext() - obj, err := storage.Get(ctx, "foo") - pod := obj.(*api.Pod) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - expect := *podRegistry.Pod - expect.Status.Phase = api.PodUnknown - if e, a := &expect, pod; !reflect.DeepEqual(e, a) { - t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a) - } -} - -// TODO: remove, this is covered by RESTTest.TestCreate -func TestPodStorageValidatesCreate(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Err = fmt.Errorf("test error") - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, - } - ctx := api.NewDefaultContext() - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{ - "invalid/label/to/cause/validation/failure": "bar", - }, - }, - } - c, err := storage.Create(ctx, pod) - if c != nil { - t.Errorf("Expected nil channel") - } - if !errors.IsInvalid(err) { - t.Errorf("Expected to get an invalid resource error, got %v", err) - } -} - -// TODO: remove, this is covered by RESTTest.TestCreate -func TestCreatePod(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Pod = &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + pod = &api.Pod{ Status: api.PodStatus{ - Host: "machine", + Host: "foo", }, } - storage := REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, - } - pod := &api.Pod{ - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - DNSPolicy: api.DNSClusterFirst, - }, - } - pod.Name = "foo" - ctx := api.NewDefaultContext() - obj, err := storage.Create(ctx, pod) - if err != nil { + if err := PodStatusDecorator(cache)(pod); err != nil { t.Fatalf("unexpected error: %v", err) } - if obj == nil { - t.Fatalf("unexpected object: %#v", obj) - } - if !api.HasObjectMetaSystemFieldValues(&podRegistry.Pod.ObjectMeta) { - t.Errorf("Expected ObjectMeta field values were populated") + if pod.Status.Phase != api.PodRunning || pod.Status.Host != "foo" { + t.Errorf("unexpected pod: %#v", pod) } } -// TODO: remove, this is covered by RESTTest.TestCreate -func TestCreatePodWithConflictingNamespace(t *testing.T) { - storage := REST{} - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "not-default"}, - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - DNSPolicy: api.DNSClusterFirst, - }, - } - - ctx := api.NewDefaultContext() - channel, err := storage.Create(ctx, pod) - if channel != nil { - t.Error("Expected a nil channel, but we got a value") - } - if err == nil { - t.Errorf("Expected an error, but we didn't get one") - } else if strings.Contains(err.Error(), "Controller.Namespace does not match the provided context") { - t.Errorf("Expected 'Pod.Namespace does not match the provided context' error, got '%v'", err.Error()) - } -} - -func TestUpdatePodWithConflictingNamespace(t *testing.T) { - storage := REST{} - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "not-default"}, - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - DNSPolicy: api.DNSClusterFirst, - }, - } - - ctx := api.NewDefaultContext() - obj, created, err := storage.Update(ctx, pod) - if obj != nil || created { - t.Error("Expected a nil channel, but we got a value or created") - } - if err == nil { - t.Errorf("Expected an error, but we didn't get one") - } else if strings.Index(err.Error(), "Pod.Namespace does not match the provided context") == -1 { - t.Errorf("Expected 'Pod.Namespace does not match the provided context' error, got '%v'", err.Error()) - } -} - -func TestResourceLocation(t *testing.T) { - expectedIP := "1.2.3.4" - testCases := []struct { - pod api.Pod - query string - location string - }{ - { - pod: api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - }, - query: "foo", - location: expectedIP, - }, - { - pod: api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - }, - query: "foo:12345", - location: expectedIP + ":12345", - }, - { - pod: api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "ctr"}, - }, - }, - }, - query: "foo", - location: expectedIP, - }, - { - pod: api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}}, - }, - }, - }, - query: "foo", - location: expectedIP + ":9376", - }, - { - pod: api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}}, - }, - }, - }, - query: "foo:12345", - location: expectedIP + ":12345", - }, - { - pod: api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "ctr1"}, - {Name: "ctr2", Ports: []api.Port{{ContainerPort: 9376}}}, - }, - }, - }, - query: "foo", - location: expectedIP + ":9376", - }, - { - pod: api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "ctr1", Ports: []api.Port{{ContainerPort: 9376}}}, - {Name: "ctr2", Ports: []api.Port{{ContainerPort: 1234}}}, - }, - }, - }, - query: "foo", - location: expectedIP + ":9376", - }, - } - - for _, tc := range testCases { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Pod = &tc.pod - storage := &REST{ - registry: podRegistry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{PodIP: expectedIP}}, - } - - redirector := apiserver.Redirector(storage) - location, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - - if location != tc.location { - t.Errorf("Expected %v, but got %v", tc.location, location) - } - } -} - -func TestDeletePod(t *testing.T) { - podRegistry := registrytest.NewPodRegistry(nil) - podRegistry.Pod = &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Status: api.PodStatus{Host: "machine"}, - } - fakeCache := &fakeCache{} - storage := REST{ - registry: podRegistry, - podCache: fakeCache, - } - ctx := api.NewDefaultContext() - result, err := storage.Delete(ctx, "foo") - if err != nil { +func TestPodStatusDecoratorError(t *testing.T) { + cache := &fakeCache{errorToReturn: fmt.Errorf("test error")} + pod := &api.Pod{} + if err := PodStatusDecorator(cache)(pod); err != nil { t.Fatalf("unexpected error: %v", err) } - if fakeCache.clearedNamespace != "default" || fakeCache.clearedName != "foo" { - t.Errorf("Unexpeceted cache delete: %s %s %#v", fakeCache.clearedName, fakeCache.clearedNamespace, result) + if pod.Status.Phase != api.PodUnknown { + t.Errorf("unexpected pod: %#v", pod) } } - -func TestCreate(t *testing.T) { - registry := registrytest.NewPodRegistry(nil) - test := resttest.New(t, &REST{ - registry: registry, - podCache: &fakeCache{statusToReturn: &api.PodStatus{}}, - }, registry.SetError) - test.TestCreate( - // valid - &api.Pod{ - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "test1", - Image: "foo", - ImagePullPolicy: api.PullIfNotPresent, - }, - }, - RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, - DNSPolicy: api.DNSClusterFirst, - }, - }, - // invalid - &api.Pod{ - Spec: api.PodSpec{ - Containers: []api.Container{}, - }, - }, - ) -}