Merge pull request #5192 from quinton-hoole/2015-03-06-service-namespace-clash

Fix service namespace clash
This commit is contained in:
Tim Hockin 2015-03-17 09:10:50 -07:00
commit 879a39bcc4
10 changed files with 429 additions and 278 deletions

View File

@ -20,6 +20,7 @@ import (
"sync" "sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/config"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -81,7 +82,7 @@ type EndpointsConfig struct {
// It immediately runs the created EndpointsConfig. // It immediately runs the created EndpointsConfig.
func NewEndpointsConfig() *EndpointsConfig { func NewEndpointsConfig() *EndpointsConfig {
updates := make(chan struct{}) updates := make(chan struct{})
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[string]api.Endpoints)} store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)}
mux := config.NewMux(store) mux := config.NewMux(store)
bcaster := config.NewBroadcaster() bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates) go watchForUpdates(bcaster, store, updates)
@ -112,7 +113,7 @@ func (c *EndpointsConfig) Config() []api.Endpoints {
type endpointsStore struct { type endpointsStore struct {
endpointLock sync.RWMutex endpointLock sync.RWMutex
endpoints map[string]map[string]api.Endpoints endpoints map[string]map[types.NamespacedName]api.Endpoints
updates chan<- struct{} updates chan<- struct{}
} }
@ -120,26 +121,29 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
s.endpointLock.Lock() s.endpointLock.Lock()
endpoints := s.endpoints[source] endpoints := s.endpoints[source]
if endpoints == nil { if endpoints == nil {
endpoints = make(map[string]api.Endpoints) endpoints = make(map[types.NamespacedName]api.Endpoints)
} }
update := change.(EndpointsUpdate) update := change.(EndpointsUpdate)
switch update.Op { switch update.Op {
case ADD: case ADD:
glog.V(4).Infof("Adding new endpoint from source %s : %+v", source, update.Endpoints) glog.V(4).Infof("Adding new endpoint from source %s : %+v", source, update.Endpoints)
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
endpoints[value.Name] = value name := types.NamespacedName{value.Namespace, value.Name}
endpoints[name] = value
} }
case REMOVE: case REMOVE:
glog.V(4).Infof("Removing an endpoint %+v", update) glog.V(4).Infof("Removing an endpoint %+v", update)
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
delete(endpoints, value.Name) name := types.NamespacedName{value.Namespace, value.Name}
delete(endpoints, name)
} }
case SET: case SET:
glog.V(4).Infof("Setting endpoints %+v", update) glog.V(4).Infof("Setting endpoints %+v", update)
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
endpoints = make(map[string]api.Endpoints) endpoints = make(map[types.NamespacedName]api.Endpoints)
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
endpoints[value.Name] = value name := types.NamespacedName{value.Namespace, value.Name}
endpoints[name] = value
} }
default: default:
glog.V(4).Infof("Received invalid update type: %v", update) glog.V(4).Infof("Received invalid update type: %v", update)
@ -176,7 +180,7 @@ type ServiceConfig struct {
// It immediately runs the created ServiceConfig. // It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig { func NewServiceConfig() *ServiceConfig {
updates := make(chan struct{}) updates := make(chan struct{})
store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)} store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}
mux := config.NewMux(store) mux := config.NewMux(store)
bcaster := config.NewBroadcaster() bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates) go watchForUpdates(bcaster, store, updates)
@ -207,7 +211,7 @@ func (c *ServiceConfig) Config() []api.Service {
type serviceStore struct { type serviceStore struct {
serviceLock sync.RWMutex serviceLock sync.RWMutex
services map[string]map[string]api.Service services map[string]map[types.NamespacedName]api.Service
updates chan<- struct{} updates chan<- struct{}
} }
@ -215,26 +219,29 @@ func (s *serviceStore) Merge(source string, change interface{}) error {
s.serviceLock.Lock() s.serviceLock.Lock()
services := s.services[source] services := s.services[source]
if services == nil { if services == nil {
services = make(map[string]api.Service) services = make(map[types.NamespacedName]api.Service)
} }
update := change.(ServiceUpdate) update := change.(ServiceUpdate)
switch update.Op { switch update.Op {
case ADD: case ADD:
glog.V(4).Infof("Adding new service from source %s : %+v", source, update.Services) glog.V(4).Infof("Adding new service from source %s : %+v", source, update.Services)
for _, value := range update.Services { for _, value := range update.Services {
services[value.Name] = value name := types.NamespacedName{value.Namespace, value.Name}
services[name] = value
} }
case REMOVE: case REMOVE:
glog.V(4).Infof("Removing a service %+v", update) glog.V(4).Infof("Removing a service %+v", update)
for _, value := range update.Services { for _, value := range update.Services {
delete(services, value.Name) name := types.NamespacedName{value.Namespace, value.Name}
delete(services, name)
} }
case SET: case SET:
glog.V(4).Infof("Setting services %+v", update) glog.V(4).Infof("Setting services %+v", update)
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
services = make(map[string]api.Service) services = make(map[types.NamespacedName]api.Service)
for _, value := range update.Services { for _, value := range update.Services {
services[value.Name] = value name := types.NamespacedName{value.Namespace, value.Name}
services[name] = value
} }
default: default:
glog.V(4).Infof("Received invalid update type: %v", update) glog.V(4).Infof("Received invalid update type: %v", update)

View File

@ -136,7 +136,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
handler := NewServiceHandlerMock() handler := NewServiceHandlerMock()
handler.Wait(1) handler.Wait(1)
config.RegisterHandler(handler) config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
channel <- serviceUpdate channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services) handler.ValidateServices(t, serviceUpdate.Services)
@ -147,24 +147,24 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
channel := config.Channel("one") channel := config.Channel("one")
handler := NewServiceHandlerMock() handler := NewServiceHandlerMock()
config.RegisterHandler(handler) config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
handler.Wait(1) handler.Wait(1)
channel <- serviceUpdate channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services) handler.ValidateServices(t, serviceUpdate.Services)
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}}) serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(1) handler.Wait(1)
channel <- serviceUpdate2 channel <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]} services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]}
handler.ValidateServices(t, services) handler.ValidateServices(t, services)
serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}}) serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}})
handler.Wait(1) handler.Wait(1)
channel <- serviceUpdate3 channel <- serviceUpdate3
services = []api.Service{serviceUpdate2.Services[0]} services = []api.Service{serviceUpdate2.Services[0]}
handler.ValidateServices(t, services) handler.ValidateServices(t, services)
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Name: "foobar"}, Spec: api.ServiceSpec{Port: 99}}) serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, Spec: api.ServiceSpec{Port: 99}})
handler.Wait(1) handler.Wait(1)
channel <- serviceUpdate4 channel <- serviceUpdate4
services = []api.Service{serviceUpdate4.Services[0]} services = []api.Service{serviceUpdate4.Services[0]}
@ -180,8 +180,8 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
} }
handler := NewServiceHandlerMock() handler := NewServiceHandlerMock()
config.RegisterHandler(handler) config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}}) serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(2) handler.Wait(2)
channelOne <- serviceUpdate1 channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2 channelTwo <- serviceUpdate2
@ -197,8 +197,8 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T
handler2 := NewServiceHandlerMock() handler2 := NewServiceHandlerMock()
config.RegisterHandler(handler) config.RegisterHandler(handler)
config.RegisterHandler(handler2) config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}}) serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(2) handler.Wait(2)
handler2.Wait(2) handler2.Wait(2)
channelOne <- serviceUpdate1 channelOne <- serviceUpdate1
@ -217,11 +217,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
config.RegisterHandler(handler) config.RegisterHandler(handler)
config.RegisterHandler(handler2) config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{ endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}}, Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
}) })
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{ endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"}, ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}}, Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
}) })
handler.Wait(2) handler.Wait(2)
@ -243,11 +243,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
config.RegisterHandler(handler) config.RegisterHandler(handler)
config.RegisterHandler(handler2) config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{ endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}}, Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
}) })
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{ endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"}, ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}}, Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
}) })
handler.Wait(2) handler.Wait(2)
@ -261,7 +261,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
// Add one more // Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{ endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foobar"}, ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Endpoints: []api.Endpoint{{IP: "endpoint5"}, {IP: "endpoint6"}}, Endpoints: []api.Endpoint{{IP: "endpoint5"}, {IP: "endpoint6"}},
}) })
handler.Wait(1) handler.Wait(1)
@ -273,7 +273,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
// Update the "foo" service with new endpoints // Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{ endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint7"}}, Endpoints: []api.Endpoint{{IP: "endpoint7"}},
}) })
handler.Wait(1) handler.Wait(1)
@ -284,7 +284,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
handler2.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints)
// Remove "bar" service // Remove "bar" service
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}) endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}})
handler.Wait(1) handler.Wait(1)
handler2.Wait(1) handler2.Wait(1)
channelTwo <- endpointsUpdate2 channelTwo <- endpointsUpdate2

