From a2953fdc7ef5a9705f9e595a8c332eab055af4a2 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 16 Apr 2015 16:18:02 -0700 Subject: [PATCH] Make endpoint controller use framework --- cmd/integration/integration.go | 4 +- .../app/controllermanager.go | 5 +- cmd/kubernetes/kubernetes.go | 2 +- pkg/client/cache/store.go | 24 + pkg/service/endpoints_controller.go | 418 +++++++++++++----- pkg/service/endpoints_controller_test.go | 403 +++++++---------- pkg/util/set.go | 18 + 7 files changed, 516 insertions(+), 358 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index c5976cfde9e..676deee188f 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -208,7 +208,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st endpoints := service.NewEndpointController(cl) // ensure the service endpoints are sync'd several times within the window that the integration tests wait - go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*4) + go endpoints.Run(3, util.NeverStop) controllerManager := replicationControllerPkg.NewReplicationManager(cl) @@ -285,7 +285,7 @@ func endpointsSet(c *client.Client, serviceNamespace, serviceID string, endpoint return func() (bool, error) { endpoints, err := c.Endpoints(serviceNamespace).Get(serviceID) if err != nil { - glog.Infof("Error on creating endpoints: %v", err) + glog.Infof("Error getting endpoints: %v", err) return false, nil } count := 0 diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 060c26239c0..8423f761a25 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -50,6 +50,7 @@ type CMServer struct { ClientConfig client.Config CloudProvider string CloudConfigFile string + ConcurrentEndpointSyncs int MinionRegexp string NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -79,6 +80,7 @@ func NewCMServer() *CMServer { s := CMServer{ Port: ports.ControllerManagerPort, Address: util.IP(net.ParseIP("127.0.0.1")), + ConcurrentEndpointSyncs: 5, NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, NamespaceSyncPeriod: 5 * time.Minute, @@ -101,6 +103,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { client.BindClientConfigFlags(fs, &s.ClientConfig) fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") + fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent_endpoint_syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.") fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+ @@ -171,7 +174,7 @@ func (s *CMServer) Run(_ []string) error { }() endpoints := service.NewEndpointController(kubeClient) - go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) + go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient) controllerManager.Run(replicationControllerPkg.DefaultSyncPeriod) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 8ec247d9b30..65cd939b8c2 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -139,7 +139,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, } endpoints := service.NewEndpointController(cl) - go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) + go endpoints.Run(5, util.NeverStop) controllerManager := controller.NewReplicationManager(cl) controllerManager.Run(controller.DefaultSyncPeriod) diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index b6031d43ca3..40aceec467d 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -18,6 +18,8 @@ package cache import ( "fmt" + "strings" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" ) @@ -67,6 +69,9 @@ type ExplicitKey string // keys for API objects which implement meta.Interface. // The key uses the format / unless is empty, then // it's just . +// +// TODO: replace key-as-string with a key-as-struct so that this +// packing/unpacking won't be necessary. func MetaNamespaceKeyFunc(obj interface{}) (string, error) { if key, ok := obj.(ExplicitKey); ok { return string(key), nil @@ -81,6 +86,25 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) { return meta.Name(), nil } +// SplitMetaNamespaceKey returns the namespace and name that +// MetaNamespaceKeyFunc encoded into key. +// +// TODO: replace key-as-string with a key-as-struct so that this +// packing/unpacking won't be necessary. +func SplitMetaNamespaceKey(key string) (namespace, name string, err error) { + parts := strings.Split(key, "/") + switch len(parts) { + case 1: + // name only, no namespace + return "", parts[0], nil + case 2: + // name and namespace + return parts[0], parts[1], nil + } + + return "", "", fmt.Errorf("unexpected key format: %q", key) +} + // cache responsibilities are limited to: // 1. Computing keys for objects via keyFunc // 2. Invoking methods of a ThreadSafeStorage interface diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 12f12557981..1be624dc97f 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -19,6 +19,7 @@ package service import ( "fmt" "reflect" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints" @@ -26,135 +27,354 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) -// EndpointController manages selector-based service endpoints. -type EndpointController struct { - client *client.Client -} +const ( + // We'll attempt to recompute EVERY service's endpoints at least this + // often. Higher numbers = lower CPU/network load; lower numbers = + // shorter amount of time before a mistaken endpoint is corrected. + FullServiceResyncPeriod = 30 * time.Second + + // We'll keep pod watches open up to this long. In the unlikely case + // that a watch misdelivers info about a pod, it'll take this long for + // that mistake to be rectified. + PodRelistPeriod = 5 * time.Minute +) + +var ( + keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc +) // NewEndpointController returns a new *EndpointController. func NewEndpointController(client *client.Client) *EndpointController { - return &EndpointController{ + e := &EndpointController{ client: client, + queue: workqueue.New(), + } + + e.serviceStore.Store, e.serviceController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return e.client.Services(api.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Service{}, + FullServiceResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: e.enqueueService, + UpdateFunc: func(old, cur interface{}) { + e.enqueueService(cur) + }, + DeleteFunc: e.enqueueService, + }, + ) + + e.podStore.Store, e.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return e.client.Pods(api.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Pod{}, + PodRelistPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: e.addPod, + UpdateFunc: e.updatePod, + DeleteFunc: e.deletePod, + }, + ) + + return e +} + +// EndpointController manages selector-based service endpoints. +type EndpointController struct { + client *client.Client + + serviceStore cache.StoreToServiceLister + podStore cache.StoreToPodLister + + // Services that need to be updated. A channel is inappropriate here, + // because it allows services with lots of pods to be serviced much + // more often than services with few pods; it also would cause a + // service that's inserted multiple times to be processed more than + // necessary. + queue *workqueue.Type + + // Since we join two objects, we'll watch both of them with + // controllers. + serviceController *framework.Controller + podController *framework.Controller +} + +// Runs e; will not return until stopCh is closed. workers determines how many +// endpoints will be handled in parallel. +func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { + defer util.HandleCrash() + go e.serviceController.Run(stopCh) + go e.podController.Run(stopCh) + for i := 0; i < workers; i++ { + go util.Until(e.worker, time.Second, stopCh) + } + <-stopCh + e.queue.ShutDown() +} + +func (e *EndpointController) getPodServiceMemberships(pod *api.Pod) (util.StringSet, error) { + set := util.StringSet{} + services, err := e.serviceStore.GetPodServices(pod) + if err != nil { + // don't log this error because this function makes pointless + // errors when no services match. + return set, nil + } + for i := range services { + key, err := keyFunc(&services[i]) + if err != nil { + return nil, err + } + set.Insert(key) + } + return set, nil +} + +// When a pod is added, figure out what services it will be a member of and +// enqueue them. obj must have *api.Pod type. +func (e *EndpointController) addPod(obj interface{}) { + pod := obj.(*api.Pod) + services, err := e.getPodServiceMemberships(pod) + if err != nil { + glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err) + return + } + for key := range services { + e.queue.Add(key) } } -// SyncServiceEndpoints syncs endpoints for services with selectors. -func (e *EndpointController) SyncServiceEndpoints() error { - services, err := e.client.Services(api.NamespaceAll).List(labels.Everything()) - if err != nil { - glog.Errorf("Failed to list services: %v", err) - return err +// When a pod is updated, figure out what services it used to be a member of +// and what services it will be a member of, and enqueue the union of these. +// old and cur must be *api.Pod types. +func (e *EndpointController) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + return + } + newPod := old.(*api.Pod) + services, err := e.getPodServiceMemberships(newPod) + if err != nil { + glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err) + return } - var resultErr error - for i := range services.Items { - service := &services.Items[i] - if service.Spec.Selector == nil { - // services without a selector receive no endpoints from this controller; - // these services will receive the endpoints that are created out-of-band via the REST API. - continue - } - - glog.V(5).Infof("About to update endpoints for service %s/%s", service.Namespace, service.Name) - pods, err := e.client.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector()) + oldPod := cur.(*api.Pod) + // Only need to get the old services if the labels changed. + if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) { + oldServices, err := e.getPodServiceMemberships(oldPod) if err != nil { - glog.Errorf("Error syncing service: %s/%s, skipping", service.Namespace, service.Name) - resultErr = err - continue + glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err) + return } + services = services.Union(oldServices) + } + for key := range services { + e.queue.Add(key) + } +} - subsets := []api.EndpointSubset{} - for i := range pods.Items { - pod := &pods.Items[i] +// When a pod is deleted, enqueue the services the pod used to be a member of. +// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. +func (e *EndpointController) deletePod(obj interface{}) { + if _, ok := obj.(*api.Pod); ok { + // Enqueue all the services that the pod used to be a member + // of. This happens to be exactly the same thing we do when a + // pod is added. + e.addPod(obj) + return + } + podKey, err := keyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + } + glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod) - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] + // TODO: keep a map of pods to services to handle this condition. +} - // TODO: Once v1beta1 and v1beta2 are EOL'ed, - // this can safely assume that TargetPort is - // populated, and findPort() can be removed. - _ = v1beta1.Dependency - _ = v1beta2.Dependency +// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. +func (e *EndpointController) enqueueService(obj interface{}) { + key, err := keyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + } - portName := servicePort.Name - portProto := servicePort.Protocol - portNum, err := findPort(pod, servicePort) - if err != nil { - glog.Errorf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) - continue - } - if len(pod.Status.PodIP) == 0 { - glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) - continue - } + e.queue.Add(key) +} - inService := false - for _, c := range pod.Status.Conditions { - if c.Type == api.PodReady && c.Status == api.ConditionTrue { - inService = true - break - } - } - if !inService { - glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) - continue - } - - epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto} - epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{ - Kind: "Pod", - Namespace: pod.ObjectMeta.Namespace, - Name: pod.ObjectMeta.Name, - UID: pod.ObjectMeta.UID, - ResourceVersion: pod.ObjectMeta.ResourceVersion, - }} - subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}}) +// worker runs a worker thread that just dequeues items, processes them, and +// marks them done. You may run as many of these in parallel as you wish; the +// workqueue guarantees that they will not end up processing the same service +// at the same time. +func (e *EndpointController) worker() { + for { + func() { + key, quit := e.queue.Get() + if quit { + return } - } - subsets = endpoints.RepackSubsets(subsets) + // Use defer: in the unlikely event that there's a + // panic, we'd still like this to get marked done-- + // otherwise the controller will not be able to sync + // this service again until it is restarted. + defer e.queue.Done(key) + e.syncService(key.(string)) + }() + } +} - // See if there's actually an update here. - currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) +func (e *EndpointController) syncService(key string) { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime)) + }() + obj, exists, err := e.serviceStore.Store.GetByKey(key) + if err != nil || !exists { + // Delete the corresponding endpoint, as the service has been deleted. + // TODO: Please note that this will delete an endpoint when a + // service is deleted. However, if we're down at the time when + // the service is deleted, we will miss that deletion, so this + // doesn't completely solve the problem. See #6877. + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - if errors.IsNotFound(err) { - currentEndpoints = &api.Endpoints{ - ObjectMeta: api.ObjectMeta{ - Name: service.Name, - Labels: service.Labels, - }, - } - } else { - glog.Errorf("Error getting endpoints: %v", err) + glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err) + // Don't retry, as the key isn't going to magically become understandable. + return + } + err = e.client.Endpoints(namespace).Delete(name) + if err != nil && !errors.IsNotFound(err) { + glog.Errorf("Error deleting endpoint %q: %v", key, err) + e.queue.Add(key) // Retry + } + return + } + + service := obj.(*api.Service) + if service.Spec.Selector == nil { + // services without a selector receive no endpoints from this controller; + // these services will receive the endpoints that are created out-of-band via the REST API. + return + } + + glog.V(5).Infof("About to update endpoints for service %q", key) + pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector()) + if err != nil { + // Since we're getting stuff from a local cache, it is + // basically impossible to get this error. + glog.Errorf("Error syncing service %q: %v", key, err) + e.queue.Add(key) // Retry + return + } + + subsets := []api.EndpointSubset{} + for i := range pods.Items { + pod := &pods.Items[i] + + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + + // TODO: Once v1beta1 and v1beta2 are EOL'ed, + // this can safely assume that TargetPort is + // populated, and findPort() can be removed. + _ = v1beta1.Dependency + _ = v1beta2.Dependency + + portName := servicePort.Name + portProto := servicePort.Protocol + portNum, err := findPort(pod, servicePort) + if err != nil { + glog.Errorf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) + continue + } + if len(pod.Status.PodIP) == 0 { + glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) continue } - } - if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) { - glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) - continue - } - newEndpoints := currentEndpoints - newEndpoints.Subsets = subsets - newEndpoints.Labels = service.Labels - if len(currentEndpoints.ResourceVersion) == 0 { - // No previous endpoints, create them - _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) - } else { - // Pre-existing - _, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) - } - if err != nil { - glog.Errorf("Error updating endpoints: %v", err) - continue + inService := false + for _, c := range pod.Status.Conditions { + if c.Type == api.PodReady && c.Status == api.ConditionTrue { + inService = true + break + } + } + if !inService { + glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) + continue + } + + epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto} + epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{ + Kind: "Pod", + Namespace: pod.ObjectMeta.Namespace, + Name: pod.ObjectMeta.Name, + UID: pod.ObjectMeta.UID, + ResourceVersion: pod.ObjectMeta.ResourceVersion, + }} + subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}}) } } - return resultErr + subsets = endpoints.RepackSubsets(subsets) + + // See if there's actually an update here. + currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) + if err != nil { + if errors.IsNotFound(err) { + currentEndpoints = &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: service.Name, + Labels: service.Labels, + }, + } + } else { + glog.Errorf("Error getting endpoints: %v", err) + e.queue.Add(key) // Retry + return + } + } + if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) { + glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) + return + } + newEndpoints := currentEndpoints + newEndpoints.Subsets = subsets + newEndpoints.Labels = service.Labels + + if len(currentEndpoints.ResourceVersion) == 0 { + // No previous endpoints, create them + _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) + } else { + // Pre-existing + _, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) + } + if err != nil { + glog.Errorf("Error updating endpoints: %v", err) + e.queue.Add(key) // Retry + } } func findDefaultPort(pod *api.Pod, servicePort int, proto api.Protocol) int { diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 0794b1b56e5..ade1ce34c79 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -27,16 +27,20 @@ import ( _ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -func newPodList(nPods int, nPorts int) *api.PodList { - pods := []api.Pod{} +func addPods(store cache.Store, namespace string, nPods int, nPorts int) { for i := 0; i < nPods; i++ { - p := api.Pod{ - TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, - ObjectMeta: api.ObjectMeta{Name: fmt.Sprintf("pod%d", i)}, + p := &api.Pod{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, + ObjectMeta: api.ObjectMeta{ + Namespace: namespace, + Name: fmt.Sprintf("pod%d", i), + Labels: map[string]string{"foo": "bar"}, + }, Spec: api.PodSpec{ Containers: []api.Container{{Ports: []api.ContainerPort{}}}, }, @@ -54,11 +58,7 @@ func newPodList(nPods int, nPorts int) *api.PodList { p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, api.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: 8080 + j}) } - pods = append(pods, p) - } - return &api.PodList{ - TypeMeta: api.TypeMeta{APIVersion: testapi.Version(), Kind: "PodList"}, - Items: pods, + store.Add(p) } } @@ -222,22 +222,12 @@ type serverResponse struct { obj interface{} } -func makeTestServer(t *testing.T, namespace string, podResponse, serviceResponse, endpointsResponse serverResponse) (*httptest.Server, *util.FakeHandler) { - fakePodHandler := util.FakeHandler{ - StatusCode: podResponse.statusCode, - ResponseBody: runtime.EncodeOrDie(testapi.Codec(), podResponse.obj.(runtime.Object)), - } - fakeServiceHandler := util.FakeHandler{ - StatusCode: serviceResponse.statusCode, - ResponseBody: runtime.EncodeOrDie(testapi.Codec(), serviceResponse.obj.(runtime.Object)), - } +func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResponse) (*httptest.Server, *util.FakeHandler) { fakeEndpointsHandler := util.FakeHandler{ StatusCode: endpointsResponse.statusCode, ResponseBody: runtime.EncodeOrDie(testapi.Codec(), endpointsResponse.obj.(runtime.Object)), } mux := http.NewServeMux() - mux.Handle(testapi.ResourcePath("pods", namespace, ""), &fakePodHandler) - mux.Handle(testapi.ResourcePath("services", "", ""), &fakeServiceHandler) mux.Handle(testapi.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler) mux.Handle(testapi.ResourcePath("endpoints/", namespace, ""), &fakeEndpointsHandler) mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { @@ -247,47 +237,13 @@ func makeTestServer(t *testing.T, namespace string, podResponse, serviceResponse return httptest.NewServer(mux), &fakeEndpointsHandler } -func TestSyncEndpointsEmpty(t *testing.T) { - testServer, _ := makeTestServer(t, api.NamespaceDefault, - serverResponse{http.StatusOK, newPodList(0, 0)}, - serverResponse{http.StatusOK, &api.ServiceList{}}, - serverResponse{http.StatusOK, &api.Endpoints{}}) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } -} - -func TestSyncEndpointsError(t *testing.T) { - testServer, _ := makeTestServer(t, api.NamespaceDefault, - serverResponse{http.StatusOK, newPodList(0, 0)}, - serverResponse{http.StatusInternalServerError, &api.ServiceList{}}, - serverResponse{http.StatusOK, &api.Endpoints{}}) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err == nil { - t.Errorf("unexpected non-error") - } -} - func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}}, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault, - serverResponse{http.StatusOK, newPodList(0, 0)}, - serverResponse{http.StatusOK, &serviceList}, + ns := api.NamespaceDefault + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ @@ -298,30 +254,21 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}}, + }) + endpoints.syncService(ns + "/foo") endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsProtocolTCP(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{}, - Ports: []api.ServicePort{{Port: 80}}, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "other", - serverResponse{http.StatusOK, newPodList(0, 0)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ @@ -332,30 +279,24 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Ports: []api.ServicePort{{Port: 80}}, + }, + }) + endpoints.syncService(ns + "/foo") endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsProtocolUDP(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{}, - Ports: []api.ServicePort{{Port: 80}}, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "other", - serverResponse{http.StatusOK, newPodList(0, 0)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ @@ -366,30 +307,24 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Ports: []api.ServicePort{{Port: 80}}, + }, + }) + endpoints.syncService(ns + "/foo") endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{}, - Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "other", - serverResponse{http.StatusOK, newPodList(1, 1)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{}, @@ -397,40 +332,36 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, ns, 1, 1) + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) - endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", "foo"), "PUT", &data) + endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data) } func TestSyncEndpointsItemsPreexisting(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "bar", - serverResponse{http.StatusOK, newPodList(1, 1)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "bar" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ @@ -441,85 +372,83 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, ns, 1, 1) + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) - endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "bar", "foo"), "PUT", &data) + endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data) } func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, - }, - }, - }, - } + ns := api.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault, - serverResponse{http.StatusOK, newPodList(1, 1)}, - serverResponse{http.StatusOK, &serviceList}, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ ResourceVersion: "1", + Name: "foo", + Namespace: ns, }, Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1) + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", api.NamespaceDefault, "foo"), "GET", nil) } func TestSyncEndpointsItems(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{ - {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, - {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)}, - }, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "other", - serverResponse{http.StatusOK, newPodList(3, 2)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{}}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, ns, 3, 2) + addPods(endpoints.podStore.Store, "blah", 5, 2) // make sure these aren't found! + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []api.ServicePort{ + {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)}, + }, + }, + }) + endpoints.syncService("other/foo") expectedSubsets := []api.EndpointSubset{{ Addresses: []api.EndpointAddress{ - {IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}, - {IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1"}}, - {IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2"}}, + {IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, + {IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}, + {IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}}, }, Ports: []api.EndpointPort{ {Name: "port0", Port: 8080, Protocol: "TCP"}, @@ -534,69 +463,38 @@ func TestSyncEndpointsItems(t *testing.T) { }) // endpointsHandler should get 2 requests - one for "GET" and the next for "POST". endpointsHandler.ValidateRequestCount(t, 2) - endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", ""), "POST", &data) -} - -func TestSyncEndpointsPodError(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, - }, - }, - }, - } - testServer, _ := makeTestServer(t, api.NamespaceDefault, - serverResponse{http.StatusInternalServerError, &api.PodList{}}, - serverResponse{http.StatusOK, &serviceList}, - serverResponse{http.StatusOK, &api.Endpoints{}}) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err == nil { - t.Error("Unexpected non-error") - } + endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, ""), "POST", &data) } func TestSyncEndpointsItemsWithLabels(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: "other", - Labels: map[string]string{ - "foo": "bar", - }, - }, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{ - {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, - {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)}, - }, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "other", - serverResponse{http.StatusOK, newPodList(3, 2)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{}}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, ns, 3, 2) + serviceLabels := map[string]string{"foo": "bar"} + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + Labels: serviceLabels, + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []api.ServicePort{ + {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)}, + }, + }, + }) + endpoints.syncService(ns + "/foo") expectedSubsets := []api.EndpointSubset{{ Addresses: []api.EndpointAddress{ - {IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}, - {IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1"}}, - {IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2"}}, + {IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, + {IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}, + {IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}}, }, Ports: []api.EndpointPort{ {Name: "port0", Port: 8080, Protocol: "TCP"}, @@ -606,39 +504,22 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ ObjectMeta: api.ObjectMeta{ ResourceVersion: "", - Labels: serviceList.Items[0].Labels, + Labels: serviceLabels, }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) // endpointsHandler should get 2 requests - one for "GET" and the next for "POST". endpointsHandler.ValidateRequestCount(t, 2) - endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", ""), "POST", &data) + endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, ""), "POST", &data) } func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: "bar", - Labels: map[string]string{ - "baz": "blah", - }, - }, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "bar", - serverResponse{http.StatusOK, newPodList(1, 1)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "bar" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ "foo": "bar", @@ -652,19 +533,31 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, ns, 1, 1) + serviceLabels := map[string]string{"baz": "blah"} + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + Labels: serviceLabels, + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", - Labels: serviceList.Items[0].Labels, + Labels: serviceLabels, }, Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) - endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "bar", "foo"), "PUT", &data) + endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data) } diff --git a/pkg/util/set.go b/pkg/util/set.go index 67e807a0fd4..5141cdba106 100644 --- a/pkg/util/set.go +++ b/pkg/util/set.go @@ -91,6 +91,24 @@ func (s StringSet) Difference(s2 StringSet) StringSet { return result } +// Union returns a new set which includes items in either s1 or s2. +// vof objects that are not in s2 +// For example: +// s1 = {1, 2} +// s2 = {3, 4} +// s1.Union(s2) = {1, 2, 3, 4} +// s2.Union(s1) = {1, 2, 3, 4} +func (s1 StringSet) Union(s2 StringSet) StringSet { + result := NewStringSet() + for key := range s1 { + result.Insert(key) + } + for key := range s2 { + result.Insert(key) + } + return result +} + // IsSuperset returns true iff s1 is a superset of s2. func (s1 StringSet) IsSuperset(s2 StringSet) bool { for item := range s2 {