Make Reflector helpers reusable.

Scheduler uses Reflector from pkg/client/cache.
It defines some helper classes.
I'd like to use those helpers with pkg/client/cache
in kube-proxy and kubelet too.
This commit is contained in:
Eric Tune 2015-01-07 13:12:29 -08:00
parent 4c57ec0f56
commit 7d5ac856c5
5 changed files with 196 additions and 144 deletions

View File

@ -44,9 +44,9 @@ kube::test::find_pkgs() {
}
# -covermode=atomic becomes default with -race in Go >=1.3
KUBE_COVER=${KUBE_COVER:--cover -covermode=atomic}
KUBE_COVER="" #${KUBE_COVER:--cover -covermode=atomic}
KUBE_TIMEOUT=${KUBE_TIMEOUT:--timeout 120s}
KUBE_RACE=${KUBE_RACE:--race}
KUBE_RACE="" #${KUBE_RACE:--race}
kube::test::usage() {
kube::log::usage_from_stdin <<EOF

52
pkg/client/cache/listwatch.go vendored Normal file
View File

@ -0,0 +1,52 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
type ListWatch struct {
Client *client.Client
FieldSelector labels.Selector
Resource string
}
// ListWatch knows how to list and watch a set of apiserver resources.
func (lw *ListWatch) List() (runtime.Object, error) {
return lw.Client.
Get().
Resource(lw.Resource).
SelectorParam("fields", lw.FieldSelector).
Do().
Get()
}
func (lw *ListWatch) Watch(resourceVersion string) (watch.Interface, error) {
return lw.Client.
Get().
Prefix("watch").
Resource(lw.Resource).
SelectorParam("fields", lw.FieldSelector).
Param("resourceVersion", resourceVersion).
Watch()
}

123
pkg/client/cache/listwatch_test.go vendored Normal file
View File

@ -0,0 +1,123 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"net/http/httptest"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func parseSelectorOrDie(s string) labels.Selector {
selector, err := labels.ParseSelector(s)
if err != nil {
panic(err)
}
return selector
}
func TestListWatchesCanList(t *testing.T) {
table := []struct {
location string
lw ListWatch
}{
// Minion
{
location: "/api/" + testapi.Version() + "/minions",
lw: ListWatch{
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
},
},
// pod with "assigned" field selector.
{
location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host%3D",
lw: ListWatch{
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
},
},
}
for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
defer server.Close()
item.lw.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
// This test merely tests that the correct request is made.
item.lw.List()
handler.ValidateRequest(t, item.location, "GET", nil)
}
}
func TestListWatchesCanWatch(t *testing.T) {
table := []struct {
rv string
location string
lw ListWatch
}{
// Minion
{
location: "/api/" + testapi.Version() + "/watch/minions?resourceVersion=",
rv: "",
lw: ListWatch{
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
},
},
{
location: "/api/" + testapi.Version() + "/watch/minions?resourceVersion=42",
rv: "42",
lw: ListWatch{
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
},
},
// pod with "assigned" field selector.
{
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0",
rv: "0",
lw: ListWatch{
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
},
},
}
for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
defer server.Close()
item.lw.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
// This test merely tests that the correct request is made.
item.lw.Watch(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil)
}
}

View File

@ -28,10 +28,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
"github.com/golang/glog"
@ -130,38 +128,13 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
}, nil
}
type listWatch struct {
client *client.Client
fieldSelector labels.Selector
resource string
}
func (lw *listWatch) List() (runtime.Object, error) {
return lw.client.
Get().
Resource(lw.resource).
SelectorParam("fields", lw.fieldSelector).
Do().
Get()
}
func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) {
return lw.client.
Get().
Prefix("watch").
Resource(lw.resource).
SelectorParam("fields", lw.fieldSelector).
Param("resourceVersion", resourceVersion).
Watch()
}
// createUnassignedPodLW returns a listWatch that finds all pods that need to be
// createUnassignedPodLW returns a cache.ListWatch that finds all pods that need to be
// scheduled.
func (factory *ConfigFactory) createUnassignedPodLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
resource: "pods",
func (factory *ConfigFactory) createUnassignedPodLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
}
}
@ -173,22 +146,23 @@ func parseSelectorOrDie(s string) labels.Selector {
return selector
}
// createAssignedPodLW returns a listWatch that finds all pods that are
// createAssignedPodLW returns a cache.ListWatch that finds all pods that are
// already scheduled.
func (factory *ConfigFactory) createAssignedPodLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: parseSelectorOrDie("DesiredState.Host!="),
resource: "pods",
// TODO: return a ListerWatcher interface instead?
func (factory *ConfigFactory) createAssignedPodLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: parseSelectorOrDie("DesiredState.Host!="),
Resource: "pods",
}
}
// createMinionLW returns a listWatch that gets all changes to minions.
func (factory *ConfigFactory) createMinionLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: parseSelectorOrDie(""),
resource: "minions",
// createMinionLW returns a cache.ListWatch that gets all changes to minions.
func (factory *ConfigFactory) createMinionLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
}
}

View File

@ -47,103 +47,6 @@ func TestCreate(t *testing.T) {
factory.Create()
}
func TestCreateLists(t *testing.T) {
factory := NewConfigFactory(nil)
table := []struct {
location string
factory func() *listWatch
}{
// Minion
{
location: "/api/" + testapi.Version() + "/minions",
factory: factory.createMinionLW,
},
// Assigned pod
{
location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host!%3D",
factory: factory.createAssignedPodLW,
},
// Unassigned pod
{
location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host%3D",
factory: factory.createUnassignedPodLW,
},
}
for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
defer server.Close()
factory.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
// This test merely tests that the correct request is made.
item.factory().List()
handler.ValidateRequest(t, item.location, "GET", nil)
}
}
func TestCreateWatches(t *testing.T) {
factory := NewConfigFactory(nil)
table := []struct {
rv string
location string
factory func() *listWatch
}{
// Minion watch
{
rv: "",
location: "/api/" + testapi.Version() + "/watch/minions?resourceVersion=",
factory: factory.createMinionLW,
}, {
rv: "0",
location: "/api/" + testapi.Version() + "/watch/minions?resourceVersion=0",
factory: factory.createMinionLW,
}, {
rv: "42",
location: "/api/" + testapi.Version() + "/watch/minions?resourceVersion=42",
factory: factory.createMinionLW,
},
// Assigned pod watches
{
rv: "",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=",
factory: factory.createAssignedPodLW,
}, {
rv: "42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42",
factory: factory.createAssignedPodLW,
},
// Unassigned pod watches
{
rv: "",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=",
factory: factory.createUnassignedPodLW,
}, {
rv: "42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42",
factory: factory.createUnassignedPodLW,
},
}
for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
defer server.Close()
factory.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
// This test merely tests that the correct request is made.
item.factory().Watch(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil)
}
}
func TestPollMinions(t *testing.T) {
table := []struct {
minions []api.Node