diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 8925e5541a0..7b375739b8c 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -48,6 +48,8 @@ type ServiceInfo struct { Timeout time.Duration // ActiveClients is the cache of active UDP clients being proxied by this proxy for this service ActiveClients *ClientCache + // ServiceRef is a full object reference to the the service described by this ServiceInfo + ServiceRef api.ObjectReference isAliveAtomic int32 // Only access this with atomic ops portal portal @@ -350,7 +352,7 @@ func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *Serv // addServiceOnPort starts listening for a new service, returning the ServiceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceRef api.ObjectReference, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -368,6 +370,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol si := &ServiceInfo{ Timeout: timeout, ActiveClients: newClientCache(), + ServiceRef: serviceRef, isAliveAtomic: 1, proxyPort: portNum, @@ -404,6 +407,17 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { continue } + // TODO: should this just be api.GetReference? + svcGVK := service.GetObjectKind().GroupVersionKind() + svcRef := api.ObjectReference{ + Kind: svcGVK.Kind, + Namespace: service.Namespace, + Name: service.Name, + UID: service.UID, + APIVersion: svcGVK.GroupVersion().String(), + ResourceVersion: service.ResourceVersion, + } + for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} @@ -434,7 +448,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { } glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) + info, err = proxier.addServiceOnPort(serviceName, svcRef, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) if err != nil { glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) continue diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 66383e7a6be..c5f327667f5 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -217,7 +217,8 @@ func TestTCPProxy(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -244,7 +245,8 @@ func TestUDPProxy(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -271,7 +273,8 @@ func TestUDPProxyTimeout(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -307,14 +310,16 @@ func TestMultiPortProxy(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) + serviceRefP := api.ObjectReference{Name: serviceP.Name, Namespace: serviceP.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfoP, err := p.addServiceOnPort(serviceP, serviceRefP, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort) waitForNumProxyLoops(t, p, 1) - svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second) + serviceRefQ := api.ObjectReference{Name: serviceQ.Name, Namespace: serviceQ.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfoQ, err := p.addServiceOnPort(serviceQ, serviceRefQ, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -397,7 +402,8 @@ func TestTCPProxyStop(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -441,7 +447,8 @@ func TestUDPProxyStop(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -479,7 +486,8 @@ func TestTCPProxyUpdateDelete(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -516,7 +524,8 @@ func TestUDPProxyUpdateDelete(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -552,7 +561,8 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -605,7 +615,8 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -659,7 +670,8 @@ func TestTCPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -707,7 +719,8 @@ func TestUDPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -752,7 +765,8 @@ func TestProxyUpdatePublicIPs(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -803,7 +817,8 @@ func TestProxyUpdatePortal(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"} + svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) }