Merge pull request #5854 from fgrzadkowski/delete_pod_cache

Delete pod_cache and rely on updating pod status by kublet.
This commit is contained in:
Saad Ali
2015-03-25 09:17:09 -07:00
11 changed files with 44 additions and 1165 deletions

View File

@@ -85,15 +85,6 @@ func (r *REST) ResourceLocation(ctx api.Context, name string) (*url.URL, http.Ro
return pod.ResourceLocation(r, ctx, name)
}
// WithPodStatus returns a rest object that decorates returned responses with extra
// status information.
func (r *REST) WithPodStatus(cache pod.PodStatusGetter) *REST {
store := *r
store.Decorator = pod.PodStatusDecorator(cache)
store.AfterDelete = rest.AllFuncs(store.AfterDelete, pod.PodStatusReset(cache))
return &store
}
// BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
type BindingREST struct {
store *etcdgeneric.Etcd

View File

@@ -28,7 +28,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
@@ -39,27 +38,6 @@ import (
"github.com/coreos/go-etcd/etcd"
)
type fakeCache struct {
requestedNamespace string
requestedName string
clearedNamespace string
clearedName string
statusToReturn *api.PodStatus
errorToReturn error
}
func (f *fakeCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) {
f.requestedNamespace = namespace
f.requestedName = name
return f.statusToReturn, f.errorToReturn
}
func (f *fakeCache) ClearPodStatus(namespace, name string) {
f.clearedNamespace = namespace
f.clearedName = name
}
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeEtcdClient.TestIndex = true
@@ -70,7 +48,6 @@ func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient, h := newHelper(t)
storage, bindingStorage, statusStorage := NewStorage(h)
storage = storage.WithPodStatus(&fakeCache{statusToReturn: &api.PodStatus{}})
return storage, bindingStorage, statusStorage, fakeEtcdClient, h
}
@@ -113,8 +90,6 @@ func TestStorage(t *testing.T) {
func TestCreate(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
test := resttest.New(t, storage, fakeEtcdClient.SetError)
pod := validNewPod()
pod.ObjectMeta = api.ObjectMeta{}
@@ -133,8 +108,6 @@ func TestCreate(t *testing.T) {
func TestDelete(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
test := resttest.New(t, storage, fakeEtcdClient.SetError)
createFn := func() runtime.Object {
@@ -182,8 +155,6 @@ func TestCreateRegistryError(t *testing.T) {
func TestCreateSetsFields(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
pod := validNewPod()
_, err := storage.Create(api.NewDefaultContext(), pod)
if err != fakeEtcdClient.Err {
@@ -206,8 +177,6 @@ func TestListError(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Err = fmt.Errorf("test error")
storage, _, _ := NewStorage(helper)
cache := &fakeCache{}
storage = storage.WithPodStatus(cache)
pods, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
if err != fakeEtcdClient.Err {
t.Fatalf("Expected %#v, Got %#v", fakeEtcdClient.Err, err)
@@ -217,39 +186,6 @@ func TestListError(t *testing.T) {
}
}
func TestListCacheError(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Data["/registry/pods/default"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
}),
},
},
},
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable}
storage = storage.WithPodStatus(cache)
pods, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("Expected no error, got %#v", err)
}
pl := pods.(*api.PodList)
if len(pl.Items) != 1 {
t.Fatalf("Unexpected 0-len pod list: %+v", pl)
}
if e, a := api.PodUnknown, pl.Items[0].Status.Phase; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestListEmptyPodList(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.ChangeIndex = 1
@@ -259,8 +195,6 @@ func TestListEmptyPodList(t *testing.T) {
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{}
storage = storage.WithPodStatus(cache)
pods, err := storage.List(api.NewContext(), labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
@@ -283,7 +217,7 @@ func TestListPodList(t *testing.T) {
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
Status: api.PodStatus{Phase: api.PodRunning, Host: "machine"},
}),
},
{
@@ -297,8 +231,6 @@ func TestListPodList(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}
storage = storage.WithPodStatus(cache)
podsObj, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
pods := podsObj.(*api.PodList)
@@ -348,8 +280,6 @@ func TestListPodListSelection(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}
storage = storage.WithPodStatus(cache)
ctx := api.NewDefaultContext()
@@ -433,6 +363,7 @@ func TestPodDecode(t *testing.T) {
func TestGet(t *testing.T) {
expect := validNewPod()
expect.Status.Phase = api.PodRunning
expect.Status.Host = "machine"
fakeEtcdClient, helper := newHelper(t)
@@ -444,8 +375,6 @@ func TestGet(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}
storage = storage.WithPodStatus(cache)
obj, err := storage.Get(api.WithNamespace(api.NewContext(), "test"), "foo")
pod := obj.(*api.Pod)
@@ -453,47 +382,16 @@ func TestGet(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
expect.Status.Phase = api.PodRunning
if e, a := expect, pod; !api.Semantic.DeepEqual(e, a) {
t.Errorf("Unexpected pod: %s", util.ObjectDiff(e, a))
}
}
func TestGetCacheError(t *testing.T) {
expect := validNewPod()
expect.Status.Host = "machine"
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Data["/registry/pods/default/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(latest.Codec, expect),
},
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{errorToReturn: client.ErrPodInfoNotAvailable}
storage = storage.WithPodStatus(cache)
obj, err := storage.Get(api.NewDefaultContext(), "foo")
pod := obj.(*api.Pod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expect.Status.Phase = api.PodUnknown
if e, a := expect, pod; !api.Semantic.DeepEqual(e, a) {
t.Errorf("unexpected object: %s", util.ObjectDiff(e, a))
}
}
// TODO: remove, this is covered by RESTTest.TestCreate
func TestPodStorageValidatesCreate(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Err = fmt.Errorf("test error")
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
pod := validNewPod()
pod.Labels = map[string]string{
@@ -512,8 +410,6 @@ func TestPodStorageValidatesCreate(t *testing.T) {
func TestCreatePod(t *testing.T) {
_, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
pod := validNewPod()
obj, err := storage.Create(api.NewDefaultContext(), pod)
@@ -536,8 +432,6 @@ func TestCreatePod(t *testing.T) {
func TestCreateWithConflictingNamespace(t *testing.T) {
_, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
cache := &fakeCache{}
storage = storage.WithPodStatus(cache)
pod := validNewPod()
pod.Namespace = "not-default"
@@ -567,8 +461,6 @@ func TestUpdateWithConflictingNamespace(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{}
storage = storage.WithPodStatus(cache)
pod := validChangedPod()
pod.Namespace = "not-default"
@@ -594,6 +486,7 @@ func TestResourceLocation(t *testing.T) {
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo",
location: expectedIP,
@@ -601,6 +494,7 @@ func TestResourceLocation(t *testing.T) {
{
pod: api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo:12345",
location: expectedIP + ":12345",
@@ -613,6 +507,7 @@ func TestResourceLocation(t *testing.T) {
{Name: "ctr"},
},
},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo",
location: expectedIP,
@@ -625,6 +520,7 @@ func TestResourceLocation(t *testing.T) {
{Name: "ctr", Ports: []api.ContainerPort{{ContainerPort: 9376}}},
},
},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo",
location: expectedIP + ":9376",
@@ -637,6 +533,7 @@ func TestResourceLocation(t *testing.T) {
{Name: "ctr", Ports: []api.ContainerPort{{ContainerPort: 9376}}},
},
},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo:12345",
location: expectedIP + ":12345",
@@ -650,6 +547,7 @@ func TestResourceLocation(t *testing.T) {
{Name: "ctr2", Ports: []api.ContainerPort{{ContainerPort: 9376}}},
},
},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo",
location: expectedIP + ":9376",
@@ -663,6 +561,7 @@ func TestResourceLocation(t *testing.T) {
{Name: "ctr2", Ports: []api.ContainerPort{{ContainerPort: 1234}}},
},
},
Status: api.PodStatus{PodIP: expectedIP},
},
query: "foo",
location: expectedIP + ":9376",
@@ -679,8 +578,6 @@ func TestResourceLocation(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{PodIP: expectedIP}}
storage = storage.WithPodStatus(cache)
redirector := rest.Redirector(storage)
location, _, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query)
@@ -719,16 +616,11 @@ func TestDeletePod(t *testing.T) {
},
}
storage, _, _ := NewStorage(helper)
cache := &fakeCache{statusToReturn: &api.PodStatus{}}
storage = storage.WithPodStatus(cache)
result, err := storage.Delete(api.NewDefaultContext(), "foo", nil)
_, err := storage.Delete(api.NewDefaultContext(), "foo", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if cache.clearedNamespace != "default" || cache.clearedName != "foo" {
t.Fatalf("Unexpected cache delete: %s %s %#v", cache.clearedName, cache.clearedNamespace, result)
}
}
// TestEtcdGetDifferentNamespace ensures same-name pods in different namespaces do not clash

View File

@@ -25,7 +25,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@@ -92,40 +91,6 @@ func (podStatusStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.Val
return validation.ValidatePodStatusUpdate(obj.(*api.Pod), old.(*api.Pod))
}
// PodStatusGetter is an interface used by Pods to fetch and retrieve status info.
type PodStatusGetter interface {
GetPodStatus(namespace, name string) (*api.PodStatus, error)
ClearPodStatus(namespace, name string)
}
// PodStatusDecorator returns a function that updates pod.Status based
// on the provided pod cache.
func PodStatusDecorator(cache PodStatusGetter) rest.ObjectFunc {
return func(obj runtime.Object) error {
pod := obj.(*api.Pod)
host := pod.Status.Host
if status, err := cache.GetPodStatus(pod.Namespace, pod.Name); err != nil {
pod.Status = api.PodStatus{
Phase: api.PodUnknown,
}
} else {
pod.Status = *status
}
pod.Status.Host = host
return nil
}
}
// PodStatusReset returns a function that clears the pod cache when the object
// is deleted.
func PodStatusReset(cache PodStatusGetter) rest.ObjectFunc {
return func(obj runtime.Object) error {
pod := obj.(*api.Pod)
cache.ClearPodStatus(pod.Namespace, pod.Name)
return nil
}
}
// MatchPod returns a generic matcher for a given label and field selector.
func MatchPod(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {

View File

@@ -1,78 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pod
import (
"fmt"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
type fakeCache struct {
requestedNamespace string
requestedName string
clearedNamespace string
clearedName string
statusToReturn *api.PodStatus
errorToReturn error
}
func (f *fakeCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) {
f.requestedNamespace = namespace
f.requestedName = name
return f.statusToReturn, f.errorToReturn
}
func (f *fakeCache) ClearPodStatus(namespace, name string) {
f.clearedNamespace = namespace
f.clearedName = name
}
func TestPodStatusDecorator(t *testing.T) {
cache := &fakeCache{statusToReturn: &api.PodStatus{Phase: api.PodRunning}}
pod := &api.Pod{}
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if pod.Status.Phase != api.PodRunning {
t.Errorf("unexpected pod: %#v", pod)
}
pod = &api.Pod{
Status: api.PodStatus{
Host: "foo",
},
}
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if pod.Status.Phase != api.PodRunning || pod.Status.Host != "foo" {
t.Errorf("unexpected pod: %#v", pod)
}
}
func TestPodStatusDecoratorError(t *testing.T) {
cache := &fakeCache{errorToReturn: fmt.Errorf("test error")}
pod := &api.Pod{}
if err := PodStatusDecorator(cache)(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if pod.Status.Phase != api.PodUnknown {
t.Errorf("unexpected pod: %#v", pod)
}
}