Remove pods from pkg/registry/etcd/etcd.go since they are in their own type

Altered the master initialization code
This commit is contained in:
Clayton Coleman 2015-02-11 19:07:54 -05:00
parent 247e467217
commit 7a93af57c0
6 changed files with 64 additions and 1067 deletions

View File

@ -117,19 +117,9 @@ type Config struct {
// Master contains state for a Kubernetes cluster master/api server.
type Master struct {
// "Inputs", Copied from Config
controllerRegistry controller.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
minionRegistry minion.Registry
eventRegistry generic.Registry
limitRangeRegistry generic.Registry
resourceQuotaRegistry resourcequota.Registry
namespaceRegistry generic.Registry
boundPodFactory pod.BoundPodFactory
storage map[string]apiserver.RESTStorage
client *client.Client
portalNet *net.IPNet
cacheTimeout time.Duration
client *client.Client
portalNet *net.IPNet
cacheTimeout time.Duration
mux apiserver.Mux
muxHelper *apiserver.MuxHelper
@ -156,6 +146,17 @@ type Master struct {
serviceReadWritePort int
masterServices *util.Runner
// storage contains the RESTful endpoints exposed by this master
storage map[string]apiserver.RESTStorage
// registries are internal client APIs for accessing the storage layer
// TODO: define the internal typed interface in a way that clients can
// also be replaced
nodeRegistry minion.Registry
namespaceRegistry generic.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
// "Outputs"
Handler http.Handler
InsecureHandler http.Handler
@ -259,7 +260,6 @@ func setDefaults(c *Config) {
// any unhandled paths to "Handler".
func New(c *Config) *Master {
setDefaults(c)
boundPodFactory := &pod.BasicBoundPodFactory{}
if c.KubeletClient == nil {
glog.Fatalf("master.New() called with config.KubeletClient == nil")
}
@ -276,15 +276,6 @@ func New(c *Config) *Master {
glog.Infof("Setting master service IPs based on PortalNet subnet to %q (read-only) and %q (read-write).", serviceReadOnlyIP, serviceReadWriteIP)
m := &Master{
controllerRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
serviceRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())),
namespaceRegistry: namespace.NewEtcdRegistry(c.EtcdHelper),
minionRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
limitRangeRegistry: limitrange.NewEtcdRegistry(c.EtcdHelper),
resourceQuotaRegistry: resourcequota.NewEtcdRegistry(c.EtcdHelper),
boundPodFactory: boundPodFactory,
client: c.Client,
portalNet: c.PortalNet,
rootWebService: new(restful.WebService),
@ -374,13 +365,28 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter)
// init initializes master.
func (m *Master) init(c *Config) {
podStorage, bindingStorage := podetcd.NewREST(c.EtcdHelper, m.boundPodFactory)
boundPodFactory := &pod.BasicBoundPodFactory{}
podStorage, bindingStorage := podetcd.NewREST(c.EtcdHelper, boundPodFactory)
podRegistry := pod.NewRegistry(podStorage)
nodeRESTStorage := minion.NewREST(m.minionRegistry)
eventRegistry := event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds()))
limitRangeRegistry := limitrange.NewEtcdRegistry(c.EtcdHelper)
resourceQuotaRegistry := resourcequota.NewEtcdRegistry(c.EtcdHelper)
m.namespaceRegistry = namespace.NewEtcdRegistry(c.EtcdHelper)
// TODO: split me up into distinct storage registries
registry := etcd.NewRegistry(c.EtcdHelper, podRegistry)
m.serviceRegistry = registry
m.endpointRegistry = registry
m.nodeRegistry = registry
nodeStorage := minion.NewREST(m.nodeRegistry)
// TODO: unify the storage -> registry and storage -> client patterns
nodeStorageClient := RESTStorageToNodes(nodeStorage)
podCache := NewPodCache(
c.KubeletClient,
RESTStorageToNodes(nodeRESTStorage).Nodes(),
nodeStorageClient.Nodes(),
podRegistry,
)
go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout)
@ -394,16 +400,16 @@ func (m *Master) init(c *Config) {
"pods": podStorage,
"bindings": bindingStorage,
"replicationControllers": controller.NewREST(m.controllerRegistry, podRegistry),
"services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry, m.portalNet),
"replicationControllers": controller.NewREST(registry, podRegistry),
"services": service.NewREST(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.portalNet),
"endpoints": endpoint.NewREST(m.endpointRegistry),
"minions": nodeRESTStorage,
"nodes": nodeRESTStorage,
"events": event.NewREST(m.eventRegistry),
"minions": nodeStorage,
"nodes": nodeStorage,
"events": event.NewREST(eventRegistry),
"limitRanges": limitrange.NewREST(m.limitRangeRegistry),
"resourceQuotas": resourcequota.NewREST(m.resourceQuotaRegistry),
"resourceQuotaUsages": resourcequotausage.NewREST(m.resourceQuotaRegistry),
"limitRanges": limitrange.NewREST(limitRangeRegistry),
"resourceQuotas": resourcequota.NewREST(resourceQuotaRegistry),
"resourceQuotaUsages": resourcequotausage.NewREST(resourceQuotaRegistry),
"namespaces": namespace.NewREST(m.namespaceRegistry),
}
@ -542,7 +548,7 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
}
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/v2/keys/"}
}
nodes, err := m.minionRegistry.ListMinions(api.NewDefaultContext())
nodes, err := m.nodeRegistry.ListMinions(api.NewDefaultContext())
if err != nil {
glog.Errorf("Failed to list minions: %v", err)
}

