mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #91706 from sbangari/remoteendpointrefcount
Fixing refcounting of remote endpoints used across services
This commit is contained in:
commit
86e14157d0
@ -158,7 +158,7 @@ type endpointsInfo struct {
|
||||
isLocal bool
|
||||
macAddress string
|
||||
hnsID string
|
||||
refCount uint16
|
||||
refCount *uint16
|
||||
providerAddress string
|
||||
hns HostNetworkService
|
||||
}
|
||||
@ -180,7 +180,7 @@ func newEndpointInfo(ip string, port uint16, isLocal bool, hns HostNetworkServic
|
||||
port: port,
|
||||
isLocal: isLocal,
|
||||
macAddress: conjureMac("02-11", net.ParseIP(ip)),
|
||||
refCount: 0,
|
||||
refCount: new(uint16),
|
||||
hnsID: "",
|
||||
hns: hns,
|
||||
}
|
||||
@ -201,11 +201,14 @@ func newSourceVIP(hns HostNetworkService, network string, ip string, mac string,
|
||||
|
||||
func (ep *endpointsInfo) Cleanup() {
|
||||
Log(ep, "Endpoint Cleanup", 3)
|
||||
ep.refCount--
|
||||
if ep.refCount != nil {
|
||||
*ep.refCount--
|
||||
}
|
||||
|
||||
// Remove the remote hns endpoint, if no service is referring it
|
||||
// Never delete a Local Endpoint. Local Endpoints are already created by other entities.
|
||||
// Remove only remote endpoints created by this service
|
||||
if ep.refCount <= 0 && !ep.isLocal {
|
||||
if (ep.refCount == nil || *ep.refCount <= 0) && !ep.isLocal {
|
||||
klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep)
|
||||
err := ep.hns.deleteEndpoint(ep.hnsID)
|
||||
if err == nil {
|
||||
@ -216,6 +219,15 @@ func (ep *endpointsInfo) Cleanup() {
|
||||
}
|
||||
}
|
||||
|
||||
func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 {
|
||||
refCount, exists := refCountMap[hnsID]
|
||||
if !exists {
|
||||
refCountMap[hnsID] = new(uint16)
|
||||
refCount = refCountMap[hnsID]
|
||||
}
|
||||
return refCount
|
||||
}
|
||||
|
||||
// returns a new serviceInfo struct
|
||||
func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, service *v1.Service, hns HostNetworkService) *serviceInfo {
|
||||
onlyNodeLocalEndpoints := false
|
||||
@ -311,6 +323,7 @@ type updateServiceMapResult struct {
|
||||
}
|
||||
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
|
||||
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
|
||||
type endPointsReferenceCountMap map[string]*uint16
|
||||
|
||||
func newEndpointsChangeMap(hostname string) endpointsChangeMap {
|
||||
return endpointsChangeMap{
|
||||
@ -461,10 +474,11 @@ type Proxier struct {
|
||||
endpointsChanges endpointsChangeMap
|
||||
serviceChanges serviceChangeMap
|
||||
|
||||
mu sync.Mutex // protects the following fields
|
||||
serviceMap proxyServiceMap
|
||||
endpointsMap proxyEndpointsMap
|
||||
portsMap map[localPort]closeable
|
||||
mu sync.Mutex // protects the following fields
|
||||
serviceMap proxyServiceMap
|
||||
endpointsMap proxyEndpointsMap
|
||||
portsMap map[localPort]closeable
|
||||
endPointsRefCount endPointsReferenceCountMap
|
||||
// endpointsSynced and servicesSynced are set to true when corresponding
|
||||
// objects are synced after startup. This is used to avoid updating hns policies
|
||||
// with some partial data after kube-proxy restart.
|
||||
@ -638,6 +652,7 @@ func NewProxier(
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
endPointsRefCount: make(endPointsReferenceCountMap),
|
||||
masqueradeAll: masqueradeAll,
|
||||
masqueradeMark: masqueradeMark,
|
||||
clusterCIDR: clusterCIDR,
|
||||
@ -1103,7 +1118,8 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
continue
|
||||
}
|
||||
|
||||
newHnsEndpoint.refCount++
|
||||
newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
|
||||
*newHnsEndpoint.refCount++
|
||||
svcInfo.remoteEndpoint = newHnsEndpoint
|
||||
}
|
||||
}
|
||||
@ -1205,9 +1221,14 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
|
||||
if newHnsEndpoint.isLocal {
|
||||
hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
|
||||
} else {
|
||||
// We only share the refCounts for remote endpoints
|
||||
ep.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
|
||||
}
|
||||
|
||||
ep.hnsID = newHnsEndpoint.hnsID
|
||||
ep.refCount++
|
||||
*ep.refCount++
|
||||
|
||||
Log(ep, "Endpoint resource found", 3)
|
||||
}
|
||||
|
||||
@ -1346,6 +1367,12 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
klog.V(5).Infof("Pending delete stale service IP %s connections", svcIP)
|
||||
}
|
||||
|
||||
// remove stale endpoint refcount entries
|
||||
for hnsID, referenceCount := range proxier.endPointsRefCount {
|
||||
if *referenceCount <= 0 {
|
||||
delete(proxier.endPointsRefCount, hnsID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type endpointServicePair struct {
|
||||
|
@ -70,7 +70,7 @@ func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpo
|
||||
if ipNet.Contains(net.ParseIP(ip)) {
|
||||
return &endpointsInfo{
|
||||
ip: ip,
|
||||
isLocal: true,
|
||||
isLocal: false,
|
||||
macAddress: macAddress,
|
||||
hnsID: guid,
|
||||
hns: hns,
|
||||
@ -102,6 +102,7 @@ func (hns fakeHNS) deleteLoadBalancer(hnsID string) error {
|
||||
func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string) *Proxier {
|
||||
sourceVip := "192.168.1.2"
|
||||
hnsNetworkInfo := &hnsNetworkInfo{
|
||||
id: strings.ToUpper(guid),
|
||||
name: "TestNetwork",
|
||||
networkType: networkType,
|
||||
}
|
||||
@ -120,6 +121,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
|
||||
hostMac: macAddress,
|
||||
isDSR: false,
|
||||
hns: newFakeHNS(),
|
||||
endPointsRefCount: make(endPointsReferenceCountMap),
|
||||
}
|
||||
return proxier
|
||||
}
|
||||
@ -217,6 +219,14 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
|
||||
if proxier.endpointsMap[svcPortName][0].hnsID != guid {
|
||||
t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid)
|
||||
}
|
||||
|
||||
if *proxier.endPointsRefCount[guid] <= 0 {
|
||||
t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid])
|
||||
}
|
||||
|
||||
if *proxier.endPointsRefCount[guid] != *proxier.endpointsMap[svcPortName][0].refCount {
|
||||
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *proxier.endpointsMap[svcPortName][0].refCount)
|
||||
}
|
||||
}
|
||||
func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
|
||||
syncPeriod := 30 * time.Second
|
||||
@ -264,7 +274,16 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
|
||||
if proxier.endpointsMap[svcPortName][0].hnsID != guid {
|
||||
t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid)
|
||||
}
|
||||
|
||||
if *proxier.endPointsRefCount[guid] <= 0 {
|
||||
t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid])
|
||||
}
|
||||
|
||||
if *proxier.endPointsRefCount[guid] != *proxier.endpointsMap[svcPortName][0].refCount {
|
||||
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *proxier.endpointsMap[svcPortName][0].refCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateLoadBalancer(t *testing.T) {
|
||||
syncPeriod := 30 * time.Second
|
||||
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay")
|
||||
|
Loading…
Reference in New Issue
Block a user