From cd43fc94f7f9df42022da472d24dd8742c3058d3 Mon Sep 17 00:00:00 2001 From: Sravanth Bangari Date: Tue, 2 Jun 2020 14:24:57 -0700 Subject: [PATCH] Fixing refcounting of remote endpoints used across services --- pkg/proxy/winkernel/proxier.go | 47 +++++++++++++++++++++++------ pkg/proxy/winkernel/proxier_test.go | 21 ++++++++++++- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 3b577e96c01..1596b187efa 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -157,7 +157,7 @@ type endpointsInfo struct { isLocal bool macAddress string hnsID string - refCount uint16 + refCount *uint16 providerAddress string hns HostNetworkService } @@ -179,7 +179,7 @@ func newEndpointInfo(ip string, port uint16, isLocal bool, hns HostNetworkServic port: port, isLocal: isLocal, macAddress: conjureMac("02-11", net.ParseIP(ip)), - refCount: 0, + refCount: new(uint16), hnsID: "", hns: hns, } @@ -200,11 +200,14 @@ func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, func (ep *endpointsInfo) Cleanup() { Log(ep, "Endpoint Cleanup", 3) - ep.refCount-- + if ep.refCount != nil { + *ep.refCount-- + } + // Remove the remote hns endpoint, if no service is referring it // Never delete a Local Endpoint. Local Endpoints are already created by other entities. // Remove only remote endpoints created by this service - if ep.refCount <= 0 && !ep.isLocal { + if (ep.refCount == nil || *ep.refCount <= 0) && !ep.isLocal { klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep) err := ep.hns.deleteEndpoint(ep.hnsID) if err == nil { @@ -215,6 +218,15 @@ func (ep *endpointsInfo) Cleanup() { } } +func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 { + refCount, exists := refCountMap[hnsID] + if !exists { + refCountMap[hnsID] = new(uint16) + refCount = refCountMap[hnsID] + } + return refCount +} + // returns a new serviceInfo struct func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, service *v1.Service, hns HostNetworkService) *serviceInfo { onlyNodeLocalEndpoints := false @@ -310,6 +322,7 @@ type updateServiceMapResult struct { } type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo +type endPointsReferenceCountMap map[string]*uint16 func newEndpointsChangeMap(hostname string) endpointsChangeMap { return endpointsChangeMap{ @@ -460,10 +473,11 @@ type Proxier struct { endpointsChanges endpointsChangeMap serviceChanges serviceChangeMap - mu sync.Mutex // protects the following fields - serviceMap proxyServiceMap - endpointsMap proxyEndpointsMap - portsMap map[localPort]closeable + mu sync.Mutex // protects the following fields + serviceMap proxyServiceMap + endpointsMap proxyEndpointsMap + portsMap map[localPort]closeable + endPointsRefCount endPointsReferenceCountMap // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating hns policies // with some partial data after kube-proxy restart. @@ -636,6 +650,7 @@ func NewProxier( serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), endpointsChanges: newEndpointsChangeMap(hostname), + endPointsRefCount: make(endPointsReferenceCountMap), masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, clusterCIDR: clusterCIDR, @@ -1100,7 +1115,8 @@ func (proxier *Proxier) syncProxyRules() { continue } - newHnsEndpoint.refCount++ + newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID) + *newHnsEndpoint.refCount++ svcInfo.remoteEndpoint = newHnsEndpoint } } @@ -1202,9 +1218,14 @@ func (proxier *Proxier) syncProxyRules() { hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint) if newHnsEndpoint.isLocal { hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint) + } else { + // We only share the refCounts for remote endpoints + ep.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID) } + ep.hnsID = newHnsEndpoint.hnsID - ep.refCount++ + *ep.refCount++ + Log(ep, "Endpoint resource found", 3) } @@ -1337,6 +1358,12 @@ func (proxier *Proxier) syncProxyRules() { klog.V(5).Infof("Pending delete stale service IP %s connections", svcIP) } + // remove stale endpoint refcount entries + for hnsID, referenceCount := range proxier.endPointsRefCount { + if *referenceCount <= 0 { + delete(proxier.endPointsRefCount, hnsID) + } + } } type endpointServicePair struct { diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index 935ebbee446..e2fdd84d879 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -70,7 +70,7 @@ func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpo if ipNet.Contains(net.ParseIP(ip)) { return &endpointsInfo{ ip: ip, - isLocal: true, + isLocal: false, macAddress: macAddress, hnsID: guid, hns: hns, @@ -102,6 +102,7 @@ func (hns fakeHNS) deleteLoadBalancer(hnsID string) error { func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string) *Proxier { sourceVip := "192.168.1.2" hnsNetworkInfo := &hnsNetworkInfo{ + id: strings.ToUpper(guid), name: "TestNetwork", networkType: networkType, } @@ -120,6 +121,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust hostMac: macAddress, isDSR: false, hns: newFakeHNS(), + endPointsRefCount: make(endPointsReferenceCountMap), } return proxier } @@ -217,6 +219,14 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) { if proxier.endpointsMap[svcPortName][0].hnsID != guid { t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid) } + + if *proxier.endPointsRefCount[guid] <= 0 { + t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid]) + } + + if *proxier.endPointsRefCount[guid] != *proxier.endpointsMap[svcPortName][0].refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *proxier.endpointsMap[svcPortName][0].refCount) + } } func TestCreateRemoteEndpointL2Bridge(t *testing.T) { syncPeriod := 30 * time.Second @@ -264,7 +274,16 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) { if proxier.endpointsMap[svcPortName][0].hnsID != guid { t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid) } + + if *proxier.endPointsRefCount[guid] <= 0 { + t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid]) + } + + if *proxier.endPointsRefCount[guid] != *proxier.endpointsMap[svcPortName][0].refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *proxier.endpointsMap[svcPortName][0].refCount) + } } + func TestCreateLoadBalancer(t *testing.T) { syncPeriod := 30 * time.Second proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay")