Merge pull request #93638 from sbangari/refcountfix3

Avoid dereferencing same endpoint twice on the deletion or update of a service
This commit is contained in:
Kubernetes Prow Robot 2020-09-01 16:35:06 -07:00 committed by GitHub
commit 6e7086d7ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 354 additions and 11 deletions

View File

@ -319,19 +319,21 @@ func (ep *endpointsInfo) Cleanup() {
Log(ep, "Endpoint Cleanup", 3)
if ep.refCount != nil {
*ep.refCount--
}
// Remove the remote hns endpoint, if no service is referring it
// Never delete a Local Endpoint. Local Endpoints are already created by other entities.
// Remove only remote endpoints created by this service
if (ep.refCount == nil || *ep.refCount <= 0) && !ep.GetIsLocal() {
klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep)
err := ep.hns.deleteEndpoint(ep.hnsID)
if err == nil {
ep.hnsID = ""
} else {
klog.Errorf("Endpoint deletion failed for %v: %v", ep.IP(), err)
// Remove the remote hns endpoint, if no service is referring it
// Never delete a Local Endpoint. Local Endpoints are already created by other entities.
// Remove only remote endpoints created by this service
if *ep.refCount <= 0 && !ep.GetIsLocal() {
klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep)
err := ep.hns.deleteEndpoint(ep.hnsID)
if err == nil {
ep.hnsID = ""
} else {
klog.Errorf("Endpoint deletion failed for %v: %v", ep.IP(), err)
}
}
ep.refCount = nil
}
}

View File

@ -315,7 +315,329 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
}
}
func TestSharedRemoteEndpointDelete(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
if proxier == nil {
t.Error()
}
svcIP1 := "10.20.30.41"
svcPort1 := 80
svcNodePort1 := 3001
svcPortName1 := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
svcIP2 := "10.20.30.42"
svcPort2 := 80
svcNodePort2 := 3002
svcPortName2 := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc2"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
makeServiceMap(proxier,
makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP1
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort1),
}}
}),
makeTestService(svcPortName2.Namespace, svcPortName2.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP2
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName2.Port,
Port: int32(svcPort2),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort2),
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
}},
}}
}),
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName2.Port,
Port: int32(svcPort2),
Protocol: v1.ProtocolTCP,
}},
}}
}),
)
proxier.setInitialized(true)
proxier.syncProxyRules()
ep := proxier.endpointsMap[svcPortName1][0]
epInfo, ok := ep.(*endpointsInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
} else {
if epInfo.hnsID != guid {
t.Errorf("%v does not match %v", epInfo.hnsID, guid)
}
}
if *proxier.endPointsRefCount[guid] != 2 {
t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid])
}
if *proxier.endPointsRefCount[guid] != *epInfo.refCount {
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
}
proxier.setInitialized(false)
deleteServices(proxier,
makeTestService(svcPortName2.Namespace, svcPortName2.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP2
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName2.Port,
Port: int32(svcPort2),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort2),
}}
}),
)
deleteEndpoints(proxier,
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName2.Port,
Port: int32(svcPort2),
Protocol: v1.ProtocolTCP,
}},
}}
}),
)
proxier.setInitialized(true)
proxier.syncProxyRules()
ep = proxier.endpointsMap[svcPortName1][0]
epInfo, ok = ep.(*endpointsInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
} else {
if epInfo.hnsID != guid {
t.Errorf("%v does not match %v", epInfo.hnsID, guid)
}
}
if *epInfo.refCount != 1 {
t.Errorf("Incorrect Refcount. Current value: %v", *epInfo.refCount)
}
if *proxier.endPointsRefCount[guid] != *epInfo.refCount {
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
}
}
func TestSharedRemoteEndpointUpdate(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
if proxier == nil {
t.Error()
}
svcIP1 := "10.20.30.41"
svcPort1 := 80
svcNodePort1 := 3001
svcPortName1 := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
svcIP2 := "10.20.30.42"
svcPort2 := 80
svcNodePort2 := 3002
svcPortName2 := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc2"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
makeServiceMap(proxier,
makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP1
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort1),
}}
}),
makeTestService(svcPortName2.Namespace, svcPortName2.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP2
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName2.Port,
Port: int32(svcPort2),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort2),
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
}},
}}
}),
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName2.Port,
Port: int32(svcPort2),
Protocol: v1.ProtocolTCP,
}},
}}
}),
)
proxier.setInitialized(true)
proxier.syncProxyRules()
ep := proxier.endpointsMap[svcPortName1][0]
epInfo, ok := ep.(*endpointsInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
} else {
if epInfo.hnsID != guid {
t.Errorf("%v does not match %v", epInfo.hnsID, guid)
}
}
if *proxier.endPointsRefCount[guid] != 2 {
t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid])
}
if *proxier.endPointsRefCount[guid] != *epInfo.refCount {
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
}
proxier.setInitialized(false)
proxier.OnServiceUpdate(
makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP1
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort1),
}}
}),
makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP1
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
NodePort: int32(3003),
}}
}))
proxier.OnEndpointsUpdate(
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
}},
}}
}),
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{
{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
},
{
Name: "p443",
Port: int32(443),
Protocol: v1.ProtocolTCP,
}},
}}
}))
proxier.mu.Lock()
proxier.endpointsSynced = true
proxier.mu.Unlock()
proxier.setInitialized(true)
proxier.syncProxyRules()
ep = proxier.endpointsMap[svcPortName1][0]
epInfo, ok = ep.(*endpointsInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String())
} else {
if epInfo.hnsID != guid {
t.Errorf("%v does not match %v", epInfo.hnsID, guid)
}
}
if *epInfo.refCount != 2 {
t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount)
}
if *proxier.endPointsRefCount[guid] != *epInfo.refCount {
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
}
}
func TestCreateLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false)
@ -487,6 +809,15 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
defer proxier.mu.Unlock()
proxier.servicesSynced = true
}
func deleteServices(proxier *Proxier, allServices ...*v1.Service) {
for i := range allServices {
proxier.OnServiceDelete(allServices[i])
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.servicesSynced = true
}
func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
@ -511,6 +842,16 @@ func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
proxier.endpointsSynced = true
}
func deleteEndpoints(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
for i := range allEndpoints {
proxier.OnEndpointsDelete(allEndpoints[i])
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointsSynced = true
}
func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
ept := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{