Merge pull request #4248 from smarterclayton/remove_rest_handler

Create a new unified storage object for Pods
This commit is contained in:
Brian Grant 2015-02-17 09:42:48 -08:00
commit 3e012db59a
12 changed files with 1930 additions and 1862 deletions

View File

@ -25,17 +25,17 @@ import (
func TestCheckGeneratedNameError(t *testing.T) {
expect := errors.NewNotFound("foo", "bar")
if err := CheckGeneratedNameError(Pods, expect, &api.Pod{}); err != expect {
if err := CheckGeneratedNameError(Services, expect, &api.Pod{}); err != expect {
t.Errorf("NotFoundError should be ignored: %v", err)
}
expect = errors.NewAlreadyExists("foo", "bar")
if err := CheckGeneratedNameError(Pods, expect, &api.Pod{}); err != expect {
if err := CheckGeneratedNameError(Services, expect, &api.Pod{}); err != expect {
t.Errorf("AlreadyExists should be returned when no GenerateName field: %v", err)
}
expect = errors.NewAlreadyExists("foo", "bar")
if err := CheckGeneratedNameError(Pods, expect, &api.Pod{ObjectMeta: api.ObjectMeta{GenerateName: "foo"}}); err == nil || !errors.IsServerTimeout(err) {
if err := CheckGeneratedNameError(Services, expect, &api.Pod{ObjectMeta: api.ObjectMeta{GenerateName: "foo"}}); err == nil || !errors.IsServerTimeout(err) {
t.Errorf("expected try again later error: %v", err)
}
}

View File

@ -72,46 +72,6 @@ func (rcStrategy) Validate(obj runtime.Object) errors.ValidationErrorList {
return validation.ValidateReplicationController(controller)
}
// podStrategy implements behavior for Pods
// TODO: move to a pod specific package.
type podStrategy struct {
runtime.ObjectTyper
api.NameGenerator
}
// Pods is the default logic that applies when creating and updating Pod
// objects.
var Pods = podStrategy{api.Scheme, api.SimpleNameGenerator}
// NamespaceScoped is true for pods.
func (podStrategy) NamespaceScoped() bool {
return true
}
// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation.
func (podStrategy) ResetBeforeCreate(obj runtime.Object) {
pod := obj.(*api.Pod)
pod.Status = api.PodStatus{
Phase: api.PodPending,
}
}
// Validate validates a new pod.
func (podStrategy) Validate(obj runtime.Object) errors.ValidationErrorList {
pod := obj.(*api.Pod)
return validation.ValidatePod(pod)
}
// AllowCreateOnUpdate is false for pods.
func (podStrategy) AllowCreateOnUpdate() bool {
return false
}
// ValidateUpdate is the default update validation for an end user.
func (podStrategy) ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList {
return validation.ValidatePodUpdate(obj.(*api.Pod), old.(*api.Pod))
}
// svcStrategy implements behavior for Services
// TODO: move to a service specific package.
type svcStrategy struct {

View File

@ -42,7 +42,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd"
@ -52,6 +51,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
podetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequota"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequotausage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
@ -117,20 +117,9 @@ type Config struct {
// Master contains state for a Kubernetes cluster master/api server.
type Master struct {
// "Inputs", Copied from Config
podRegistry pod.Registry
controllerRegistry controller.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
minionRegistry minion.Registry
bindingRegistry binding.Registry
eventRegistry generic.Registry
limitRangeRegistry generic.Registry
resourceQuotaRegistry resourcequota.Registry
namespaceRegistry generic.Registry
storage map[string]apiserver.RESTStorage
client *client.Client
portalNet *net.IPNet
cacheTimeout time.Duration
client *client.Client
portalNet *net.IPNet
cacheTimeout time.Duration
mux apiserver.Mux
muxHelper *apiserver.MuxHelper
@ -157,6 +146,17 @@ type Master struct {
serviceReadWritePort int
masterServices *util.Runner
// storage contains the RESTful endpoints exposed by this master
storage map[string]apiserver.RESTStorage
// registries are internal client APIs for accessing the storage layer
// TODO: define the internal typed interface in a way that clients can
// also be replaced
nodeRegistry minion.Registry
namespaceRegistry generic.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
// "Outputs"
Handler http.Handler
InsecureHandler http.Handler
@ -260,7 +260,6 @@ func setDefaults(c *Config) {
// any unhandled paths to "Handler".
func New(c *Config) *Master {
setDefaults(c)
boundPodFactory := &pod.BasicBoundPodFactory{}
if c.KubeletClient == nil {
glog.Fatalf("master.New() called with config.KubeletClient == nil")
}
@ -277,16 +276,6 @@ func New(c *Config) *Master {
glog.Infof("Setting master service IPs based on PortalNet subnet to %q (read-only) and %q (read-write).", serviceReadOnlyIP, serviceReadWriteIP)
m := &Master{
podRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory),
controllerRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
serviceRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
bindingRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory),
eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())),
namespaceRegistry: namespace.NewEtcdRegistry(c.EtcdHelper),
minionRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
limitRangeRegistry: limitrange.NewEtcdRegistry(c.EtcdHelper),
resourceQuotaRegistry: resourcequota.NewEtcdRegistry(c.EtcdHelper),
client: c.Client,
portalNet: c.PortalNet,
rootWebService: new(restful.WebService),
@ -376,34 +365,51 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter)
// init initializes master.
func (m *Master) init(c *Config) {
nodeRESTStorage := minion.NewREST(m.minionRegistry)
boundPodFactory := &pod.BasicBoundPodFactory{}
podStorage, bindingStorage := podetcd.NewREST(c.EtcdHelper, boundPodFactory)
podRegistry := pod.NewRegistry(podStorage)
eventRegistry := event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds()))
limitRangeRegistry := limitrange.NewEtcdRegistry(c.EtcdHelper)
resourceQuotaRegistry := resourcequota.NewEtcdRegistry(c.EtcdHelper)
m.namespaceRegistry = namespace.NewEtcdRegistry(c.EtcdHelper)
// TODO: split me up into distinct storage registries
registry := etcd.NewRegistry(c.EtcdHelper, podRegistry)
m.serviceRegistry = registry
m.endpointRegistry = registry
m.nodeRegistry = registry
nodeStorage := minion.NewREST(m.nodeRegistry)
// TODO: unify the storage -> registry and storage -> client patterns
nodeStorageClient := RESTStorageToNodes(nodeStorage)
podCache := NewPodCache(
c.KubeletClient,
RESTStorageToNodes(nodeRESTStorage).Nodes(),
m.podRegistry,
nodeStorageClient.Nodes(),
podRegistry,
)
go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout)
go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30)
// TODO: refactor podCache to sit on top of podStorage via status calls
podStorage = podStorage.WithPodStatus(podCache)
// TODO: Factor out the core API registration
m.storage = map[string]apiserver.RESTStorage{
"pods": pod.NewREST(&pod.RESTConfig{
PodCache: podCache,
Registry: m.podRegistry,
}),
"replicationControllers": controller.NewREST(m.controllerRegistry, m.podRegistry),
"services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry, m.portalNet),
"pods": podStorage,
"bindings": bindingStorage,
"replicationControllers": controller.NewREST(registry, podRegistry),
"services": service.NewREST(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.portalNet),
"endpoints": endpoint.NewREST(m.endpointRegistry),
"minions": nodeRESTStorage,
"nodes": nodeRESTStorage,
"events": event.NewREST(m.eventRegistry),
"minions": nodeStorage,
"nodes": nodeStorage,
"events": event.NewREST(eventRegistry),
// TODO: should appear only in scheduler API group.
"bindings": binding.NewREST(m.bindingRegistry),
"limitRanges": limitrange.NewREST(m.limitRangeRegistry),
"resourceQuotas": resourcequota.NewREST(m.resourceQuotaRegistry),
"resourceQuotaUsages": resourcequotausage.NewREST(m.resourceQuotaRegistry),
"limitRanges": limitrange.NewREST(limitRangeRegistry),
"resourceQuotas": resourcequota.NewREST(resourceQuotaRegistry),
"resourceQuotaUsages": resourcequotausage.NewREST(resourceQuotaRegistry),
"namespaces": namespace.NewREST(m.namespaceRegistry),
}
@ -542,7 +548,7 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
}
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/v2/keys/"}
}
nodes, err := m.minionRegistry.ListMinions(api.NewDefaultContext())
nodes, err := m.nodeRegistry.ListMinions(api.NewDefaultContext())
if err != nil {
glog.Errorf("Failed to list minions: %v", err)
}

View File

@ -32,7 +32,7 @@ func TestGetServersToValidate(t *testing.T) {
fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil}
master.minionRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})
master.nodeRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})
servers := master.getServersToValidate(&config)

