Collapse pod REST+Registry to a single object using generic/etcd

This commit is contained in:
Clayton Coleman 2015-02-11 18:37:12 -05:00
parent 64678b71f3
commit 247e467217
7 changed files with 1828 additions and 759 deletions

View File

@ -42,7 +42,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd"
@ -52,6 +51,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
podetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequota"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequotausage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
@ -117,16 +117,15 @@ type Config struct {
// Master contains state for a Kubernetes cluster master/api server.
type Master struct {
// "Inputs", Copied from Config
podRegistry pod.Registry
controllerRegistry controller.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
minionRegistry minion.Registry
bindingRegistry binding.Registry
eventRegistry generic.Registry
limitRangeRegistry generic.Registry
resourceQuotaRegistry resourcequota.Registry
namespaceRegistry generic.Registry
boundPodFactory pod.BoundPodFactory
storage map[string]apiserver.RESTStorage
client *client.Client
portalNet *net.IPNet
@ -277,16 +276,15 @@ func New(c *Config) *Master {
glog.Infof("Setting master service IPs based on PortalNet subnet to %q (read-only) and %q (read-write).", serviceReadOnlyIP, serviceReadWriteIP)
m := &Master{
podRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory),
controllerRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
serviceRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
bindingRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory),
eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())),
namespaceRegistry: namespace.NewEtcdRegistry(c.EtcdHelper),
minionRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
limitRangeRegistry: limitrange.NewEtcdRegistry(c.EtcdHelper),
resourceQuotaRegistry: resourcequota.NewEtcdRegistry(c.EtcdHelper),
boundPodFactory: boundPodFactory,
client: c.Client,
portalNet: c.PortalNet,
rootWebService: new(restful.WebService),
@ -376,31 +374,33 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter)
// init initializes master.
func (m *Master) init(c *Config) {
podStorage, bindingStorage := podetcd.NewREST(c.EtcdHelper, m.boundPodFactory)
podRegistry := pod.NewRegistry(podStorage)
nodeRESTStorage := minion.NewREST(m.minionRegistry)
podCache := NewPodCache(
c.KubeletClient,
RESTStorageToNodes(nodeRESTStorage).Nodes(),
m.podRegistry,
podRegistry,
)
go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout)
go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30)
// TODO: refactor podCache to sit on top of podStorage via status calls
podStorage = podStorage.WithPodStatus(podCache)
// TODO: Factor out the core API registration
m.storage = map[string]apiserver.RESTStorage{
"pods": pod.NewREST(&pod.RESTConfig{
PodCache: podCache,
Registry: m.podRegistry,
}),
"replicationControllers": controller.NewREST(m.controllerRegistry, m.podRegistry),
"pods": podStorage,
"bindings": bindingStorage,
"replicationControllers": controller.NewREST(m.controllerRegistry, podRegistry),
"services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry, m.portalNet),
"endpoints": endpoint.NewREST(m.endpointRegistry),
"minions": nodeRESTStorage,
"nodes": nodeRESTStorage,
"events": event.NewREST(m.eventRegistry),
// TODO: should appear only in scheduler API group.
"bindings": binding.NewREST(m.bindingRegistry),
"limitRanges": limitrange.NewREST(m.limitRangeRegistry),
"resourceQuotas": resourcequota.NewREST(m.resourceQuotaRegistry),
"resourceQuotaUsages": resourcequotausage.NewREST(m.resourceQuotaRegistry),

View File

@ -43,6 +43,8 @@ import (
// ResourceVersion and semantics. The RESTCreateStrategy and
// RESTUpdateStrategy are generic across all backends, and encapsulate
// logic specific to the API.
//
// TODO: make the default exposed methods exactly match a generic RESTStorage
type Etcd struct {
// Called to make a new object, should return e.g., &api.Pod{}
NewFunc func() runtime.Object
@ -116,6 +118,8 @@ func NamespaceKeyFunc(ctx api.Context, prefix string, name string) (string, erro
}
// List returns a list of all the items matching m.
// TODO: rename this to ListPredicate, take the default predicate function on the constructor, and
// introduce a List method that uses the default predicate function
func (e *Etcd) List(ctx api.Context, m generic.Matcher) (runtime.Object, error) {
list := e.NewListFunc()
err := e.Helper.ExtractToList(e.KeyRootFunc(ctx), list)
@ -126,6 +130,7 @@ func (e *Etcd) List(ctx api.Context, m generic.Matcher) (runtime.Object, error)
}
// CreateWithName inserts a new item with the provided name
// DEPRECATED: use Create instead
func (e *Etcd) CreateWithName(ctx api.Context, name string, obj runtime.Object) error {
key, err := e.KeyFunc(ctx, name)
if err != nil {
@ -191,6 +196,7 @@ func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, erro
}
// UpdateWithName updates the item with the provided name
// DEPRECATED: use Update instead
func (e *Etcd) UpdateWithName(ctx api.Context, name string, obj runtime.Object) error {
key, err := e.KeyFunc(ctx, name)
if err != nil {
@ -323,6 +329,8 @@ func (e *Etcd) Delete(ctx api.Context, name string) (runtime.Object, error) {
// Watch starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list.
// TODO: rename this to WatchPredicate, take the default predicate function on the constructor, and
// introduce a Watch method that uses the default predicate function
func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
if err != nil {

View File

@ -0,0 +1,252 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/constraint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// rest implements a RESTStorage for pods against etcd
type REST struct {
store *etcdgeneric.Etcd
}
// NewREST returns a RESTStorage object that will work against pods.
func NewREST(h tools.EtcdHelper, factory pod.BoundPodFactory) (*REST, *BindingREST) {
prefix := "/registry/pods"
bindings := &podLifecycle{h}
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
KeyRootFunc: func(ctx api.Context) string {
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
},
KeyFunc: func(ctx api.Context, name string) (string, error) {
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name)
},
ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*api.Pod).Name, nil
},
EndpointName: "pods",
CreateStrategy: rest.Pods,
UpdateStrategy: rest.Pods,
AfterUpdate: bindings.AfterUpdate,
ReturnDeletedObject: true,
AfterDelete: bindings.AfterDelete,
Helper: h,
}
return &REST{store: store}, &BindingREST{store: store, factory: factory}
}
// WithPodStatus returns a rest object that decorates returned responses with extra
// status information.
func (r *REST) WithPodStatus(cache pod.PodStatusGetter) *REST {
store := *r.store
store.Decorator = pod.PodStatusDecorator(cache)
store.AfterDelete = rest.AllFuncs(store.AfterDelete, pod.PodStatusReset(cache))
return &REST{store: &store}
}
// New returns a new object
func (r *REST) New() runtime.Object {
return r.store.NewFunc()
}
// NewList returns a new list object
func (r *REST) NewList() runtime.Object {
return r.store.NewListFunc()
}
// List obtains a list of pods with labels that match selector.
func (r *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
return r.store.List(ctx, pod.MatchPod(label, field))
}
// Watch begins watching for new, changed, or deleted pods.
func (r *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return r.store.Watch(ctx, pod.MatchPod(label, field), resourceVersion)
}
// Get gets a specific pod specified by its ID.
func (r *REST) Get(ctx api.Context, name string) (runtime.Object, error) {
return r.store.Get(ctx, name)
}
// Create creates a pod based on a specification.
func (r *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
return r.store.Create(ctx, obj)
}
// Update changes a pod specification.
func (r *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
return r.store.Update(ctx, obj)
}
// Delete deletes an existing pod specified by its ID.
func (r *REST) Delete(ctx api.Context, name string) (runtime.Object, error) {
return r.store.Delete(ctx, name)
}
// ResourceLocation returns a pods location from its HostIP
func (r *REST) ResourceLocation(ctx api.Context, name string) (string, error) {
return pod.ResourceLocation(r, ctx, name)
}
func makeBoundPodsKey(machine string) string {
return "/registry/nodes/" + machine + "/boundpods"
}
// BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
type BindingREST struct {
store *etcdgeneric.Etcd
factory pod.BoundPodFactory
}
func (r *BindingREST) New() runtime.Object {
return &api.Binding{}
}
// Create ensures a pod is bound to a specific host.
func (r *BindingREST) Create(ctx api.Context, obj runtime.Object) (out runtime.Object, err error) {
binding := obj.(*api.Binding)
err = r.assignPod(ctx, binding.PodID, binding.Host)
err = etcderr.InterpretCreateError(err, "binding", "")
out = &api.Status{Status: api.StatusSuccess}
return
}
// setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'.
// Returns the current state of the pod, or an error.
func (r *BindingREST) setPodHostTo(ctx api.Context, podID, oldMachine, machine string) (finalPod *api.Pod, err error) {
podKey, err := r.store.KeyFunc(ctx, podID)
if err != nil {
return nil, err
}
err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
if pod.Status.Host != oldMachine {
return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.Name, pod.Status.Host)
}
pod.Status.Host = machine
finalPod = pod
return pod, nil
})
return finalPod, err
}
// assignPod assigns the given pod to the given machine.
func (r *BindingREST) assignPod(ctx api.Context, podID string, machine string) error {
finalPod, err := r.setPodHostTo(ctx, podID, "", machine)
if err != nil {
return err
}
boundPod, err := r.factory.MakeBoundPod(machine, finalPod)
if err != nil {
return err
}
// Doing the constraint check this way provides atomicity guarantees.
contKey := makeBoundPodsKey(machine)
err = r.store.Helper.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPodList := in.(*api.BoundPods)
boundPodList.Items = append(boundPodList.Items, *boundPod)
if errors := constraint.Allowed(boundPodList.Items); len(errors) > 0 {
return nil, fmt.Errorf("the assignment would cause the following constraints violation: %v", errors)
}
return boundPodList, nil
})
if err != nil {
// Put the pod's host back the way it was. This is a terrible hack, but
// can't really be helped, since there's not really a way to do atomic
// multi-object changes in etcd.
if _, err2 := r.setPodHostTo(ctx, podID, machine, ""); err2 != nil {
glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2)
}
}
return err
}
type podLifecycle struct {
tools.EtcdHelper
}
func (h *podLifecycle) AfterUpdate(obj runtime.Object) error {
pod := obj.(*api.Pod)
if len(pod.Status.Host) == 0 {
return nil
}
containerKey := makeBoundPodsKey(pod.Status.Host)
return h.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPods := in.(*api.BoundPods)
for ix := range boundPods.Items {
if boundPods.Items[ix].Name == pod.Name && boundPods.Items[ix].Namespace == pod.Namespace {
boundPods.Items[ix].Spec = pod.Spec
return boundPods, nil
}
}
// This really shouldn't happen
glog.Warningf("Couldn't find: %s in %#v", pod.Name, boundPods)
return boundPods, fmt.Errorf("failed to update pod, couldn't find %s in %#v", pod.Name, boundPods)
})
}
func (h *podLifecycle) AfterDelete(obj runtime.Object) error {
pod := obj.(*api.Pod)
if len(pod.Status.Host) == 0 {
return nil
}
containerKey := makeBoundPodsKey(pod.Status.Host)
return h.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
pods := in.(*api.BoundPods)
newPods := make([]api.BoundPod, 0, len(pods.Items))
found := false
for _, boundPod := range pods.Items {
if boundPod.Name != pod.Name || boundPod.Namespace != pod.Namespace {
newPods = append(newPods, boundPod)
} else {
found = true
}
}
if !found {
// This really shouldn't happen, it indicates something is broken, and likely
// there is a lost pod somewhere.
// However it is "deleted" so log it and move on
glog.Warningf("Couldn't find: %s in %#v", pod.Name, pods)
}
pods.Items = newPods
return pods, nil
})
}