View File

@ -32,7 +32,7 @@ func TestGetServersToValidate(t *testing.T) {
fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil}
master.minionRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})
master.nodeRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})
servers := master.getServersToValidate(&config)

View File

@ -20,10 +20,7 @@ import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/constraint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
@ -34,8 +31,6 @@ import (
)
const (
// PodPath is the path to pod resources in etcd
PodPath string = "/registry/pods"
// ControllerPath is the path to controller resources in etcd
ControllerPath string = "/registry/controllers"
// ServicePath is the path to service resources in etcd
@ -53,15 +48,15 @@ const (
// MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd.
type Registry struct {
tools.EtcdHelper
boundPodFactory pod.BoundPodFactory
pods pod.Registry
}
// NewRegistry creates an etcd registry.
func NewRegistry(helper tools.EtcdHelper, boundPodFactory pod.BoundPodFactory) *Registry {
func NewRegistry(helper tools.EtcdHelper, pods pod.Registry) *Registry {
registry := &Registry{
EtcdHelper: helper,
pods: pods,
}
registry.boundPodFactory = boundPodFactory
return registry
}
@ -89,239 +84,6 @@ func MakeEtcdItemKey(ctx api.Context, prefix string, id string) (string, error)
return key, nil
}
// makePodListKey constructs etcd paths to pod directories enforcing namespace rules.
func makePodListKey(ctx api.Context) string {
return MakeEtcdListKey(ctx, PodPath)
}
// makePodKey constructs etcd paths to pod items enforcing namespace rules.
func makePodKey(ctx api.Context, id string) (string, error) {
return MakeEtcdItemKey(ctx, PodPath, id)
}
// ListPods obtains a list of pods with labels that match selector.
func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool {
return selector.Matches(labels.Set(pod.Labels))
})
}
// ListPodsPredicate obtains a list of pods that match filter.
func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) {
allPods := api.PodList{}
key := makePodListKey(ctx)
err := r.ExtractToList(key, &allPods)
if err != nil {
return nil, err
}
filtered := []api.Pod{}
for _, pod := range allPods.Items {
if filter(&pod) {
filtered = append(filtered, pod)
}
}
allPods.Items = filtered
return &allPods, nil
}
// WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod")
if err != nil {
return nil, err
}
key := makePodListKey(ctx)
return r.WatchList(key, version, func(obj runtime.Object) bool {
podObj, ok := obj.(*api.Pod)
if !ok {
// Must be an error: return true to propagate to upper level.
return true
}
fields := pod.PodToSelectableFields(podObj)
return label.Matches(labels.Set(podObj.Labels)) && field.Matches(fields)
})
}
// GetPod gets a specific pod specified by its ID.
func (r *Registry) GetPod(ctx api.Context, id string) (*api.Pod, error) {
var pod api.Pod
key, err := makePodKey(ctx, id)
if err != nil {
return nil, err
}
if err = r.ExtractObj(key, &pod, false); err != nil {
return nil, etcderr.InterpretGetError(err, "pod", id)
}
return &pod, nil
}
func makeBoundPodsKey(machine string) string {
return "/registry/nodes/" + machine + "/boundpods"
}
// CreatePod creates a pod based on a specification.
func (r *Registry) CreatePod(ctx api.Context, pod *api.Pod) error {
// Set current status to "Waiting".
pod.Status.Phase = api.PodPending
pod.Status.Host = ""
key, err := makePodKey(ctx, pod.Name)
if err != nil {
return err
}
err = r.CreateObj(key, pod, 0)
return etcderr.InterpretCreateError(err, "pod", pod.Name)
}
// ApplyBinding implements binding's registry
func (r *Registry) ApplyBinding(ctx api.Context, binding *api.Binding) error {
return etcderr.InterpretCreateError(r.assignPod(ctx, binding.PodID, binding.Host), "binding", "")
}
// setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'.
// Returns the current state of the pod, or an error.
func (r *Registry) setPodHostTo(ctx api.Context, podID, oldMachine, machine string) (finalPod *api.Pod, err error) {
podKey, err := makePodKey(ctx, podID)
if err != nil {
return nil, err
}
err = r.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
if pod.Status.Host != oldMachine {
return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.Name, pod.Status.Host)
}
pod.Status.Host = machine
finalPod = pod
return pod, nil
})
return finalPod, err
}
// assignPod assigns the given pod to the given machine.
func (r *Registry) assignPod(ctx api.Context, podID string, machine string) error {
finalPod, err := r.setPodHostTo(ctx, podID, "", machine)
if err != nil {
return err
}
boundPod, err := r.boundPodFactory.MakeBoundPod(machine, finalPod)
if err != nil {
return err
}
// Doing the constraint check this way provides atomicity guarantees.
contKey := makeBoundPodsKey(machine)
err = r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPodList := in.(*api.BoundPods)
boundPodList.Items = append(boundPodList.Items, *boundPod)
if errors := constraint.Allowed(boundPodList.Items); len(errors) > 0 {
return nil, fmt.Errorf("the assignment would cause the following constraints violation: %v", errors)
}
return boundPodList, nil
})
if err != nil {
// Put the pod's host back the way it was. This is a terrible hack, but
// can't really be helped, since there's not really a way to do atomic
// multi-object changes in etcd.
if _, err2 := r.setPodHostTo(ctx, podID, machine, ""); err2 != nil {
glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2)
}
}
return err
}
func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
var podOut api.Pod
podKey, err := makePodKey(ctx, pod.Name)
if err != nil {
return err
}
err = r.EtcdHelper.ExtractObj(podKey, &podOut, false)
if err != nil {
return err
}
scheduled := podOut.Status.Host != ""
if scheduled {
pod.Status.Host = podOut.Status.Host
// If it's already been scheduled, limit the types of updates we'll accept.
errs := validation.ValidatePodUpdate(pod, &podOut)
if len(errs) != 0 {
return errors.NewInvalid("Pod", pod.Name, errs)
}
}
// There's no race with the scheduler, because either this write will fail because the host
// has been updated, or the host update will fail because this pod has been updated.
err = r.EtcdHelper.SetObj(podKey, pod, 0 /* ttl */)
if err != nil {
return err
}
if !scheduled {
// never scheduled, just update.
return nil
}
containerKey := makeBoundPodsKey(podOut.Status.Host)
return r.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPods := in.(*api.BoundPods)
for ix := range boundPods.Items {
item := &boundPods.Items[ix]
if item.Name == pod.Name && item.Namespace == pod.Namespace {
item.Spec = pod.Spec
return boundPods, nil
}
}
// This really shouldn't happen
glog.Warningf("Couldn't find: %s in %#v", pod.Name, boundPods)
return boundPods, fmt.Errorf("failed to update pod, couldn't find %s in %#v", pod.Name, boundPods)
})
}
// DeletePod deletes an existing pod specified by its ID.
func (r *Registry) DeletePod(ctx api.Context, podID string) error {
var pod api.Pod
podKey, err := makePodKey(ctx, podID)
if err != nil {
return err
}
err = r.ExtractObj(podKey, &pod, false)
if err != nil {
return etcderr.InterpretDeleteError(err, "pod", podID)
}
// First delete the pod, so a scheduler doesn't notice it getting removed from the
// machine and attempt to put it somewhere.
err = r.Delete(podKey, true)
if err != nil {
return etcderr.InterpretDeleteError(err, "pod", podID)
}
machine := pod.Status.Host
if machine == "" {
// Pod was never scheduled anywhere, just return.
return nil
}
// Next, remove the pod from the machine atomically.
contKey := makeBoundPodsKey(machine)
return r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
pods := in.(*api.BoundPods)
newPods := make([]api.BoundPod, 0, len(pods.Items))
found := false
for _, item := range pods.Items {
if item.Name != pod.Name || item.Namespace != pod.Namespace {
newPods = append(newPods, item)
} else {
found = true
}
}
if !found {
// This really shouldn't happen, it indicates something is broken, and likely
// there is a lost pod somewhere.
// However it is "deleted" so log it and move on
glog.Warningf("Couldn't find: %s in %#v", podID, pods)
}
pods.Items = newPods
return pods, nil
})
}
// ListControllers obtains a list of ReplicationControllers.
func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) {
controllers := &api.ReplicationControllerList{}
@ -348,7 +110,7 @@ func (r *Registry) WatchControllers(ctx api.Context, label, field labels.Selecto
}
match := label.Matches(labels.Set(controller.Labels))
if match {
pods, err := r.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
pods, err := r.pods.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
if err != nil {
glog.Warningf("Error listing pods: %v", err)
// No object that's useable so drop it on the floor

View File

@ -18,7 +18,6 @@ package etcd
import (
"strconv"
"strings"
"testing"
"time"
@ -26,7 +25,9 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
podetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
@ -34,791 +35,15 @@ import (
)
func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicBoundPodFactory{})
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, nil)
return registry
}
// TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash
func TestEtcdGetPodDifferentNamespace(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
ctx1 := api.NewDefaultContext()
ctx2 := api.WithNamespace(api.NewContext(), "other")
key1, _ := makePodKey(ctx1, "foo")
key2, _ := makePodKey(ctx2, "foo")
fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: "default", Name: "foo"}}), 0)
fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: "other", Name: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient)
pod1, err := registry.GetPod(ctx1, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod1.Name != "foo" {
t.Errorf("Unexpected pod: %#v", pod1)
}
if pod1.Namespace != "default" {
t.Errorf("Unexpected pod: %#v", pod1)
}
pod2, err := registry.GetPod(ctx2, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod2.Name != "foo" {
t.Errorf("Unexpected pod: %#v", pod2)
}
if pod2.Namespace != "other" {
t.Errorf("Unexpected pod: %#v", pod2)
}
}
func TestEtcdGetPod(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
key, _ := makePodKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient)
pod, err := registry.GetPod(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v", pod)
}
}
func TestEtcdGetPodNotFound(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
_, err := registry.GetPod(ctx, "foo")
if !errors.IsNotFound(err) {
t.Errorf("Unexpected error returned: %#v", err)
}
}
func TestEtcdCreatePod(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine", ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
resp, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var pod api.Pod
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].Name != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
func TestEtcdCreatePodFailsWithoutNamespace(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(api.NewContext(), &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
// Accept "namespace" or "Namespace".
if err == nil || !strings.Contains(err.Error(), "amespace") {
t.Fatalf("expected error that namespace was missing from context, got: %v", err)
}
}
func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}),
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
})
if !errors.IsAlreadyExists(err) {
t.Errorf("Unexpected error returned: %#v", err)
}
}
func TestEtcdCreatePodWithContainersError(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNodeExist, // validate that ApplyBinding is translating Create errors
}
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
if !errors.IsAlreadyExists(err) {
t.Fatalf("Unexpected error returned: %#v", err)
}
existingPod, err := registry.GetPod(ctx, "foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if existingPod.Status.Host == "machine" {
t.Fatal("Pod's host changed in response to an non-apply-able binding.")
}
}
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
resp, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var pod api.Pod
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].Name != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{Name: "bar"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Suddenly, a wild scheduler appears:
err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
resp, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var pod api.Pod
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if pod.Name != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 2 || boundPods.Items[1].Name != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
func TestEtcdUpdatePodNotFound(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
podIn := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
}
err := registry.UpdatePod(ctx, &podIn)
if err == nil {
t.Errorf("unexpected non-error")
}
}
func TestEtcdUpdatePodNotScheduled(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}), 1)
registry := NewTestEtcdRegistry(fakeClient)
podIn := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
err := registry.UpdatePod(ctx, &podIn)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
response, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var podOut api.Pod
latest.Codec.DecodeInto([]byte(response.Node.Value), &podOut)
if !api.Semantic.DeepEqual(podOut, podIn) {
t.Errorf("expected: %v, got: %v", podOut, podIn)
}
}
func TestEtcdUpdatePodScheduled(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.PodSpec{
// Host: "machine",
Containers: []api.Container{
{
Image: "foo:v1",
},
},
},
Status: api.PodStatus{
Host: "machine",
},
}), 1)
contKey := "/registry/nodes/machine/boundpods"
fakeClient.Set(contKey, runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo:v1",
},
},
},
}, {
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo:v1",
},
},
},
},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
podIn := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo:v2",
ImagePullPolicy: api.PullIfNotPresent,
TerminationMessagePath: api.TerminationMessagePathDefault,
},
},
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
Status: api.PodStatus{
Host: "machine",
},
}
err := registry.UpdatePod(ctx, &podIn)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
response, err := fakeClient.Get(key, false, false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var podOut api.Pod
latest.Codec.DecodeInto([]byte(response.Node.Value), &podOut)
if !api.Semantic.DeepEqual(podOut, podIn) {
t.Errorf("expected: %#v, got: %#v", podOut, podIn)
}
response, err = fakeClient.Get(contKey, false, false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var list api.BoundPods
if err := latest.Codec.DecodeInto([]byte(response.Node.Value), &list); err != nil {
t.Fatalf("unexpected error decoding response: %v", err)
}
if len(list.Items) != 2 || !api.Semantic.DeepEqual(list.Items[1].Spec, podIn.Spec) {
t.Errorf("unexpected container list: %d\n items[0] - %#v\n podin.spec - %#v\n", len(list.Items), list.Items[1].Spec, podIn.Spec)
}
}
func TestEtcdDeletePod(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine"},
}), 0)
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.DeletePod(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
} else if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 0 {
t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value)
}
}
func TestEtcdDeletePodMultipleContainers(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
key, _ := makePodKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine"},
}), 0)
fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}},
{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.DeletePod(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
}
if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 1 {
t.Fatalf("Unexpected boundPod set: %#v, expected empty", boundPods)
}
if boundPods.Items[0].Namespace != "other" {
t.Errorf("Deleted wrong boundPod: %#v", boundPods)
}
}
func TestEtcdEmptyListPods(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
ctx := api.NewDefaultContext()
key := makePodListKey(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
pods, err := registry.ListPods(ctx, labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(pods.Items) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestEtcdListPodsNotFound(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
ctx := api.NewDefaultContext()
key := makePodListKey(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
pods, err := registry.ListPods(ctx, labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(pods.Items) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestEtcdListPods(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
ctx := api.NewDefaultContext()
key := makePodListKey(ctx)
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Status: api.PodStatus{Host: "machine"},
}),
},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
pods, err := registry.ListPods(ctx, labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(pods.Items) != 2 || pods.Items[0].Name != "foo" || pods.Items[1].Name != "bar" {
t.Errorf("Unexpected pod list: %#v", pods)
}
if pods.Items[0].Status.Host != "machine" ||
pods.Items[1].Status.Host != "machine" {
t.Errorf("Failed to populate host name.")
}
}
func TestEtcdWatchPods(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.Everything(),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchPodsMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: map[string]string{
"name": "foo",
},
},
}
podBytes, _ := latest.Codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
func TestEtcdWatchPodsNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
podBytes, _ := latest.Codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry {
helper := tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
podStorage, _ := podetcd.NewREST(helper, nil)
registry := NewRegistry(helper, pod.NewRegistry(podStorage))
return registry
}
func TestEtcdListControllersNotFound(t *testing.T) {
@ -1081,8 +306,8 @@ func TestEtcdWatchController(t *testing.T) {
func TestEtcdWatchControllersMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.ExpectNotFoundGet(makePodListKey(ctx))
registry := NewTestEtcdRegistry(fakeClient)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
registry := NewTestEtcdRegistryWithPods(fakeClient)
watching, err := registry.WatchControllers(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
@ -1122,8 +347,8 @@ func TestEtcdWatchControllersMatch(t *testing.T) {
func TestEtcdWatchControllersNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.ExpectNotFoundGet(makePodListKey(ctx))
registry := NewTestEtcdRegistry(fakeClient)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
registry := NewTestEtcdRegistryWithPods(fakeClient)
watching, err := registry.WatchControllers(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),

View File

@ -30,7 +30,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -68,7 +67,7 @@ func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
func newStorage(t *testing.T) (*REST, *BindingREST, *tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient, h := newHelper(t)
storage, bindingStorage := NewREST(h, &pod.BasicBoundPodFactory{ServiceRegistry: &registrytest.ServiceRegistry{}})
storage, bindingStorage := NewREST(h, &pod.BasicBoundPodFactory{})
storage = storage.WithPodStatus(&fakeCache{statusToReturn: &api.PodStatus{}})
return storage, bindingStorage, fakeEtcdClient, h
}

View File

@ -40,6 +40,8 @@ type Registry interface {
DeletePod(ctx api.Context, podID string) error
}
// Storage is an interface for a standard REST Storage backend
// TODO: move me somewhere common
type Storage interface {
apiserver.RESTDeleter
apiserver.RESTLister
@ -50,10 +52,13 @@ type Storage interface {
Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error)
}
// storage puts strong typing around storage calls
type storage struct {
Storage
}
// NewRegistry returns a new Registry interface for the given Storage. Any mismatched
// types will panic.
func NewRegistry(s Storage) Registry {
return &storage{s}
}