mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 05:21:58 +00:00
Userspace Proxy: Keep ref to service being proxied
This commit makes the userspace proxy keep an ObjectReference to the service being proxied. This allows the consumers of the `ServiceInfo` struct, like `ProxySockets` to emit events about or otherwise refer to the service.
This commit is contained in:
parent
f5526727fb
commit
655b338256
@ -48,6 +48,8 @@ type ServiceInfo struct {
|
|||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
// ActiveClients is the cache of active UDP clients being proxied by this proxy for this service
|
// ActiveClients is the cache of active UDP clients being proxied by this proxy for this service
|
||||||
ActiveClients *ClientCache
|
ActiveClients *ClientCache
|
||||||
|
// ServiceRef is a full object reference to the the service described by this ServiceInfo
|
||||||
|
ServiceRef api.ObjectReference
|
||||||
|
|
||||||
isAliveAtomic int32 // Only access this with atomic ops
|
isAliveAtomic int32 // Only access this with atomic ops
|
||||||
portal portal
|
portal portal
|
||||||
@ -350,7 +352,7 @@ func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *Serv
|
|||||||
// addServiceOnPort starts listening for a new service, returning the ServiceInfo.
|
// addServiceOnPort starts listening for a new service, returning the ServiceInfo.
|
||||||
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
|
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
|
||||||
// connections, for now.
|
// connections, for now.
|
||||||
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
|
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceRef api.ObjectReference, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
|
||||||
sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort)
|
sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -368,6 +370,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
|
|||||||
si := &ServiceInfo{
|
si := &ServiceInfo{
|
||||||
Timeout: timeout,
|
Timeout: timeout,
|
||||||
ActiveClients: newClientCache(),
|
ActiveClients: newClientCache(),
|
||||||
|
ServiceRef: serviceRef,
|
||||||
|
|
||||||
isAliveAtomic: 1,
|
isAliveAtomic: 1,
|
||||||
proxyPort: portNum,
|
proxyPort: portNum,
|
||||||
@ -404,6 +407,17 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: should this just be api.GetReference?
|
||||||
|
svcGVK := service.GetObjectKind().GroupVersionKind()
|
||||||
|
svcRef := api.ObjectReference{
|
||||||
|
Kind: svcGVK.Kind,
|
||||||
|
Namespace: service.Namespace,
|
||||||
|
Name: service.Name,
|
||||||
|
UID: service.UID,
|
||||||
|
APIVersion: svcGVK.GroupVersion().String(),
|
||||||
|
ResourceVersion: service.ResourceVersion,
|
||||||
|
}
|
||||||
|
|
||||||
for i := range service.Spec.Ports {
|
for i := range service.Spec.Ports {
|
||||||
servicePort := &service.Spec.Ports[i]
|
servicePort := &service.Spec.Ports[i]
|
||||||
serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}
|
serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}
|
||||||
@ -434,7 +448,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
|
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
|
||||||
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
|
info, err = proxier.addServiceOnPort(serviceName, svcRef, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
|
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
|
||||||
continue
|
continue
|
||||||
|
@ -217,7 +217,8 @@ func TestTCPProxy(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -244,7 +245,8 @@ func TestUDPProxy(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -271,7 +273,8 @@ func TestUDPProxyTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -307,14 +310,16 @@ func TestMultiPortProxy(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second)
|
serviceRefP := api.ObjectReference{Name: serviceP.Name, Namespace: serviceP.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfoP, err := p.addServiceOnPort(serviceP, serviceRefP, "TCP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort)
|
testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort)
|
||||||
waitForNumProxyLoops(t, p, 1)
|
waitForNumProxyLoops(t, p, 1)
|
||||||
|
|
||||||
svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second)
|
serviceRefQ := api.ObjectReference{Name: serviceQ.Name, Namespace: serviceQ.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfoQ, err := p.addServiceOnPort(serviceQ, serviceRefQ, "UDP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -397,7 +402,8 @@ func TestTCPProxyStop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -441,7 +447,8 @@ func TestUDPProxyStop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -479,7 +486,8 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -516,7 +524,8 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -552,7 +561,8 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -605,7 +615,8 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -659,7 +670,8 @@ func TestTCPProxyUpdatePort(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -707,7 +719,8 @@ func TestUDPProxyUpdatePort(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -752,7 +765,8 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
@ -803,7 +817,8 @@ func TestProxyUpdatePortal(t *testing.T) {
|
|||||||
}
|
}
|
||||||
waitForNumProxyLoops(t, p, 0)
|
waitForNumProxyLoops(t, p, 0)
|
||||||
|
|
||||||
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
|
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
|
||||||
|
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error adding new service: %#v", err)
|
t.Fatalf("error adding new service: %#v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user