diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index f926b85b2f9..862944439b2 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -88,7 +88,7 @@ type SimpleRESTStorage struct { injectedFunction func(obj runtime.Object) (returnObj runtime.Object, err error) } -func (storage *SimpleRESTStorage) List(labels.Selector) (runtime.Object, error) { +func (storage *SimpleRESTStorage) List(label, field labels.Selector) (runtime.Object, error) { result := &SimpleList{ Items: storage.list, } diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index c2a4f2c5751..0c1fb2cce6e 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -30,8 +30,7 @@ type RESTStorage interface { New() runtime.Object // List selects resources in the storage which match to the selector. - // TODO: add field selector in addition to label selector. - List(labels.Selector) (runtime.Object, error) + List(label, field labels.Selector) (runtime.Object, error) // Get finds a resource in the storage by id and returns it. // Although it can return an arbitrary error value, IsNotFound(err) is true for the diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 4b6977b9e8d..03208acb143 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -70,12 +70,17 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt case "GET": switch len(parts) { case 1: - selector, err := labels.ParseSelector(req.URL.Query().Get("labels")) + label, err := labels.ParseSelector(req.URL.Query().Get("labels")) if err != nil { errorJSON(err, h.codec, w) return } - list, err := storage.List(selector) + field, err := labels.ParseSelector(req.URL.Query().Get("fields")) + if err != nil { + errorJSON(err, h.codec, w) + return + } + list, err := storage.List(label, field) if err != nil { errorJSON(err, h.codec, w) return diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index 07463e72c35..40261651974 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -116,6 +116,23 @@ func (f *FIFO) Pop() interface{} { } } +// Replace will delete the contents of 'f', using instead the given map. +// 'f' takes ownersip of the map, you should not reference the map again +// after calling this function. f's queue is reset, too; upon return, it +// will contain the items in the map, in no particular order. +func (f *FIFO) Replace(idToObj map[string]interface{}) { + f.lock.Lock() + defer f.lock.Unlock() + f.items = idToObj + f.queue = f.queue[:0] + for id := range idToObj { + f.queue = append(f.queue, id) + } + if len(f.queue) > 0 { + f.cond.Broadcast() + } +} + // NewFIFO returns a Store which can be used to queue up items to // process. func NewFIFO() *FIFO { diff --git a/pkg/client/cache/fifo_test.go b/pkg/client/cache/fifo_test.go index f86360aa327..e4a4a4f88d7 100644 --- a/pkg/client/cache/fifo_test.go +++ b/pkg/client/cache/fifo_test.go @@ -81,3 +81,29 @@ func TestFIFO_addUpdate(t *testing.T) { t.Errorf("item did not get removed") } } + +func TestFIFO_addReplace(t *testing.T) { + f := NewFIFO() + f.Add("foo", 10) + f.Replace(map[string]interface{}{"foo": 15}) + got := make(chan int, 2) + go func() { + for { + got <- f.Pop().(int) + } + }() + + first := <-got + if e, a := 15, first; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected) + case <-time.After(50 * time.Millisecond): + } + _, exists := f.Get("foo") + if exists { + t.Errorf("item did not get removed") + } +} diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 31a774410f9..1e309975048 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "fmt" "reflect" "time" @@ -26,58 +27,106 @@ import ( "github.com/golang/glog" ) +// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. +type ListerWatcher interface { + // List should return a list type object; the Items field will be extracted, and the + // ResourceVersion field will be used to start the watch in the right place. + List() (runtime.Object, error) + // Watch should begin a watch at the specified version. + Watch(resourceVersion uint64) (watch.Interface, error) +} + // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // The type of object we expect to place in the store. expectedType reflect.Type // The destination to sync up with the watch source store Store - // watchFactory is called to initiate watches. - watchFactory WatchFactory + // listerWatcher is used to perform lists and watches. + listerWatcher ListerWatcher // period controls timing between one watch ending and // the beginning of the next one. period time.Duration } -// WatchFactory should begin a watch at the specified version. -type WatchFactory func(resourceVersion uint64) (watch.Interface, error) - // NewReflector creates a new Reflector object which will keep the given store up to // date with the server's contents for the given resource. Reflector promises to // only put things in the store that have the type of expectedType. -func NewReflector(watchFactory WatchFactory, expectedType interface{}, store Store) *Reflector { - gc := &Reflector{ - watchFactory: watchFactory, - store: store, - expectedType: reflect.TypeOf(expectedType), - period: time.Second, +func NewReflector(lw ListerWatcher, expectedType interface{}, store Store) *Reflector { + r := &Reflector{ + listerWatcher: lw, + store: store, + expectedType: reflect.TypeOf(expectedType), + period: time.Second, } - return gc + return r } // Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run starts a goroutine and returns immediately. -func (gc *Reflector) Run() { +func (r *Reflector) Run() { + go util.Forever(func() { r.listAndWatch() }, r.period) +} + +func (r *Reflector) listAndWatch() { var resourceVersion uint64 - go util.Forever(func() { - w, err := gc.watchFactory(resourceVersion) + + list, err := r.listerWatcher.List() + if err != nil { + glog.Errorf("Failed to list %v: %v", r.expectedType, err) + return + } + jsonBase, err := runtime.FindJSONBase(list) + if err != nil { + glog.Errorf("Unable to understand list result %#v", list) + return + } + resourceVersion = jsonBase.ResourceVersion() + items, err := runtime.ExtractList(list) + if err != nil { + glog.Errorf("Unable to understand list result %#v (%v)", list, err) + return + } + err = r.syncWith(items) + if err != nil { + glog.Errorf("Unable to sync list result: %v", err) + return + } + + for { + w, err := r.listerWatcher.Watch(resourceVersion) if err != nil { - glog.Errorf("failed to watch %v: %v", gc.expectedType, err) + glog.Errorf("failed to watch %v: %v", r.expectedType, err) return } - gc.watchHandler(w, &resourceVersion) - }, gc.period) + r.watchHandler(w, &resourceVersion) + } +} + +// syncWith replaces the store's items with the given list. +func (r *Reflector) syncWith(items []runtime.Object) error { + found := map[string]interface{}{} + for _, item := range items { + jsonBase, err := runtime.FindJSONBase(item) + if err != nil { + return fmt.Errorf("unexpected item in list: %v", err) + } + found[jsonBase.ID()] = item + } + + r.store.Replace(found) + return nil } // watchHandler watches w and keeps *resourceVersion up to date. -func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { +func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { for { event, ok := <-w.ResultChan() if !ok { glog.Errorf("unexpected watch close") return } - if e, a := gc.expectedType, reflect.TypeOf(event.Object); e != a { + if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { glog.Errorf("expected type %v, but watch event object had type %v", e, a) continue } @@ -88,14 +137,14 @@ func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { } switch event.Type { case watch.Added: - gc.store.Add(jsonBase.ID(), event.Object) + r.store.Add(jsonBase.ID(), event.Object) case watch.Modified: - gc.store.Update(jsonBase.ID(), event.Object) + r.store.Update(jsonBase.ID(), event.Object) case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. - gc.store.Delete(jsonBase.ID()) + r.store.Delete(jsonBase.ID()) default: glog.Errorf("unable to understand watch event %#v", event) } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 89e5b7386da..60963d793af 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -17,15 +17,27 @@ limitations under the License. package cache import ( + "fmt" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) +type testLW struct { + ListFunc func() (runtime.Object, error) + WatchFunc func(resourceVersion uint64) (watch.Interface, error) +} + +func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() } +func (t *testLW) Watch(resourceVersion uint64) (watch.Interface, error) { + return t.WatchFunc(resourceVersion) +} + func TestReflector_watchHandler(t *testing.T) { s := NewStore() - g := NewReflector(nil, &api.Pod{}, s) + g := NewReflector(&testLW{}, &api.Pod{}, s) fw := watch.NewFake() s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}}) @@ -68,27 +80,32 @@ func TestReflector_watchHandler(t *testing.T) { } } -func TestReflector_Run(t *testing.T) { +func TestReflector_listAndWatch(t *testing.T) { createdFakes := make(chan *watch.FakeWatcher) - // Expect our starter to get called at the beginning of the watch with 0, and again with 3 when we - // inject an error at 2. - expectedRVs := []uint64{0, 3} - watchStarter := func(rv uint64) (watch.Interface, error) { - fw := watch.NewFake() - if e, a := expectedRVs[0], rv; e != a { - t.Errorf("Expected rv %v, but got %v", e, a) - } - expectedRVs = expectedRVs[1:] - // channel is not buffered because the for loop below needs to block. But - // we don't want to block here, so report the new fake via a go routine. - go func() { createdFakes <- fw }() - return fw, nil + // The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc + // to get called at the beginning of the watch with 1, and again with 4 when we + // inject an error at 3. + expectedRVs := []uint64{1, 4} + lw := &testLW{ + WatchFunc: func(rv uint64) (watch.Interface, error) { + fw := watch.NewFake() + if e, a := expectedRVs[0], rv; e != a { + t.Errorf("Expected rv %v, but got %v", e, a) + } + expectedRVs = expectedRVs[1:] + // channel is not buffered because the for loop below needs to block. But + // we don't want to block here, so report the new fake via a go routine. + go func() { createdFakes <- fw }() + return fw, nil + }, + ListFunc: func() (runtime.Object, error) { + return &api.PodList{JSONBase: api.JSONBase{ResourceVersion: 1}}, nil + }, } s := NewFIFO() - r := NewReflector(watchStarter, &api.Pod{}, s) - r.period = 0 - r.Run() + r := NewReflector(lw, &api.Pod{}, s) + go r.listAndWatch() ids := []string{"foo", "bar", "baz", "qux", "zoo"} var fw *watch.FakeWatcher @@ -96,9 +113,9 @@ func TestReflector_Run(t *testing.T) { if fw == nil { fw = <-createdFakes } - sendingRV := uint64(i + 1) + sendingRV := uint64(i + 2) fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: sendingRV}}) - if sendingRV == 2 { + if sendingRV == 3 { // Inject a failure. fw.Stop() fw = nil @@ -111,7 +128,7 @@ func TestReflector_Run(t *testing.T) { if e, a := id, pod.ID; e != a { t.Errorf("%v: Expected %v, got %v", i, e, a) } - if e, a := uint64(i+1), pod.ResourceVersion; e != a { + if e, a := uint64(i+2), pod.ResourceVersion; e != a { t.Errorf("%v: Expected %v, got %v", i, e, a) } } @@ -120,3 +137,91 @@ func TestReflector_Run(t *testing.T) { t.Error("called watchStarter an unexpected number of times") } } + +func TestReflector_listAndWatchWithErrors(t *testing.T) { + mkPod := func(id string, rv uint64) *api.Pod { + return &api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: rv}} + } + mkList := func(rv uint64, pods ...*api.Pod) *api.PodList { + list := &api.PodList{JSONBase: api.JSONBase{ResourceVersion: rv}} + for _, pod := range pods { + list.Items = append(list.Items, *pod) + } + return list + } + table := []struct { + list *api.PodList + listErr error + events []watch.Event + watchErr error + }{ + { + list: mkList(1), + events: []watch.Event{ + {watch.Added, mkPod("foo", 2)}, + {watch.Added, mkPod("bar", 3)}, + }, + }, { + list: mkList(3, mkPod("foo", 2), mkPod("bar", 3)), + events: []watch.Event{ + {watch.Deleted, mkPod("foo", 4)}, + {watch.Added, mkPod("qux", 5)}, + }, + }, { + listErr: fmt.Errorf("a list error"), + }, { + list: mkList(5, mkPod("bar", 3), mkPod("qux", 5)), + watchErr: fmt.Errorf("a watch error"), + }, { + list: mkList(5, mkPod("bar", 3), mkPod("qux", 5)), + events: []watch.Event{ + {watch.Added, mkPod("baz", 6)}, + }, + }, { + list: mkList(6, mkPod("bar", 3), mkPod("qux", 5), mkPod("baz", 6)), + }, + } + + s := NewFIFO() + for line, item := range table { + if item.list != nil { + // Test that the list is what currently exists in the store. + current := s.List() + checkMap := map[string]uint64{} + for _, item := range current { + pod := item.(*api.Pod) + checkMap[pod.ID] = pod.ResourceVersion + } + for _, pod := range item.list.Items { + if e, a := pod.ResourceVersion, checkMap[pod.ID]; e != a { + t.Errorf("%v: expected %v, got %v for pod %v", line, e, a, pod.ID) + } + } + if e, a := len(item.list.Items), len(checkMap); e != a { + t.Errorf("%v: expected %v, got %v", line, e, a) + } + } + watchRet, watchErr := item.events, item.watchErr + lw := &testLW{ + WatchFunc: func(rv uint64) (watch.Interface, error) { + if watchErr != nil { + return nil, watchErr + } + watchErr = fmt.Errorf("second watch") + fw := watch.NewFake() + go func() { + for _, e := range watchRet { + fw.Action(e.Type, e.Object) + } + fw.Stop() + }() + return fw, nil + }, + ListFunc: func() (runtime.Object, error) { + return item.list, item.listErr + }, + } + r := NewReflector(lw, &api.Pod{}, s) + r.listAndWatch() + } +} diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 4b27aa29099..ebf0a2ce953 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -33,6 +33,11 @@ type Store interface { List() []interface{} Contains() util.StringSet Get(id string) (item interface{}, exists bool) + + // Replace will delete the contents of the store, using instead the + // given map. Store takes ownership of the map, you should not reference + // it after calling this function. + Replace(idToObj map[string]interface{}) } type cache struct { @@ -95,6 +100,15 @@ func (c *cache) Get(id string) (item interface{}, exists bool) { return item, exists } +// Replace will delete the contents of 'c', using instead the given map. +// 'c' takes ownership of the map, you should not reference the map again +// after calling this function. +func (c *cache) Replace(idToObj map[string]interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + c.items = idToObj +} + // NewStore returns a Store implemented simply with a map and a lock. func NewStore() Store { return &cache{items: map[string]interface{}{}} diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go index 4220d284f4a..7e9af6eef91 100644 --- a/pkg/client/cache/store_test.go +++ b/pkg/client/cache/store_test.go @@ -45,28 +45,58 @@ func doTestStore(t *testing.T, store Store) { t.Errorf("found deleted item??") } - // Test List + // Test List. store.Add("a", "b") store.Add("c", "d") store.Add("e", "e") - found := util.StringSet{} - for _, item := range store.List() { - found.Insert(item.(string)) - } - if !found.HasAll("b", "d", "e") { - t.Errorf("missing items") - } - if len(found) != 3 { - t.Errorf("extra items") + { + found := util.StringSet{} + for _, item := range store.List() { + found.Insert(item.(string)) + } + if !found.HasAll("b", "d", "e") { + t.Errorf("missing items") + } + if len(found) != 3 { + t.Errorf("extra items") + } + + // Check that ID list is correct. + ids := store.Contains() + if !ids.HasAll("a", "c", "e") { + t.Errorf("missing items") + } + if len(ids) != 3 { + t.Errorf("extra items") + } } - // Check that ID list is correct. - ids := store.Contains() - if !ids.HasAll("a", "c", "e") { - t.Errorf("missing items") - } - if len(ids) != 3 { - t.Errorf("extra items") + // Test Replace. + store.Replace(map[string]interface{}{ + "foo": "foo", + "bar": "bar", + }) + + { + found := util.StringSet{} + for _, item := range store.List() { + found.Insert(item.(string)) + } + if !found.HasAll("foo", "bar") { + t.Errorf("missing items") + } + if len(found) != 2 { + t.Errorf("extra items") + } + + // Check that ID list is correct. + ids := store.Contains() + if !ids.HasAll("foo", "bar") { + t.Errorf("missing items") + } + if len(ids) != 2 { + t.Errorf("extra items") + } } } diff --git a/pkg/registry/binding/rest.go b/pkg/registry/binding/rest.go index 980ecf4a8ab..bfc7b6e150f 100644 --- a/pkg/registry/binding/rest.go +++ b/pkg/registry/binding/rest.go @@ -41,7 +41,7 @@ func NewREST(bindingRegistry Registry) *REST { } // List returns an error because bindings are write-only objects. -func (*REST) List(selector labels.Selector) (runtime.Object, error) { +func (*REST) List(label, field labels.Selector) (runtime.Object, error) { return nil, errors.NewNotFound("binding", "list") } diff --git a/pkg/registry/binding/rest_test.go b/pkg/registry/binding/rest_test.go index 3bbc216f287..4a1b8f30077 100644 --- a/pkg/registry/binding/rest_test.go +++ b/pkg/registry/binding/rest_test.go @@ -65,7 +65,7 @@ func TestRESTUnsupported(t *testing.T) { if _, err := b.Get("binding id"); err == nil { t.Errorf("unexpected non-error") } - if _, err := b.List(labels.Set{"name": "foo"}.AsSelector()); err == nil { + if _, err := b.List(labels.Set{"name": "foo"}.AsSelector(), labels.Everything()); err == nil { t.Errorf("unexpected non-error") } // Try sending wrong object just to get 100% coverage diff --git a/pkg/registry/controller/rest.go b/pkg/registry/controller/rest.go index d494c7791af..cfd39bcbcf7 100644 --- a/pkg/registry/controller/rest.go +++ b/pkg/registry/controller/rest.go @@ -97,14 +97,17 @@ func (rs *REST) Get(id string) (runtime.Object, error) { } // List obtains a list of ReplicationControllers that match selector. -func (rs *REST) List(selector labels.Selector) (runtime.Object, error) { +func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { + if !field.Empty() { + return nil, fmt.Errorf("field selector not supported yet") + } controllers, err := rs.registry.ListControllers() if err != nil { return nil, err } filtered := []api.ReplicationController{} for _, controller := range controllers.Items { - if selector.Matches(labels.Set(controller.Labels)) { + if label.Matches(labels.Set(controller.Labels)) { rs.fillCurrentState(&controller) filtered = append(filtered, controller) } diff --git a/pkg/registry/controller/rest_test.go b/pkg/registry/controller/rest_test.go index 8768d7847b6..ee8f298777d 100644 --- a/pkg/registry/controller/rest_test.go +++ b/pkg/registry/controller/rest_test.go @@ -39,7 +39,7 @@ func TestListControllersError(t *testing.T) { storage := REST{ registry: &mockRegistry, } - controllers, err := storage.List(nil) + controllers, err := storage.List(labels.Everything(), labels.Everything()) if err != mockRegistry.Err { t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err) } @@ -53,7 +53,7 @@ func TestListEmptyControllerList(t *testing.T) { storage := REST{ registry: &mockRegistry, } - controllers, err := storage.List(labels.Everything()) + controllers, err := storage.List(labels.Everything(), labels.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -86,7 +86,7 @@ func TestListControllerList(t *testing.T) { storage := REST{ registry: &mockRegistry, } - controllersObj, err := storage.List(labels.Everything()) + controllersObj, err := storage.List(labels.Everything(), labels.Everything()) controllers := controllersObj.(*api.ReplicationControllerList) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/pkg/registry/endpoint/rest.go b/pkg/registry/endpoint/rest.go index 554dadd21eb..7e620dfcde0 100644 --- a/pkg/registry/endpoint/rest.go +++ b/pkg/registry/endpoint/rest.go @@ -43,9 +43,9 @@ func (rs *REST) Get(id string) (runtime.Object, error) { } // List satisfies the RESTStorage interface. -func (rs *REST) List(selector labels.Selector) (runtime.Object, error) { - if !selector.Empty() { - return nil, errors.New("label selectors are not supported on endpoints") +func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { + if !label.Empty() || !field.Empty() { + return nil, errors.New("label/field selectors are not supported on endpoints") } return rs.registry.ListEndpoints() } diff --git a/pkg/registry/endpoint/rest_test.go b/pkg/registry/endpoint/rest_test.go index e14e9aaa120..978babb0e9f 100644 --- a/pkg/registry/endpoint/rest_test.go +++ b/pkg/registry/endpoint/rest_test.go @@ -79,7 +79,7 @@ func TestEndpointsRegistryList(t *testing.T) { {JSONBase: api.JSONBase{ID: "bar"}}, }, } - s, _ := storage.List(labels.Everything()) + s, _ := storage.List(labels.Everything(), labels.Everything()) sl := s.(*api.EndpointsList) if len(sl.Items) != 2 { t.Fatalf("Expected 2 endpoints, but got %v", len(sl.Items)) diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index c74b78a9fa5..e95c0e11abd 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -60,8 +60,15 @@ func makePodKey(podID string) string { return "/registry/pods/" + podID } -// ListPods obtains a list of pods that match selector. +// ListPods obtains a list of pods with labels that match selector. func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) { + return r.ListPodsPredicate(func(pod *api.Pod) bool { + return selector.Matches(labels.Set(pod.Labels)) + }) +} + +// ListPodsPredicate obtains a list of pods that match filter. +func (r *Registry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) { allPods := api.PodList{} err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion) if err != nil { @@ -69,7 +76,7 @@ func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) { } filtered := []api.Pod{} for _, pod := range allPods.Items { - if selector.Matches(labels.Set(pod.Labels)) { + if filter(&pod) { // TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets // the CurrentState.Host and Status fields. Here we pretend that reality perfectly // matches our desires. diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index e66a0f9b6e1..7b512f68d6e 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -86,7 +86,7 @@ func (rs *REST) Get(id string) (runtime.Object, error) { return rs.toApiMinion(id), err } -func (rs *REST) List(selector labels.Selector) (runtime.Object, error) { +func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { nameList, err := rs.registry.List() if err != nil { return nil, err diff --git a/pkg/registry/minion/rest_test.go b/pkg/registry/minion/rest_test.go index d1a71c883e9..3b4c79265c8 100644 --- a/pkg/registry/minion/rest_test.go +++ b/pkg/registry/minion/rest_test.go @@ -67,7 +67,7 @@ func TestMinionREST(t *testing.T) { t.Errorf("delete returned wrong error") } - list, err := ms.List(labels.Everything()) + list, err := ms.List(labels.Everything(), labels.Everything()) if err != nil { t.Errorf("got error calling List") } diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index ef12477e060..876e9118208 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -24,8 +24,10 @@ import ( // Registry is an interface implemented by things that know how to store Pod objects. type Registry interface { - // ListPods obtains a list of pods that match selector. + // ListPods obtains a list of pods having labels which match selector. ListPods(selector labels.Selector) (*api.PodList, error) + // ListPodsPredicate obtains a list of pods for which filter returns true. + ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) // Watch for new/changed/deleted pods WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) // Get a specific pod diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index d3493f73374..1d74c810dc1 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -114,8 +114,25 @@ func (rs *REST) Get(id string) (runtime.Object, error) { return pod, err } -func (rs *REST) List(selector labels.Selector) (runtime.Object, error) { - pods, err := rs.registry.ListPods(selector) +func (rs *REST) podToSelectableFields(pod *api.Pod) labels.Set { + return labels.Set{ + "ID": pod.ID, + "DesiredState.Status": string(pod.DesiredState.Status), + "DesiredState.Host": pod.DesiredState.Host, + } +} + +// filterFunc returns a predicate based on label & field selectors that can be passed to registry's +// ListPods & WatchPods. +func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool { + return func(pod *api.Pod) bool { + fields := rs.podToSelectableFields(pod) + return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) + } +} + +func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { + pods, err := rs.registry.ListPodsPredicate(rs.filterFunc(label, field)) if err == nil { for i := range pods.Items { pod := &pods.Items[i] @@ -133,14 +150,7 @@ func (rs *REST) List(selector labels.Selector) (runtime.Object, error) { // Watch begins watching for new, changed, or deleted pods. func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - return rs.registry.WatchPods(resourceVersion, func(pod *api.Pod) bool { - fields := labels.Set{ - "ID": pod.ID, - "DesiredState.Status": string(pod.DesiredState.Status), - "DesiredState.Host": pod.DesiredState.Host, - } - return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) - }) + return rs.registry.WatchPods(resourceVersion, rs.filterFunc(label, field)) } func (*REST) New() runtime.Object { diff --git a/pkg/registry/pod/rest_test.go b/pkg/registry/pod/rest_test.go index 3e3c5571833..43a3cf89281 100644 --- a/pkg/registry/pod/rest_test.go +++ b/pkg/registry/pod/rest_test.go @@ -30,6 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/fsouza/go-dockerclient" ) @@ -130,7 +131,7 @@ func TestListPodsError(t *testing.T) { storage := REST{ registry: podRegistry, } - pods, err := storage.List(labels.Everything()) + pods, err := storage.List(labels.Everything(), labels.Everything()) if err != podRegistry.Err { t.Errorf("Expected %#v, Got %#v", podRegistry.Err, err) } @@ -144,7 +145,7 @@ func TestListEmptyPodList(t *testing.T) { storage := REST{ registry: podRegistry, } - pods, err := storage.List(labels.Everything()) + pods, err := storage.List(labels.Everything(), labels.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -176,7 +177,7 @@ func TestListPodList(t *testing.T) { storage := REST{ registry: podRegistry, } - podsObj, err := storage.List(labels.Everything()) + podsObj, err := storage.List(labels.Everything(), labels.Everything()) pods := podsObj.(*api.PodList) if err != nil { t.Errorf("unexpected error: %v", err) @@ -193,6 +194,86 @@ func TestListPodList(t *testing.T) { } } +func TestListPodListSelection(t *testing.T) { + podRegistry := registrytest.NewPodRegistry(nil) + podRegistry.Pods = &api.PodList{ + Items: []api.Pod{ + { + JSONBase: api.JSONBase{ID: "foo"}, + }, { + JSONBase: api.JSONBase{ID: "bar"}, + DesiredState: api.PodState{Host: "barhost"}, + }, { + JSONBase: api.JSONBase{ID: "baz"}, + DesiredState: api.PodState{Status: "bazstatus"}, + }, { + JSONBase: api.JSONBase{ID: "qux"}, + Labels: map[string]string{"label": "qux"}, + }, { + JSONBase: api.JSONBase{ID: "zot"}, + }, + }, + } + storage := REST{ + registry: podRegistry, + } + + table := []struct { + label, field string + expectedIDs util.StringSet + }{ + { + expectedIDs: util.NewStringSet("foo", "bar", "baz", "qux", "zot"), + }, { + field: "ID=zot", + expectedIDs: util.NewStringSet("zot"), + }, { + label: "label=qux", + expectedIDs: util.NewStringSet("qux"), + }, { + field: "DesiredState.Status=bazstatus", + expectedIDs: util.NewStringSet("baz"), + }, { + field: "DesiredState.Host=barhost", + expectedIDs: util.NewStringSet("bar"), + }, { + field: "DesiredState.Host=", + expectedIDs: util.NewStringSet("foo", "baz", "qux", "zot"), + }, { + field: "DesiredState.Host!=", + expectedIDs: util.NewStringSet("bar"), + }, + } + + for index, item := range table { + label, err := labels.ParseSelector(item.label) + if err != nil { + t.Errorf("unexpected error: %v", err) + continue + } + field, err := labels.ParseSelector(item.field) + if err != nil { + t.Errorf("unexpected error: %v", err) + continue + } + podsObj, err := storage.List(label, field) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + pods := podsObj.(*api.PodList) + + if e, a := len(item.expectedIDs), len(pods.Items); e != a { + t.Errorf("%v: Expected %v, got %v", index, e, a) + } + for _, pod := range pods.Items { + if !item.expectedIDs.Has(pod.ID) { + t.Errorf("%v: Unexpected pod %v", index, pod.ID) + } + t.Logf("%v: Got pod ID: %v", index, pod.ID) + } + } +} + func TestPodDecode(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) storage := REST{ diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index 27f33ff74c6..0634ac1a770 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -40,7 +40,7 @@ func NewPodRegistry(pods *api.PodList) *PodRegistry { } } -func (r *PodRegistry) ListPods(selector labels.Selector) (*api.PodList, error) { +func (r *PodRegistry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) { r.Lock() defer r.Unlock() if r.Err != nil { @@ -48,7 +48,7 @@ func (r *PodRegistry) ListPods(selector labels.Selector) (*api.PodList, error) { } var filtered []api.Pod for _, pod := range r.Pods.Items { - if selector.Matches(labels.Set(pod.Labels)) { + if filter(&pod) { filtered = append(filtered, pod) } } @@ -57,6 +57,12 @@ func (r *PodRegistry) ListPods(selector labels.Selector) (*api.PodList, error) { return &pods, nil } +func (r *PodRegistry) ListPods(selector labels.Selector) (*api.PodList, error) { + return r.ListPodsPredicate(func(pod *api.Pod) bool { + return selector.Matches(labels.Set(pod.Labels)) + }) +} + func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { // TODO: wire filter down into the mux; it needs access to current and previous state :( return r.mux.Watch(), nil diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 14740cec876..b0315af43ac 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -113,14 +113,15 @@ func (rs *REST) Get(id string) (runtime.Object, error) { return s, err } -func (rs *REST) List(selector labels.Selector) (runtime.Object, error) { +// TODO: implement field selector? +func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) { list, err := rs.registry.ListServices() if err != nil { return nil, err } var filtered []api.Service for _, service := range list.Items { - if selector.Matches(labels.Set(service.Labels)) { + if label.Matches(labels.Set(service.Labels)) { filtered = append(filtered, service) } } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 9ec77bcb4eb..b9d3d99585e 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -319,7 +319,7 @@ func TestServiceRegistryList(t *testing.T) { Selector: map[string]string{"bar2": "baz2"}, }) registry.List.ResourceVersion = 1 - s, _ := storage.List(labels.Everything()) + s, _ := storage.List(labels.Everything(), labels.Everything()) sl := s.(*api.ServiceList) if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index e63be7713fb..c67aa863785 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -43,19 +44,19 @@ type ConfigFactory struct { func (factory *ConfigFactory) Create() *scheduler.Config { // Watch and queue pods that need scheduling. podQueue := cache.NewFIFO() - cache.NewReflector(factory.createUnassignedPodWatch, &api.Pod{}, podQueue).Run() + cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, podQueue).Run() // Watch and cache all running pods. Scheduler needs to find all pods // so it knows where it's safe to place a pod. Cache this locally. podCache := cache.NewStore() - cache.NewReflector(factory.createAssignedPodWatch, &api.Pod{}, podCache).Run() + cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, podCache).Run() // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. minionCache := cache.NewStore() if false { // Disable this code until minions support watches. - cache.NewReflector(factory.createMinionWatch, &api.Minion{}, minionCache).Run() + cache.NewReflector(factory.createMinionLW(), &api.Minion{}, minionCache).Run() } else { cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run() } @@ -82,38 +83,66 @@ func (factory *ConfigFactory) Create() *scheduler.Config { } } -// createUnassignedPodWatch starts a watch that finds all pods that need to be +type listWatch struct { + client *client.Client + fieldSelector labels.Selector + resource string +} + +func (lw *listWatch) List() (runtime.Object, error) { + return lw.client. + Get(). + Path(lw.resource). + SelectorParam("fields", lw.fieldSelector). + Do(). + Get() +} + +func (lw *listWatch) Watch(resourceVersion uint64) (watch.Interface, error) { + return lw.client. + Get(). + Path("watch"). + Path(lw.resource). + SelectorParam("fields", lw.fieldSelector). + UintParam("resourceVersion", resourceVersion). + Watch() +} + +// createUnassignedPodLW returns a listWatch that finds all pods that need to be // scheduled. -func (factory *ConfigFactory) createUnassignedPodWatch(resourceVersion uint64) (watch.Interface, error) { - return factory.Client. - Get(). - Path("watch"). - Path("pods"). - SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()). - UintParam("resourceVersion", resourceVersion). - Watch() +func (factory *ConfigFactory) createUnassignedPodLW() *listWatch { + return &listWatch{ + client: factory.Client, + fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), + resource: "pods", + } } -// createUnassignedPodWatch starts a watch that finds all pods that are +func parseSelectorOrDie(s string) labels.Selector { + selector, err := labels.ParseSelector(s) + if err != nil { + panic(err) + } + return selector +} + +// createUnassignedPodLW returns a listWatch that finds all pods that are // already scheduled. -func (factory *ConfigFactory) createAssignedPodWatch(resourceVersion uint64) (watch.Interface, error) { - return factory.Client. - Get(). - Path("watch"). - Path("pods"). - ParseSelectorParam("fields", "DesiredState.Host!="). - UintParam("resourceVersion", resourceVersion). - Watch() +func (factory *ConfigFactory) createAssignedPodLW() *listWatch { + return &listWatch{ + client: factory.Client, + fieldSelector: parseSelectorOrDie("DesiredState.Host!="), + resource: "pods", + } } -// createMinionWatch starts a watch that gets all changes to minions. -func (factory *ConfigFactory) createMinionWatch(resourceVersion uint64) (watch.Interface, error) { - return factory.Client. - Get(). - Path("watch"). - Path("minions"). - UintParam("resourceVersion", resourceVersion). - Watch() +// createMinionLW returns a listWatch that gets all changes to minions. +func (factory *ConfigFactory) createMinionLW() *listWatch { + return &listWatch{ + client: factory.Client, + fieldSelector: parseSelectorOrDie(""), + resource: "minions", + } } // pollMinions lists all minions and returns an enumerator for cache.Poller. diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 4ad09db1862..1845d41c363 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -30,7 +30,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func TestCreate(t *testing.T) { @@ -45,42 +44,26 @@ func TestCreate(t *testing.T) { factory.Create() } -func TestCreateWatches(t *testing.T) { +func TestCreateLists(t *testing.T) { factory := ConfigFactory{nil} table := []struct { - rv uint64 - location string - watchFactory func(rv uint64) (watch.Interface, error) + location string + factory func() *listWatch }{ - // Minion watch + // Minion { - rv: 0, - location: "/api/v1beta1/watch/minions?resourceVersion=0", - watchFactory: factory.createMinionWatch, - }, { - rv: 42, - location: "/api/v1beta1/watch/minions?resourceVersion=42", - watchFactory: factory.createMinionWatch, + location: "/api/v1beta1/minions?fields=", + factory: factory.createMinionLW, }, - // Assigned pod watches + // Assigned pod { - rv: 0, - location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0", - watchFactory: factory.createAssignedPodWatch, - }, { - rv: 42, - location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42", - watchFactory: factory.createAssignedPodWatch, + location: "/api/v1beta1/pods?fields=DesiredState.Host!%3D", + factory: factory.createAssignedPodLW, }, - // Unassigned pod watches + // Unassigned pod { - rv: 0, - location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0", - watchFactory: factory.createUnassignedPodWatch, - }, { - rv: 42, - location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42", - watchFactory: factory.createUnassignedPodWatch, + location: "/api/v1beta1/pods?fields=DesiredState.Host%3D", + factory: factory.createUnassignedPodLW, }, } @@ -93,7 +76,60 @@ func TestCreateWatches(t *testing.T) { server := httptest.NewServer(&handler) factory.Client = client.NewOrDie(server.URL, nil) // This test merely tests that the correct request is made. - item.watchFactory(item.rv) + item.factory().List() + handler.ValidateRequest(t, item.location, "GET", nil) + } +} + +func TestCreateWatches(t *testing.T) { + factory := ConfigFactory{nil} + table := []struct { + rv uint64 + location string + factory func() *listWatch + }{ + // Minion watch + { + rv: 0, + location: "/api/v1beta1/watch/minions?fields=&resourceVersion=0", + factory: factory.createMinionLW, + }, { + rv: 42, + location: "/api/v1beta1/watch/minions?fields=&resourceVersion=42", + factory: factory.createMinionLW, + }, + // Assigned pod watches + { + rv: 0, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0", + factory: factory.createAssignedPodLW, + }, { + rv: 42, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42", + factory: factory.createAssignedPodLW, + }, + // Unassigned pod watches + { + rv: 0, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0", + factory: factory.createUnassignedPodLW, + }, { + rv: 42, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42", + factory: factory.createUnassignedPodLW, + }, + } + + for _, item := range table { + handler := util.FakeHandler{ + StatusCode: 500, + ResponseBody: "", + T: t, + } + server := httptest.NewServer(&handler) + factory.Client = client.NewOrDie(server.URL, nil) + // This test merely tests that the correct request is made. + item.factory().Watch(item.rv) handler.ValidateRequest(t, item.location, "GET", nil) } }