View File

@ -20,13 +20,14 @@ import (
"net" "net"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
) )
// LoadBalancer is an interface for distributing incoming requests to service endpoints. // LoadBalancer is an interface for distributing incoming requests to service endpoints.
type LoadBalancer interface { type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given // NextEndpoint returns the endpoint to handle a request for the given
// service and source address. // service and source address.
NextEndpoint(service string, srcAddr net.Addr) (string, error) NextEndpoint(service types.NamespacedName, srcAddr net.Addr) (string, error)
NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error NewService(service types.NamespacedName, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service string) CleanupStaleStickySessions(service types.NamespacedName)
} }

View File

@ -27,6 +27,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
@ -58,7 +59,7 @@ type proxySocket interface {
// while sessions are active. // while sessions are active.
Close() error Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints. // ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service string, info *serviceInfo, proxier *Proxier) ProxyLoop(service types.NamespacedName, info *serviceInfo, proxier *Proxier)
} }
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
@ -67,7 +68,7 @@ type tcpProxySocket struct {
net.Listener net.Listener
} }
func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { func tryConnect(service types.NamespacedName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
for _, retryTimeout := range endpointDialTimeout { for _, retryTimeout := range endpointDialTimeout {
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr)
if err != nil { if err != nil {
@ -87,7 +88,7 @@ func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Prox
return nil, fmt.Errorf("failed to connect to an endpoint.") return nil, fmt.Errorf("failed to connect to an endpoint.")
} }
func (tcp *tcpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) { func (tcp *tcpProxySocket) ProxyLoop(service types.NamespacedName, myInfo *serviceInfo, proxier *Proxier) {
for { for {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
// The service port was closed or replaced. // The service port was closed or replaced.
@ -162,7 +163,7 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}} return &clientCache{clients: map[string]net.Conn{}}
} }
func (udp *udpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) { func (udp *udpProxySocket) ProxyLoop(service types.NamespacedName, myInfo *serviceInfo, proxier *Proxier) {
activeClients := newClientCache() activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets var buffer [4096]byte // 4KiB should be enough for most whole-packets
for { for {
@ -207,7 +208,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxie
} }
} }
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service string, timeout time.Duration) (net.Conn, error) { func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service types.NamespacedName, timeout time.Duration) (net.Conn, error) {
activeClients.mu.Lock() activeClients.mu.Lock()
defer activeClients.mu.Unlock() defer activeClients.mu.Unlock()
@ -303,7 +304,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
type Proxier struct { type Proxier struct {
loadBalancer LoadBalancer loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo serviceMap map[types.NamespacedName]*serviceInfo
numProxyLoops int32 // use atomic ops to access this; mostly for testing numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP listenIP net.IP
iptables iptables.Interface iptables iptables.Interface
@ -345,7 +346,7 @@ func CreateProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
} }
return &Proxier{ return &Proxier{
loadBalancer: loadBalancer, loadBalancer: loadBalancer,
serviceMap: make(map[string]*serviceInfo), serviceMap: make(map[types.NamespacedName]*serviceInfo),
listenIP: listenIP, listenIP: listenIP,
iptables: iptables, iptables: iptables,
hostIP: hostIP, hostIP: hostIP,
@ -393,26 +394,26 @@ func (proxier *Proxier) cleanupStaleStickySessions() {
} }
// This assumes proxier.mu is not locked. // This assumes proxier.mu is not locked.
func (proxier *Proxier) stopProxy(service string, info *serviceInfo) error { func (proxier *Proxier) stopProxy(service types.NamespacedName, info *serviceInfo) error {
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
return proxier.stopProxyInternal(service, info) return proxier.stopProxyInternal(service, info)
} }
// This assumes proxier.mu is locked. // This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) error { func (proxier *Proxier) stopProxyInternal(service types.NamespacedName, info *serviceInfo) error {
delete(proxier.serviceMap, service) delete(proxier.serviceMap, service)
return info.socket.Close() return info.socket.Close()
} }
func (proxier *Proxier) getServiceInfo(service string) (*serviceInfo, bool) { func (proxier *Proxier) getServiceInfo(service types.NamespacedName) (*serviceInfo, bool) {
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
info, ok := proxier.serviceMap[service] info, ok := proxier.serviceMap[service]
return info, ok return info, ok
} }
func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) { func (proxier *Proxier) setServiceInfo(service types.NamespacedName, info *serviceInfo) {
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.serviceMap[service] = info proxier.serviceMap[service] = info
@ -421,7 +422,7 @@ func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) {
// addServiceOnPort starts listening for a new service, returning the serviceInfo. // addServiceOnPort starts listening for a new service, returning the serviceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now. // connections, for now.
func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { func (proxier *Proxier) addServiceOnPort(service types.NamespacedName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
if err != nil { if err != nil {
return nil, err return nil, err
@ -447,7 +448,7 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol,
proxier.setServiceInfo(service, si) proxier.setServiceInfo(service, si)
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service string, proxier *Proxier) { go func(service types.NamespacedName, proxier *Proxier) {
defer util.HandleCrash() defer util.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1) atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier) sock.ProxyLoop(service, si, proxier)
@ -465,30 +466,31 @@ const udpIdleTimeout = 1 * time.Minute
// shutdown if missing from the update set. // shutdown if missing from the update set.
func (proxier *Proxier) OnUpdate(services []api.Service) { func (proxier *Proxier) OnUpdate(services []api.Service) {
glog.V(4).Infof("Received update notice: %+v", services) glog.V(4).Infof("Received update notice: %+v", services)
activeServices := util.StringSet{} activeServices := make(map[types.NamespacedName]bool) // use a map as a set
for _, service := range services { for _, service := range services {
activeServices.Insert(service.Name) serviceName := types.NamespacedName{service.Namespace, service.Name}
info, exists := proxier.getServiceInfo(service.Name) activeServices[serviceName] = true
info, exists := proxier.getServiceInfo(serviceName)
serviceIP := net.ParseIP(service.Spec.PortalIP) serviceIP := net.ParseIP(service.Spec.PortalIP)
// TODO: check health of the socket? What if ProxyLoop exited? // TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) { if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) {
continue continue
} }
if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) { if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) {
glog.V(4).Infof("Something changed for service %q: stopping it", service.Name) glog.V(4).Infof("Something changed for service %q: stopping it", serviceName.String())
err := proxier.closePortal(service.Name, info) err := proxier.closePortal(serviceName, info)
if err != nil { if err != nil {
glog.Errorf("Failed to close portal for %q: %v", service.Name, err) glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
} }
err = proxier.stopProxy(service.Name, info) err = proxier.stopProxy(serviceName, info)
if err != nil { if err != nil {
glog.Errorf("Failed to stop service %q: %v", service.Name, err) glog.Errorf("Failed to stop service %q: %v", serviceName, err)
} }
} }
glog.V(1).Infof("Adding new service %q at %s:%d/%s", service.Name, serviceIP, service.Spec.Port, service.Spec.Protocol) glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, service.Spec.Port, service.Spec.Protocol)
info, err := proxier.addServiceOnPort(service.Name, service.Spec.Protocol, 0, udpIdleTimeout) info, err := proxier.addServiceOnPort(serviceName, service.Spec.Protocol, 0, udpIdleTimeout)
if err != nil { if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", service.Name, err) glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue continue
} }
info.portalIP = serviceIP info.portalIP = serviceIP
@ -499,16 +501,16 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
info.stickyMaxAgeMinutes = 180 info.stickyMaxAgeMinutes = 180
glog.V(4).Infof("info: %+v", info) glog.V(4).Infof("info: %+v", info)
err = proxier.openPortal(service.Name, info) err = proxier.openPortal(serviceName, info)
if err != nil { if err != nil {
glog.Errorf("Failed to open portal for %q: %v", service.Name, err) glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
} }
proxier.loadBalancer.NewService(service.Name, info.sessionAffinityType, info.stickyMaxAgeMinutes) proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
} }
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap { for name, info := range proxier.serviceMap {
if !activeServices.Has(name) { if !activeServices[name] {
glog.V(1).Infof("Stopping service %q", name) glog.V(1).Infof("Stopping service %q", name)
err := proxier.closePortal(name, info) err := proxier.closePortal(name, info)
if err != nil { if err != nil {
@ -534,7 +536,7 @@ func ipsEqual(lhs, rhs []string) bool {
return true return true
} }
func (proxier *Proxier) openPortal(service string, info *serviceInfo) error { func (proxier *Proxier) openPortal(service types.NamespacedName, info *serviceInfo) error {
err := proxier.openOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service) err := proxier.openOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil { if err != nil {
return err return err
@ -548,7 +550,7 @@ func (proxier *Proxier) openPortal(service string, info *serviceInfo) error {
return nil return nil
} }
func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name string) error { func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name types.NamespacedName) error {
// Handle traffic from containers. // Handle traffic from containers.
args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name) args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesContainerPortalChain, args...) existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesContainerPortalChain, args...)
@ -573,7 +575,7 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol
return nil return nil
} }
func (proxier *Proxier) closePortal(service string, info *serviceInfo) error { func (proxier *Proxier) closePortal(service types.NamespacedName, info *serviceInfo) error {
// Collect errors and report them all at the end. // Collect errors and report them all at the end.
el := proxier.closeOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service) el := proxier.closeOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
for _, publicIP := range info.publicIP { for _, publicIP := range info.publicIP {
@ -587,7 +589,7 @@ func (proxier *Proxier) closePortal(service string, info *serviceInfo) error {
return errors.NewAggregate(el) return errors.NewAggregate(el)
} }
func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name string) []error { func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name types.NamespacedName) []error {
el := []error{} el := []error{}
// Handle traffic from containers. // Handle traffic from containers.
@ -666,7 +668,7 @@ var zeroIPv6 = net.ParseIP("::0")
var localhostIPv6 = net.ParseIP("::1") var localhostIPv6 = net.ParseIP("::1")
// Build a slice of iptables args that are common to from-container and from-host portal rules. // Build a slice of iptables args that are common to from-container and from-host portal rules.
func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service string) []string { func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service types.NamespacedName) []string {
// This list needs to include all fields as they are eventually spit out // This list needs to include all fields as they are eventually spit out
// by iptables-save. This is because some systems do not support the // by iptables-save. This is because some systems do not support the
// 'iptables -C' arg, and so fall back on parsing iptables-save output. // 'iptables -C' arg, and so fall back on parsing iptables-save output.
@ -677,7 +679,7 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol
// iptables versions. // iptables versions.
args := []string{ args := []string{
"-m", "comment", "-m", "comment",
"--comment", service, "--comment", service.String(),
"-p", strings.ToLower(string(protocol)), "-p", strings.ToLower(string(protocol)),
"-m", strings.ToLower(string(protocol)), "-m", strings.ToLower(string(protocol)),
"-d", fmt.Sprintf("%s/32", destIP.String()), "-d", fmt.Sprintf("%s/32", destIP.String()),
@ -687,7 +689,7 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol
} }
// Build a slice of iptables args for a from-container portal rule. // Build a slice of iptables args for a from-container portal rule.
func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service string) []string { func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service types.NamespacedName) []string {
args := iptablesCommonPortalArgs(destIP, destPort, protocol, service) args := iptablesCommonPortalArgs(destIP, destPort, protocol, service)
// This is tricky. // This is tricky.
@ -734,7 +736,7 @@ func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int,
} }
// Build a slice of iptables args for a from-host portal rule. // Build a slice of iptables args for a from-host portal rule.
func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service string) []string { func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service types.NamespacedName) []string {
args := iptablesCommonPortalArgs(destIP, destPort, protocol, service) args := iptablesCommonPortalArgs(destIP, destPort, protocol, service)
// This is tricky. // This is tricky.

View File

@ -29,6 +29,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
) )
@ -194,9 +195,10 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
func TestTCPProxy(t *testing.T) { func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
{ {
ObjectMeta: api.ObjectMeta{Name: "echo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
}, },
}) })
@ -204,7 +206,7 @@ func TestTCPProxy(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
@ -214,9 +216,10 @@ func TestTCPProxy(t *testing.T) {
func TestUDPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
{ {
ObjectMeta: api.ObjectMeta{Name: "echo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
}, },
}) })
@ -224,7 +227,7 @@ func TestUDPProxy(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
@ -233,7 +236,7 @@ func TestUDPProxy(t *testing.T) {
} }
// Helper: Stops the proxy for the named service. // Helper: Stops the proxy for the named service.
func stopProxyByName(proxier *Proxier, service string) error { func stopProxyByName(proxier *Proxier, service types.NamespacedName) error {
info, found := proxier.getServiceInfo(service) info, found := proxier.getServiceInfo(service)
if !found { if !found {
return fmt.Errorf("unknown service: %s", service) return fmt.Errorf("unknown service: %s", service)
@ -243,9 +246,10 @@ func stopProxyByName(proxier *Proxier, service string) error {
func TestTCPProxyStop(t *testing.T) { func TestTCPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
{ {
ObjectMeta: api.ObjectMeta{Name: "echo"}, ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
}, },
}) })
@ -253,7 +257,7 @@ func TestTCPProxyStop(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
@ -264,7 +268,7 @@ func TestTCPProxyStop(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, "echo") stopProxyByName(p, service)
// Wait for the port to really close. // Wait for the port to really close.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
@ -274,9 +278,10 @@ func TestTCPProxyStop(t *testing.T) {
func TestUDPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
{ {
ObjectMeta: api.ObjectMeta{Name: "echo"}, ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
}, },
}) })
@ -284,7 +289,7 @@ func TestUDPProxyStop(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
@ -295,7 +300,7 @@ func TestUDPProxyStop(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, "echo") stopProxyByName(p, service)
// Wait for the port to really close. // Wait for the port to really close.
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
@ -305,9 +310,10 @@ func TestUDPProxyStop(t *testing.T) {
func TestTCPProxyUpdateDelete(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
{ {
ObjectMeta: api.ObjectMeta{Name: "echo"}, ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
}, },
}) })
@ -315,7 +321,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
@ -335,9 +341,10 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
func TestUDPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
{ {
ObjectMeta: api.ObjectMeta{Name: "echo"}, ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
}, },
}) })
@ -345,7 +352,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
@ -365,9 +372,10 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
{ {
ObjectMeta: api.ObjectMeta{Name: "echo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
}, },
}) })
@ -375,7 +383,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
@ -392,11 +400,11 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
} }
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{ p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP"}, Status: api.ServiceStatus{}}, {ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP"}, Status: api.ServiceStatus{}},
}) })
svcInfo, exists := p.getServiceInfo("echo") svcInfo, exists := p.getServiceInfo(service)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo") t.Fatalf("can't find serviceInfo for %s", service)
} }
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
@ -404,9 +412,10 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
{ {
ObjectMeta: api.ObjectMeta{Name: "echo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
}, },
}) })
@ -414,7 +423,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
@ -431,9 +440,9 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
} }
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{ p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP"}, Status: api.ServiceStatus{}}, {ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP"}, Status: api.ServiceStatus{}},
}) })
svcInfo, exists := p.getServiceInfo("echo") svcInfo, exists := p.getServiceInfo(service)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo") t.Fatalf("can't find serviceInfo")
} }
@ -443,9 +452,10 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
func TestTCPProxyUpdatePort(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
{ {
ObjectMeta: api.ObjectMeta{Name: "echo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
}, },
}) })
@ -453,7 +463,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
@ -461,13 +471,13 @@ func TestTCPProxyUpdatePort(t *testing.T) {
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{ p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: 99, Protocol: "TCP"}, Status: api.ServiceStatus{}}, {ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: 99, Protocol: "TCP"}, Status: api.ServiceStatus{}},
}) })
// Wait for the socket to actually get free. // Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
svcInfo, exists := p.getServiceInfo("echo") svcInfo, exists := p.getServiceInfo(service)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo") t.Fatalf("can't find serviceInfo")
} }
@ -479,9 +489,10 @@ func TestTCPProxyUpdatePort(t *testing.T) {
func TestUDPProxyUpdatePort(t *testing.T) { func TestUDPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{ lb.OnUpdate([]api.Endpoints{
{ {
ObjectMeta: api.ObjectMeta{Name: "echo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
}, },
}) })
@ -489,20 +500,20 @@ func TestUDPProxyUpdatePort(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
t.Fatalf("error adding new service: %#v", err) t.Fatalf("error adding new service: %#v", err)
} }
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{ p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: 99, Protocol: "UDP"}, Status: api.ServiceStatus{}}, {ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: 99, Protocol: "UDP"}, Status: api.ServiceStatus{}},
}) })
// Wait for the socket to actually get free. // Wait for the socket to actually get free.
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
svcInfo, exists := p.getServiceInfo("echo") svcInfo, exists := p.getServiceInfo(service)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo") t.Fatalf("can't find serviceInfo")
} }

