Merge pull request #10464 from jiangyaoguo/reflector-in-proxy

replace Reflector in kube-proxy
This commit is contained in:
Vish Kannan 2015-07-23 18:01:29 -07:00
commit 0a715a72f1
3 changed files with 220 additions and 510 deletions

View File

@ -25,7 +25,6 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
clientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api" clientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api"
@ -130,8 +129,7 @@ func (s *ProxyServer) Run(_ []string) error {
} }
config.NewSourceAPI( config.NewSourceAPI(
client.Services(api.NamespaceAll), client,
client.Endpoints(api.NamespaceAll),
30*time.Second, 30*time.Second,
serviceConfig.Channel("api"), serviceConfig.Channel("api"),
endpointsConfig.Channel("api"), endpointsConfig.Channel("api"),

View File

@ -21,223 +21,41 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
) )
// TODO: to use Reflector, need to change the ServicesWatcher to a generic ListerWatcher. // NewSourceAPIserver creates config source that watches for changes to the services and endpoints.
// ServicesWatcher is capable of listing and watching for changes to services across ALL namespaces func NewSourceAPI(c *client.Client, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
type ServicesWatcher interface { servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything())
List(label labels.Selector) (*api.ServiceList, error) endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything())
Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
newServicesSourceApiFromLW(servicesLW, period, servicesChan)
newEndpointsSourceApiFromLW(endpointsLW, period, endpointsChan)
} }
// EndpointsWatcher is capable of listing and watching for changes to endpoints across ALL namespaces func newServicesSourceApiFromLW(servicesLW cache.ListerWatcher, period time.Duration, servicesChan chan<- ServiceUpdate) {
type EndpointsWatcher interface { servicesPush := func(objs []interface{}) {
List(label labels.Selector) (*api.EndpointsList, error) var services []api.Service
Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) for _, o := range objs {
} services = append(services, *(o.(*api.Service)))
// SourceAPI implements a configuration source for services and endpoints that
// uses the client watch API to efficiently detect changes.
type SourceAPI struct {
s servicesReflector
e endpointsReflector
}
type servicesReflector struct {
watcher ServicesWatcher
services chan<- ServiceUpdate
resourceVersion string
waitDuration time.Duration
reconnectDuration time.Duration
}
type endpointsReflector struct {
watcher EndpointsWatcher
endpoints chan<- EndpointsUpdate
resourceVersion string
waitDuration time.Duration
reconnectDuration time.Duration
}
// NewSourceAPI creates a config source that watches for changes to the services and endpoints.
func NewSourceAPI(servicesWatcher ServicesWatcher, endpointsWatcher EndpointsWatcher, period time.Duration, services chan<- ServiceUpdate, endpoints chan<- EndpointsUpdate) *SourceAPI {
config := &SourceAPI{
s: servicesReflector{
watcher: servicesWatcher,
services: services,
resourceVersion: "",
waitDuration: period,
// prevent hot loops if the server starts to misbehave
reconnectDuration: time.Second * 1,
},
e: endpointsReflector{
watcher: endpointsWatcher,
endpoints: endpoints,
resourceVersion: "",
waitDuration: period,
// prevent hot loops if the server starts to misbehave
reconnectDuration: time.Second * 1,
},
}
go util.Forever(func() { config.s.listAndWatch() }, period)
go util.Forever(func() { config.e.listAndWatch() }, period)
return config
}
func (r *servicesReflector) listAndWatch() {
r.run(&r.resourceVersion)
time.Sleep(wait.Jitter(r.reconnectDuration, 0.0))
}
func (r *endpointsReflector) listAndWatch() {
r.run(&r.resourceVersion)
time.Sleep(wait.Jitter(r.reconnectDuration, 0.0))
}
// run loops forever looking for changes to services.
func (s *servicesReflector) run(resourceVersion *string) {
if len(*resourceVersion) == 0 {
services, err := s.watcher.List(labels.Everything())
if err != nil {
glog.Errorf("Unable to load services: %v", err)
// TODO: reconcile with pkg/client/cache which doesn't use reflector.
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
} }
*resourceVersion = services.ResourceVersion servicesChan <- ServiceUpdate{Op: SET, Services: services}
// TODO: replace with code to update the
s.services <- ServiceUpdate{Op: SET, Services: services.Items}
} }
watcher, err := s.watcher.Watch(labels.Everything(), fields.Everything(), *resourceVersion) serviceQueue := cache.NewUndeltaStore(servicesPush, cache.MetaNamespaceKeyFunc)
if err != nil { cache.NewReflector(servicesLW, &api.Service{}, serviceQueue, period).Run()
glog.Errorf("Unable to watch for services changes: %v", err)
if !client.IsTimeout(err) {
// Reset so that we do a fresh get request
*resourceVersion = ""
}
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
defer watcher.Stop()
ch := watcher.ResultChan()
s.watchHandler(resourceVersion, ch, s.services)
} }
// watchHandler loops over an event channel and delivers config changes to an update channel. func newEndpointsSourceApiFromLW(endpointsLW cache.ListerWatcher, period time.Duration, endpointsChan chan<- EndpointsUpdate) {
func (s *servicesReflector) watchHandler(resourceVersion *string, ch <-chan watch.Event, updates chan<- ServiceUpdate) { endpointsPush := func(objs []interface{}) {
for { var endpoints []api.Endpoints
select { for _, o := range objs {
case event, ok := <-ch: endpoints = append(endpoints, *(o.(*api.Endpoints)))
if !ok {
glog.V(4).Infof("WatchServices channel closed")
return
}
if event.Object == nil {
glog.Errorf("Got nil over WatchServices channel")
return
}
var service *api.Service
switch obj := event.Object.(type) {
case *api.Service:
service = obj
case *api.Status:
glog.Warningf("Got error status on WatchServices channel: %+v", obj)
*resourceVersion = ""
return
default:
glog.Errorf("Got unexpected object over WatchServices channel: %+v", obj)
*resourceVersion = ""
return
}
*resourceVersion = service.ResourceVersion
switch event.Type {
case watch.Added, watch.Modified:
updates <- ServiceUpdate{Op: ADD, Services: []api.Service{*service}}
case watch.Deleted:
updates <- ServiceUpdate{Op: REMOVE, Services: []api.Service{*service}}
}
}
}
}
// run loops forever looking for changes to endpoints.
func (s *endpointsReflector) run(resourceVersion *string) {
if len(*resourceVersion) == 0 {
endpoints, err := s.watcher.List(labels.Everything())
if err != nil {
glog.Errorf("Unable to load endpoints: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
*resourceVersion = endpoints.ResourceVersion
s.endpoints <- EndpointsUpdate{Op: SET, Endpoints: endpoints.Items}
}
watcher, err := s.watcher.Watch(labels.Everything(), fields.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for endpoints changes: %v", err)
if !client.IsTimeout(err) {
// Reset so that we do a fresh get request
*resourceVersion = ""
}
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
defer watcher.Stop()
ch := watcher.ResultChan()
s.watchHandler(resourceVersion, ch, s.endpoints)
}
// watchHandler loops over an event channel and delivers config changes to an update channel.
func (s *endpointsReflector) watchHandler(resourceVersion *string, ch <-chan watch.Event, updates chan<- EndpointsUpdate) {
for {
select {
case event, ok := <-ch:
if !ok {
glog.V(4).Infof("WatchEndpoints channel closed")
return
}
if event.Object == nil {
glog.Errorf("Got nil over WatchEndpoints channel")
return
}
var endpoints *api.Endpoints
switch obj := event.Object.(type) {
case *api.Endpoints:
endpoints = obj
case *api.Status:
glog.Warningf("Got error status on WatchEndpoints channel: %+v", obj)
*resourceVersion = ""
return
default:
glog.Errorf("Got unexpected object over WatchEndpoints channel: %+v", obj)
*resourceVersion = ""
return
}
*resourceVersion = endpoints.ResourceVersion
switch event.Type {
case watch.Added, watch.Modified:
updates <- EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{*endpoints}}
case watch.Deleted:
updates <- EndpointsUpdate{Op: REMOVE, Endpoints: []api.Endpoints{*endpoints}}
}
} }
endpointsChan <- EndpointsUpdate{Op: SET, Endpoints: endpoints}
} }
endpointQueue := cache.NewUndeltaStore(endpointsPush, cache.MetaNamespaceKeyFunc)
cache.NewReflector(endpointsLW, &api.Endpoints{}, endpointQueue, period).Run()
} }

View File

@ -17,335 +17,229 @@ limitations under the License.
package config package config
import ( import (
"errors"
"reflect"
"testing" "testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
func TestServices(t *testing.T) { type fakeLW struct {
service := api.Service{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}} listResp runtime.Object
watchResp watch.Interface
}
func (lw fakeLW) List() (runtime.Object, error) {
return lw.listResp, nil
}
func (lw fakeLW) Watch(resourceVersion string) (watch.Interface, error) {
return lw.watchResp, nil
}
var _ cache.ListerWatcher = fakeLW{}
func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
service1v1 := &api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}}
service1v2 := &api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}}
service2 := &api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "s2"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 30}}}}
// Setup fake api client.
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
fakeClient := &testclient.Fake{Watch: fakeWatch} lw := fakeLW{
services := make(chan ServiceUpdate) listResp: &api.ServiceList{Items: []api.Service{}},
source := SourceAPI{ watchResp: fakeWatch,
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := "1"
go func() {
// called twice
source.s.run(&resourceVersion)
source.s.run(&resourceVersion)
}()
// test adding a service to the watch
fakeWatch.Add(&service)
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-services", "1"}}) {
t.Errorf("expected call to watch-services, got %#v", fakeClient)
} }
actual := <-services ch := make(chan ServiceUpdate)
expected := ServiceUpdate{Op: ADD, Services: []api.Service{service}}
if !reflect.DeepEqual(expected, actual) { newServicesSourceApiFromLW(lw, 30*time.Second, ch)
t.Errorf("expected %#v, got %#v", expected, actual)
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected := ServiceUpdate{Op: SET, Services: []api.Service{}}
if !api.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
} }
// verify that a delete results in a config change // Add the first service
fakeWatch.Delete(&service) fakeWatch.Add(service1v1)
actual = <-services got, ok = <-ch
expected = ServiceUpdate{Op: REMOVE, Services: []api.Service{service}} if !ok {
if !reflect.DeepEqual(expected, actual) { t.Errorf("Unable to read from channel when expected")
t.Errorf("expected %#v, got %#v", expected, actual) }
expected = ServiceUpdate{Op: SET, Services: []api.Service{*service1v1}}
if !api.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
} }
// verify that closing the channel results in a new call to WatchServices with a higher resource version // Add another service
newFakeWatch := watch.NewFake() fakeWatch.Add(service2)
fakeClient.Watch = newFakeWatch got, ok = <-ch
fakeWatch.Stop() if !ok {
t.Errorf("Unable to read from channel when expected")
}
// Could be sorted either of these two ways:
expectedA := ServiceUpdate{Op: SET, Services: []api.Service{*service1v1, *service2}}
expectedB := ServiceUpdate{Op: SET, Services: []api.Service{*service2, *service1v1}}
newFakeWatch.Add(&service) if !api.Semantic.DeepEqual(expectedA, got) && !api.Semantic.DeepEqual(expectedB, got) {
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-services", "1"}, {"watch-services", "2"}}) { t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) }
// Modify service1
fakeWatch.Modify(service1v2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expectedA = ServiceUpdate{Op: SET, Services: []api.Service{*service1v2, *service2}}
expectedB = ServiceUpdate{Op: SET, Services: []api.Service{*service2, *service1v2}}
if !api.Semantic.DeepEqual(expectedA, got) && !api.Semantic.DeepEqual(expectedB, got) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
}
// Delete service1
fakeWatch.Delete(service1v2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: SET, Services: []api.Service{*service2}}
if !api.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
// Delete service2
fakeWatch.Delete(service2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: SET, Services: []api.Service{}}
if !api.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
} }
} }
func TestServicesFromZero(t *testing.T) { func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
service := api.Service{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}} endpoints1v1 := &api.Endpoints{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
fakeWatch := watch.NewFake()
fakeWatch.Stop()
fakeClient := testclient.NewSimpleFake(&api.ServiceList{
ListMeta: api.ListMeta{ResourceVersion: "2"},
Items: []api.Service{
service,
},
})
fakeClient.Watch = fakeWatch
services := make(chan ServiceUpdate)
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.s.run(&resourceVersion)
close(ch)
}()
// should get services SET
actual := <-services
expected := ServiceUpdate{Op: SET, Services: []api.Service{service}}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// should have listed, then watched
<-ch
if resourceVersion != "2" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"list-services", nil}, {"watch-services", "2"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestServicesError(t *testing.T) {
fakeClient := &testclient.Fake{Err: errors.New("test")}
services := make(chan ServiceUpdate)
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.s.run(&resourceVersion)
close(ch)
}()
// should have listed only
<-ch
if resourceVersion != "" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-services", "1"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestServicesErrorTimeout(t *testing.T) {
fakeClient := &testclient.Fake{Err: errors.New("use of closed network connection")}
services := make(chan ServiceUpdate)
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.s.run(&resourceVersion)
close(ch)
}()
// should have listed only
<-ch
if resourceVersion != "1" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-services", "1"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestServicesFromZeroError(t *testing.T) {
fakeClient := &testclient.Fake{Err: errors.New("test")}
services := make(chan ServiceUpdate)
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll), services: services},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll)}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.s.run(&resourceVersion)
close(ch)
}()
// should have listed only
<-ch
if resourceVersion != "" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"list-services", nil}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestEndpoints(t *testing.T) {
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Subsets: []api.EndpointSubset{{ Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []api.EndpointAddress{
Ports: []api.EndpointPort{{Port: 9000}}, {IP: "1.2.3.4"},
},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
}
endpoints1v2 := &api.Endpoints{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.1"},
},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
}
endpoints2 := &api.Endpoints{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "e2"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{
{IP: "5.6.7.8"},
},
Ports: []api.EndpointPort{{Port: 80, Protocol: "TCP"}},
}}, }},
} }
// Setup fake api client.
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
fakeClient := &testclient.Fake{Watch: fakeWatch} lw := fakeLW{
endpoints := make(chan EndpointsUpdate) listResp: &api.EndpointsList{Items: []api.Endpoints{}},
source := SourceAPI{ watchResp: fakeWatch,
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := "1"
go func() {
// called twice
source.e.run(&resourceVersion)
source.e.run(&resourceVersion)
}()
// test adding an endpoint to the watch
fakeWatch.Add(&endpoint)
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-endpoints", "1"}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
} }
actual := <-endpoints ch := make(chan EndpointsUpdate)
expected := EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{endpoint}}
if !reflect.DeepEqual(expected, actual) { newEndpointsSourceApiFromLW(lw, 30*time.Second, ch)
t.Errorf("expected %#v, got %#v", expected, actual)
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{}}
if !api.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
} }
// verify that a delete results in a config change // Add the first endpoints
fakeWatch.Delete(&endpoint) fakeWatch.Add(endpoints1v1)
actual = <-endpoints got, ok = <-ch
expected = EndpointsUpdate{Op: REMOVE, Endpoints: []api.Endpoints{endpoint}} if !ok {
if !reflect.DeepEqual(expected, actual) { t.Errorf("Unable to read from channel when expected")
t.Errorf("expected %#v, got %#v", expected, actual) }
expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v1}}
if !api.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
} }
// verify that closing the channel results in a new call to WatchEndpoints with a higher resource version // Add another endpoints
newFakeWatch := watch.NewFake() fakeWatch.Add(endpoints2)
fakeClient.Watch = newFakeWatch got, ok = <-ch
fakeWatch.Stop() if !ok {
t.Errorf("Unable to read from channel when expected")
}
// Could be sorted either of these two ways:
expectedA := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v1, *endpoints2}}
expectedB := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2, *endpoints1v1}}
newFakeWatch.Add(&endpoint) if !api.Semantic.DeepEqual(expectedA, got) && !api.Semantic.DeepEqual(expectedB, got) {
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-endpoints", "1"}, {"watch-endpoints", "2"}}) { t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) }
}
} // Modify endpoints1
fakeWatch.Modify(endpoints1v2)
func TestEndpointsFromZero(t *testing.T) { got, ok = <-ch
endpoint := api.Endpoints{ if !ok {
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, t.Errorf("Unable to read from channel when expected")
Subsets: []api.EndpointSubset{{ }
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, expectedA = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v2, *endpoints2}}
Ports: []api.EndpointPort{{Port: 9000}}, expectedB = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2, *endpoints1v2}}
}},
} if !api.Semantic.DeepEqual(expectedA, got) && !api.Semantic.DeepEqual(expectedB, got) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
fakeWatch := watch.NewFake() }
fakeWatch.Stop()
fakeClient := testclient.NewSimpleFake(&api.EndpointsList{ // Delete endpoints1
ListMeta: api.ListMeta{ResourceVersion: "2"}, fakeWatch.Delete(endpoints1v2)
Items: []api.Endpoints{ got, ok = <-ch
endpoint, if !ok {
}, t.Errorf("Unable to read from channel when expected")
}) }
fakeClient.Watch = fakeWatch expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2}}
endpoints := make(chan EndpointsUpdate) if !api.Semantic.DeepEqual(expected, got) {
source := SourceAPI{ t.Errorf("Expected %#v, Got %#v", expected, got)
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)}, }
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := "" // Delete endpoints2
ch := make(chan struct{}) fakeWatch.Delete(endpoints2)
go func() { got, ok = <-ch
source.e.run(&resourceVersion) if !ok {
close(ch) t.Errorf("Unable to read from channel when expected")
}() }
expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{}}
// should get endpoints SET if !api.Semantic.DeepEqual(expected, got) {
actual := <-endpoints t.Errorf("Expected %#v, Got %#v", expected, got)
expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{endpoint}}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// should have listed, then watched
<-ch
if resourceVersion != "2" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"list-endpoints", nil}, {"watch-endpoints", "2"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestEndpointsError(t *testing.T) {
fakeClient := &testclient.Fake{Err: errors.New("test")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.e.run(&resourceVersion)
close(ch)
}()
// should have listed only
<-ch
if resourceVersion != "" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-endpoints", "1"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestEndpointsErrorTimeout(t *testing.T) {
fakeClient := &testclient.Fake{Err: errors.New("use of closed network connection")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := "1"
ch := make(chan struct{})
go func() {
source.e.run(&resourceVersion)
close(ch)
}()
// should have listed only
<-ch
if resourceVersion != "1" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"watch-endpoints", "1"}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
}
}
func TestEndpointsFromZeroError(t *testing.T) {
fakeClient := &testclient.Fake{Err: errors.New("test")}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{
s: servicesReflector{watcher: fakeClient.Services(api.NamespaceAll)},
e: endpointsReflector{watcher: fakeClient.Endpoints(api.NamespaceAll), endpoints: endpoints}}
resourceVersion := ""
ch := make(chan struct{})
go func() {
source.e.run(&resourceVersion)
close(ch)
}()
// should have listed only
<-ch
if resourceVersion != "" {
t.Errorf("unexpected resource version, got %#v", resourceVersion)
}
if !reflect.DeepEqual(fakeClient.Actions, []testclient.FakeAction{{"list-endpoints", nil}}) {
t.Errorf("unexpected actions, got %#v", fakeClient)
} }
} }