diff --git a/pkg/proxy/winkernel/hns.go b/pkg/proxy/winkernel/hns.go index 051a5b30035..9d3889d60cc 100644 --- a/pkg/proxy/winkernel/hns.go +++ b/pkg/proxy/winkernel/hns.go @@ -20,6 +20,7 @@ limitations under the License. package winkernel import ( + "crypto/sha1" "encoding/json" "fmt" @@ -250,11 +251,17 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo)) for _, lb := range lbs { portMap := lb.PortMappings[0] + // Compute hash from backends (endpoint IDs) + hash, err := hashEndpoints(lb.HostComputeEndpoints) + if err != nil { + klog.V(2).ErrorS(err, "Error hashing endpoints", "policy", lb) + return nil, err + } if len(lb.FrontendVIPs) == 0 { // Leave VIP uninitialized - id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsCount: len(lb.HostComputeEndpoints)} + id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsHash: hash} } else { - id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsCount: len(lb.HostComputeEndpoints)} + id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsHash: hash} } loadBalancers[id] = &loadBalancerInfo{ hnsID: lb.Id, @@ -267,11 +274,17 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { var id loadBalancerIdentifier vips := []string{} + // Compute hash from backends (endpoint IDs) + hash, err := hashEndpoints(endpoints) + if err != nil { + klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints) + return nil, err + } if len(vip) > 0 { - id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsCount: len(endpoints)} + id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash} vips = append(vips, vip) } else { - id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsCount: len(endpoints)} + id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash} } if lb, found := previousLoadBalancers[id]; found { @@ -354,5 +367,46 @@ func (hns hns) deleteLoadBalancer(hnsID string) error { } err = lb.Delete() + if err != nil { + // There is a bug in Windows Server 2019, that can cause the delete call to fail sometimes. We retry one more time. + // TODO: The logic in syncProxyRules should be rewritten in the future to better stage and handle a call like this failing using the policyApplied fields. + klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource. Attempting one more time...", "loadBalancer", lb) + return lb.Delete() + } return err } + +// Calculates a hash from the given endpoint IDs. +func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err error) { + var id string + // Recover in case something goes wrong. Return error and null byte array. + defer func() { + if r := recover(); r != nil { + err = r.(error) + hash = [20]byte{} + } + }() + + // Iterate over endpoints, compute hash + for _, ep := range endpoints { + switch x := any(ep).(type) { + case endpointsInfo: + id = x.hnsID + case string: + id = x + } + if len(id) > 0 { + // We XOR the hashes of endpoints, since they are an unordered set. + // This can cause collisions, but is sufficient since we are using other keys to identify the load balancer. + hash = xor(hash, sha1.Sum(([]byte(id)))) + } + } + return +} + +func xor(b1 [20]byte, b2 [20]byte) (xorbytes [20]byte) { + for i := 0; i < 20; i++ { + xorbytes[i] = b1[i] ^ b2[i] + } + return xorbytes +} diff --git a/pkg/proxy/winkernel/hns_test.go b/pkg/proxy/winkernel/hns_test.go index 47bccc7920b..fe3ce751ccb 100644 --- a/pkg/proxy/winkernel/hns_test.go +++ b/pkg/proxy/winkernel/hns_test.go @@ -37,6 +37,7 @@ const ( gatewayAddress = "192.168.1.1" epMacAddress = "00-11-22-33-44-55" epIpAddress = "192.168.1.3" + epIpAddressB = "192.168.1.4" epIpAddressRemote = "192.168.2.3" epPaAddress = "10.0.0.3" protocol = 6 @@ -302,15 +303,20 @@ func TestGetLoadBalancerExisting(t *testing.T) { if err != nil { t.Error(err) } - // We populate this to ensure we test for getting existing load balancer - id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsCount: len(Endpoints)} - lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id} - endpoint := &endpointsInfo{ ip: Endpoint.IpConfigurations[0].IpAddress, hnsID: Endpoint.Id, } endpoints := []endpointsInfo{*endpoint} + hash, err := hashEndpoints(endpoints) + if err != nil { + t.Error(err) + } + + // We populate this to ensure we test for getting existing load balancer + id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsHash: hash} + lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id} + lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs) if err != nil { @@ -452,6 +458,80 @@ func mustTestNetwork(t *testing.T) *hcn.HostComputeNetwork { return network } +func TestHashEndpoints(t *testing.T) { + Network := mustTestNetwork(t) + // Create endpoint A + ipConfigA := &hcn.IpConfig{ + IpAddress: epIpAddress, + } + endpointASpec := &hcn.HostComputeEndpoint{ + IpConfigurations: []hcn.IpConfig{*ipConfigA}, + MacAddress: epMacAddress, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + } + endpointA, err := Network.CreateEndpoint(endpointASpec) + if err != nil { + t.Error(err) + } + endpointInfoA := &endpointsInfo{ + ip: endpointA.IpConfigurations[0].IpAddress, + hnsID: endpointA.Id, + } + // Create Endpoint B + ipConfigB := &hcn.IpConfig{ + IpAddress: epIpAddressB, + } + endpointBSpec := &hcn.HostComputeEndpoint{ + IpConfigurations: []hcn.IpConfig{*ipConfigB}, + MacAddress: epMacAddress, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + } + endpointB, err := Network.CreateEndpoint(endpointBSpec) + if err != nil { + t.Error(err) + } + endpointInfoB := &endpointsInfo{ + ip: endpointB.IpConfigurations[0].IpAddress, + hnsID: endpointB.Id, + } + endpoints := []endpointsInfo{*endpointInfoA, *endpointInfoB} + endpointsReverse := []endpointsInfo{*endpointInfoB, *endpointInfoA} + h1, err := hashEndpoints(endpoints) + if err != nil { + t.Error(err) + } else if len(h1) < 1 { + t.Error("HashEndpoints failed for endpoints", endpoints) + } + + h2, err := hashEndpoints(endpointsReverse) + if err != nil { + t.Error(err) + } + if h1 != h2 { + t.Errorf("%x does not match %x", h1, h2) + } + + // Clean up + err = endpointA.Delete() + if err != nil { + t.Error(err) + } + err = endpointB.Delete() + if err != nil { + t.Error(err) + } + err = Network.Delete() + if err != nil { + t.Error(err) + } +} + func createTestNetwork() (*hcn.HostComputeNetwork, error) { network := &hcn.HostComputeNetwork{ Type: NETWORK_TYPE_OVERLAY, diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 2e5ae7078c8..235ea7680d2 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -97,11 +97,11 @@ type loadBalancerInfo struct { } type loadBalancerIdentifier struct { - protocol uint16 - internalPort uint16 - externalPort uint16 - vip string - endpointsCount int + protocol uint16 + internalPort uint16 + externalPort uint16 + vip string + endpointsHash [20]byte } type loadBalancerFlags struct { @@ -153,7 +153,6 @@ func newHostNetworkService() (HostNetworkService, hcn.SupportedFeatures) { } else { panic("Windows HNS Api V2 required. This version of windows does not support API V2") } - return h, supportedFeatures } @@ -784,7 +783,7 @@ func CleanupLeftovers() (encounteredError bool) { func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) { klog.V(3).InfoS("Service cleanup", "serviceInfo", svcInfo) // Skip the svcInfo.policyApplied check to remove all the policies - svcInfo.deleteAllHnsLoadBalancerPolicy() + svcInfo.deleteLoadBalancerPolicy() // Cleanup Endpoints references for _, ep := range endpoints { epInfo, ok := ep.(*endpointsInfo) @@ -799,25 +798,46 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) { svcInfo.policyApplied = false } -func (svcInfo *serviceInfo) deleteAllHnsLoadBalancerPolicy() { +func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { // Remove the Hns Policy corresponding to this service hns := svcInfo.hns - hns.deleteLoadBalancer(svcInfo.hnsID) - svcInfo.hnsID = "" + if err := hns.deleteLoadBalancer(svcInfo.hnsID); err != nil { + klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource.", "hnsID", svcInfo.hnsID, "ClusterIP", svcInfo.ClusterIP()) + } else { + // On successful delete, remove hnsId + svcInfo.hnsID = "" + } - hns.deleteLoadBalancer(svcInfo.nodePorthnsID) - svcInfo.nodePorthnsID = "" + if err := hns.deleteLoadBalancer(svcInfo.nodePorthnsID); err != nil { + klog.V(1).ErrorS(err, "Error deleting Hns NodePort policy resource.", "hnsID", svcInfo.nodePorthnsID, "NodePort", svcInfo.NodePort()) + } else { + // On successful delete, remove hnsId + svcInfo.nodePorthnsID = "" + } for _, externalIP := range svcInfo.externalIPs { - hns.deleteLoadBalancer(externalIP.hnsID) - externalIP.hnsID = "" + if err := hns.deleteLoadBalancer(externalIP.hnsID); err != nil { + klog.V(1).ErrorS(err, "Error deleting Hns ExternalIP policy resource.", "hnsID", externalIP.hnsID, "IP", externalIP.ip) + } else { + // On successful delete, remove hnsId + externalIP.hnsID = "" + } } for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs { - hns.deleteLoadBalancer(lbIngressIP.hnsID) - lbIngressIP.hnsID = "" + if err := hns.deleteLoadBalancer(lbIngressIP.hnsID); err != nil { + klog.V(1).ErrorS(err, "Error deleting Hns IngressIP policy resource.", "hnsID", lbIngressIP.hnsID, "IP", lbIngressIP.ip) + } else { + // On successful delete, remove hnsId + lbIngressIP.hnsID = "" + } + if lbIngressIP.healthCheckHnsID != "" { - hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID) - lbIngressIP.healthCheckHnsID = "" + if err := hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID); err != nil { + klog.V(1).ErrorS(err, "Error deleting Hns IngressIP HealthCheck policy resource.", "hnsID", lbIngressIP.healthCheckHnsID, "IP", lbIngressIP.ip) + } else { + // On successful delete, remove hnsId + lbIngressIP.healthCheckHnsID = "" + } } } }