View File

@ -26,6 +26,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/slice" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/slice"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -49,13 +50,10 @@ type affinityPolicy struct {
ttlMinutes int ttlMinutes int
} }
// balancerKey is a string that the balancer uses to key stored state.
type balancerKey string
// LoadBalancerRR is a round-robin load balancer. // LoadBalancerRR is a round-robin load balancer.
type LoadBalancerRR struct { type LoadBalancerRR struct {
lock sync.RWMutex lock sync.RWMutex
services map[balancerKey]*balancerState services map[types.NamespacedName]*balancerState
} }
type balancerState struct { type balancerState struct {
@ -75,11 +73,11 @@ func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityP
// NewLoadBalancerRR returns a new LoadBalancerRR. // NewLoadBalancerRR returns a new LoadBalancerRR.
func NewLoadBalancerRR() *LoadBalancerRR { func NewLoadBalancerRR() *LoadBalancerRR {
return &LoadBalancerRR{ return &LoadBalancerRR{
services: map[balancerKey]*balancerState{}, services: map[types.NamespacedName]*balancerState{},
} }
} }
func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error { func (lb *LoadBalancerRR) NewService(service types.NamespacedName, affinityType api.AffinityType, ttlMinutes int) error {
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
@ -88,17 +86,16 @@ func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityTy
} }
// This assumes that lb.lock is already held. // This assumes that lb.lock is already held.
func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) *balancerState { func (lb *LoadBalancerRR) newServiceInternal(service types.NamespacedName, affinityType api.AffinityType, ttlMinutes int) *balancerState {
if ttlMinutes == 0 { if ttlMinutes == 0 {
ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
} }
key := balancerKey(service) if _, exists := lb.services[service]; !exists {
if _, exists := lb.services[key]; !exists { lb.services[service] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service) glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service)
} }
return lb.services[key] return lb.services[service]
} }
// return true if this service is using some form of session affinity. // return true if this service is using some form of session affinity.
@ -112,13 +109,13 @@ func isSessionAffinity(affinity *affinityPolicy) bool {
// NextEndpoint returns a service endpoint. // NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm. // The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { func (lb *LoadBalancerRR) NextEndpoint(service types.NamespacedName, srcAddr net.Addr) (string, error) {
// Coarse locking is simple. We can get more fine-grained if/when we // Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters. // can prove it matters.
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
key := balancerKey(service) key := service
state, exists := lb.services[key] state, exists := lb.services[key]
if !exists || state == nil { if !exists || state == nil {
return "", ErrMissingServiceEntry return "", ErrMissingServiceEntry
@ -185,7 +182,7 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string {
} }
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). // Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) { func removeSessionAffinityByEndpoint(state *balancerState, service types.NamespacedName, endpoint string) {
for _, affinity := range state.affinity.affinityMap { for _, affinity := range state.affinity.affinityMap {
if affinity.endpoint == endpoint { if affinity.endpoint == endpoint {
glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service) glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service)
@ -197,7 +194,7 @@ func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey,
// Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
// Then remove any session affinity records that are not in both lists. // Then remove any session affinity records that are not in both lists.
// This assumes the lb.lock is held. // This assumes the lb.lock is held.
func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) { func (lb *LoadBalancerRR) updateAffinityMap(service types.NamespacedName, newEndpoints []string) {
allEndpoints := map[string]int{} allEndpoints := map[string]int{}
for _, newEndpoint := range newEndpoints { for _, newEndpoint := range newEndpoints {
allEndpoints[newEndpoint] = 1 allEndpoints[newEndpoint] = 1
@ -221,13 +218,14 @@ func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []
// Registered endpoints are updated if found in the update set or // Registered endpoints are updated if found in the update set or
// unregistered if missing from the update set. // unregistered if missing from the update set.
func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
registeredEndpoints := make(map[balancerKey]bool) registeredEndpoints := make(map[types.NamespacedName]bool)
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
// Update endpoints for services. // Update endpoints for services.
for _, svcEndpoints := range allEndpoints { for _, svcEndpoints := range allEndpoints {
key := balancerKey(svcEndpoints.Name) name := types.NamespacedName{svcEndpoints.Namespace, svcEndpoints.Name}
key := name
state, exists := lb.services[key] state, exists := lb.services[key]
curEndpoints := []string{} curEndpoints := []string{}
if state != nil { if state != nil {
@ -240,7 +238,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
// On update can be called without NewService being called externally. // On update can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created // To be safe we will call it here. A new service will only be created
// if one does not already exist. // if one does not already exist.
state = lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) state = lb.newServiceInternal(name, api.AffinityTypeNone, 0)
state.endpoints = slice.ShuffleStrings(newEndpoints) state.endpoints = slice.ShuffleStrings(newEndpoints)
// Reset the round-robin index. // Reset the round-robin index.
@ -268,11 +266,11 @@ func slicesEquiv(lhs, rhs []string) bool {
return false return false
} }
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { func (lb *LoadBalancerRR) CleanupStaleStickySessions(service types.NamespacedName) {
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
key := balancerKey(service) key := service
state, exists := lb.services[key] state, exists := lb.services[key]
if !exists { if !exists {
glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service) glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service)

View File

@ -21,6 +21,7 @@ import (
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
) )
func TestValidateWorks(t *testing.T) { func TestValidateWorks(t *testing.T) {
@ -66,7 +67,8 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
var endpoints []api.Endpoints var endpoints []api.Endpoints
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
endpoint, err := loadBalancer.NextEndpoint("foo", nil) service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil { if err == nil {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -75,7 +77,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
} }
} }
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, expected string, netaddr net.Addr) { func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service types.NamespacedName, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr) endpoint, err := loadBalancer.NextEndpoint(service, netaddr)
if err != nil { if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
@ -87,31 +89,33 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string,
func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil) service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}}, Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}},
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
} }
func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil) service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 2},
@ -119,22 +123,23 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
} }
func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil) service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 2},
@ -142,31 +147,31 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
// Then update the configuration with one fewer endpoints, make sure // Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again // we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 8}, {IP: "endpoint", Port: 8},
{IP: "endpoint", Port: 9}, {IP: "endpoint", Port: 9},
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
// Clear endpoints // Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", nil) endpoint, err = loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -174,13 +179,15 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil) fooService := types.NewNamespacedNameOrDie("testnamespace", "foo")
barService := types.NewNamespacedNameOrDie("testnamespace", "bar")
endpoint, err := loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
endpoints := make([]api.Endpoints, 2) endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 2},
@ -188,60 +195,61 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
}, },
} }
endpoints[1] = api.Endpoints{ endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"}, ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4}, {IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5}, {IP: "endpoint", Port: 5},
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services["foo"].endpoints shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], nil)
shuffledBarEndpoints := loadBalancer.services["bar"].endpoints shuffledBarEndpoints := loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
// Then update the configuration by removing foo // Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:]) loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint("foo", nil) endpoint, err = loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
// but bar is still there, and we continue RR from where we left off. // but bar is still there, and we continue RR from where we left off.
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
} }
func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) { func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil) service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}}, Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}},
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) expectEndpoint(t, loadBalancer, service, "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) expectEndpoint(t, loadBalancer, service, "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) expectEndpoint(t, loadBalancer, service, "endpoint:1", client2)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) expectEndpoint(t, loadBalancer, service, "endpoint:1", client2)
} }
func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
@ -249,15 +257,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil) service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 2},
@ -265,15 +274,15 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
} }
func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
@ -281,15 +290,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil) service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
loadBalancer.NewService("foo", api.AffinityTypeNone, 0) loadBalancer.NewService(service, api.AffinityTypeNone, 0)
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 2},
@ -297,15 +307,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client3) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1)
} }
func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
@ -316,15 +327,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0} client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0}
client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil) service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 2},
@ -332,25 +344,25 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
client1Endpoint := shuffledEndpoints[0] client1Endpoint := shuffledEndpoints[0]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
client2Endpoint := shuffledEndpoints[1] client2Endpoint := shuffledEndpoints[1]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
client3Endpoint := shuffledEndpoints[2] client3Endpoint := shuffledEndpoints[2]
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 2},
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints shuffledEndpoints = loadBalancer.services[service].endpoints
if client1Endpoint == "endpoint:3" { if client1Endpoint == "endpoint:3" {
client1Endpoint = shuffledEndpoints[0] client1Endpoint = shuffledEndpoints[0]
} else if client2Endpoint == "endpoint:3" { } else if client2Endpoint == "endpoint:3" {
@ -358,12 +370,12 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
} else if client3Endpoint == "endpoint:3" { } else if client3Endpoint == "endpoint:3" {
client3Endpoint = shuffledEndpoints[0] client3Endpoint = shuffledEndpoints[0]
} }
expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 2},
@ -371,13 +383,13 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client4) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client5) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client6) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client6)
} }
func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
@ -385,15 +397,16 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil) service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1) endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 2},
@ -401,35 +414,35 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
// Then update the configuration with one fewer endpoints, make sure // Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again // we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4}, {IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5}, {IP: "endpoint", Port: 5},
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
// Clear endpoints // Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", nil) endpoint, err = loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
@ -440,58 +453,61 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil) fooService := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) loadBalancer.NewService(fooService, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 2) endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{ endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3}, {IP: "endpoint", Port: 3},
}, },
} }
loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0) barService := types.NewNamespacedNameOrDie("testnamespace", "bar")
loadBalancer.NewService(barService, api.AffinityTypeClientIP, 0)
endpoints[1] = api.Endpoints{ endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"}, ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
Endpoints: []api.Endpoint{ Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 5}, {IP: "endpoint", Port: 5},
{IP: "endpoint", Port: 5}, {IP: "endpoint", Port: 5},
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
shuffledBarEndpoints := loadBalancer.services["bar"].endpoints shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
shuffledBarEndpoints := loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
// Then update the configuration by removing foo // Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:]) loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint("foo", nil) endpoint, err = loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 { if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service") t.Errorf("Didn't fail with non-existent service")
} }
// but bar is still there, and we continue RR from where we left off. // but bar is still there, and we continue RR from where we left off.
shuffledBarEndpoints = loadBalancer.services["bar"].endpoints shuffledBarEndpoints = loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
} }

