refactor services to v1beta3

This commit is contained in:
markturansky
2014-10-30 09:29:11 -04:00
parent 5a649f2b93
commit bd7643c033
26 changed files with 466 additions and 340 deletions

View File

@@ -136,7 +136,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
handler := NewServiceHandlerMock()
handler.Wait(1)
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Port: 10})
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
@@ -147,12 +147,12 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Port: 10})
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
handler.Wait(1)
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Port: 20})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(1)
channel <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]}
@@ -164,7 +164,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
services = []api.Service{serviceUpdate2.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Name: "foobar"}, Port: 99})
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Name: "foobar"}, Spec: api.ServiceSpec{Port: 99}})
handler.Wait(1)
channel <- serviceUpdate4
services = []api.Service{serviceUpdate4.Services[0]}
@@ -180,8 +180,8 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
}
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Port: 10})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Port: 20})
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(2)
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
@@ -197,8 +197,8 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T
handler2 := NewServiceHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Port: 10})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Port: 20})
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(2)
handler2.Wait(2)
channelOne <- serviceUpdate1

View File

@@ -148,7 +148,7 @@ func (s ConfigSourceEtcd) decodeServices(node *etcd.Node, retServices []api.Serv
glog.Errorf("Couldn't get endpoints for %s %s : %v skipping", svc.Namespace, svc.Name, err)
endpoints = api.Endpoints{}
} else {
glog.V(3).Infof("Got service: %s %s on localport %d mapping to: %s", svc.Namespace, svc.Name, svc.Port, endpoints)
glog.V(3).Infof("Got service: %s %s on localport %d mapping to: %s", svc.Namespace, svc.Name, svc.Spec.Port, endpoints)
}
retEndpoints = append(retEndpoints, endpoints)
}

View File

@@ -435,12 +435,12 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
for _, service := range services {
activeServices.Insert(service.Name)
info, exists := proxier.getServiceInfo(service.Name)
serviceIP := net.ParseIP(service.PortalIP)
serviceIP := net.ParseIP(service.Spec.PortalIP)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.isActive() && info.portalPort == service.Port && info.portalIP.Equal(serviceIP) {
if exists && info.isActive() && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) {
continue
}
if exists && (info.portalPort != service.Port || !info.portalIP.Equal(serviceIP)) {
if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP)) {
glog.V(4).Infof("Something changed for service %q: stopping it", service.Name)
err := proxier.closePortal(service.Name, info)
if err != nil {
@@ -451,14 +451,14 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
glog.Errorf("Failed to stop service %q: %s", service.Name, err)
}
}
glog.V(1).Infof("Adding new service %q at %s:%d/%s (local :%d)", service.Name, serviceIP, service.Port, service.Protocol, service.ProxyPort)
info, err := proxier.addServiceOnPort(service.Name, service.Protocol, service.ProxyPort, udpIdleTimeout)
glog.V(1).Infof("Adding new service %q at %s:%d/%s (local :%d)", service.Name, serviceIP, service.Spec.Port, service.Spec.Protocol, service.Spec.ProxyPort)
info, err := proxier.addServiceOnPort(service.Name, service.Spec.Protocol, service.Spec.ProxyPort, udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %+v", service.Name, err)
continue
}
info.portalIP = serviceIP
info.portalPort = service.Port
info.portalPort = service.Spec.Port
err = proxier.openPortal(service.Name, info)
if err != nil {
glog.Errorf("Failed to open portal for %q: %s", service.Name, err)

View File

@@ -340,7 +340,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
t.Fatalf(err.Error())
}
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Port: svcInfo.proxyPort, ProxyPort: svcInfo.proxyPort, Protocol: "TCP"},
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", ProxyPort: svcInfo.proxyPort}, Status: api.ServiceStatus{}},
})
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
}
@@ -371,7 +371,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
t.Fatalf(err.Error())
}
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Port: svcInfo.proxyPort, ProxyPort: svcInfo.proxyPort, Protocol: "UDP"},
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP", ProxyPort: svcInfo.proxyPort}, Status: api.ServiceStatus{}},
})
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
}
@@ -406,7 +406,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
t.Errorf("expected difference, got %d %d", newPort, svcInfo.proxyPort)
}
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Port: newPort, ProxyPort: newPort, Protocol: "TCP"},
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: newPort, Protocol: "TCP", ProxyPort: newPort}, Status: api.ServiceStatus{}},
})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
@@ -451,7 +451,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
t.Errorf("expected difference, got %d %d", newPort, svcInfo.proxyPort)
}
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Port: newPort, ProxyPort: newPort, Protocol: "UDP"},
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: newPort, Protocol: "UDP", ProxyPort: newPort}, Status: api.ServiceStatus{}},
})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())