File diff suppressed because it is too large Load Diff

View File

@ -18,7 +18,9 @@ package pod
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
@ -26,8 +28,6 @@ import (
type Registry interface {
// ListPods obtains a list of pods having labels which match selector.
ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error)
// ListPodsPredicate obtains a list of pods for which filter returns true.
ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error)
// Watch for new/changed/deleted pods
WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
// Get a specific pod
@ -39,3 +39,56 @@ type Registry interface {
// Delete an existing pod
DeletePod(ctx api.Context, podID string) error
}
type Storage interface {
apiserver.RESTDeleter
apiserver.RESTLister
apiserver.RESTGetter
apiserver.ResourceWatcher
Create(ctx api.Context, obj runtime.Object) (runtime.Object, error)
Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error)
}
type storage struct {
Storage
}
func NewRegistry(s Storage) Registry {
return &storage{s}
}
func (s *storage) ListPods(ctx api.Context, label labels.Selector) (*api.PodList, error) {
obj, err := s.List(ctx, label, labels.Everything())
if err != nil {
return nil, err
}
return obj.(*api.PodList), nil
}
func (s *storage) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return s.Watch(ctx, label, field, resourceVersion)
}
func (s *storage) GetPod(ctx api.Context, podID string) (*api.Pod, error) {
obj, err := s.Get(ctx, podID)
if err != nil {
return nil, err
}
return obj.(*api.Pod), nil
}
func (s *storage) CreatePod(ctx api.Context, pod *api.Pod) error {
_, err := s.Create(ctx, pod)
return err
}
func (s *storage) UpdatePod(ctx api.Context, pod *api.Pod) error {
_, _, err := s.Update(ctx, pod)
return err
}
func (s *storage) DeletePod(ctx api.Context, podID string) error {
_, err := s.Delete(ctx, podID)
return err
}