View File

@ -0,0 +1,35 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
// NamespacedName comprises a resource name, with a mandatory namespace,
// rendered as "<namespace>/<name>". Being a type captures intent and
// helps make sure that UIDs, namespaced names and non-namespaced names
// do not get conflated in code. For most use cases, namespace and name
// will already have been format validated at the API entry point, so we
// don't do that here. Where that's not the case (e.g. in testing),
// consider using NamespacedNameOrDie() in testing.go in this package.
type NamespacedName struct {
Namespace string
Name string
}
// String returns the general purpose string representation
func (n NamespacedName) String() string {
return n.Namespace + "/" + n.Name
}

26
pkg/types/testing.go Normal file
View File

@ -0,0 +1,26 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import "fmt"
func NewNamespacedNameOrDie(namespace, name string) (ret NamespacedName) {
if len(namespace) == 0 || len(name) == 0 {
panic(fmt.Sprintf("invalid call to NewNamespacedNameOrDie(%q, %q)", namespace, name))
}
return NamespacedName{namespace, name}
}

View File

@ -18,6 +18,7 @@ package e2e
import ( import (
"fmt" "fmt"
"sort"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -178,8 +179,8 @@ var _ = Describe("Services", func() {
Expect(foundRO).To(Equal(true)) Expect(foundRO).To(Equal(true))
}) })
It("should serve basic a endpoint from pods", func(done Done) { It("should serve a basic endpoint from pods", func(done Done) {
serviceName := "endpoint-test" serviceName := "endpoint-test2"
ns := api.NamespaceDefault ns := api.NamespaceDefault
labels := map[string]string{ labels := map[string]string{
"foo": "bar", "foo": "bar",
@ -243,9 +244,59 @@ var _ = Describe("Services", func() {
defer func() { defer func() {
close(done) close(done)
}() }()
}, 120.0) }, 240.0)
It("should correctly serve identically named services in different namespaces on different external IP addresses", func(done Done) {
serviceNames := []string{"services-namespace-test0"} // Could add more here, but then it takes longer.
namespaces := []string{"namespace0", "namespace1"} // As above.
labels := map[string]string{
"key0": "value0",
"key1": "value1",
}
service := &api.Service{
ObjectMeta: api.ObjectMeta{},
Spec: api.ServiceSpec{
Port: 80,
Selector: labels,
ContainerPort: util.NewIntOrStringFromInt(80),
CreateExternalLoadBalancer: true,
},
}
publicIPs := []string{}
// We defer Gingko pieces that may Fail, so clean up at the end.
defer func() {
close(done)
}()
for _, namespace := range namespaces {
for _, serviceName := range serviceNames {
service.ObjectMeta.Name = serviceName
service.ObjectMeta.Namespace = namespace
By("creating service " + serviceName + " in namespace " + namespace)
result, err := c.Services(namespace).Create(service)
Expect(err).NotTo(HaveOccurred())
defer func(namespace, serviceName string) { // clean up when we're done
By("deleting service " + serviceName + " in namespace " + namespace)
err := c.Services(namespace).Delete(serviceName)
Expect(err).NotTo(HaveOccurred())
}(namespace, serviceName)
publicIPs = append(publicIPs, result.Spec.PublicIPs...) // Save 'em to check uniqueness
}
}
validateUniqueOrFail(publicIPs)
}, 240.0)
}) })
func validateUniqueOrFail(s []string) {
By(fmt.Sprintf("validating unique: %v", s))
sort.Strings(s)
var prev string
for i, elem := range s {
if i > 0 && elem == prev {
Fail("duplicate found: " + elem)
}
prev = elem
}
}
func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) { func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) {
ips := util.StringSet{} ips := util.StringSet{}
for _, ep := range endpoints.Endpoints { for _, ep := range endpoints.Endpoints {
@ -263,7 +314,10 @@ func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEn
if !ips.Has(pod.Status.PodIP) { if !ips.Has(pod.Status.PodIP) {
Failf("ip validation failed, expected: %v, saw: %v", ips, pod.Status.PodIP) Failf("ip validation failed, expected: %v, saw: %v", ips, pod.Status.PodIP)
} }
By(fmt.Sprintf(""))
} }
By(fmt.Sprintf("successfully validated IPs %v against expected endpoints %v port %d on namespace %s", ips, expectedEndpoints, expectedPort, ns))
} }
func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedPort int, expectedEndpoints []string) { func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedPort int, expectedEndpoints []string) {
@ -274,17 +328,18 @@ func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedP
validateIPsOrFail(c, ns, expectedPort, expectedEndpoints, endpoints) validateIPsOrFail(c, ns, expectedPort, expectedEndpoints, endpoints)
return return
} else { } else {
By(fmt.Sprintf("Unexpected endpoints: %v, expected %v", endpoints.Endpoints, expectedEndpoints)) By(fmt.Sprintf("Unexpected number of endpoints: found %v, expected %v (ignoring for 1 second)", endpoints.Endpoints, expectedEndpoints))
} }
} else { } else {
By(fmt.Sprintf("Failed to get endpoints: %v (ignoring for 1s)", err)) By(fmt.Sprintf("Failed to get endpoints: %v (ignoring for 1 second)", err))
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} }
By(fmt.Sprintf("successfully validated endpoints %v port %d on service %s/%s", expectedEndpoints, expectedPort, ns, serviceName))
} }
func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]string) { func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]string) {
By(fmt.Sprintf("Adding pod %v", name)) By(fmt.Sprintf("Adding pod %v in namespace %v", name, ns))
pod := &api.Pod{ pod := &api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: name, Name: name,