View File

@ -20,10 +20,7 @@ import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/constraint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
@ -34,8 +31,6 @@ import (
)
const (
// PodPath is the path to pod resources in etcd
PodPath string = "/registry/pods"
// ControllerPath is the path to controller resources in etcd
ControllerPath string = "/registry/controllers"
// ServicePath is the path to service resources in etcd
@ -53,15 +48,15 @@ const (
// MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd.
type Registry struct {
tools.EtcdHelper
boundPodFactory pod.BoundPodFactory
pods pod.Registry
}
// NewRegistry creates an etcd registry.
func NewRegistry(helper tools.EtcdHelper, boundPodFactory pod.BoundPodFactory) *Registry {
func NewRegistry(helper tools.EtcdHelper, pods pod.Registry) *Registry {
registry := &Registry{
EtcdHelper: helper,
pods: pods,
}
registry.boundPodFactory = boundPodFactory
return registry
}
@ -89,239 +84,6 @@ func MakeEtcdItemKey(ctx api.Context, prefix string, id string) (string, error)
return key, nil
}
// makePodListKey constructs etcd paths to pod directories enforcing namespace rules.
func makePodListKey(ctx api.Context) string {
return MakeEtcdListKey(ctx, PodPath)
}
// makePodKey constructs etcd paths to pod items enforcing namespace rules.
func makePodKey(ctx api.Context, id string) (string, error) {
return MakeEtcdItemKey(ctx, PodPath, id)
}
// ListPods obtains a list of pods with labels that match selector.
func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool {
return selector.Matches(labels.Set(pod.Labels))
})
}
// ListPodsPredicate obtains a list of pods that match filter.
func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) {
allPods := api.PodList{}
key := makePodListKey(ctx)
err := r.ExtractToList(key, &allPods)
if err != nil {
return nil, err
}
filtered := []api.Pod{}
for _, pod := range allPods.Items {
if filter(&pod) {
filtered = append(filtered, pod)
}
}
allPods.Items = filtered
return &allPods, nil
}
// WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod")
if err != nil {
return nil, err
}
key := makePodListKey(ctx)
return r.WatchList(key, version, func(obj runtime.Object) bool {
podObj, ok := obj.(*api.Pod)
if !ok {
// Must be an error: return true to propagate to upper level.
return true
}
fields := pod.PodToSelectableFields(podObj)
return label.Matches(labels.Set(podObj.Labels)) && field.Matches(fields)
})
}
// GetPod gets a specific pod specified by its ID.
func (r *Registry) GetPod(ctx api.Context, id string) (*api.Pod, error) {
var pod api.Pod
key, err := makePodKey(ctx, id)
if err != nil {
return nil, err
}
if err = r.ExtractObj(key, &pod, false); err != nil {
return nil, etcderr.InterpretGetError(err, "pod", id)
}
return &pod, nil
}
func makeBoundPodsKey(machine string) string {
return "/registry/nodes/" + machine + "/boundpods"
}
// CreatePod creates a pod based on a specification.
func (r *Registry) CreatePod(ctx api.Context, pod *api.Pod) error {
// Set current status to "Waiting".
pod.Status.Phase = api.PodPending
pod.Status.Host = ""
key, err := makePodKey(ctx, pod.Name)
if err != nil {
return err
}
err = r.CreateObj(key, pod, 0)
return etcderr.InterpretCreateError(err, "pod", pod.Name)
}
// ApplyBinding implements binding's registry
func (r *Registry) ApplyBinding(ctx api.Context, binding *api.Binding) error {
return etcderr.InterpretCreateError(r.assignPod(ctx, binding.PodID, binding.Host), "binding", "")
}
// setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'.
// Returns the current state of the pod, or an error.
func (r *Registry) setPodHostTo(ctx api.Context, podID, oldMachine, machine string) (finalPod *api.Pod, err error) {
podKey, err := makePodKey(ctx, podID)
if err != nil {
return nil, err
}
err = r.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
if pod.Status.Host != oldMachine {
return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.Name, pod.Status.Host)
}
pod.Status.Host = machine
finalPod = pod
return pod, nil
})
return finalPod, err
}
// assignPod assigns the given pod to the given machine.
func (r *Registry) assignPod(ctx api.Context, podID string, machine string) error {
finalPod, err := r.setPodHostTo(ctx, podID, "", machine)
if err != nil {
return err
}
boundPod, err := r.boundPodFactory.MakeBoundPod(machine, finalPod)
if err != nil {
return err
}
// Doing the constraint check this way provides atomicity guarantees.
contKey := makeBoundPodsKey(machine)
err = r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPodList := in.(*api.BoundPods)
boundPodList.Items = append(boundPodList.Items, *boundPod)
if errors := constraint.Allowed(boundPodList.Items); len(errors) > 0 {
return nil, fmt.Errorf("the assignment would cause the following constraints violation: %v", errors)
}
return boundPodList, nil
})
if err != nil {
// Put the pod's host back the way it was. This is a terrible hack, but
// can't really be helped, since there's not really a way to do atomic
// multi-object changes in etcd.
if _, err2 := r.setPodHostTo(ctx, podID, machine, ""); err2 != nil {
glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2)
}
}
return err
}
func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
var podOut api.Pod
podKey, err := makePodKey(ctx, pod.Name)
if err != nil {
return err
}
err = r.EtcdHelper.ExtractObj(podKey, &podOut, false)
if err != nil {
return err
}
scheduled := podOut.Status.Host != ""
if scheduled {
pod.Status.Host = podOut.Status.Host
// If it's already been scheduled, limit the types of updates we'll accept.
errs := validation.ValidatePodUpdate(pod, &podOut)
if len(errs) != 0 {
return errors.NewInvalid("Pod", pod.Name, errs)
}
}
// There's no race with the scheduler, because either this write will fail because the host
// has been updated, or the host update will fail because this pod has been updated.
err = r.EtcdHelper.SetObj(podKey, pod, 0 /* ttl */)
if err != nil {
return err
}
if !scheduled {
// never scheduled, just update.
return nil
}
containerKey := makeBoundPodsKey(podOut.Status.Host)
return r.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPods := in.(*api.BoundPods)
for ix := range boundPods.Items {
item := &boundPods.Items[ix]
if item.Name == pod.Name && item.Namespace == pod.Namespace {
item.Spec = pod.Spec
return boundPods, nil
}
}
// This really shouldn't happen
glog.Warningf("Couldn't find: %s in %#v", pod.Name, boundPods)
return boundPods, fmt.Errorf("failed to update pod, couldn't find %s in %#v", pod.Name, boundPods)
})
}
// DeletePod deletes an existing pod specified by its ID.
func (r *Registry) DeletePod(ctx api.Context, podID string) error {
var pod api.Pod
podKey, err := makePodKey(ctx, podID)
if err != nil {
return err
}
err = r.ExtractObj(podKey, &pod, false)
if err != nil {
return etcderr.InterpretDeleteError(err, "pod", podID)
}
// First delete the pod, so a scheduler doesn't notice it getting removed from the
// machine and attempt to put it somewhere.
err = r.Delete(podKey, true)
if err != nil {
return etcderr.InterpretDeleteError(err, "pod", podID)
}
machine := pod.Status.Host
if machine == "" {
// Pod was never scheduled anywhere, just return.
return nil
}
// Next, remove the pod from the machine atomically.
contKey := makeBoundPodsKey(machine)
return r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
pods := in.(*api.BoundPods)
newPods := make([]api.BoundPod, 0, len(pods.Items))
found := false
for _, item := range pods.Items {
if item.Name != pod.Name || item.Namespace != pod.Namespace {
newPods = append(newPods, item)
} else {
found = true
}
}
if !found {
// This really shouldn't happen, it indicates something is broken, and likely
// there is a lost pod somewhere.
// However it is "deleted" so log it and move on
glog.Warningf("Couldn't find: %s in %#v", podID, pods)
}
pods.Items = newPods
return pods, nil
})
}
// ListControllers obtains a list of ReplicationControllers.
func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) {
controllers := &api.ReplicationControllerList{}
@ -348,7 +110,7 @@ func (r *Registry) WatchControllers(ctx api.Context, label, field labels.Selecto
}
match := label.Matches(labels.Set(controller.Labels))
if match {
pods, err := r.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
pods, err := r.pods.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
if err != nil {
glog.Warningf("Error listing pods: %v", err)
// No object that's useable so drop it on the floor

View File

@ -18,7 +18,6 @@ package etcd
import (
"strconv"
"strings"
"testing"
"time"
@ -26,7 +25,9 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
podetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
@ -34,791 +35,15 @@ import (
)
func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicBoundPodFactory{})
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, nil)
return registry
}
// TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash
func TestEtcdGetPodDifferentNamespace(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
ctx1 := api.NewDefaultContext()
ctx2 := api.WithNamespace(api.NewContext(), "other")
key1, _ := makePodKey(ctx1, "foo")
key2, _ := makePodKey(ctx2, "foo")
fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: "default", Name: "foo"}}), 0)
fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: "other", Name: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient)
pod1, err := registry.GetPod(ctx1, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod1.Name != "foo" {
t.Errorf("Unexpected pod: %#v", pod1)
}
if pod1.Namespace != "default" {
t.Errorf("Unexpected pod: %#v", pod1)
}
pod2, err := registry.GetPod(ctx2, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod2.Name != "foo" {
t.Errorf("Unexpected pod: %#v", pod2)
}
if pod2.Namespace != "other" {
t.Errorf("Unexpected pod: %#v", pod2)
}
}
func TestEtcdGetPod(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
key, _ := makePodKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient)
pod, err := registry.GetPod(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v", pod)
}
}
func TestEtcdGetPodNotFound(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
_, err := registry.GetPod(ctx, "foo")
if !errors.IsNotFound(err) {
t.Errorf("Unexpected error returned: %#v", err)
}
}
func TestEtcdCreatePod(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine", ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
resp, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var pod api.Pod
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].Name != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
func TestEtcdCreatePodFailsWithoutNamespace(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(api.NewContext(), &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
// Accept "namespace" or "Namespace".
if err == nil || !strings.Contains(err.Error(), "amespace") {
t.Fatalf("expected error that namespace was missing from context, got: %v", err)
}
}
func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}),
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
})
if !errors.IsAlreadyExists(err) {
t.Errorf("Unexpected error returned: %#v", err)
}
}
func TestEtcdCreatePodWithContainersError(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNodeExist, // validate that ApplyBinding is translating Create errors
}
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
if !errors.IsAlreadyExists(err) {
t.Fatalf("Unexpected error returned: %#v", err)
}
existingPod, err := registry.GetPod(ctx, "foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if existingPod.Status.Host == "machine" {
t.Fatal("Pod's host changed in response to an non-apply-able binding.")
}
}
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
resp, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var pod api.Pod
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].Name != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{Name: "bar"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
resp, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var pod api.Pod
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 2 || boundPods.Items[1].Name != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
func TestEtcdUpdatePodNotFound(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
podIn := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
}
err := registry.UpdatePod(ctx, &podIn)
if err == nil {
t.Errorf("unexpected non-error")
}
}
func TestEtcdUpdatePodNotScheduled(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}), 1)
registry := NewTestEtcdRegistry(fakeClient)
podIn := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
err := registry.UpdatePod(ctx, &podIn)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
response, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var podOut api.Pod
latest.Codec.DecodeInto([]byte(response.Node.Value), &podOut)
if !api.Semantic.DeepEqual(podOut, podIn) {
t.Errorf("expected: %v, got: %v", podOut, podIn)
}
}
func TestEtcdUpdatePodScheduled(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.PodSpec{
// Host: "machine",
Containers: []api.Container{
{
Image: "foo:v1",
},
},
},
Status: api.PodStatus{
Host: "machine",
},
}), 1)
contKey := "/registry/nodes/machine/boundpods"
fakeClient.Set(contKey, runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo:v1",
},
},
},
}, {
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo:v1",
},
},
},
},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
podIn := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo:v2",
ImagePullPolicy: api.PullIfNotPresent,
TerminationMessagePath: api.TerminationMessagePathDefault,
},
},
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
Status: api.PodStatus{
Host: "machine",
},
}
err := registry.UpdatePod(ctx, &podIn)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
response, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var podOut api.Pod
latest.Codec.DecodeInto([]byte(response.Node.Value), &podOut)
if !api.Semantic.DeepEqual(podOut, podIn) {
t.Errorf("expected: %#v, got: %#v", podOut, podIn)
}
response, err = fakeClient.Get(contKey, false, false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var list api.BoundPods
if err := latest.Codec.DecodeInto([]byte(response.Node.Value), &list); err != nil {
t.Fatalf("unexpected error decoding response: %v", err)
}
if len(list.Items) != 2 || !api.Semantic.DeepEqual(list.Items[1].Spec, podIn.Spec) {
t.Errorf("unexpected container list: %d\n items[0] - %#v\n podin.spec - %#v\n", len(list.Items), list.Items[1].Spec, podIn.Spec)
}
}
func TestEtcdDeletePod(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine"},
}), 0)
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.DeletePod(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
} else if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 0 {
t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value)
}
}
func TestEtcdDeletePodMultipleContainers(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine"},
}), 0)
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}},
{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.DeletePod(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
}
if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 1 {
t.Fatalf("Unexpected boundPod set: %#v, expected empty", boundPods)
}
if boundPods.Items[0].Namespace != "other" {
t.Errorf("Deleted wrong boundPod: %#v", boundPods)
}
}
func TestEtcdEmptyListPods(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
ctx := api.NewDefaultContext()
key := makePodListKey(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
pods, err := registry.ListPods(ctx, labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(pods.Items) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestEtcdListPodsNotFound(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
ctx := api.NewDefaultContext()
key := makePodListKey(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
pods, err := registry.ListPods(ctx, labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(pods.Items) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestEtcdListPods(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
ctx := api.NewDefaultContext()
key := makePodListKey(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Status: api.PodStatus{Host: "machine"},
}),
},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
pods, err := registry.ListPods(ctx, labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(pods.Items) != 2 || pods.Items[0].Name != "foo" || pods.Items[1].Name != "bar" {
t.Errorf("Unexpected pod list: %#v", pods)
}
if pods.Items[0].Status.Host != "machine" ||
pods.Items[1].Status.Host != "machine" {
t.Errorf("Failed to populate host name.")
}
}
func TestEtcdWatchPods(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.Everything(),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchPodsMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: map[string]string{
"name": "foo",
},
},
}
podBytes, _ := latest.Codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
func TestEtcdWatchPodsNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
podBytes, _ := latest.Codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry {
helper := tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
podStorage, _ := podetcd.NewREST(helper, nil)
registry := NewRegistry(helper, pod.NewRegistry(podStorage))
return registry
}
func TestEtcdListControllersNotFound(t *testing.T) {
@ -1081,8 +306,8 @@ func TestEtcdWatchController(t *testing.T) {
func TestEtcdWatchControllersMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.ExpectNotFoundGet(makePodListKey(ctx))
registry := NewTestEtcdRegistry(fakeClient)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
registry := NewTestEtcdRegistryWithPods(fakeClient)
watching, err := registry.WatchControllers(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
@ -1122,8 +347,8 @@ func TestEtcdWatchControllersMatch(t *testing.T) {
func TestEtcdWatchControllersNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.ExpectNotFoundGet(makePodListKey(ctx))
registry := NewTestEtcdRegistry(fakeClient)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
registry := NewTestEtcdRegistryWithPods(fakeClient)
watching, err := registry.WatchControllers(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),

View File

@ -43,6 +43,8 @@ import (
// ResourceVersion and semantics. The RESTCreateStrategy and
// RESTUpdateStrategy are generic across all backends, and encapsulate
// logic specific to the API.
//
// TODO: make the default exposed methods exactly match a generic RESTStorage
type Etcd struct {
// Called to make a new object, should return e.g., &api.Pod{}
NewFunc func() runtime.Object
@ -116,6 +118,8 @@ func NamespaceKeyFunc(ctx api.Context, prefix string, name string) (string, erro
}
// List returns a list of all the items matching m.
// TODO: rename this to ListPredicate, take the default predicate function on the constructor, and
// introduce a List method that uses the default predicate function
func (e *Etcd) List(ctx api.Context, m generic.Matcher) (runtime.Object, error) {
list := e.NewListFunc()
err := e.Helper.ExtractToList(e.KeyRootFunc(ctx), list)
@ -126,6 +130,7 @@ func (e *Etcd) List(ctx api.Context, m generic.Matcher) (runtime.Object, error)
}
// CreateWithName inserts a new item with the provided name
// DEPRECATED: use Create instead
func (e *Etcd) CreateWithName(ctx api.Context, name string, obj runtime.Object) error {
key, err := e.KeyFunc(ctx, name)
if err != nil {
@ -191,6 +196,7 @@ func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, erro
}
// UpdateWithName updates the item with the provided name
// DEPRECATED: use Update instead
func (e *Etcd) UpdateWithName(ctx api.Context, name string, obj runtime.Object) error {
key, err := e.KeyFunc(ctx, name)
if err != nil {
@ -323,6 +329,8 @@ func (e *Etcd) Delete(ctx api.Context, name string) (runtime.Object, error) {
// Watch starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list.
// TODO: rename this to WatchPredicate, take the default predicate function on the constructor, and
// introduce a Watch method that uses the default predicate function
func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
if err != nil {

View File

@ -0,0 +1,252 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/constraint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// rest implements a RESTStorage for pods against etcd
type REST struct {
store *etcdgeneric.Etcd
}
// NewREST returns a RESTStorage object that will work against pods.
func NewREST(h tools.EtcdHelper, factory pod.BoundPodFactory) (*REST, *BindingREST) {
prefix := "/registry/pods"
bindings := &podLifecycle{h}
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
KeyRootFunc: func(ctx api.Context) string {
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
},
KeyFunc: func(ctx api.Context, name string) (string, error) {
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name)
},
ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*api.Pod).Name, nil
},
EndpointName: "pods",
CreateStrategy: pod.Strategy,
UpdateStrategy: pod.Strategy,
AfterUpdate: bindings.AfterUpdate,
ReturnDeletedObject: true,
AfterDelete: bindings.AfterDelete,
Helper: h,
}
return &REST{store: store}, &BindingREST{store: store, factory: factory}
}
// WithPodStatus returns a rest object that decorates returned responses with extra
// status information.
func (r *REST) WithPodStatus(cache pod.PodStatusGetter) *REST {
store := *r.store
store.Decorator = pod.PodStatusDecorator(cache)
store.AfterDelete = rest.AllFuncs(store.AfterDelete, pod.PodStatusReset(cache))
return &REST{store: &store}
}
// New returns a new object
func (r *REST) New() runtime.Object {
return r.store.NewFunc()
}
// NewList returns a new list object
func (r *REST) NewList() runtime.Object {
return r.store.NewListFunc()
}
// List obtains a list of pods with labels that match selector.
func (r *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
return r.store.List(ctx, pod.MatchPod(label, field))
}
// Watch begins watching for new, changed, or deleted pods.
func (r *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return r.store.Watch(ctx, pod.MatchPod(label, field), resourceVersion)
}
// Get gets a specific pod specified by its ID.
func (r *REST) Get(ctx api.Context, name string) (runtime.Object, error) {
return r.store.Get(ctx, name)
}
// Create creates a pod based on a specification.
func (r *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
return r.store.Create(ctx, obj)
}
// Update changes a pod specification.
func (r *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
return r.store.Update(ctx, obj)
}
// Delete deletes an existing pod specified by its ID.
func (r *REST) Delete(ctx api.Context, name string) (runtime.Object, error) {
return r.store.Delete(ctx, name)
}
// ResourceLocation returns a pods location from its HostIP
func (r *REST) ResourceLocation(ctx api.Context, name string) (string, error) {
return pod.ResourceLocation(r, ctx, name)
}
func makeBoundPodsKey(machine string) string {
return "/registry/nodes/" + machine + "/boundpods"
}
// BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
type BindingREST struct {
store *etcdgeneric.Etcd
factory pod.BoundPodFactory
}
func (r *BindingREST) New() runtime.Object {
return &api.Binding{}
}
// Create ensures a pod is bound to a specific host.
func (r *BindingREST) Create(ctx api.Context, obj runtime.Object) (out runtime.Object, err error) {
binding := obj.(*api.Binding)
err = r.assignPod(ctx, binding.PodID, binding.Host)
err = etcderr.InterpretCreateError(err, "binding", "")
out = &api.Status{Status: api.StatusSuccess}
return
}
// setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'.
// Returns the current state of the pod, or an error.
func (r *BindingREST) setPodHostTo(ctx api.Context, podID, oldMachine, machine string) (finalPod *api.Pod, err error) {
podKey, err := r.store.KeyFunc(ctx, podID)
if err != nil {
return nil, err
}
err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
if pod.Status.Host != oldMachine {
return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.Name, pod.Status.Host)
}
pod.Status.Host = machine
finalPod = pod
return pod, nil
})
return finalPod, err
}
// assignPod assigns the given pod to the given machine.
func (r *BindingREST) assignPod(ctx api.Context, podID string, machine string) error {
finalPod, err := r.setPodHostTo(ctx, podID, "", machine)
if err != nil {
return err
}
boundPod, err := r.factory.MakeBoundPod(machine, finalPod)
if err != nil {
return err
}
// Doing the constraint check this way provides atomicity guarantees.
contKey := makeBoundPodsKey(machine)
err = r.store.Helper.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPodList := in.(*api.BoundPods)
boundPodList.Items = append(boundPodList.Items, *boundPod)
if errors := constraint.Allowed(boundPodList.Items); len(errors) > 0 {
return nil, fmt.Errorf("the assignment would cause the following constraints violation: %v", errors)
}
return boundPodList, nil
})
if err != nil {
// Put the pod's host back the way it was. This is a terrible hack, but
// can't really be helped, since there's not really a way to do atomic
// multi-object changes in etcd.
if _, err2 := r.setPodHostTo(ctx, podID, machine, ""); err2 != nil {
glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2)
}
}
return err
}
type podLifecycle struct {
tools.EtcdHelper
}
func (h *podLifecycle) AfterUpdate(obj runtime.Object) error {
pod := obj.(*api.Pod)
if len(pod.Status.Host) == 0 {
return nil
}
containerKey := makeBoundPodsKey(pod.Status.Host)
return h.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPods := in.(*api.BoundPods)
for ix := range boundPods.Items {
if boundPods.Items[ix].Name == pod.Name && boundPods.Items[ix].Namespace == pod.Namespace {
boundPods.Items[ix].Spec = pod.Spec
return boundPods, nil
}
}
// This really shouldn't happen
glog.Warningf("Couldn't find: %s in %#v", pod.Name, boundPods)
return boundPods, fmt.Errorf("failed to update pod, couldn't find %s in %#v", pod.Name, boundPods)
})
}
func (h *podLifecycle) AfterDelete(obj runtime.Object) error {
pod := obj.(*api.Pod)
if len(pod.Status.Host) == 0 {
return nil
}
containerKey := makeBoundPodsKey(pod.Status.Host)
return h.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
pods := in.(*api.BoundPods)
newPods := make([]api.BoundPod, 0, len(pods.Items))
found := false
for _, boundPod := range pods.Items {
if boundPod.Name != pod.Name || boundPod.Namespace != pod.Namespace {
newPods = append(newPods, boundPod)
} else {
found = true
}
}
if !found {
// This really shouldn't happen, it indicates something is broken, and likely
// there is a lost pod somewhere.
// However it is "deleted" so log it and move on
glog.Warningf("Couldn't find: %s in %#v", pod.Name, pods)
}
pods.Items = newPods
return pods, nil
})
}

File diff suppressed because it is too large Load Diff

View File

@ -18,7 +18,9 @@ package pod
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
@ -26,8 +28,6 @@ import (
type Registry interface {
// ListPods obtains a list of pods having labels which match selector.
ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error)
// ListPodsPredicate obtains a list of pods for which filter returns true.
ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error)
// Watch for new/changed/deleted pods
WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
// Get a specific pod
@ -39,3 +39,61 @@ type Registry interface {
// Delete an existing pod
DeletePod(ctx api.Context, podID string) error
}
// Storage is an interface for a standard REST Storage backend
// TODO: move me somewhere common
type Storage interface {
apiserver.RESTDeleter
apiserver.RESTLister
apiserver.RESTGetter
apiserver.ResourceWatcher
Create(ctx api.Context, obj runtime.Object) (runtime.Object, error)
Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error)
}
// storage puts strong typing around storage calls
type storage struct {
Storage
}
// NewRegistry returns a new Registry interface for the given Storage. Any mismatched
// types will panic.
func NewRegistry(s Storage) Registry {
return &storage{s}
}
func (s *storage) ListPods(ctx api.Context, label labels.Selector) (*api.PodList, error) {
obj, err := s.List(ctx, label, labels.Everything())
if err != nil {
return nil, err
}
return obj.(*api.PodList), nil
}
func (s *storage) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return s.Watch(ctx, label, field, resourceVersion)
}
func (s *storage) GetPod(ctx api.Context, podID string) (*api.Pod, error) {
obj, err := s.Get(ctx, podID)
if err != nil {
return nil, err
}
return obj.(*api.Pod), nil
}
func (s *storage) CreatePod(ctx api.Context, pod *api.Pod) error {
_, err := s.Create(ctx, pod)
return err
}
func (s *storage) UpdatePod(ctx api.Context, pod *api.Pod) error {
_, _, err := s.Update(ctx, pod)
return err
}
func (s *storage) DeletePod(ctx api.Context, podID string) error {
_, err := s.Delete(ctx, podID)
return err
}