View File

@ -24,84 +24,60 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// PodStatusGetter is an interface used by Pods to fetch and retrieve status info.
type PodStatusGetter interface {
GetPodStatus(namespace, name string) (*api.PodStatus, error)
ClearPodStatus(namespace, name string)
}
// REST implements the RESTStorage interface in terms of a PodRegistry.
type REST struct {
podCache PodStatusGetter
registry Registry
}
type RESTConfig struct {
PodCache PodStatusGetter
Registry Registry
}
// NewREST returns a new REST.
func NewREST(config *RESTConfig) *REST {
return &REST{
podCache: config.PodCache,
registry: config.Registry,
}
}
func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
pod := obj.(*api.Pod)
if err := rest.BeforeCreate(rest.Pods, ctx, obj); err != nil {
return nil, err
}
if err := rs.registry.CreatePod(ctx, pod); err != nil {
err = rest.CheckGeneratedNameError(rest.Pods, err, pod)
return nil, err
}
return rs.registry.GetPod(ctx, pod.Name)
}
func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
namespace, found := api.NamespaceFrom(ctx)
if !found {
return &api.Status{Status: api.StatusFailure}, nil
}
rs.podCache.ClearPodStatus(namespace, id)
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(ctx, id)
}
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
pod, err := rs.registry.GetPod(ctx, id)
if err != nil {
return pod, err
}
if pod == nil {
return pod, nil
}
host := pod.Status.Host
if status, err := rs.podCache.GetPodStatus(pod.Namespace, pod.Name); err != nil {
pod.Status = api.PodStatus{
Phase: api.PodUnknown,
// PodStatusDecorator returns a function that updates pod.Status based
// on the provided pod cache.
func PodStatusDecorator(cache PodStatusGetter) rest.ObjectFunc {
return func(obj runtime.Object) error {
pod := obj.(*api.Pod)
host := pod.Status.Host
if status, err := cache.GetPodStatus(pod.Namespace, pod.Name); err != nil {
pod.Status = api.PodStatus{
Phase: api.PodUnknown,
}
} else {
pod.Status = *status
}
} else {
pod.Status = *status
pod.Status.Host = host
return nil
}
// Make sure not to hide a recent host with an old one from the cache.
// TODO: move host to spec
pod.Status.Host = host
return pod, err
}
func PodToSelectableFields(pod *api.Pod) labels.Set {
// PodStatusReset returns a function that clears the pod cache when the object
// is deleted.
func PodStatusReset(cache PodStatusGetter) rest.ObjectFunc {
return func(obj runtime.Object) error {
pod := obj.(*api.Pod)
cache.ClearPodStatus(pod.Namespace, pod.Name)
return nil
}
}
// MatchPod returns a generic matcher for a given label and field selector.
func MatchPod(label, field labels.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {
podObj, ok := obj.(*api.Pod)
if !ok {
return false, fmt.Errorf("not a pod")
}
fields := PodToSelectableFields(podObj)
return label.Matches(labels.Set(podObj.Labels)) && field.Matches(fields), nil
})
}
// PodToSelectableFields returns a label set that represents the object
// TODO: fields are not labels, and the validation rules for them do not apply.
func PodToSelectableFields(pod *api.Pod) labels.Set {
// TODO we are populating both Status and DesiredState because selectors are not aware of API versions
// see https://github.com/GoogleCloudPlatform/kubernetes/pull/2503
@ -117,68 +93,13 @@ func PodToSelectableFields(pod *api.Pod) labels.Set {
}
}
// filterFunc returns a predicate based on label & field selectors that can be passed to registry's
// ListPods & WatchPods.
func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool {
return func(pod *api.Pod) bool {
fields := PodToSelectableFields(pod)
return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
}
}
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
pods, err := rs.registry.ListPodsPredicate(ctx, rs.filterFunc(label, field))
if err == nil {
for i := range pods.Items {
pod := &pods.Items[i]
host := pod.Status.Host
if status, err := rs.podCache.GetPodStatus(pod.Namespace, pod.Name); err != nil {
pod.Status = api.PodStatus{
Phase: api.PodUnknown,
}
} else {
pod.Status = *status
}
// Make sure not to hide a recent host with an old one from the cache.
// This is tested by the integration test.
// TODO: move host to spec
pod.Status.Host = host
}
}
return pods, err
}
// Watch begins watching for new, changed, or deleted pods.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
// TODO: Add pod status to watch command
return rs.registry.WatchPods(ctx, label, field, resourceVersion)
}
func (*REST) New() runtime.Object {
return &api.Pod{}
}
func (*REST) NewList() runtime.Object {
return &api.PodList{}
}
func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
pod := obj.(*api.Pod)
if !api.ValidNamespace(ctx, &pod.ObjectMeta) {
return nil, false, errors.NewConflict("pod", pod.Namespace, fmt.Errorf("Pod.Namespace does not match the provided context"))
}
if errs := validation.ValidatePod(pod); len(errs) > 0 {
return nil, false, errors.NewInvalid("pod", pod.Name, errs)
}
if err := rs.registry.UpdatePod(ctx, pod); err != nil {
return nil, false, err
}
out, err := rs.registry.GetPod(ctx, pod.Name)
return out, false, err
// ResourceGetter is an interface for retrieving resources by ResourceLocation.
type ResourceGetter interface {
Get(api.Context, string) (runtime.Object, error)
}
// ResourceLocation returns a URL to which one can send traffic for the specified pod.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string, error) {
// Allow ID as "podname" or "podname:port". If port is not specified,
// try to use the first defined port on the pod.
parts := strings.Split(id, ":")
@ -192,7 +113,7 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
port = parts[1]
}
obj, err := rs.Get(ctx, name)
obj, err := getter.Get(ctx, name)
if err != nil {
return "", err
}

