Merge pull request #109124 from daschott/daschott/winkernel-perf-fix

winkernel proxier cache HNS data to improve syncProxyRules performance
This commit is contained in:
Kubernetes Prow Robot 2022-05-04 11:47:14 -07:00 committed by GitHub
commit 889e60ab33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 208 additions and 77 deletions

View File

@ -31,12 +31,14 @@ import (
type HostNetworkService interface {
getNetworkByName(name string) (*hnsNetworkInfo, error)
getAllEndpointsByNetwork(networkName string) (map[string]*endpointsInfo, error)
getEndpointByID(id string) (*endpointsInfo, error)
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
getEndpointByName(id string) (*endpointsInfo, error)
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
deleteEndpoint(hnsID string) error
getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error)
getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error)
deleteLoadBalancer(hnsID string) error
}
@ -56,6 +58,41 @@ func (hns hnsV1) getNetworkByName(name string) (*hnsNetworkInfo, error) {
networkType: hnsnetwork.Type,
}, nil
}
func (hns hnsV1) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointsInfo), error) {
hnsnetwork, err := hcsshim.GetHNSNetworkByName(networkName)
if err != nil {
klog.ErrorS(err, "failed to get HNS network by name", "name", networkName)
return nil, err
}
endpoints, err := hcsshim.HNSListEndpointRequest()
if err != nil {
return nil, fmt.Errorf("failed to list endpoints: %w", err)
}
endpointInfos := make(map[string]*(endpointsInfo))
for _, endpoint := range endpoints {
if strings.EqualFold(endpoint.VirtualNetwork, hnsnetwork.Id) {
// Add to map with key endpoint ID or IP address
// Storing this is expensive in terms of memory, however there is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address.
// TODO: Store by IP only and remove any lookups by endpoint ID.
endpointInfos[endpoint.Id] = &endpointsInfo{
ip: endpoint.IPAddress.String(),
isLocal: !endpoint.IsRemoteEndpoint,
macAddress: endpoint.MacAddress,
hnsID: endpoint.Id,
hns: hns,
// only ready and not terminating endpoints were added to HNS
ready: true,
serving: true,
terminating: false,
}
endpointInfos[endpoint.IPAddress.String()] = endpointInfos[endpoint.Id]
}
}
klog.V(3).InfoS("Queried endpoints from network", "network", networkName)
return endpointInfos, nil
}
func (hns hnsV1) getEndpointByID(id string) (*endpointsInfo, error) {
hnsendpoint, err := hcsshim.GetHNSEndpointByID(id)
if err != nil {
@ -186,38 +223,48 @@ func (hns hnsV1) deleteEndpoint(hnsID string) error {
return err
}
func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
func (hns hnsV1) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) {
plists, err := hcsshim.HNSListPolicyListRequest()
var id loadBalancerIdentifier
if err != nil {
return nil, err
}
loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
for _, plist := range plists {
// Validate if input meets any of the policy lists
lb := hcsshim.ELBPolicy{}
if err = json.Unmarshal(plist.Policies[0], &lb); err != nil {
continue
}
// Policy is ELB policy
portMap := lb.LBPolicy
if len(lb.VIPs) == 0 {
// Leave VIP uninitialized
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort}
} else {
id = loadBalancerIdentifier{protocol: portMap.Protocol, internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.VIPs[0]}
}
loadBalancers[id] = &loadBalancerInfo{
hnsID: plist.ID,
}
}
return loadBalancers, nil
}
func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
if flags.isDSR {
klog.V(3).InfoS("DSR is not supported in V1. Using non DSR instead")
}
var id loadBalancerIdentifier
if len(vip) > 0 {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsCount: len(endpoints)}
} else {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsCount: len(endpoints)}
}
for _, plist := range plists {
if len(plist.EndpointReferences) != len(endpoints) {
continue
}
// Validate if input meets any of the policy lists
elbPolicy := hcsshim.ELBPolicy{}
if err = json.Unmarshal(plist.Policies[0], &elbPolicy); err != nil {
continue
}
if elbPolicy.Protocol == protocol && elbPolicy.InternalPort == internalPort && elbPolicy.ExternalPort == externalPort && elbPolicy.ILB == flags.isILB {
if len(vip) > 0 {
if len(elbPolicy.VIPs) == 0 || elbPolicy.VIPs[0] != vip {
continue
}
} else if len(elbPolicy.VIPs) != 0 {
continue
}
klog.V(1).InfoS("Found existing Hns loadbalancer policy resource", "policies", plist)
return &loadBalancerInfo{
hnsID: plist.ID,
}, nil
}
if lb, found := previousLoadBalancers[id]; found {
klog.V(1).InfoS("Found existing Hns loadbalancer policy resource", "policies", lb)
return lb, nil
}
var hnsEndpoints []hcsshim.HNSEndpoint
@ -243,9 +290,12 @@ func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFl
} else {
return nil, err
}
return &loadBalancerInfo{
lbInfo := &loadBalancerInfo{
hnsID: lb.ID,
}, err
}
// Add to map of load balancers
previousLoadBalancers[id] = lbInfo
return lbInfo, err
}
func (hns hnsV1) deleteLoadBalancer(hnsID string) error {
if len(hnsID) == 0 {

View File

@ -68,6 +68,39 @@ func (hns hnsV2) getNetworkByName(name string) (*hnsNetworkInfo, error) {
remoteSubnets: remoteSubnets,
}, nil
}
func (hns hnsV2) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointsInfo), error) {
hcnnetwork, err := hcn.GetNetworkByName(networkName)
if err != nil {
klog.ErrorS(err, "failed to get HNS network by name", "name", networkName)
return nil, err
}
endpoints, err := hcn.ListEndpointsOfNetwork(hcnnetwork.Id)
if err != nil {
return nil, fmt.Errorf("failed to list endpoints: %w", err)
}
endpointInfos := make(map[string]*(endpointsInfo))
for _, ep := range endpoints {
// Add to map with key endpoint ID or IP address
// Storing this is expensive in terms of memory, however there is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address.
// TODO: Store by IP only and remove any lookups by endpoint ID.
endpointInfos[ep.Id] = &endpointsInfo{
ip: ep.IpConfigurations[0].IpAddress,
isLocal: uint32(ep.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0,
macAddress: ep.MacAddress,
hnsID: ep.Id,
hns: hns,
// only ready and not terminating endpoints were added to HNS
ready: true,
serving: true,
terminating: false,
}
endpointInfos[ep.IpConfigurations[0].IpAddress] = endpointInfos[ep.Id]
}
klog.V(3).InfoS("Queried endpoints from network", "network", networkName)
return endpointInfos, nil
}
func (hns hnsV2) getEndpointByID(id string) (*endpointsInfo, error) {
hnsendpoint, err := hcn.GetEndpointByID(id)
if err != nil {
@ -111,7 +144,6 @@ func (hns hnsV2) getEndpointByIpAddress(ip string, networkName string) (*endpoin
}, nil
}
}
return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}
func (hns hnsV2) getEndpointByName(name string) (*endpointsInfo, error) {
@ -195,45 +227,43 @@ func (hns hnsV2) deleteEndpoint(hnsID string) error {
}
return err
}
func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
plists, err := hcn.ListLoadBalancers()
func (hns hnsV2) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) {
lbs, err := hcn.ListLoadBalancers()
var id loadBalancerIdentifier
if err != nil {
return nil, err
}
for _, plist := range plists {
if len(plist.HostComputeEndpoints) != len(endpoints) {
continue
loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
for _, lb := range lbs {
portMap := lb.PortMappings[0]
if len(lb.FrontendVIPs) == 0 {
// Leave VIP uninitialized
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsCount: len(lb.HostComputeEndpoints)}
} else {
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsCount: len(lb.HostComputeEndpoints)}
}
// Validate if input meets any of the policy lists
lbPortMapping := plist.PortMappings[0]
if lbPortMapping.Protocol == uint32(protocol) && lbPortMapping.InternalPort == internalPort && lbPortMapping.ExternalPort == externalPort && (lbPortMapping.Flags&1 != 0) == flags.isILB {
if len(vip) > 0 {
if len(plist.FrontendVIPs) == 0 || plist.FrontendVIPs[0] != vip {
continue
}
} else if len(plist.FrontendVIPs) != 0 {
continue
}
klog.V(1).InfoS("Found existing Hns loadbalancer policy resource", "policies", plist)
return &loadBalancerInfo{
hnsID: plist.Id,
}, nil
loadBalancers[id] = &loadBalancerInfo{
hnsID: lb.Id,
}
}
klog.V(3).InfoS("Queried load balancers", "count", len(lbs))
return loadBalancers, nil
}
var hnsEndpoints []hcn.HostComputeEndpoint
for _, ep := range endpoints {
endpoint, err := hcn.GetEndpointByID(ep.hnsID)
if err != nil {
return nil, err
}
hnsEndpoints = append(hnsEndpoints, *endpoint)
}
func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
var id loadBalancerIdentifier
vips := []string{}
if len(vip) > 0 {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsCount: len(endpoints)}
vips = append(vips, vip)
} else {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsCount: len(endpoints)}
}
if lb, found := previousLoadBalancers[id]; found {
klog.V(1).InfoS("Found cached Hns loadbalancer policy resource", "policies", lb)
return lb, nil
}
lbPortMappingFlags := hcn.LoadBalancerPortMappingFlagsNone
@ -284,8 +314,8 @@ func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFl
Flags: lbFlags,
}
for _, endpoint := range hnsEndpoints {
loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, endpoint.Id)
for _, ep := range endpoints {
loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, ep.hnsID)
}
lb, err := loadBalancer.Create()
@ -294,11 +324,15 @@ func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFl
return nil, err
}
klog.V(1).InfoS("Hns loadbalancer policy resource", "loadBalancer", lb)
return &loadBalancerInfo{
klog.V(1).InfoS("Created Hns loadbalancer policy resource", "loadBalancer", lb)
lbInfo := &loadBalancerInfo{
hnsID: lb.Id,
}, err
}
// Add to map of load balancers
previousLoadBalancers[id] = lbInfo
return lbInfo, err
}
func (hns hnsV2) deleteLoadBalancer(hnsID string) error {
lb, err := hcn.GetLoadBalancerByID(hnsID)
if err != nil {

View File

@ -327,6 +327,7 @@ func testDeleteEndpoint(t *testing.T, hns HostNetworkService) {
func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) {
Network := mustTestNetwork(t)
lbs := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
ipConfig := &hcn.IpConfig{
IpAddress: epIpAddress,
@ -358,13 +359,16 @@ func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) {
if err != nil {
t.Error(err)
}
// We populate this to ensure we test for getting existing load balancer
id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsCount: len(Endpoints)}
lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id}
endpoint := &endpointsInfo{
ip: Endpoint.IpConfigurations[0].IpAddress,
hnsID: Endpoint.Id,
}
endpoints := []endpointsInfo{*endpoint}
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort)
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
if err != nil {
t.Error(err)
}
@ -388,6 +392,8 @@ func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) {
}
func testGetLoadBalancerNew(t *testing.T, hns HostNetworkService) {
Network := mustTestNetwork(t)
// We keep this empty to ensure we test for new load balancer creation.
lbs := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
ipConfig := &hcn.IpConfig{
IpAddress: epIpAddress,
@ -409,7 +415,7 @@ func testGetLoadBalancerNew(t *testing.T, hns HostNetworkService) {
hnsID: Endpoint.Id,
}
endpoints := []endpointsInfo{*endpoint}
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort)
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
if err != nil {
t.Error(err)
}

View File

@ -96,6 +96,14 @@ type loadBalancerInfo struct {
hnsID string
}
type loadBalancerIdentifier struct {
protocol uint16
internalPort uint16
externalPort uint16
vip string
endpointsCount int
}
type loadBalancerFlags struct {
isILB bool
isDSR bool
@ -1032,15 +1040,32 @@ func (proxier *Proxier) syncProxyRules() {
staleServices.Insert(svcInfo.ClusterIP().String())
}
}
// Query HNS for endpoints and load balancers
queriedEndpoints, err := hns.getAllEndpointsByNetwork(hnsNetworkName)
if err != nil {
klog.ErrorS(err, "Querying HNS for endpoints failed")
return
}
if queriedEndpoints == nil {
klog.V(4).InfoS("No existing endpoints found in HNS")
queriedEndpoints = make(map[string]*(endpointsInfo))
}
queriedLoadBalancers, err := hns.getAllLoadBalancers()
if queriedLoadBalancers == nil {
klog.V(4).InfoS("No existing load balancers found in HNS")
queriedLoadBalancers = make(map[loadBalancerIdentifier]*(loadBalancerInfo))
}
if err != nil {
klog.ErrorS(err, "Querying HNS for load balancers failed")
return
}
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
existingSourceVip, err := hns.getEndpointByIpAddress(proxier.sourceVip, hnsNetworkName)
if existingSourceVip == nil {
if _, ok := queriedEndpoints[proxier.sourceVip]; !ok {
_, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
}
if err != nil {
klog.ErrorS(err, "Source Vip endpoint creation failed")
return
if err != nil {
klog.ErrorS(err, "Source Vip endpoint creation failed")
return
}
}
}
@ -1060,7 +1085,7 @@ func (proxier *Proxier) syncProxyRules() {
}
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.ClusterIP().String(), hnsNetworkName)
serviceVipEndpoint := queriedEndpoints[svcInfo.ClusterIP().String()]
if serviceVipEndpoint == nil {
klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP())
hnsEndpoint := &endpointsInfo{
@ -1079,6 +1104,9 @@ func (proxier *Proxier) syncProxyRules() {
newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
*newHnsEndpoint.refCount++
svcInfo.remoteEndpoint = newHnsEndpoint
// store newly created endpoints in queriedEndpoints
queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint
queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint
}
}
@ -1099,7 +1127,6 @@ func (proxier *Proxier) syncProxyRules() {
if !ep.IsReady() {
continue
}
var newHnsEndpoint *endpointsInfo
hnsNetworkName := proxier.network.name
var err error
@ -1110,17 +1137,19 @@ func (proxier *Proxier) syncProxyRules() {
if svcInfo.targetPort == 0 {
svcInfo.targetPort = int(ep.port)
}
// There is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address, so we need to check using endpoint ID first.
// TODO: Remove lookup by endpoint ID, and use the IP address only, so we don't need to maintain multiple keys for lookup.
if len(ep.hnsID) > 0 {
newHnsEndpoint, err = hns.getEndpointByID(ep.hnsID)
newHnsEndpoint = queriedEndpoints[ep.hnsID]
}
if newHnsEndpoint == nil {
// First check if an endpoint resource exists for this IP, on the current host
// A Local endpoint could exist here already
// A remote endpoint was already created and proxy was restarted
newHnsEndpoint, err = hns.getEndpointByIpAddress(ep.IP(), hnsNetworkName)
newHnsEndpoint = queriedEndpoints[ep.IP()]
}
if newHnsEndpoint == nil {
if ep.GetIsLocal() {
klog.ErrorS(err, "Local endpoint not found: on network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName)
@ -1170,7 +1199,6 @@ func (proxier *Proxier) syncProxyRules() {
}
}
}
// For Overlay networks 'SourceVIP' on an Load balancer Policy can either be chosen as
// a) Source VIP configured on kube-proxy (or)
// b) Node IP of the current node
@ -1242,6 +1270,7 @@ func (proxier *Proxier) syncProxyRules() {
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
@ -1269,6 +1298,7 @@ func (proxier *Proxier) syncProxyRules() {
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.NodePort()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
@ -1300,6 +1330,7 @@ func (proxier *Proxier) syncProxyRules() {
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
@ -1328,6 +1359,7 @@ func (proxier *Proxier) syncProxyRules() {
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
@ -1354,6 +1386,7 @@ func (proxier *Proxier) syncProxyRules() {
Enum(svcInfo.Protocol()),
uint16(nodeport),
uint16(nodeport),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")

View File

@ -69,6 +69,10 @@ func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) {
}, nil
}
func (hns fakeHNS) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointsInfo), error) {
return nil, nil
}
func (hns fakeHNS) getEndpointByID(id string) (*endpointsInfo, error) {
return nil, nil
}
@ -82,6 +86,10 @@ func (hns fakeHNS) getEndpointByName(name string) (*endpointsInfo, error) {
}, nil
}
func (hns fakeHNS) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) {
return nil, nil
}
func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
_, ipNet, _ := netutils.ParseCIDRSloppy(destinationPrefix)
@ -112,7 +120,7 @@ func (hns fakeHNS) deleteEndpoint(hnsID string) error {
return nil
}
func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
return &loadBalancerInfo{
hnsID: guid,
}, nil