View File

@ -26,82 +26,100 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// podStrategy implements behavior for Pods
// TODO: move to a pod specific package.
type podStrategy struct {
runtime.ObjectTyper
api.NameGenerator
}
// Strategy is the default logic that applies when creating and updating Pod
// objects via the REST API.
// TODO: Create other strategies for updating status, bindings, etc
var Strategy = podStrategy{api.Scheme, api.SimpleNameGenerator}
// NamespaceScoped is true for pods.
func (podStrategy) NamespaceScoped() bool {
return true
}
// ResetBeforeCreate clears fields that are not allowed to be set by end users on creation.
func (podStrategy) ResetBeforeCreate(obj runtime.Object) {
pod := obj.(*api.Pod)
pod.Status = api.PodStatus{
Phase: api.PodPending,
}
}
// Validate validates a new pod.
func (podStrategy) Validate(obj runtime.Object) errors.ValidationErrorList {
pod := obj.(*api.Pod)
return validation.ValidatePod(pod)
}
// AllowCreateOnUpdate is false for pods.
func (podStrategy) AllowCreateOnUpdate() bool {
return false
}
// ValidateUpdate is the default update validation for an end user.
func (podStrategy) ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList {
return validation.ValidatePodUpdate(obj.(*api.Pod), old.(*api.Pod))
}
// PodStatusGetter is an interface used by Pods to fetch and retrieve status info.
type PodStatusGetter interface {
GetPodStatus(namespace, name string) (*api.PodStatus, error)
ClearPodStatus(namespace, name string)
}
// REST implements the RESTStorage interface in terms of a PodRegistry.
type REST struct {
podCache PodStatusGetter
registry Registry
}
type RESTConfig struct {
PodCache PodStatusGetter
Registry Registry
}
// NewREST returns a new REST.
func NewREST(config *RESTConfig) *REST {
return &REST{
podCache: config.PodCache,
registry: config.Registry,
}
}
func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
pod := obj.(*api.Pod)
if err := rest.BeforeCreate(rest.Pods, ctx, obj); err != nil {
return nil, err
}
if err := rs.registry.CreatePod(ctx, pod); err != nil {
err = rest.CheckGeneratedNameError(rest.Pods, err, pod)
return nil, err
}
return rs.registry.GetPod(ctx, pod.Name)
}
func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
namespace, found := api.NamespaceFrom(ctx)
if !found {
return &api.Status{Status: api.StatusFailure}, nil
}
rs.podCache.ClearPodStatus(namespace, id)
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(ctx, id)
}
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
pod, err := rs.registry.GetPod(ctx, id)
if err != nil {
return pod, err
}
if pod == nil {
return pod, nil
}
host := pod.Status.Host
if status, err := rs.podCache.GetPodStatus(pod.Namespace, pod.Name); err != nil {
pod.Status = api.PodStatus{
Phase: api.PodUnknown,
// PodStatusDecorator returns a function that updates pod.Status based
// on the provided pod cache.
func PodStatusDecorator(cache PodStatusGetter) rest.ObjectFunc {
return func(obj runtime.Object) error {
pod := obj.(*api.Pod)
host := pod.Status.Host
if status, err := cache.GetPodStatus(pod.Namespace, pod.Name); err != nil {
pod.Status = api.PodStatus{
Phase: api.PodUnknown,
}
} else {
pod.Status = *status
}
} else {
pod.Status = *status
pod.Status.Host = host
return nil
}
// Make sure not to hide a recent host with an old one from the cache.
// TODO: move host to spec
pod.Status.Host = host
return pod, err
}
func PodToSelectableFields(pod *api.Pod) labels.Set {
// PodStatusReset returns a function that clears the pod cache when the object
// is deleted.
func PodStatusReset(cache PodStatusGetter) rest.ObjectFunc {
return func(obj runtime.Object) error {
pod := obj.(*api.Pod)
cache.ClearPodStatus(pod.Namespace, pod.Name)
return nil
}
}
// MatchPod returns a generic matcher for a given label and field selector.
func MatchPod(label, field labels.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {
podObj, ok := obj.(*api.Pod)
if !ok {
return false, fmt.Errorf("not a pod")
}
fields := PodToSelectableFields(podObj)
return label.Matches(labels.Set(podObj.Labels)) && field.Matches(fields), nil
})
}
// PodToSelectableFields returns a label set that represents the object
// TODO: fields are not labels, and the validation rules for them do not apply.
func PodToSelectableFields(pod *api.Pod) labels.Set {
// TODO we are populating both Status and DesiredState because selectors are not aware of API versions
// see https://github.com/GoogleCloudPlatform/kubernetes/pull/2503
@ -117,68 +135,13 @@ func PodToSelectableFields(pod *api.Pod) labels.Set {
}
}
// filterFunc returns a predicate based on label & field selectors that can be passed to registry's
// ListPods & WatchPods.
func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool {
return func(pod *api.Pod) bool {
fields := PodToSelectableFields(pod)
return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
}
}
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
pods, err := rs.registry.ListPodsPredicate(ctx, rs.filterFunc(label, field))
if err == nil {
for i := range pods.Items {
pod := &pods.Items[i]
host := pod.Status.Host
if status, err := rs.podCache.GetPodStatus(pod.Namespace, pod.Name); err != nil {
pod.Status = api.PodStatus{
Phase: api.PodUnknown,
}
} else {
pod.Status = *status
}
// Make sure not to hide a recent host with an old one from the cache.
// This is tested by the integration test.
// TODO: move host to spec
pod.Status.Host = host
}
}
return pods, err
}
// Watch begins watching for new, changed, or deleted pods.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
// TODO: Add pod status to watch command
return rs.registry.WatchPods(ctx, label, field, resourceVersion)
}
func (*REST) New() runtime.Object {
return &api.Pod{}
}
func (*REST) NewList() runtime.Object {
return &api.PodList{}
}
func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
pod := obj.(*api.Pod)
if !api.ValidNamespace(ctx, &pod.ObjectMeta) {
return nil, false, errors.NewConflict("pod", pod.Namespace, fmt.Errorf("Pod.Namespace does not match the provided context"))
}
if errs := validation.ValidatePod(pod); len(errs) > 0 {
return nil, false, errors.NewInvalid("pod", pod.Name, errs)
}
if err := rs.registry.UpdatePod(ctx, pod); err != nil {
return nil, false, err
}
out, err := rs.registry.GetPod(ctx, pod.Name)
return out, false, err
// ResourceGetter is an interface for retrieving resources by ResourceLocation.
type ResourceGetter interface {
Get(api.Context, string) (runtime.Object, error)
}
// ResourceLocation returns a URL to which one can send traffic for the specified pod.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string, error) {
// Allow ID as "podname" or "podname:port". If port is not specified,
// try to use the first defined port on the pod.
parts := strings.Split(id, ":")
@ -192,7 +155,7 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
port = parts[1]
}
obj, err := rs.Get(ctx, name)
obj, err := getter.Get(ctx, name)
if err != nil {
return "", err
}

