From dc2fc1045da1d270dc5f1131b584bd2c095fae04 Mon Sep 17 00:00:00 2001 From: daschott Date: Mon, 31 Oct 2022 21:57:34 -0700 Subject: [PATCH] added backend hashing to winkernel proxier --- pkg/proxy/winkernel/hns.go | 56 ++++++++++++++++++++++++++++++--- pkg/proxy/winkernel/hns_test.go | 13 +++++--- pkg/proxy/winkernel/proxier.go | 11 +++---- 3 files changed, 66 insertions(+), 14 deletions(-) diff --git a/pkg/proxy/winkernel/hns.go b/pkg/proxy/winkernel/hns.go index 051a5b30035..443dfb09572 100644 --- a/pkg/proxy/winkernel/hns.go +++ b/pkg/proxy/winkernel/hns.go @@ -20,6 +20,7 @@ limitations under the License. package winkernel import ( + "crypto/sha1" "encoding/json" "fmt" @@ -250,11 +251,17 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo)) for _, lb := range lbs { portMap := lb.PortMappings[0] + // Compute hash from backends (endpoint IDs) + hash, err := hashEndpoints(lb.HostComputeEndpoints) + if err != nil { + klog.V(2).ErrorS(err, "Error hashing endpoints", "policy", lb) + return nil, err + } if len(lb.FrontendVIPs) == 0 { // Leave VIP uninitialized - id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsCount: len(lb.HostComputeEndpoints)} + id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsHash: hash} } else { - id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsCount: len(lb.HostComputeEndpoints)} + id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsHash: hash} } loadBalancers[id] = &loadBalancerInfo{ hnsID: lb.Id, @@ -267,11 +274,17 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { var id loadBalancerIdentifier vips := []string{} + // Compute hash from backends (endpoint IDs) + hash, err := hashEndpoints(endpoints) + if err != nil { + klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints) + return nil, err + } if len(vip) > 0 { - id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsCount: len(endpoints)} + id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash} vips = append(vips, vip) } else { - id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsCount: len(endpoints)} + id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash} } if lb, found := previousLoadBalancers[id]; found { @@ -356,3 +369,38 @@ func (hns hns) deleteLoadBalancer(hnsID string) error { err = lb.Delete() return err } + +// Calculates a hash from the given endpoint IDs. +func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err error) { + var id string + // Recover in case something goes wrong. Return error and null byte array. + defer func() { + if r := recover(); r != nil { + err = r.(error) + hash = [20]byte{} + } + }() + + // Iterate over endpoints, compute hash + for _, ep := range endpoints { + switch x := any(ep).(type) { + case endpointsInfo: + id = x.hnsID + case string: + id = x + } + if len(id) > 0 { + // We XOR the hashes of endpoints, since they are an unordered set. + // This can cause collisions, but is sufficient since we are using other keys to identify the load balancer. + hash = xor(hash, sha1.Sum(([]byte(id)))) + } + } + return +} + +func xor(b1 [20]byte, b2 [20]byte) (xorbytes [20]byte) { + for i := 0; i < 20; i++ { + xorbytes[i] = b1[i] ^ b2[i] + } + return xorbytes +} diff --git a/pkg/proxy/winkernel/hns_test.go b/pkg/proxy/winkernel/hns_test.go index 749b0c23c2f..f18fb40d1e1 100644 --- a/pkg/proxy/winkernel/hns_test.go +++ b/pkg/proxy/winkernel/hns_test.go @@ -302,15 +302,20 @@ func TestGetLoadBalancerExisting(t *testing.T) { if err != nil { t.Error(err) } - // We populate this to ensure we test for getting existing load balancer - id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsCount: len(Endpoints)} - lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id} - endpoint := &endpointsInfo{ ip: Endpoint.IpConfigurations[0].IpAddress, hnsID: Endpoint.Id, } endpoints := []endpointsInfo{*endpoint} + hash, err := hashEndpoints(endpoints) + if err != nil { + t.Error(err) + } + + // We populate this to ensure we test for getting existing load balancer + id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsHash: hash} + lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id} + lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs) if err != nil { diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 2e5ae7078c8..38e0ec67cb3 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -97,11 +97,11 @@ type loadBalancerInfo struct { } type loadBalancerIdentifier struct { - protocol uint16 - internalPort uint16 - externalPort uint16 - vip string - endpointsCount int + protocol uint16 + internalPort uint16 + externalPort uint16 + vip string + endpointsHash [20]byte } type loadBalancerFlags struct { @@ -153,7 +153,6 @@ func newHostNetworkService() (HostNetworkService, hcn.SupportedFeatures) { } else { panic("Windows HNS Api V2 required. This version of windows does not support API V2") } - return h, supportedFeatures }