added backend hashing to winkernel proxier

This commit is contained in:
daschott 2022-10-31 21:57:34 -07:00
parent dd3dfab895
commit dc2fc1045d
3 changed files with 66 additions and 14 deletions

View File

@ -20,6 +20,7 @@ limitations under the License.
package winkernel package winkernel
import ( import (
"crypto/sha1"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -250,11 +251,17 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn
loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo)) loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
for _, lb := range lbs { for _, lb := range lbs {
portMap := lb.PortMappings[0] portMap := lb.PortMappings[0]
// Compute hash from backends (endpoint IDs)
hash, err := hashEndpoints(lb.HostComputeEndpoints)
if err != nil {
klog.V(2).ErrorS(err, "Error hashing endpoints", "policy", lb)
return nil, err
}
if len(lb.FrontendVIPs) == 0 { if len(lb.FrontendVIPs) == 0 {
// Leave VIP uninitialized // Leave VIP uninitialized
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsCount: len(lb.HostComputeEndpoints)} id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsHash: hash}
} else { } else {
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsCount: len(lb.HostComputeEndpoints)} id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsHash: hash}
} }
loadBalancers[id] = &loadBalancerInfo{ loadBalancers[id] = &loadBalancerInfo{
hnsID: lb.Id, hnsID: lb.Id,
@ -267,11 +274,17 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn
func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
var id loadBalancerIdentifier var id loadBalancerIdentifier
vips := []string{} vips := []string{}
// Compute hash from backends (endpoint IDs)
hash, err := hashEndpoints(endpoints)
if err != nil {
klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints)
return nil, err
}
if len(vip) > 0 { if len(vip) > 0 {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsCount: len(endpoints)} id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash}
vips = append(vips, vip) vips = append(vips, vip)
} else { } else {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsCount: len(endpoints)} id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash}
} }
if lb, found := previousLoadBalancers[id]; found { if lb, found := previousLoadBalancers[id]; found {
@ -356,3 +369,38 @@ func (hns hns) deleteLoadBalancer(hnsID string) error {
err = lb.Delete() err = lb.Delete()
return err return err
} }
// Calculates a hash from the given endpoint IDs.
func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err error) {
var id string
// Recover in case something goes wrong. Return error and null byte array.
defer func() {
if r := recover(); r != nil {
err = r.(error)
hash = [20]byte{}
}
}()
// Iterate over endpoints, compute hash
for _, ep := range endpoints {
switch x := any(ep).(type) {
case endpointsInfo:
id = x.hnsID
case string:
id = x
}
if len(id) > 0 {
// We XOR the hashes of endpoints, since they are an unordered set.
// This can cause collisions, but is sufficient since we are using other keys to identify the load balancer.
hash = xor(hash, sha1.Sum(([]byte(id))))
}
}
return
}
func xor(b1 [20]byte, b2 [20]byte) (xorbytes [20]byte) {
for i := 0; i < 20; i++ {
xorbytes[i] = b1[i] ^ b2[i]
}
return xorbytes
}

View File

@ -302,15 +302,20 @@ func TestGetLoadBalancerExisting(t *testing.T) {
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
// We populate this to ensure we test for getting existing load balancer
id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsCount: len(Endpoints)}
lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id}
endpoint := &endpointsInfo{ endpoint := &endpointsInfo{
ip: Endpoint.IpConfigurations[0].IpAddress, ip: Endpoint.IpConfigurations[0].IpAddress,
hnsID: Endpoint.Id, hnsID: Endpoint.Id,
} }
endpoints := []endpointsInfo{*endpoint} endpoints := []endpointsInfo{*endpoint}
hash, err := hashEndpoints(endpoints)
if err != nil {
t.Error(err)
}
// We populate this to ensure we test for getting existing load balancer
id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsHash: hash}
lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id}
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs) lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
if err != nil { if err != nil {

View File

@ -97,11 +97,11 @@ type loadBalancerInfo struct {
} }
type loadBalancerIdentifier struct { type loadBalancerIdentifier struct {
protocol uint16 protocol uint16
internalPort uint16 internalPort uint16
externalPort uint16 externalPort uint16
vip string vip string
endpointsCount int endpointsHash [20]byte
} }
type loadBalancerFlags struct { type loadBalancerFlags struct {
@ -153,7 +153,6 @@ func newHostNetworkService() (HostNetworkService, hcn.SupportedFeatures) {
} else { } else {
panic("Windows HNS Api V2 required. This version of windows does not support API V2") panic("Windows HNS Api V2 required. This version of windows does not support API V2")
} }
return h, supportedFeatures return h, supportedFeatures
} }