diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index a2ae3495099..4718639c14e 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -319,19 +319,21 @@ func (ep *endpointsInfo) Cleanup() { Log(ep, "Endpoint Cleanup", 3) if ep.refCount != nil { *ep.refCount-- - } - // Remove the remote hns endpoint, if no service is referring it - // Never delete a Local Endpoint. Local Endpoints are already created by other entities. - // Remove only remote endpoints created by this service - if (ep.refCount == nil || *ep.refCount <= 0) && !ep.GetIsLocal() { - klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep) - err := ep.hns.deleteEndpoint(ep.hnsID) - if err == nil { - ep.hnsID = "" - } else { - klog.Errorf("Endpoint deletion failed for %v: %v", ep.IP(), err) + // Remove the remote hns endpoint, if no service is referring it + // Never delete a Local Endpoint. Local Endpoints are already created by other entities. + // Remove only remote endpoints created by this service + if *ep.refCount <= 0 && !ep.GetIsLocal() { + klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep) + err := ep.hns.deleteEndpoint(ep.hnsID) + if err == nil { + ep.hnsID = "" + } else { + klog.Errorf("Endpoint deletion failed for %v: %v", ep.IP(), err) + } } + + ep.refCount = nil } } diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index 01a46b44aa7..f3a92c21716 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -315,7 +315,329 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) { t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount) } } +func TestSharedRemoteEndpointDelete(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false) + if proxier == nil { + t.Error() + } + svcIP1 := "10.20.30.41" + svcPort1 := 80 + svcNodePort1 := 3001 + svcPortName1 := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + svcIP2 := "10.20.30.42" + svcPort2 := 80 + svcNodePort2 := 3002 + svcPortName2 := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc2"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP1 + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName1.Port, + Port: int32(svcPort1), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort1), + }} + }), + makeTestService(svcPortName2.Namespace, svcPortName2.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP2 + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName2.Port, + Port: int32(svcPort2), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort2), + }} + }), + ) + makeEndpointsMap(proxier, + makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName1.Port, + Port: int32(svcPort1), + Protocol: v1.ProtocolTCP, + }}, + }} + }), + makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName2.Port, + Port: int32(svcPort2), + Protocol: v1.ProtocolTCP, + }}, + }} + }), + ) + proxier.setInitialized(true) + proxier.syncProxyRules() + ep := proxier.endpointsMap[svcPortName1][0] + epInfo, ok := ep.(*endpointsInfo) + if !ok { + t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + + } else { + if epInfo.hnsID != guid { + t.Errorf("%v does not match %v", epInfo.hnsID, guid) + } + } + + if *proxier.endPointsRefCount[guid] != 2 { + t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid]) + } + + if *proxier.endPointsRefCount[guid] != *epInfo.refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount) + } + + proxier.setInitialized(false) + deleteServices(proxier, + makeTestService(svcPortName2.Namespace, svcPortName2.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP2 + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName2.Port, + Port: int32(svcPort2), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort2), + }} + }), + ) + + deleteEndpoints(proxier, + makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName2.Port, + Port: int32(svcPort2), + Protocol: v1.ProtocolTCP, + }}, + }} + }), + ) + + proxier.setInitialized(true) + proxier.syncProxyRules() + + ep = proxier.endpointsMap[svcPortName1][0] + epInfo, ok = ep.(*endpointsInfo) + if !ok { + t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + + } else { + if epInfo.hnsID != guid { + t.Errorf("%v does not match %v", epInfo.hnsID, guid) + } + } + + if *epInfo.refCount != 1 { + t.Errorf("Incorrect Refcount. Current value: %v", *epInfo.refCount) + } + + if *proxier.endPointsRefCount[guid] != *epInfo.refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount) + } +} +func TestSharedRemoteEndpointUpdate(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false) + if proxier == nil { + t.Error() + } + + svcIP1 := "10.20.30.41" + svcPort1 := 80 + svcNodePort1 := 3001 + svcPortName1 := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + svcIP2 := "10.20.30.42" + svcPort2 := 80 + svcNodePort2 := 3002 + svcPortName2 := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc2"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP1 + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName1.Port, + Port: int32(svcPort1), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort1), + }} + }), + makeTestService(svcPortName2.Namespace, svcPortName2.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP2 + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName2.Port, + Port: int32(svcPort2), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort2), + }} + }), + ) + + makeEndpointsMap(proxier, + makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName1.Port, + Port: int32(svcPort1), + Protocol: v1.ProtocolTCP, + }}, + }} + }), + makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName2.Port, + Port: int32(svcPort2), + Protocol: v1.ProtocolTCP, + }}, + }} + }), + ) + proxier.setInitialized(true) + proxier.syncProxyRules() + ep := proxier.endpointsMap[svcPortName1][0] + epInfo, ok := ep.(*endpointsInfo) + if !ok { + t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + + } else { + if epInfo.hnsID != guid { + t.Errorf("%v does not match %v", epInfo.hnsID, guid) + } + } + + if *proxier.endPointsRefCount[guid] != 2 { + t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid]) + } + + if *proxier.endPointsRefCount[guid] != *epInfo.refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount) + } + + proxier.setInitialized(false) + + proxier.OnServiceUpdate( + makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP1 + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName1.Port, + Port: int32(svcPort1), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort1), + }} + }), + makeTestService(svcPortName1.Namespace, svcPortName1.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP1 + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName1.Port, + Port: int32(svcPort1), + Protocol: v1.ProtocolTCP, + NodePort: int32(3003), + }} + })) + + proxier.OnEndpointsUpdate( + makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName1.Port, + Port: int32(svcPort1), + Protocol: v1.ProtocolTCP, + }}, + }} + }), + makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{ + { + Name: svcPortName1.Port, + Port: int32(svcPort1), + Protocol: v1.ProtocolTCP, + }, + { + Name: "p443", + Port: int32(443), + Protocol: v1.ProtocolTCP, + }}, + }} + })) + + proxier.mu.Lock() + proxier.endpointsSynced = true + proxier.mu.Unlock() + + proxier.setInitialized(true) + proxier.syncProxyRules() + + ep = proxier.endpointsMap[svcPortName1][0] + epInfo, ok = ep.(*endpointsInfo) + + if !ok { + t.Errorf("Failed to cast endpointsInfo %q", svcPortName1.String()) + + } else { + if epInfo.hnsID != guid { + t.Errorf("%v does not match %v", epInfo.hnsID, guid) + } + } + + if *epInfo.refCount != 2 { + t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount) + } + + if *proxier.endPointsRefCount[guid] != *epInfo.refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount) + } +} func TestCreateLoadBalancer(t *testing.T) { syncPeriod := 30 * time.Second proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false) @@ -487,6 +809,15 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) { defer proxier.mu.Unlock() proxier.servicesSynced = true } +func deleteServices(proxier *Proxier, allServices ...*v1.Service) { + for i := range allServices { + proxier.OnServiceDelete(allServices[i]) + } + + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.servicesSynced = true +} func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service { svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -511,6 +842,16 @@ func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) { proxier.endpointsSynced = true } +func deleteEndpoints(proxier *Proxier, allEndpoints ...*v1.Endpoints) { + for i := range allEndpoints { + proxier.OnEndpointsDelete(allEndpoints[i]) + } + + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.endpointsSynced = true +} + func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints { ept := &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{