mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Make endpoint controller use framework
This commit is contained in:
parent
24a8cceb5c
commit
a2953fdc7e
@ -208,7 +208,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
|
||||
|
||||
endpoints := service.NewEndpointController(cl)
|
||||
// ensure the service endpoints are sync'd several times within the window that the integration tests wait
|
||||
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*4)
|
||||
go endpoints.Run(3, util.NeverStop)
|
||||
|
||||
controllerManager := replicationControllerPkg.NewReplicationManager(cl)
|
||||
|
||||
@ -285,7 +285,7 @@ func endpointsSet(c *client.Client, serviceNamespace, serviceID string, endpoint
|
||||
return func() (bool, error) {
|
||||
endpoints, err := c.Endpoints(serviceNamespace).Get(serviceID)
|
||||
if err != nil {
|
||||
glog.Infof("Error on creating endpoints: %v", err)
|
||||
glog.Infof("Error getting endpoints: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
count := 0
|
||||
|
@ -50,6 +50,7 @@ type CMServer struct {
|
||||
ClientConfig client.Config
|
||||
CloudProvider string
|
||||
CloudConfigFile string
|
||||
ConcurrentEndpointSyncs int
|
||||
MinionRegexp string
|
||||
NodeSyncPeriod time.Duration
|
||||
ResourceQuotaSyncPeriod time.Duration
|
||||
@ -79,6 +80,7 @@ func NewCMServer() *CMServer {
|
||||
s := CMServer{
|
||||
Port: ports.ControllerManagerPort,
|
||||
Address: util.IP(net.ParseIP("127.0.0.1")),
|
||||
ConcurrentEndpointSyncs: 5,
|
||||
NodeSyncPeriod: 10 * time.Second,
|
||||
ResourceQuotaSyncPeriod: 10 * time.Second,
|
||||
NamespaceSyncPeriod: 5 * time.Minute,
|
||||
@ -101,6 +103,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
||||
client.BindClientConfigFlags(fs, &s.ClientConfig)
|
||||
fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
|
||||
fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
|
||||
fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent_endpoint_syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
||||
fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.")
|
||||
fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+
|
||||
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
|
||||
@ -171,7 +174,7 @@ func (s *CMServer) Run(_ []string) error {
|
||||
}()
|
||||
|
||||
endpoints := service.NewEndpointController(kubeClient)
|
||||
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
|
||||
go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop)
|
||||
|
||||
controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient)
|
||||
controllerManager.Run(replicationControllerPkg.DefaultSyncPeriod)
|
||||
|
@ -139,7 +139,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
|
||||
}
|
||||
|
||||
endpoints := service.NewEndpointController(cl)
|
||||
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
|
||||
go endpoints.Run(5, util.NeverStop)
|
||||
|
||||
controllerManager := controller.NewReplicationManager(cl)
|
||||
controllerManager.Run(controller.DefaultSyncPeriod)
|
||||
|
24
pkg/client/cache/store.go
vendored
24
pkg/client/cache/store.go
vendored
@ -18,6 +18,8 @@ package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
|
||||
)
|
||||
|
||||
@ -67,6 +69,9 @@ type ExplicitKey string
|
||||
// keys for API objects which implement meta.Interface.
|
||||
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
|
||||
// it's just <name>.
|
||||
//
|
||||
// TODO: replace key-as-string with a key-as-struct so that this
|
||||
// packing/unpacking won't be necessary.
|
||||
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
|
||||
if key, ok := obj.(ExplicitKey); ok {
|
||||
return string(key), nil
|
||||
@ -81,6 +86,25 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
|
||||
return meta.Name(), nil
|
||||
}
|
||||
|
||||
// SplitMetaNamespaceKey returns the namespace and name that
|
||||
// MetaNamespaceKeyFunc encoded into key.
|
||||
//
|
||||
// TODO: replace key-as-string with a key-as-struct so that this
|
||||
// packing/unpacking won't be necessary.
|
||||
func SplitMetaNamespaceKey(key string) (namespace, name string, err error) {
|
||||
parts := strings.Split(key, "/")
|
||||
switch len(parts) {
|
||||
case 1:
|
||||
// name only, no namespace
|
||||
return "", parts[0], nil
|
||||
case 2:
|
||||
// name and namespace
|
||||
return parts[0], parts[1], nil
|
||||
}
|
||||
|
||||
return "", "", fmt.Errorf("unexpected key format: %q", key)
|
||||
}
|
||||
|
||||
// cache responsibilities are limited to:
|
||||
// 1. Computing keys for objects via keyFunc
|
||||
// 2. Invoking methods of a ThreadSafeStorage interface
|
||||
|
@ -19,6 +19,7 @@ package service
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints"
|
||||
@ -26,135 +27,354 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// EndpointController manages selector-based service endpoints.
|
||||
type EndpointController struct {
|
||||
client *client.Client
|
||||
}
|
||||
const (
|
||||
// We'll attempt to recompute EVERY service's endpoints at least this
|
||||
// often. Higher numbers = lower CPU/network load; lower numbers =
|
||||
// shorter amount of time before a mistaken endpoint is corrected.
|
||||
FullServiceResyncPeriod = 30 * time.Second
|
||||
|
||||
// We'll keep pod watches open up to this long. In the unlikely case
|
||||
// that a watch misdelivers info about a pod, it'll take this long for
|
||||
// that mistake to be rectified.
|
||||
PodRelistPeriod = 5 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
|
||||
)
|
||||
|
||||
// NewEndpointController returns a new *EndpointController.
|
||||
func NewEndpointController(client *client.Client) *EndpointController {
|
||||
return &EndpointController{
|
||||
e := &EndpointController{
|
||||
client: client,
|
||||
queue: workqueue.New(),
|
||||
}
|
||||
|
||||
e.serviceStore.Store, e.serviceController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return e.client.Services(api.NamespaceAll).List(labels.Everything())
|
||||
},
|
||||
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||
return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
|
||||
},
|
||||
},
|
||||
&api.Service{},
|
||||
FullServiceResyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: e.enqueueService,
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
e.enqueueService(cur)
|
||||
},
|
||||
DeleteFunc: e.enqueueService,
|
||||
},
|
||||
)
|
||||
|
||||
e.podStore.Store, e.podController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return e.client.Pods(api.NamespaceAll).List(labels.Everything())
|
||||
},
|
||||
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||
return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
|
||||
},
|
||||
},
|
||||
&api.Pod{},
|
||||
PodRelistPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: e.addPod,
|
||||
UpdateFunc: e.updatePod,
|
||||
DeleteFunc: e.deletePod,
|
||||
},
|
||||
)
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
// EndpointController manages selector-based service endpoints.
|
||||
type EndpointController struct {
|
||||
client *client.Client
|
||||
|
||||
serviceStore cache.StoreToServiceLister
|
||||
podStore cache.StoreToPodLister
|
||||
|
||||
// Services that need to be updated. A channel is inappropriate here,
|
||||
// because it allows services with lots of pods to be serviced much
|
||||
// more often than services with few pods; it also would cause a
|
||||
// service that's inserted multiple times to be processed more than
|
||||
// necessary.
|
||||
queue *workqueue.Type
|
||||
|
||||
// Since we join two objects, we'll watch both of them with
|
||||
// controllers.
|
||||
serviceController *framework.Controller
|
||||
podController *framework.Controller
|
||||
}
|
||||
|
||||
// Runs e; will not return until stopCh is closed. workers determines how many
|
||||
// endpoints will be handled in parallel.
|
||||
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer util.HandleCrash()
|
||||
go e.serviceController.Run(stopCh)
|
||||
go e.podController.Run(stopCh)
|
||||
for i := 0; i < workers; i++ {
|
||||
go util.Until(e.worker, time.Second, stopCh)
|
||||
}
|
||||
<-stopCh
|
||||
e.queue.ShutDown()
|
||||
}
|
||||
|
||||
func (e *EndpointController) getPodServiceMemberships(pod *api.Pod) (util.StringSet, error) {
|
||||
set := util.StringSet{}
|
||||
services, err := e.serviceStore.GetPodServices(pod)
|
||||
if err != nil {
|
||||
// don't log this error because this function makes pointless
|
||||
// errors when no services match.
|
||||
return set, nil
|
||||
}
|
||||
for i := range services {
|
||||
key, err := keyFunc(&services[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
set.Insert(key)
|
||||
}
|
||||
return set, nil
|
||||
}
|
||||
|
||||
// When a pod is added, figure out what services it will be a member of and
|
||||
// enqueue them. obj must have *api.Pod type.
|
||||
func (e *EndpointController) addPod(obj interface{}) {
|
||||
pod := obj.(*api.Pod)
|
||||
services, err := e.getPodServiceMemberships(pod)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)
|
||||
return
|
||||
}
|
||||
for key := range services {
|
||||
e.queue.Add(key)
|
||||
}
|
||||
}
|
||||
|
||||
// SyncServiceEndpoints syncs endpoints for services with selectors.
|
||||
func (e *EndpointController) SyncServiceEndpoints() error {
|
||||
services, err := e.client.Services(api.NamespaceAll).List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to list services: %v", err)
|
||||
return err
|
||||
// When a pod is updated, figure out what services it used to be a member of
|
||||
// and what services it will be a member of, and enqueue the union of these.
|
||||
// old and cur must be *api.Pod types.
|
||||
func (e *EndpointController) updatePod(old, cur interface{}) {
|
||||
if api.Semantic.DeepEqual(old, cur) {
|
||||
return
|
||||
}
|
||||
newPod := old.(*api.Pod)
|
||||
services, err := e.getPodServiceMemberships(newPod)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
|
||||
return
|
||||
}
|
||||
var resultErr error
|
||||
for i := range services.Items {
|
||||
service := &services.Items[i]
|
||||
|
||||
if service.Spec.Selector == nil {
|
||||
// services without a selector receive no endpoints from this controller;
|
||||
// these services will receive the endpoints that are created out-of-band via the REST API.
|
||||
continue
|
||||
}
|
||||
|
||||
glog.V(5).Infof("About to update endpoints for service %s/%s", service.Namespace, service.Name)
|
||||
pods, err := e.client.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector())
|
||||
oldPod := cur.(*api.Pod)
|
||||
// Only need to get the old services if the labels changed.
|
||||
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) {
|
||||
oldServices, err := e.getPodServiceMemberships(oldPod)
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing service: %s/%s, skipping", service.Namespace, service.Name)
|
||||
resultErr = err
|
||||
continue
|
||||
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
|
||||
return
|
||||
}
|
||||
services = services.Union(oldServices)
|
||||
}
|
||||
for key := range services {
|
||||
e.queue.Add(key)
|
||||
}
|
||||
}
|
||||
|
||||
subsets := []api.EndpointSubset{}
|
||||
for i := range pods.Items {
|
||||
pod := &pods.Items[i]
|
||||
// When a pod is deleted, enqueue the services the pod used to be a member of.
|
||||
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
|
||||
func (e *EndpointController) deletePod(obj interface{}) {
|
||||
if _, ok := obj.(*api.Pod); ok {
|
||||
// Enqueue all the services that the pod used to be a member
|
||||
// of. This happens to be exactly the same thing we do when a
|
||||
// pod is added.
|
||||
e.addPod(obj)
|
||||
return
|
||||
}
|
||||
podKey, err := keyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||
}
|
||||
glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod)
|
||||
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
// TODO: keep a map of pods to services to handle this condition.
|
||||
}
|
||||
|
||||
// TODO: Once v1beta1 and v1beta2 are EOL'ed,
|
||||
// this can safely assume that TargetPort is
|
||||
// populated, and findPort() can be removed.
|
||||
_ = v1beta1.Dependency
|
||||
_ = v1beta2.Dependency
|
||||
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
|
||||
func (e *EndpointController) enqueueService(obj interface{}) {
|
||||
key, err := keyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||
}
|
||||
|
||||
portName := servicePort.Name
|
||||
portProto := servicePort.Protocol
|
||||
portNum, err := findPort(pod, servicePort)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
|
||||
continue
|
||||
}
|
||||
if len(pod.Status.PodIP) == 0 {
|
||||
glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
|
||||
continue
|
||||
}
|
||||
e.queue.Add(key)
|
||||
}
|
||||
|
||||
inService := false
|
||||
for _, c := range pod.Status.Conditions {
|
||||
if c.Type == api.PodReady && c.Status == api.ConditionTrue {
|
||||
inService = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !inService {
|
||||
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
|
||||
epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: pod.ObjectMeta.Namespace,
|
||||
Name: pod.ObjectMeta.Name,
|
||||
UID: pod.ObjectMeta.UID,
|
||||
ResourceVersion: pod.ObjectMeta.ResourceVersion,
|
||||
}}
|
||||
subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}})
|
||||
// worker runs a worker thread that just dequeues items, processes them, and
|
||||
// marks them done. You may run as many of these in parallel as you wish; the
|
||||
// workqueue guarantees that they will not end up processing the same service
|
||||
// at the same time.
|
||||
func (e *EndpointController) worker() {
|
||||
for {
|
||||
func() {
|
||||
key, quit := e.queue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
}
|
||||
subsets = endpoints.RepackSubsets(subsets)
|
||||
// Use defer: in the unlikely event that there's a
|
||||
// panic, we'd still like this to get marked done--
|
||||
// otherwise the controller will not be able to sync
|
||||
// this service again until it is restarted.
|
||||
defer e.queue.Done(key)
|
||||
e.syncService(key.(string))
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// See if there's actually an update here.
|
||||
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)
|
||||
func (e *EndpointController) syncService(key string) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
obj, exists, err := e.serviceStore.Store.GetByKey(key)
|
||||
if err != nil || !exists {
|
||||
// Delete the corresponding endpoint, as the service has been deleted.
|
||||
// TODO: Please note that this will delete an endpoint when a
|
||||
// service is deleted. However, if we're down at the time when
|
||||
// the service is deleted, we will miss that deletion, so this
|
||||
// doesn't completely solve the problem. See #6877.
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
currentEndpoints = &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: service.Name,
|
||||
Labels: service.Labels,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
glog.Errorf("Error getting endpoints: %v", err)
|
||||
glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)
|
||||
// Don't retry, as the key isn't going to magically become understandable.
|
||||
return
|
||||
}
|
||||
err = e.client.Endpoints(namespace).Delete(name)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
glog.Errorf("Error deleting endpoint %q: %v", key, err)
|
||||
e.queue.Add(key) // Retry
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
service := obj.(*api.Service)
|
||||
if service.Spec.Selector == nil {
|
||||
// services without a selector receive no endpoints from this controller;
|
||||
// these services will receive the endpoints that are created out-of-band via the REST API.
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(5).Infof("About to update endpoints for service %q", key)
|
||||
pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector())
|
||||
if err != nil {
|
||||
// Since we're getting stuff from a local cache, it is
|
||||
// basically impossible to get this error.
|
||||
glog.Errorf("Error syncing service %q: %v", key, err)
|
||||
e.queue.Add(key) // Retry
|
||||
return
|
||||
}
|
||||
|
||||
subsets := []api.EndpointSubset{}
|
||||
for i := range pods.Items {
|
||||
pod := &pods.Items[i]
|
||||
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
|
||||
// TODO: Once v1beta1 and v1beta2 are EOL'ed,
|
||||
// this can safely assume that TargetPort is
|
||||
// populated, and findPort() can be removed.
|
||||
_ = v1beta1.Dependency
|
||||
_ = v1beta2.Dependency
|
||||
|
||||
portName := servicePort.Name
|
||||
portProto := servicePort.Protocol
|
||||
portNum, err := findPort(pod, servicePort)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
|
||||
continue
|
||||
}
|
||||
if len(pod.Status.PodIP) == 0 {
|
||||
glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
|
||||
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
|
||||
continue
|
||||
}
|
||||
newEndpoints := currentEndpoints
|
||||
newEndpoints.Subsets = subsets
|
||||
newEndpoints.Labels = service.Labels
|
||||
|
||||
if len(currentEndpoints.ResourceVersion) == 0 {
|
||||
// No previous endpoints, create them
|
||||
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
|
||||
} else {
|
||||
// Pre-existing
|
||||
_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints)
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("Error updating endpoints: %v", err)
|
||||
continue
|
||||
inService := false
|
||||
for _, c := range pod.Status.Conditions {
|
||||
if c.Type == api.PodReady && c.Status == api.ConditionTrue {
|
||||
inService = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !inService {
|
||||
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
|
||||
epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: pod.ObjectMeta.Namespace,
|
||||
Name: pod.ObjectMeta.Name,
|
||||
UID: pod.ObjectMeta.UID,
|
||||
ResourceVersion: pod.ObjectMeta.ResourceVersion,
|
||||
}}
|
||||
subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}})
|
||||
}
|
||||
}
|
||||
return resultErr
|
||||
subsets = endpoints.RepackSubsets(subsets)
|
||||
|
||||
// See if there's actually an update here.
|
||||
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
currentEndpoints = &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: service.Name,
|
||||
Labels: service.Labels,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
glog.Errorf("Error getting endpoints: %v", err)
|
||||
e.queue.Add(key) // Retry
|
||||
return
|
||||
}
|
||||
}
|
||||
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
|
||||
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
|
||||
return
|
||||
}
|
||||
newEndpoints := currentEndpoints
|
||||
newEndpoints.Subsets = subsets
|
||||
newEndpoints.Labels = service.Labels
|
||||
|
||||
if len(currentEndpoints.ResourceVersion) == 0 {
|
||||
// No previous endpoints, create them
|
||||
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
|
||||
} else {
|
||||
// Pre-existing
|
||||
_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints)
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("Error updating endpoints: %v", err)
|
||||
e.queue.Add(key) // Retry
|
||||
}
|
||||
}
|
||||
|
||||
func findDefaultPort(pod *api.Pod, servicePort int, proto api.Protocol) int {
|
||||
|
@ -27,16 +27,20 @@ import (
|
||||
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
func newPodList(nPods int, nPorts int) *api.PodList {
|
||||
pods := []api.Pod{}
|
||||
func addPods(store cache.Store, namespace string, nPods int, nPorts int) {
|
||||
for i := 0; i < nPods; i++ {
|
||||
p := api.Pod{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
|
||||
ObjectMeta: api.ObjectMeta{Name: fmt.Sprintf("pod%d", i)},
|
||||
p := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Namespace: namespace,
|
||||
Name: fmt.Sprintf("pod%d", i),
|
||||
Labels: map[string]string{"foo": "bar"},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{{Ports: []api.ContainerPort{}}},
|
||||
},
|
||||
@ -54,11 +58,7 @@ func newPodList(nPods int, nPorts int) *api.PodList {
|
||||
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
|
||||
api.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: 8080 + j})
|
||||
}
|
||||
pods = append(pods, p)
|
||||
}
|
||||
return &api.PodList{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Version(), Kind: "PodList"},
|
||||
Items: pods,
|
||||
store.Add(p)
|
||||
}
|
||||
}
|
||||
|
||||
@ -222,22 +222,12 @@ type serverResponse struct {
|
||||
obj interface{}
|
||||
}
|
||||
|
||||
func makeTestServer(t *testing.T, namespace string, podResponse, serviceResponse, endpointsResponse serverResponse) (*httptest.Server, *util.FakeHandler) {
|
||||
fakePodHandler := util.FakeHandler{
|
||||
StatusCode: podResponse.statusCode,
|
||||
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), podResponse.obj.(runtime.Object)),
|
||||
}
|
||||
fakeServiceHandler := util.FakeHandler{
|
||||
StatusCode: serviceResponse.statusCode,
|
||||
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), serviceResponse.obj.(runtime.Object)),
|
||||
}
|
||||
func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResponse) (*httptest.Server, *util.FakeHandler) {
|
||||
fakeEndpointsHandler := util.FakeHandler{
|
||||
StatusCode: endpointsResponse.statusCode,
|
||||
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), endpointsResponse.obj.(runtime.Object)),
|
||||
}
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle(testapi.ResourcePath("pods", namespace, ""), &fakePodHandler)
|
||||
mux.Handle(testapi.ResourcePath("services", "", ""), &fakeServiceHandler)
|
||||
mux.Handle(testapi.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler)
|
||||
mux.Handle(testapi.ResourcePath("endpoints/", namespace, ""), &fakeEndpointsHandler)
|
||||
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
|
||||
@ -247,47 +237,13 @@ func makeTestServer(t *testing.T, namespace string, podResponse, serviceResponse
|
||||
return httptest.NewServer(mux), &fakeEndpointsHandler
|
||||
}
|
||||
|
||||
func TestSyncEndpointsEmpty(t *testing.T) {
|
||||
testServer, _ := makeTestServer(t, api.NamespaceDefault,
|
||||
serverResponse{http.StatusOK, newPodList(0, 0)},
|
||||
serverResponse{http.StatusOK, &api.ServiceList{}},
|
||||
serverResponse{http.StatusOK, &api.Endpoints{}})
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncEndpointsError(t *testing.T) {
|
||||
testServer, _ := makeTestServer(t, api.NamespaceDefault,
|
||||
serverResponse{http.StatusOK, newPodList(0, 0)},
|
||||
serverResponse{http.StatusInternalServerError, &api.ServiceList{}},
|
||||
serverResponse{http.StatusOK, &api.Endpoints{}})
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err == nil {
|
||||
t.Errorf("unexpected non-error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault,
|
||||
serverResponse{http.StatusOK, newPodList(0, 0)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
ns := api.NamespaceDefault
|
||||
testServer, endpointsHandler := makeTestServer(t, ns,
|
||||
serverResponse{http.StatusOK, &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Subsets: []api.EndpointSubset{{
|
||||
@ -298,30 +254,21 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
|
||||
})
|
||||
endpoints.syncService(ns + "/foo")
|
||||
endpointsHandler.ValidateRequestCount(t, 0)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
Ports: []api.ServicePort{{Port: 80}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, endpointsHandler := makeTestServer(t, "other",
|
||||
serverResponse{http.StatusOK, newPodList(0, 0)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
ns := "other"
|
||||
testServer, endpointsHandler := makeTestServer(t, ns,
|
||||
serverResponse{http.StatusOK, &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Subsets: []api.EndpointSubset{{
|
||||
@ -332,30 +279,24 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
Ports: []api.ServicePort{{Port: 80}},
|
||||
},
|
||||
})
|
||||
endpoints.syncService(ns + "/foo")
|
||||
endpointsHandler.ValidateRequestCount(t, 0)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsProtocolUDP(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
Ports: []api.ServicePort{{Port: 80}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, endpointsHandler := makeTestServer(t, "other",
|
||||
serverResponse{http.StatusOK, newPodList(0, 0)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
ns := "other"
|
||||
testServer, endpointsHandler := makeTestServer(t, ns,
|
||||
serverResponse{http.StatusOK, &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Subsets: []api.EndpointSubset{{
|
||||
@ -366,30 +307,24 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
Ports: []api.ServicePort{{Port: 80}},
|
||||
},
|
||||
})
|
||||
endpoints.syncService(ns + "/foo")
|
||||
endpointsHandler.ValidateRequestCount(t, 0)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, endpointsHandler := makeTestServer(t, "other",
|
||||
serverResponse{http.StatusOK, newPodList(1, 1)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
ns := "other"
|
||||
testServer, endpointsHandler := makeTestServer(t, ns,
|
||||
serverResponse{http.StatusOK, &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Subsets: []api.EndpointSubset{},
|
||||
@ -397,40 +332,36 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
addPods(endpoints.podStore.Store, ns, 1, 1)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
|
||||
},
|
||||
})
|
||||
endpoints.syncService(ns + "/foo")
|
||||
data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Subsets: []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}},
|
||||
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
||||
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
||||
}},
|
||||
})
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", "foo"), "PUT", &data)
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsItemsPreexisting(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, endpointsHandler := makeTestServer(t, "bar",
|
||||
serverResponse{http.StatusOK, newPodList(1, 1)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
ns := "bar"
|
||||
testServer, endpointsHandler := makeTestServer(t, ns,
|
||||
serverResponse{http.StatusOK, &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Subsets: []api.EndpointSubset{{
|
||||
@ -441,85 +372,83 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
addPods(endpoints.podStore.Store, ns, 1, 1)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
|
||||
},
|
||||
})
|
||||
endpoints.syncService(ns + "/foo")
|
||||
data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Subsets: []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}},
|
||||
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
||||
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
||||
}},
|
||||
})
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "bar", "foo"), "PUT", &data)
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
ns := api.NamespaceDefault
|
||||
testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault,
|
||||
serverResponse{http.StatusOK, newPodList(1, 1)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
serverResponse{http.StatusOK, &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
ResourceVersion: "1",
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
},
|
||||
Subsets: []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}},
|
||||
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
||||
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
||||
}},
|
||||
}})
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
|
||||
},
|
||||
})
|
||||
endpoints.syncService(ns + "/foo")
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", api.NamespaceDefault, "foo"), "GET", nil)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsItems(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{
|
||||
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)},
|
||||
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, endpointsHandler := makeTestServer(t, "other",
|
||||
serverResponse{http.StatusOK, newPodList(3, 2)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
ns := "other"
|
||||
testServer, endpointsHandler := makeTestServer(t, ns,
|
||||
serverResponse{http.StatusOK, &api.Endpoints{}})
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
addPods(endpoints.podStore.Store, ns, 3, 2)
|
||||
addPods(endpoints.podStore.Store, "blah", 5, 2) // make sure these aren't found!
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{
|
||||
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)},
|
||||
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)},
|
||||
},
|
||||
},
|
||||
})
|
||||
endpoints.syncService("other/foo")
|
||||
expectedSubsets := []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{
|
||||
{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}},
|
||||
{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1"}},
|
||||
{IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2"}},
|
||||
{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
|
||||
{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
|
||||
{IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
|
||||
},
|
||||
Ports: []api.EndpointPort{
|
||||
{Name: "port0", Port: 8080, Protocol: "TCP"},
|
||||
@ -534,69 +463,38 @@ func TestSyncEndpointsItems(t *testing.T) {
|
||||
})
|
||||
// endpointsHandler should get 2 requests - one for "GET" and the next for "POST".
|
||||
endpointsHandler.ValidateRequestCount(t, 2)
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", ""), "POST", &data)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsPodError(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, _ := makeTestServer(t, api.NamespaceDefault,
|
||||
serverResponse{http.StatusInternalServerError, &api.PodList{}},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
serverResponse{http.StatusOK, &api.Endpoints{}})
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err == nil {
|
||||
t.Error("Unexpected non-error")
|
||||
}
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, ""), "POST", &data)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsItemsWithLabels(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: "other",
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{
|
||||
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)},
|
||||
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, endpointsHandler := makeTestServer(t, "other",
|
||||
serverResponse{http.StatusOK, newPodList(3, 2)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
ns := "other"
|
||||
testServer, endpointsHandler := makeTestServer(t, ns,
|
||||
serverResponse{http.StatusOK, &api.Endpoints{}})
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
addPods(endpoints.podStore.Store, ns, 3, 2)
|
||||
serviceLabels := map[string]string{"foo": "bar"}
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
Labels: serviceLabels,
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{
|
||||
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)},
|
||||
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)},
|
||||
},
|
||||
},
|
||||
})
|
||||
endpoints.syncService(ns + "/foo")
|
||||
expectedSubsets := []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{
|
||||
{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}},
|
||||
{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1"}},
|
||||
{IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2"}},
|
||||
{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
|
||||
{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
|
||||
{IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
|
||||
},
|
||||
Ports: []api.EndpointPort{
|
||||
{Name: "port0", Port: 8080, Protocol: "TCP"},
|
||||
@ -606,39 +504,22 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
|
||||
data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
ResourceVersion: "",
|
||||
Labels: serviceList.Items[0].Labels,
|
||||
Labels: serviceLabels,
|
||||
},
|
||||
Subsets: endptspkg.SortSubsets(expectedSubsets),
|
||||
})
|
||||
// endpointsHandler should get 2 requests - one for "GET" and the next for "POST".
|
||||
endpointsHandler.ValidateRequestCount(t, 2)
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", ""), "POST", &data)
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, ""), "POST", &data)
|
||||
}
|
||||
|
||||
func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
|
||||
serviceList := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: "bar",
|
||||
Labels: map[string]string{
|
||||
"baz": "blah",
|
||||
},
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testServer, endpointsHandler := makeTestServer(t, "bar",
|
||||
serverResponse{http.StatusOK, newPodList(1, 1)},
|
||||
serverResponse{http.StatusOK, &serviceList},
|
||||
ns := "bar"
|
||||
testServer, endpointsHandler := makeTestServer(t, ns,
|
||||
serverResponse{http.StatusOK, &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
ResourceVersion: "1",
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
@ -652,19 +533,31 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
endpoints := NewEndpointController(client)
|
||||
if err := endpoints.SyncServiceEndpoints(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
addPods(endpoints.podStore.Store, ns, 1, 1)
|
||||
serviceLabels := map[string]string{"baz": "blah"}
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
Labels: serviceLabels,
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
|
||||
},
|
||||
})
|
||||
endpoints.syncService(ns + "/foo")
|
||||
data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
ResourceVersion: "1",
|
||||
Labels: serviceList.Items[0].Labels,
|
||||
Labels: serviceLabels,
|
||||
},
|
||||
Subsets: []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}},
|
||||
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
||||
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
|
||||
}},
|
||||
})
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "bar", "foo"), "PUT", &data)
|
||||
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data)
|
||||
}
|
||||
|
@ -91,6 +91,24 @@ func (s StringSet) Difference(s2 StringSet) StringSet {
|
||||
return result
|
||||
}
|
||||
|
||||
// Union returns a new set which includes items in either s1 or s2.
|
||||
// vof objects that are not in s2
|
||||
// For example:
|
||||
// s1 = {1, 2}
|
||||
// s2 = {3, 4}
|
||||
// s1.Union(s2) = {1, 2, 3, 4}
|
||||
// s2.Union(s1) = {1, 2, 3, 4}
|
||||
func (s1 StringSet) Union(s2 StringSet) StringSet {
|
||||
result := NewStringSet()
|
||||
for key := range s1 {
|
||||
result.Insert(key)
|
||||
}
|
||||
for key := range s2 {
|
||||
result.Insert(key)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// IsSuperset returns true iff s1 is a superset of s2.
|
||||
func (s1 StringSet) IsSuperset(s2 StringSet) bool {
|
||||
for item := range s2 {
|
||||
|
Loading…
Reference in New Issue
Block a user