diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 99db95f9b06..9ceec048b04 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -24,40 +24,44 @@ import ( "github.com/golang/glog" ) +// Operation is a type of operation of services or endpoints. type Operation int +// These are the available operation types. const ( SET Operation = iota ADD REMOVE ) -// Defines an operation sent on the channel. You can add or remove single services by -// sending an array of size one and Op == ADD|REMOVE. For setting the state of the system -// to a given state for this source configuration, set Services as desired and Op to SET, -// which will reset the system state to that specified in this operation for this source -// channel. To remove all services, set Services to empty array and Op to SET +// ServiceUpdate describes an operation of services, sent on the channel. +// You can add or remove single services by sending an array of size one and Op == ADD|REMOVE. +// For setting the state of the system to a given state for this source configuration, set Services as desired and Op to SET, +// which will reset the system state to that specified in this operation for this source channel. +// To remove all services, set Services to empty array and Op to SET type ServiceUpdate struct { Services []api.Service Op Operation } -// Defines an operation sent on the channel. You can add or remove single endpoints by -// sending an array of size one and Op == ADD|REMOVE. For setting the state of the system -// to a given state for this source configuration, set Endpoints as desired and Op to SET, -// which will reset the system state to that specified in this operation for this source -// channel. To remove all endpoints, set Endpoints to empty array and Op to SET +// EndpointsUpdate describes an operation of endpoints, sent on the channel. +// You can add or remove single endpoints by sending an array of size one and Op == ADD|REMOVE. +// For setting the state of the system to a given state for this source configuration, set Endpoints as desired and Op to SET, +// which will reset the system state to that specified in this operation for this source channel. +// To remove all endpoints, set Endpoints to empty array and Op to SET type EndpointsUpdate struct { Endpoints []api.Endpoints Op Operation } +// ServiceConfigHandler handles update notifications of the set of services. type ServiceConfigHandler interface { - // Sent when a configuration has been changed by one of the sources. This is the - // union of all the configuration sources. + // OnUpdate gets called when a configuration has been changed by one of the sources. + // This is the union of all the configuration sources. OnUpdate(services []api.Service) } +// EndpointsConfigHandler handles update notifications of the set of endpoints. type EndpointsConfigHandler interface { // OnUpdate gets called when endpoints configuration is changed for a given // service on any of the configuration sources. An example is when a new @@ -65,6 +69,8 @@ type EndpointsConfigHandler interface { OnUpdate(endpoints []api.Endpoints) } +// ServiceConfig tracks a set of service configurations and their endpoint configurations. +// It accepts "set", "add" and "remove" operations of services and endpoints via channels, and invokes registered handlers on change. type ServiceConfig struct { // Configuration sources and their lock. configSourceLock sync.RWMutex @@ -94,6 +100,8 @@ type ServiceConfig struct { endpointsNotifyChannel chan string } +// NewServiceConfig creates a new ServiceConfig. +// It immediately runs the created ServiceConfig. func NewServiceConfig() *ServiceConfig { config := &ServiceConfig{ serviceConfigSources: make(map[string]chan ServiceUpdate), @@ -109,22 +117,26 @@ func NewServiceConfig() *ServiceConfig { return config } +// Run begins a loop to accept new service configurations and new endpoint configurations. +// It never returns. func (impl *ServiceConfig) Run() { glog.Infof("Starting the config Run loop") for { select { case source := <-impl.serviceNotifyChannel: glog.Infof("Got new service configuration from source %s", source) - impl.NotifyServiceUpdate() + impl.notifyServiceUpdate() case source := <-impl.endpointsNotifyChannel: glog.Infof("Got new endpoint configuration from source %s", source) - impl.NotifyEndpointsUpdate() + impl.notifyEndpointsUpdate() case <-time.After(1 * time.Second): } } } -func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel chan ServiceUpdate) { +// serviceChannelListener begins a loop to handle incoming ServiceUpdate notifications from the channel. +// It never returns. +func (impl *ServiceConfig) serviceChannelListener(source string, listenChannel chan ServiceUpdate) { // Represents the current services configuration for this channel. serviceMap := make(map[string]api.Service) for { @@ -160,7 +172,9 @@ func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel c } } -func (impl *ServiceConfig) EndpointsChannelListener(source string, listenChannel chan EndpointsUpdate) { +// endpointsChannelListener begins a loop to handle incoming EndpointsUpdate notifications from the channel. +// It never returns. +func (impl *ServiceConfig) endpointsChannelListener(source string, listenChannel chan EndpointsUpdate) { endpointMap := make(map[string]api.Endpoints) for { select { @@ -214,11 +228,11 @@ func (impl *ServiceConfig) GetServiceConfigurationChannel(source string) chan Se } newChannel := make(chan ServiceUpdate) impl.serviceConfigSources[source] = newChannel - go impl.ServiceChannelListener(source, newChannel) + go impl.serviceChannelListener(source, newChannel) return newChannel } -// GetEndpointConfigurationChannel returns a channel where a configuration source +// GetEndpointsConfigurationChannel returns a channel where a configuration source // can send updates of new endpoint configurations. Multiple calls with the same // source will return the same channel. This allows change and state based sources // to use the same channel. Difference source names however will be treated as a @@ -235,11 +249,11 @@ func (impl *ServiceConfig) GetEndpointsConfigurationChannel(source string) chan } newChannel := make(chan EndpointsUpdate) impl.endpointsConfigSources[source] = newChannel - go impl.EndpointsChannelListener(source, newChannel) + go impl.endpointsChannelListener(source, newChannel) return newChannel } -// Register ServiceConfigHandler to receive updates of changes to services. +// RegisterServiceHandler registers the ServiceConfigHandler to receive updates of changes to services. func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) { impl.handlerLock.Lock() defer impl.handlerLock.Unlock() @@ -255,7 +269,7 @@ func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) panic("Only up to 10 service handlers supported for now") } -// Register ServiceConfigHandler to receive updates of changes to services. +// RegisterEndpointsHandler registers the EndpointsConfigHandler to receive updates of changes to services. func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandler) { impl.handlerLock.Lock() defer impl.handlerLock.Unlock() @@ -271,8 +285,9 @@ func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandl panic("Only up to 10 endpoint handlers supported for now") } -func (impl *ServiceConfig) NotifyServiceUpdate() { - services := make([]api.Service, 0) +// notifyServiceUpdate calls the registered ServiceConfigHandlers with the current states of services. +func (impl *ServiceConfig) notifyServiceUpdate() { + services := []api.Service{} impl.configLock.RLock() for _, sourceServices := range impl.serviceConfig { for _, value := range sourceServices { @@ -291,8 +306,9 @@ func (impl *ServiceConfig) NotifyServiceUpdate() { } } -func (impl *ServiceConfig) NotifyEndpointsUpdate() { - endpoints := make([]api.Endpoints, 0) +// notifyEndpointsUpdate calls the registered EndpointsConfigHandlers with the current states of endpoints. +func (impl *ServiceConfig) notifyEndpointsUpdate() { + endpoints := []api.Endpoints{} impl.configLock.RLock() for _, sourceEndpoints := range impl.endpointConfig { for _, value := range sourceEndpoints { diff --git a/pkg/proxy/config/etcd.go b/pkg/proxy/config/etcd.go index 5b536f4e9ca..cac508a649b 100644 --- a/pkg/proxy/config/etcd.go +++ b/pkg/proxy/config/etcd.go @@ -15,7 +15,7 @@ limitations under the License. */ // Watches etcd and gets the full configuration on preset intervals. -// Expects the list of exposed services to live under: +// It expects the list of exposed services to live under: // registry/services // which in etcd is exposed like so: // http:///v2/keys/registry/services @@ -30,7 +30,7 @@ limitations under the License. // '[ { "machine": , "name": }, // { "machine": , "name": } // ]', -// + package config import ( @@ -44,14 +44,18 @@ import ( "github.com/golang/glog" ) -const RegistryRoot = "registry/services" +// registryRoot is the key prefix for service configs in etcd. +const registryRoot = "registry/services" +// ConfigSourceEtcd communicates with a etcd via the client, and sends the change notification of services and endpoints to the specified channels. type ConfigSourceEtcd struct { client *etcd.Client serviceChannel chan ServiceUpdate endpointsChannel chan EndpointsUpdate } +// NewConfigSourceEtcd creates a new ConfigSourceEtcd. +// It immediately runs the created ConfigSourceEtcd in a goroutine. func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd { config := ConfigSourceEtcd{ client: client, @@ -62,13 +66,14 @@ func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, return config } +// Run begins watching for new services and their endpoints on etcd. func (impl ConfigSourceEtcd) Run() { // Initially, just wait for the etcd to come up before doing anything more complicated. var services []api.Service var endpoints []api.Endpoints var err error for { - services, endpoints, err = impl.GetServices() + services, endpoints, err = impl.getServices() if err == nil { break } @@ -87,10 +92,10 @@ func (impl ConfigSourceEtcd) Run() { // Ok, so we got something back from etcd. Let's set up a watch for new services, and // their endpoints - go impl.WatchForChanges() + go impl.watchForChanges() for { - services, endpoints, err = impl.GetServices() + services, endpoints, err = impl.getServices() if err != nil { glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err) } else { @@ -107,12 +112,12 @@ func (impl ConfigSourceEtcd) Run() { } } -// Finds the list of services and their endpoints from etcd. +// getServices finds the list of services and their endpoints from etcd. // This operation is akin to a set a known good at regular intervals. -func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) { - response, err := impl.client.Get(RegistryRoot+"/specs", true, false) +func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, error) { + response, err := impl.client.Get(registryRoot+"/specs", true, false) if err != nil { - glog.Errorf("Failed to get the key %s: %v", RegistryRoot, err) + glog.Errorf("Failed to get the key %s: %v", registryRoot, err) return make([]api.Service, 0), make([]api.Endpoints, 0), err } if response.Node.Dir == true { @@ -129,7 +134,7 @@ func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, erro continue } retServices[i] = svc - endpoints, err := impl.GetEndpoints(svc.ID) + endpoints, err := impl.getEndpoints(svc.ID) if err != nil { glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err) } @@ -138,23 +143,23 @@ func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, erro } return retServices, retEndpoints, err } - return nil, nil, fmt.Errorf("did not get the root of the registry %s", RegistryRoot) + return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot) } -func (impl ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) { - key := fmt.Sprintf(RegistryRoot + "/endpoints/" + service) +// getEndpoints finds the list of endpoints of the service from etcd. +func (impl ConfigSourceEtcd) getEndpoints(service string) (api.Endpoints, error) { + key := fmt.Sprintf(registryRoot + "/endpoints/" + service) response, err := impl.client.Get(key, true, false) if err != nil { glog.Errorf("Failed to get the key: %s %v", key, err) return api.Endpoints{}, err } // Parse all the endpoint specifications in this value. - return ParseEndpoints(response.Node.Value) + return parseEndpoints(response.Node.Value) } -// EtcdResponseToServiceAndLocalport takes an etcd response and pulls it apart to find -// service -func EtcdResponseToService(response *etcd.Response) (*api.Service, error) { +// etcdResponseToService takes an etcd response and pulls it apart to find service. +func etcdResponseToService(response *etcd.Response) (*api.Service, error) { if response.Node == nil { return nil, fmt.Errorf("invalid response from etcd: %#v", response) } @@ -166,31 +171,31 @@ func EtcdResponseToService(response *etcd.Response) (*api.Service, error) { return &svc, err } -func ParseEndpoints(jsonString string) (api.Endpoints, error) { +func parseEndpoints(jsonString string) (api.Endpoints, error) { var e api.Endpoints err := json.Unmarshal([]byte(jsonString), &e) return e, err } -func (impl ConfigSourceEtcd) WatchForChanges() { +func (impl ConfigSourceEtcd) watchForChanges() { glog.Info("Setting up a watch for new services") watchChannel := make(chan *etcd.Response) go impl.client.Watch("/registry/services/", 0, true, watchChannel, nil) for { watchResponse := <-watchChannel - impl.ProcessChange(watchResponse) + impl.processChange(watchResponse) } } -func (impl ConfigSourceEtcd) ProcessChange(response *etcd.Response) { +func (impl ConfigSourceEtcd) processChange(response *etcd.Response) { glog.Infof("Processing a change in service configuration... %s", *response) // If it's a new service being added (signified by a localport being added) // then process it as such if strings.Contains(response.Node.Key, "/endpoints/") { - impl.ProcessEndpointResponse(response) + impl.processEndpointResponse(response) } else if response.Action == "set" { - service, err := EtcdResponseToService(response) + service, err := etcdResponseToService(response) if err != nil { glog.Errorf("Failed to parse %s Port: %s", response, err) return @@ -208,13 +213,12 @@ func (impl ConfigSourceEtcd) ProcessChange(response *etcd.Response) { serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{JSONBase: api.JSONBase{ID: parts[3]}}}} impl.serviceChannel <- serviceUpdate return - } else { - glog.Infof("Unknown service delete: %#v", parts) } + glog.Infof("Unknown service delete: %#v", parts) } } -func (impl ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) { +func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) { glog.Infof("Processing a change in endpoint configuration... %s", *response) var endpoints api.Endpoints err := json.Unmarshal([]byte(response.Node.Value), &endpoints) diff --git a/pkg/proxy/config/etcd_test.go b/pkg/proxy/config/etcd_test.go index 7c7a832f0df..9b1dafbcccb 100644 --- a/pkg/proxy/config/etcd_test.go +++ b/pkg/proxy/config/etcd_test.go @@ -26,15 +26,15 @@ import ( const TomcatContainerEtcdKey = "/registry/services/tomcat/endpoints/tomcat-3bd5af34" const TomcatService = "tomcat" -const TomcatContainerId = "tomcat-3bd5af34" +const TomcatContainerID = "tomcat-3bd5af34" -func ValidateJsonParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) { - endpoints, err := ParseEndpoints(jsonString) +func validateJSONParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) { + endpoints, err := parseEndpoints(jsonString) if err == nil && expectError { - t.Errorf("ValidateJsonParsing did not get expected error when parsing %s", jsonString) + t.Errorf("validateJSONParsing did not get expected error when parsing %s", jsonString) } if err != nil && !expectError { - t.Errorf("ValidateJsonParsing got unexpected error %+v when parsing %s", err, jsonString) + t.Errorf("validateJSONParsing got unexpected error %+v when parsing %s", err, jsonString) } if !reflect.DeepEqual(expectedEndpoints, endpoints) { t.Errorf("Didn't get expected endpoints %+v got: %+v", expectedEndpoints, endpoints) @@ -42,7 +42,7 @@ func ValidateJsonParsing(t *testing.T, jsonString string, expectedEndpoints api. } func TestParseJsonEndpoints(t *testing.T) { - ValidateJsonParsing(t, "", api.Endpoints{}, true) + validateJSONParsing(t, "", api.Endpoints{}, true) endpoints := api.Endpoints{ Name: "foo", Endpoints: []string{"foo", "bar", "baz"}, @@ -51,6 +51,6 @@ func TestParseJsonEndpoints(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %#v", err) } - ValidateJsonParsing(t, string(data), endpoints, false) - // ValidateJsonParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false) + validateJSONParsing(t, string(data), endpoints, false) + // validateJSONParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false) } diff --git a/pkg/proxy/config/file.go b/pkg/proxy/config/file.go index 6ae0a09db8d..2053691bae2 100644 --- a/pkg/proxy/config/file.go +++ b/pkg/proxy/config/file.go @@ -28,6 +28,7 @@ limitations under the License. // } //] //} + package config import ( @@ -41,22 +42,24 @@ import ( "github.com/golang/glog" ) -// TODO: kill this struct. -type ServiceJSON struct { - Name string - Port int - Endpoints []string -} -type ConfigFile struct { - Services []ServiceJSON +// serviceConfig is a deserialized form of the config file format which ConfigSourceFile accepts. +type serviceConfig struct { + Services []struct { + Name string `json: "name"` + Port int `json: "port"` + Endpoints []string `json: "endpoints"` + } `json: "service"` } +// ConfigSourceFile periodically reads service configurations in JSON from a file, and sends the services and endpoints defined in th file to the specified channels. type ConfigSourceFile struct { serviceChannel chan ServiceUpdate endpointsChannel chan EndpointsUpdate filename string } +// NewConfigSourceFile creates a new ConfigSourceFile. +// It immediately runs the created ConfigSourceFile in a goroutine. func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceFile { config := ConfigSourceFile{ filename: filename, @@ -67,6 +70,7 @@ func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, end return config } +// Run begins watching the config file. func (impl ConfigSourceFile) Run() { glog.Infof("Watching file %s", impl.filename) var lastData []byte @@ -78,7 +82,7 @@ func (impl ConfigSourceFile) Run() { if err != nil { glog.Errorf("Couldn't read file: %s : %v", impl.filename, err) } else { - var config ConfigFile + var config serviceConfig err = json.Unmarshal(data, &config) if err != nil { glog.Errorf("Couldn't unmarshal configuration from file : %s %v", data, err) diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 5e781456d5e..f957a51b06e 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -38,8 +38,8 @@ func NewProxier(loadBalancer LoadBalancer) *Proxier { return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)} } -// CopyBytes copies bytes from in to out until EOF. -func CopyBytes(in, out *net.TCPConn) { +// copyBytes copies bytes from in to out until EOF. +func copyBytes(in, out *net.TCPConn) { glog.Infof("Copying from %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) _, err := io.Copy(in, out) @@ -51,15 +51,17 @@ func CopyBytes(in, out *net.TCPConn) { out.CloseWrite() } -// ProxyConnection creates a bidirectional byte shuffler. -// Copies bytes to/from each connection. -func ProxyConnection(in, out *net.TCPConn) { +// proxyConnection creates a bidirectional byte shuffler. +// It copies bytes to/from each connection. +func proxyConnection(in, out *net.TCPConn) { glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) - go CopyBytes(in, out) - go CopyBytes(out, in) + go copyBytes(in, out) + go copyBytes(out, in) } +// AcceptHandler begins accepting incoming connections from listener and proxying the connections to the load-balanced endpoints. +// It never returns. func (proxier Proxier) AcceptHandler(service string, listener net.Listener) { for { inConn, err := listener.Accept() @@ -86,12 +88,12 @@ func (proxier Proxier) AcceptHandler(service string, listener net.Listener) { inConn.Close() continue } - go ProxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) + go proxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) } } -// AddService starts listening for a new service on a given port. -func (proxier Proxier) AddService(service string, port int) error { +// addService starts listening for a new service on a given port. +func (proxier Proxier) addService(service string, port int) error { // Make sure we can start listening on the port before saying all's well. l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { @@ -127,7 +129,7 @@ func (proxier Proxier) OnUpdate(services []api.Service) { port, exists := proxier.serviceMap[service.ID] if !exists || port != service.Port { glog.Infof("Adding a new service %s on port %d", service.ID, service.Port) - err := proxier.AddService(service.ID, service.Port) + err := proxier.addService(service.ID, service.Port) if err == nil { proxier.serviceMap[service.ID] = service.Port } else { diff --git a/pkg/proxy/roundrobbin.go b/pkg/proxy/roundrobbin.go index a5474843737..d21831e8e6c 100644 --- a/pkg/proxy/roundrobbin.go +++ b/pkg/proxy/roundrobbin.go @@ -30,7 +30,7 @@ import ( "github.com/golang/glog" ) -// LoadBalancerRR is a round-robin load balancer. +// LoadBalancerRR is a round-robin load balancer. It implements LoadBalancer. type LoadBalancerRR struct { lock sync.RWMutex endpointsMap map[string][]string @@ -42,9 +42,7 @@ func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)} } -// LoadBalance registers srcAddr for the provided service. -// It returns error if no entry is available for the service -// or no previous endpoints are registered. +// LoadBalance select an endpoint of the service by round-robin algorithm. func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) { impl.lock.RLock() endpoints, exists := impl.endpointsMap[service] @@ -61,8 +59,8 @@ func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string return endpoint, nil } -// IsValid returns true if spec is valid. -func (impl LoadBalancerRR) IsValid(spec string) bool { +// isValid returns true if spec is valid. +func (impl LoadBalancerRR) isValid(spec string) bool { index := strings.Index(spec, ":") if index == -1 { return false @@ -74,11 +72,11 @@ func (impl LoadBalancerRR) IsValid(spec string) bool { return value > 0 } -// FilterValidEndpoints filters out invalid endpoints. -func (impl LoadBalancerRR) FilterValidEndpoints(endpoints []string) []string { +// filterValidEndpoints filters out invalid endpoints. +func (impl LoadBalancerRR) filterValidEndpoints(endpoints []string) []string { var result []string for _, spec := range endpoints { - if impl.IsValid(spec) { + if impl.isValid(spec) { result = append(result, spec) } } @@ -97,7 +95,7 @@ func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { existingEndpoints, exists := impl.endpointsMap[value.Name] if !exists || !reflect.DeepEqual(value.Endpoints, existingEndpoints) { glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.Name, value.Endpoints) - impl.endpointsMap[value.Name] = impl.FilterValidEndpoints(value.Endpoints) + impl.endpointsMap[value.Name] = impl.filterValidEndpoints(value.Endpoints) // Start RR from the beginning if added or updated. impl.rrIndex[value.Name] = 0 } diff --git a/pkg/proxy/roundrobbin_test.go b/pkg/proxy/roundrobbin_test.go index 216cdbd2de0..1112141de80 100644 --- a/pkg/proxy/roundrobbin_test.go +++ b/pkg/proxy/roundrobbin_test.go @@ -24,16 +24,16 @@ import ( func TestLoadBalanceValidateWorks(t *testing.T) { loadBalancer := NewLoadBalancerRR() - if loadBalancer.IsValid("") { + if loadBalancer.isValid("") { t.Errorf("Didn't fail for empty string") } - if loadBalancer.IsValid("foobar") { + if loadBalancer.isValid("foobar") { t.Errorf("Didn't fail with no port") } - if loadBalancer.IsValid("foobar:-1") { + if loadBalancer.isValid("foobar:-1") { t.Errorf("Didn't fail with a negative port") } - if !loadBalancer.IsValid("foobar:8080") { + if !loadBalancer.isValid("foobar:8080") { t.Errorf("Failed a valid config.") } } @@ -41,7 +41,7 @@ func TestLoadBalanceValidateWorks(t *testing.T) { func TestLoadBalanceFilterWorks(t *testing.T) { loadBalancer := NewLoadBalancerRR() endpoints := []string{"foobar:1", "foobar:2", "foobar:-1", "foobar:3", "foobar:-2"} - filtered := loadBalancer.FilterValidEndpoints(endpoints) + filtered := loadBalancer.filterValidEndpoints(endpoints) if len(filtered) != 3 { t.Errorf("Failed to filter to the correct size")