View File

@ -18,20 +18,9 @@ package pod
import (
"fmt"
"reflect"
"strings"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
type fakeCache struct {
@ -55,626 +44,35 @@ func (f *fakeCache) ClearPodStatus(namespace, name string) {
f.clearedName = name
}
func expectApiStatusError(t *testing.T, out runtime.Object, msg string) {
status, ok := out.(*api.Status)
if !ok {
t.Errorf("Expected an api.Status object, was %#v", out)
return
}
if msg != status.Message {
t.Errorf("Expected %#v, was %s", msg, status.Message)
}
}
func expectPod(t *testing.T, out runtime.Object) (*api.Pod, bool) {
pod, ok := out.(*api.Pod)
if !ok || pod == nil {
t.Errorf("Expected an api.Pod object, was %#v", out)
return nil, false
}
return pod, true
}
func TestCreatePodRegistryError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
ctx := api.NewDefaultContext()
_, err := storage.Create(ctx, pod)
if err != podRegistry.Err {
func TestPodStatusDecorator(t *testing.T) {
cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}
pod := &api.Pod{}
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestCreatePodSetsIds(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
if pod.Status.Phase != api.PodRunning {
t.Errorf("unexpected pod: %#v", pod)
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
ctx := api.NewDefaultContext()
_, err := storage.Create(ctx, pod)
if err != podRegistry.Err {
t.Fatalf("unexpected error: %v", err)
}
if len(podRegistry.Pod.Name) == 0 {
t.Errorf("Expected pod ID to be set, Got %#v", pod)
}
if pod.Name != podRegistry.Pod.Name {
t.Errorf("Expected manifest ID to be equal to pod ID, Got %#v", pod)
}
}
func TestCreatePodSetsUID(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
ctx := api.NewDefaultContext()
_, err := storage.Create(ctx, pod)
if err != podRegistry.Err {
t.Fatalf("unexpected error: %v", err)
}
if len(podRegistry.Pod.UID) == 0 {
t.Errorf("Expected pod UID to be set, Got %#v", pod)
}
}
func TestListPodsError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewContext()
pods, err := storage.List(ctx, labels.Everything(), labels.Everything())
if err != podRegistry.Err {
t.Errorf("Expected %#v, Got %#v", podRegistry.Err, err)
}
if pods.(*api.PodList) != nil {
t.Errorf("Unexpected non-nil pod list: %#v", pods)
}
}
func TestListPodsCacheError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pods = &api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
},
},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable},
}
ctx := api.NewContext()
pods, err := storage.List(ctx, labels.Everything(), labels.Everything())
if err != nil {
t.Fatalf("Expected no error, got %#v", err)
}
pl := pods.(*api.PodList)
if len(pl.Items) != 1 {
t.Fatalf("Unexpected 0-len pod list: %+v", pl)
}
if e, a := api.PodUnknown, pl.Items[0].Status.Phase; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestListEmptyPodList(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(&api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}})
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewContext()
pods, err := storage.List(ctx, labels.Everything(), labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(pods.(*api.PodList).Items) != 0 {
t.Errorf("Unexpected non-zero pod list: %#v", pods)
}
if pods.(*api.PodList).ResourceVersion != "1" {
t.Errorf("Unexpected resource version: %#v", pods)
}
}
func TestListPodList(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pods = &api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
},
},
},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}},
}
ctx := api.NewContext()
podsObj, err := storage.List(ctx, labels.Everything(), labels.Everything())
pods := podsObj.(*api.PodList)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(pods.Items) != 2 {
t.Errorf("Unexpected pod list: %#v", pods)
}
if pods.Items[0].Name != "foo" || pods.Items[0].Status.Phase != api.PodRunning {
t.Errorf("Unexpected pod: %#v", pods.Items[0])
}
if pods.Items[1].Name != "bar" {
t.Errorf("Unexpected pod: %#v", pods.Items[1])
}
}
func TestListPodListSelection(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pods = &api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}, {
ObjectMeta: api.ObjectMeta{Name: "bar"},
Status: api.PodStatus{Host: "barhost"},
}, {
ObjectMeta: api.ObjectMeta{Name: "baz"},
Status: api.PodStatus{Phase: "bazstatus"},
}, {
ObjectMeta: api.ObjectMeta{
Name: "qux",
Labels: map[string]string{"label": "qux"},
},
}, {
ObjectMeta: api.ObjectMeta{Name: "zot"},
},
},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewContext()
table := []struct {
label, field string
expectedIDs util.StringSet
}{
{
expectedIDs: util.NewStringSet("foo", "bar", "baz", "qux", "zot"),
}, {
field: "name=zot",
expectedIDs: util.NewStringSet("zot"),
}, {
label: "label=qux",
expectedIDs: util.NewStringSet("qux"),
}, {
field: "Status.Phase=bazstatus",
expectedIDs: util.NewStringSet("baz"),
}, {
field: "Status.Host=barhost",
expectedIDs: util.NewStringSet("bar"),
}, {
field: "Status.Host=",
expectedIDs: util.NewStringSet("foo", "baz", "qux", "zot"),
}, {
field: "Status.Host!=",
expectedIDs: util.NewStringSet("bar"),
},
}
for index, item := range table {
label, err := labels.ParseSelector(item.label)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
}
field, err := labels.ParseSelector(item.field)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
}
podsObj, err := storage.List(ctx, label, field)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
pods := podsObj.(*api.PodList)
if e, a := len(item.expectedIDs), len(pods.Items); e != a {
t.Errorf("%v: Expected %v, got %v", index, e, a)
}
for _, pod := range pods.Items {
if !item.expectedIDs.Has(pod.Name) {
t.Errorf("%v: Unexpected pod %v", index, pod.Name)
}
t.Logf("%v: Got pod Name: %v", index, pod.Name)
}
}
}
func TestPodDecode(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
expected := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
body, err := latest.Codec.Encode(expected)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
actual := storage.New()
if err := latest.Codec.DecodeInto(body, actual); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
}
func TestGetPod(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}},
}
ctx := api.NewContext()
obj, err := storage.Get(ctx, "foo")
pod := obj.(*api.Pod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expect := *podRegistry.Pod
expect.Status.Phase = api.PodRunning
// TODO: when host is moved to spec, remove this line.
expect.Status.Host = "machine"
if e, a := &expect, pod; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a)
}
}
func TestGetPodCacheError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable},
}
ctx := api.NewContext()
obj, err := storage.Get(ctx, "foo")
pod := obj.(*api.Pod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expect := *podRegistry.Pod
expect.Status.Phase = api.PodUnknown
if e, a := &expect, pod; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a)
}
}
// TODO: remove, this is covered by RESTTest.TestCreate
func TestPodStorageValidatesCreate(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewDefaultContext()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"invalid/label/to/cause/validation/failure": "bar",
},
},
}
c, err := storage.Create(ctx, pod)
if c != nil {
t.Errorf("Expected nil channel")
}
if !errors.IsInvalid(err) {
t.Errorf("Expected to get an invalid resource error, got %v", err)
}
}
// TODO: remove, this is covered by RESTTest.TestCreate
func TestCreatePod(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
pod = &api.Pod{
Status: api.PodStatus{
Host: "machine",
Host: "foo",
},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
pod := &api.Pod{
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
pod.Name = "foo"
ctx := api.NewDefaultContext()
obj, err := storage.Create(ctx, pod)
if err != nil {
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if obj == nil {
t.Fatalf("unexpected object: %#v", obj)
}
if !api.HasObjectMetaSystemFieldValues(&podRegistry.Pod.ObjectMeta) {
t.Errorf("Expected ObjectMeta field values were populated")
if pod.Status.Phase != api.PodRunning || pod.Status.Host != "foo" {
t.Errorf("unexpected pod: %#v", pod)
}
}
// TODO: remove, this is covered by RESTTest.TestCreate
func TestCreatePodWithConflictingNamespace(t *testing.T) {
storage := REST{}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "not-default"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
ctx := api.NewDefaultContext()
channel, err := storage.Create(ctx, pod)
if channel != nil {
t.Error("Expected a nil channel, but we got a value")
}
if err == nil {
t.Errorf("Expected an error, but we didn't get one")
} else if strings.Contains(err.Error(), "Controller.Namespace does not match the provided context") {
t.Errorf("Expected 'Pod.Namespace does not match the provided context' error, got '%v'", err.Error())
}
}
func TestUpdatePodWithConflictingNamespace(t *testing.T) {
storage := REST{}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "not-default"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
ctx := api.NewDefaultContext()
obj, created, err := storage.Update(ctx, pod)
if obj != nil || created {
t.Error("Expected a nil channel, but we got a value or created")
}
if err == nil {
t.Errorf("Expected an error, but we didn't get one")
} else if strings.Index(err.Error(), "Pod.Namespace does not match the provided context") == -1 {
t.Errorf("Expected 'Pod.Namespace does not match the provided context' error, got '%v'", err.Error())
}
}
func TestResourceLocation(t *testing.T) {
expectedIP := "1.2.3.4"
testCases := []struct {
pod api.Pod
query string
location string
}{
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
},
query: "foo",
location: expectedIP,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
},
query: "foo:12345",
location: expectedIP + ":12345",
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr"},
},
},
},
query: "foo",
location: expectedIP,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}},
},
},
},
query: "foo",
location: expectedIP + ":9376",
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}},
},
},
},
query: "foo:12345",
location: expectedIP + ":12345",
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr1"},
{Name: "ctr2", Ports: []api.Port{{ContainerPort: 9376}}},
},
},
},
query: "foo",
location: expectedIP + ":9376",
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr1", Ports: []api.Port{{ContainerPort: 9376}}},
{Name: "ctr2", Ports: []api.Port{{ContainerPort: 1234}}},
},
},
},
query: "foo",
location: expectedIP + ":9376",
},
}
for _, tc := range testCases {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &tc.pod
storage := &REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{PodIP: expectedIP}},
}
redirector := apiserver.Redirector(storage)
location, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if location != tc.location {
t.Errorf("Expected %v, but got %v", tc.location, location)
}
}
}
func TestDeletePod(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
}
fakeCache := &fakeCache{}
storage := REST{
registry: podRegistry,
podCache: fakeCache,
}
ctx := api.NewDefaultContext()
result, err := storage.Delete(ctx, "foo")
if err != nil {
func TestPodStatusDecoratorError(t *testing.T) {
cache := &fakeCache{errorToReturn: fmt.Errorf("test error")}
pod := &api.Pod{}
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if fakeCache.clearedNamespace != "default" || fakeCache.clearedName != "foo" {
t.Errorf("Unexpeceted cache delete: %s %s %#v", fakeCache.clearedName, fakeCache.clearedNamespace, result)
if pod.Status.Phase != api.PodUnknown {
t.Errorf("unexpected pod: %#v", pod)
}
}
func TestCreate(t *testing.T) {
registry := registrytest.NewPodRegistry(nil)
test := resttest.New(t, &REST{
registry: registry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}, registry.SetError)
test.TestCreate(
// valid
&api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "test1",
Image: "foo",
ImagePullPolicy: api.PullIfNotPresent,
},
},
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
},
// invalid
&api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{},
},
},
)
}