Move watch filter into storage level

This commit is contained in:
Deyuan Deng 2014-11-28 17:28:42 -05:00
parent 2261c3e71c
commit 76552423f9
8 changed files with 250 additions and 51 deletions

View File

@ -18,13 +18,14 @@ package controller
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// Registry is an interface for things that know how to store ReplicationControllers.
type Registry interface {
ListControllers(ctx api.Context) (*api.ReplicationControllerList, error)
WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error)
WatchControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error)
CreateController(ctx api.Context, controller *api.ReplicationController) error
UpdateController(ctx api.Context, controller *api.ReplicationController) error

View File

@ -146,41 +146,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE
// Watch returns ReplicationController events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
if !field.Empty() {
return nil, fmt.Errorf("no field selector implemented for controllers")
}
incoming, err := rs.registry.WatchControllers(ctx, resourceVersion)
if err != nil {
return nil, err
}
// TODO(lavalamp): remove watch.Filter, which is broken. Implement consistent way of filtering.
// TODO(lavalamp): this watch method needs a test.
return watch.Filter(incoming, func(e watch.Event) (watch.Event, bool) {
controller, ok := e.Object.(*api.ReplicationController)
if !ok {
// must be an error event-- pass it on
return e, true
}
match := label.Matches(labels.Set(controller.Labels))
if match {
rs.fillCurrentState(ctx, controller)
}
return e, match
}), nil
}
func (rs *REST) waitForController(ctx api.Context, controller *api.ReplicationController) (runtime.Object, error) {
for {
pods, err := rs.podLister.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
if err != nil {
return controller, err
}
if len(pods.Items) == controller.Spec.Replicas {
break
}
time.Sleep(rs.pollPeriod)
}
return controller, nil
return rs.registry.WatchControllers(ctx, label, field, resourceVersion)
}
func (rs *REST) fillCurrentState(ctx api.Context, controller *api.ReplicationController) error {

View File

@ -122,20 +122,20 @@ func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool
}
// WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) {
func (r *Registry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod")
if err != nil {
return nil, err
}
key := makePodListKey(ctx)
return r.WatchList(key, version, func(obj runtime.Object) bool {
switch t := obj.(type) {
case *api.Pod:
return filter(t)
default:
// Must be an error
podObj, ok := obj.(*api.Pod)
if !ok {
// Must be an error: return true to propagate to upper level.
return true
}
fields := pod.PodToSelectableFields(podObj)
return label.Matches(labels.Set(podObj.Labels)) && field.Matches(fields)
})
}
@ -327,13 +327,28 @@ func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerL
}
// WatchControllers begins watching for new, changed, or deleted controllers.
func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) {
func (r *Registry) WatchControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
if !field.Empty() {
return nil, fmt.Errorf("field selectors are not supported on replication controllers")
}
version, err := tools.ParseWatchResourceVersion(resourceVersion, "replicationControllers")
if err != nil {
return nil, err
}
key := makeControllerListKey(ctx)
return r.WatchList(key, version, tools.Everything)
return r.WatchList(key, version, func(obj runtime.Object) bool {
controller, ok := obj.(*api.ReplicationController)
if !ok {
// Must be an error: return true to propagate to upper level.
return true
}
match := label.Matches(labels.Set(controller.Labels))
if match {
pods, _ := r.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
controller.Status.Replicas = len(pods.Items)
}
return match
})
}
// makeControllerListKey constructs etcd paths to controller directories enforcing namespace rules.

View File

@ -21,6 +21,7 @@ import (
"strconv"
"strings"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@ -705,6 +706,112 @@ func TestEtcdListPods(t *testing.T) {
}
}
func TestEtcdWatchPods(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.Everything(),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchPodsMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: map[string]string{
"name": "foo",
},
},
}
podBytes, _ := latest.Codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
func TestEtcdWatchPodsNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
podBytes, _ := latest.Codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}
func TestEtcdListControllersNotFound(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
ctx := api.NewDefaultContext()
@ -934,6 +1041,114 @@ func TestEtcdUpdateController(t *testing.T) {
}
}
func TestEtcdWatchController(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchControllers(ctx,
labels.Everything(),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchControllersMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.ExpectNotFoundGet(makePodListKey(ctx))
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchControllers(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
controller := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: map[string]string{
"name": "foo",
},
},
}
controllerBytes, _ := latest.Codec.Encode(controller)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(controllerBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
func TestEtcdWatchControllersNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.ExpectNotFoundGet(makePodListKey(ctx))
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchControllers(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
controller := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
controllerBytes, _ := latest.Codec.Encode(controller)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(controllerBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}
func TestEtcdListServices(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)

View File

@ -29,7 +29,7 @@ type Registry interface {
// ListPodsPredicate obtains a list of pods for which filter returns true.
ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error)
// Watch for new/changed/deleted pods
WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error)
WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
// Get a specific pod
GetPod(ctx api.Context, podID string) (*api.Pod, error)
// Create a pod based on a specification.

View File

@ -137,7 +137,7 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
return pod, err
}
func (rs *REST) podToSelectableFields(pod *api.Pod) labels.Set {
func PodToSelectableFields(pod *api.Pod) labels.Set {
// TODO we are populating both Status and DesiredState because selectors are not aware of API versions
// see https://github.com/GoogleCloudPlatform/kubernetes/pull/2503
@ -158,7 +158,7 @@ func (rs *REST) podToSelectableFields(pod *api.Pod) labels.Set {
// ListPods & WatchPods.
func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool {
return func(pod *api.Pod) bool {
fields := rs.podToSelectableFields(pod)
fields := PodToSelectableFields(pod)
return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
}
}
@ -184,7 +184,8 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj
// Watch begins watching for new, changed, or deleted pods.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.WatchPods(ctx, resourceVersion, rs.filterFunc(label, field))
// TODO: Add pod status to watch command
return rs.registry.WatchPods(ctx, label, field, resourceVersion)
}
func (*REST) New() runtime.Object {

View File

@ -18,6 +18,7 @@ package registrytest
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
@ -47,6 +48,6 @@ func (r *ControllerRegistry) DeleteController(ctx api.Context, ID string) error
return r.Err
}
func (r *ControllerRegistry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) {
func (r *ControllerRegistry) WatchControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return nil, r.Err
}

View File

@ -63,9 +63,9 @@ func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api.
})
}
func (r *PodRegistry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) {
// TODO: wire filter down into the broadcaster; it needs access to current and previous state :(
func (r *PodRegistry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return r.broadcaster.Watch(), nil
}
func (r *PodRegistry) GetPod(ctx api.Context, podId string) (*api.Pod, error) {