View File

@ -18,20 +18,9 @@ package pod
import (
"fmt"
"reflect"
"strings"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
type fakeCache struct {
@ -55,626 +44,35 @@ func (f *fakeCache) ClearPodStatus(namespace, name string) {
f.clearedName = name
}
func expectApiStatusError(t *testing.T, out runtime.Object, msg string) {
status, ok := out.(*api.Status)
if !ok {
t.Errorf("Expected an api.Status object, was %#v", out)
return
}
if msg != status.Message {
t.Errorf("Expected %#v, was %s", msg, status.Message)
}
}
func expectPod(t *testing.T, out runtime.Object) (*api.Pod, bool) {
pod, ok := out.(*api.Pod)
if !ok || pod == nil {
t.Errorf("Expected an api.Pod object, was %#v", out)
return nil, false
}
return pod, true
}
func TestCreatePodRegistryError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
ctx := api.NewDefaultContext()
_, err := storage.Create(ctx, pod)
if err != podRegistry.Err {
func TestPodStatusDecorator(t *testing.T) {
cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}
pod := &api.Pod{}
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestCreatePodSetsIds(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
if pod.Status.Phase != api.PodRunning {
t.Errorf("unexpected pod: %#v", pod)
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
ctx := api.NewDefaultContext()
_, err := storage.Create(ctx, pod)
if err != podRegistry.Err {
t.Fatalf("unexpected error: %v", err)
}
if len(podRegistry.Pod.Name) == 0 {
t.Errorf("Expected pod ID to be set, Got %#v", pod)
}
if pod.Name != podRegistry.Pod.Name {
t.Errorf("Expected manifest ID to be equal to pod ID, Got %#v", pod)
}
}
func TestCreatePodSetsUID(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
ctx := api.NewDefaultContext()
_, err := storage.Create(ctx, pod)
if err != podRegistry.Err {
t.Fatalf("unexpected error: %v", err)
}
if len(podRegistry.Pod.UID) == 0 {
t.Errorf("Expected pod UID to be set, Got %#v", pod)
}
}
func TestListPodsError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewContext()
pods, err := storage.List(ctx, labels.Everything(), labels.Everything())
if err != podRegistry.Err {
t.Errorf("Expected %#v, Got %#v", podRegistry.Err, err)
}
if pods.(*api.PodList) != nil {
t.Errorf("Unexpected non-nil pod list: %#v", pods)
}
}
func TestListPodsCacheError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pods = &api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
},
},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable},
}
ctx := api.NewContext()
pods, err := storage.List(ctx, labels.Everything(), labels.Everything())
if err != nil {
t.Fatalf("Expected no error, got %#v", err)
}
pl := pods.(*api.PodList)
if len(pl.Items) != 1 {
t.Fatalf("Unexpected 0-len pod list: %+v", pl)
}
if e, a := api.PodUnknown, pl.Items[0].Status.Phase; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestListEmptyPodList(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(&api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}})
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewContext()
pods, err := storage.List(ctx, labels.Everything(), labels.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(pods.(*api.PodList).Items) != 0 {
t.Errorf("Unexpected non-zero pod list: %#v", pods)
}
if pods.(*api.PodList).ResourceVersion != "1" {
t.Errorf("Unexpected resource version: %#v", pods)
}
}
func TestListPodList(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pods = &api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
},
},
},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}},
}
ctx := api.NewContext()
podsObj, err := storage.List(ctx, labels.Everything(), labels.Everything())
pods := podsObj.(*api.PodList)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(pods.Items) != 2 {
t.Errorf("Unexpected pod list: %#v", pods)
}
if pods.Items[0].Name != "foo" || pods.Items[0].Status.Phase != api.PodRunning {
t.Errorf("Unexpected pod: %#v", pods.Items[0])
}
if pods.Items[1].Name != "bar" {
t.Errorf("Unexpected pod: %#v", pods.Items[1])
}
}
func TestListPodListSelection(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pods = &api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{Name: "foo"},
}, {
ObjectMeta: api.ObjectMeta{Name: "bar"},
Status: api.PodStatus{Host: "barhost"},
}, {
ObjectMeta: api.ObjectMeta{Name: "baz"},
Status: api.PodStatus{Phase: "bazstatus"},
}, {
ObjectMeta: api.ObjectMeta{
Name: "qux",
Labels: map[string]string{"label": "qux"},
},
}, {
ObjectMeta: api.ObjectMeta{Name: "zot"},
},
},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewContext()
table := []struct {
label, field string
expectedIDs util.StringSet
}{
{
expectedIDs: util.NewStringSet("foo", "bar", "baz", "qux", "zot"),
}, {
field: "name=zot",
expectedIDs: util.NewStringSet("zot"),
}, {
label: "label=qux",
expectedIDs: util.NewStringSet("qux"),
}, {
field: "Status.Phase=bazstatus",
expectedIDs: util.NewStringSet("baz"),
}, {
field: "Status.Host=barhost",
expectedIDs: util.NewStringSet("bar"),
}, {
field: "Status.Host=",
expectedIDs: util.NewStringSet("foo", "baz", "qux", "zot"),
}, {
field: "Status.Host!=",
expectedIDs: util.NewStringSet("bar"),
},
}
for index, item := range table {
label, err := labels.ParseSelector(item.label)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
}
field, err := labels.ParseSelector(item.field)
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
}
podsObj, err := storage.List(ctx, label, field)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
pods := podsObj.(*api.PodList)
if e, a := len(item.expectedIDs), len(pods.Items); e != a {
t.Errorf("%v: Expected %v, got %v", index, e, a)
}
for _, pod := range pods.Items {
if !item.expectedIDs.Has(pod.Name) {
t.Errorf("%v: Unexpected pod %v", index, pod.Name)
}
t.Logf("%v: Got pod Name: %v", index, pod.Name)
}
}
}
func TestPodDecode(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
expected := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
body, err := latest.Codec.Encode(expected)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
actual := storage.New()
if err := latest.Codec.DecodeInto(body, actual); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
}
func TestGetPod(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}},
}
ctx := api.NewContext()
obj, err := storage.Get(ctx, "foo")
pod := obj.(*api.Pod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expect := *podRegistry.Pod
expect.Status.Phase = api.PodRunning
// TODO: when host is moved to spec, remove this line.
expect.Status.Host = "machine"
if e, a := &expect, pod; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a)
}
}
func TestGetPodCacheError(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable},
}
ctx := api.NewContext()
obj, err := storage.Get(ctx, "foo")
pod := obj.(*api.Pod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expect := *podRegistry.Pod
expect.Status.Phase = api.PodUnknown
if e, a := &expect, pod; !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", e, a)
}
}
// TODO: remove, this is covered by RESTTest.TestCreate
func TestPodStorageValidatesCreate(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Err = fmt.Errorf("test error")
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
ctx := api.NewDefaultContext()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"invalid/label/to/cause/validation/failure": "bar",
},
},
}
c, err := storage.Create(ctx, pod)
if c != nil {
t.Errorf("Expected nil channel")
}
if !errors.IsInvalid(err) {
t.Errorf("Expected to get an invalid resource error, got %v", err)
}
}
// TODO: remove, this is covered by RESTTest.TestCreate
func TestCreatePod(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
pod = &api.Pod{
Status: api.PodStatus{
Host: "machine",
Host: "foo",
},
}
storage := REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}
pod := &api.Pod{
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
pod.Name = "foo"
ctx := api.NewDefaultContext()
obj, err := storage.Create(ctx, pod)
if err != nil {
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if obj == nil {
t.Fatalf("unexpected object: %#v", obj)
}
if !api.HasObjectMetaSystemFieldValues(&podRegistry.Pod.ObjectMeta) {
t.Errorf("Expected ObjectMeta field values were populated")
if pod.Status.Phase != api.PodRunning || pod.Status.Host != "foo" {
t.Errorf("unexpected pod: %#v", pod)
}
}
// TODO: remove, this is covered by RESTTest.TestCreate
func TestCreatePodWithConflictingNamespace(t *testing.T) {
storage := REST{}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "not-default"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
ctx := api.NewDefaultContext()
channel, err := storage.Create(ctx, pod)
if channel != nil {
t.Error("Expected a nil channel, but we got a value")
}
if err == nil {
t.Errorf("Expected an error, but we didn't get one")
} else if strings.Contains(err.Error(), "Controller.Namespace does not match the provided context") {
t.Errorf("Expected 'Pod.Namespace does not match the provided context' error, got '%v'", err.Error())
}
}
func TestUpdatePodWithConflictingNamespace(t *testing.T) {
storage := REST{}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "not-default"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
}
ctx := api.NewDefaultContext()
obj, created, err := storage.Update(ctx, pod)
if obj != nil || created {
t.Error("Expected a nil channel, but we got a value or created")
}
if err == nil {
t.Errorf("Expected an error, but we didn't get one")
} else if strings.Index(err.Error(), "Pod.Namespace does not match the provided context") == -1 {
t.Errorf("Expected 'Pod.Namespace does not match the provided context' error, got '%v'", err.Error())
}
}
func TestResourceLocation(t *testing.T) {
expectedIP := "1.2.3.4"
testCases := []struct {
pod api.Pod
query string
location string
}{
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
},
query: "foo",
location: expectedIP,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
},
query: "foo:12345",
location: expectedIP + ":12345",
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr"},
},
},
},
query: "foo",
location: expectedIP,
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}},
},
},
},
query: "foo",
location: expectedIP + ":9376",
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}},
},
},
},
query: "foo:12345",
location: expectedIP + ":12345",
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr1"},
{Name: "ctr2", Ports: []api.Port{{ContainerPort: 9376}}},
},
},
},
query: "foo",
location: expectedIP + ":9376",
},
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr1", Ports: []api.Port{{ContainerPort: 9376}}},
{Name: "ctr2", Ports: []api.Port{{ContainerPort: 1234}}},
},
},
},
query: "foo",
location: expectedIP + ":9376",
},
}
for _, tc := range testCases {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &tc.pod
storage := &REST{
registry: podRegistry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{PodIP: expectedIP}},
}
redirector := apiserver.Redirector(storage)
location, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if location != tc.location {
t.Errorf("Expected %v, but got %v", tc.location, location)
}
}
}
func TestDeletePod(t *testing.T) {
podRegistry := registrytest.NewPodRegistry(nil)
podRegistry.Pod = &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
}
fakeCache := &fakeCache{}
storage := REST{
registry: podRegistry,
podCache: fakeCache,
}
ctx := api.NewDefaultContext()
result, err := storage.Delete(ctx, "foo")
if err != nil {
func TestPodStatusDecoratorError(t *testing.T) {
cache := &fakeCache{errorToReturn: fmt.Errorf("test error")}
pod := &api.Pod{}
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if fakeCache.clearedNamespace != "default" || fakeCache.clearedName != "foo" {
t.Errorf("Unexpeceted cache delete: %s %s %#v", fakeCache.clearedName, fakeCache.clearedNamespace, result)
if pod.Status.Phase != api.PodUnknown {
t.Errorf("unexpected pod: %#v", pod)
}
}
func TestCreate(t *testing.T) {
registry := registrytest.NewPodRegistry(nil)
test := resttest.New(t, &REST{
registry: registry,
podCache: &fakeCache{statusToReturn: &api.PodStatus{}},
}, registry.SetError)
test.TestCreate(
// valid
&api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "test1",
Image: "foo",
ImagePullPolicy: api.PullIfNotPresent,
},
},
RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}},
DNSPolicy: api.DNSClusterFirst,
},
},
// invalid
&api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{},
},
},
)
}