mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 15:58:37 +00:00
Winkernel proxier cache HNS data to improve syncProxyRules performance
Resolved issues with proxy rules taking a long time to be synced on Windows, by caching HNS data. In particular, the following HNS data will be cached for the context of syncProxyRules: * HNS endpoints * HNS load balancers
This commit is contained in:
parent
7c46f40bdf
commit
b7466d02cd
@ -31,12 +31,14 @@ import (
|
|||||||
|
|
||||||
type HostNetworkService interface {
|
type HostNetworkService interface {
|
||||||
getNetworkByName(name string) (*hnsNetworkInfo, error)
|
getNetworkByName(name string) (*hnsNetworkInfo, error)
|
||||||
|
getAllEndpointsByNetwork(networkName string) (map[string]*endpointsInfo, error)
|
||||||
getEndpointByID(id string) (*endpointsInfo, error)
|
getEndpointByID(id string) (*endpointsInfo, error)
|
||||||
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
|
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
|
||||||
getEndpointByName(id string) (*endpointsInfo, error)
|
getEndpointByName(id string) (*endpointsInfo, error)
|
||||||
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
|
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
|
||||||
deleteEndpoint(hnsID string) error
|
deleteEndpoint(hnsID string) error
|
||||||
getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error)
|
getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
|
||||||
|
getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error)
|
||||||
deleteLoadBalancer(hnsID string) error
|
deleteLoadBalancer(hnsID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,6 +58,41 @@ func (hns hnsV1) getNetworkByName(name string) (*hnsNetworkInfo, error) {
|
|||||||
networkType: hnsnetwork.Type,
|
networkType: hnsnetwork.Type,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (hns hnsV1) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointsInfo), error) {
|
||||||
|
hnsnetwork, err := hcsshim.GetHNSNetworkByName(networkName)
|
||||||
|
if err != nil {
|
||||||
|
klog.ErrorS(err, "failed to get HNS network by name", "name", networkName)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
endpoints, err := hcsshim.HNSListEndpointRequest()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to list endpoints: %w", err)
|
||||||
|
}
|
||||||
|
endpointInfos := make(map[string]*(endpointsInfo))
|
||||||
|
for _, endpoint := range endpoints {
|
||||||
|
if strings.EqualFold(endpoint.VirtualNetwork, hnsnetwork.Id) {
|
||||||
|
// Add to map with key endpoint ID or IP address
|
||||||
|
// Storing this is expensive in terms of memory, however there is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address.
|
||||||
|
// TODO: Store by IP only and remove any lookups by endpoint ID.
|
||||||
|
endpointInfos[endpoint.Id] = &endpointsInfo{
|
||||||
|
ip: endpoint.IPAddress.String(),
|
||||||
|
isLocal: !endpoint.IsRemoteEndpoint,
|
||||||
|
macAddress: endpoint.MacAddress,
|
||||||
|
hnsID: endpoint.Id,
|
||||||
|
hns: hns,
|
||||||
|
// only ready and not terminating endpoints were added to HNS
|
||||||
|
ready: true,
|
||||||
|
serving: true,
|
||||||
|
terminating: false,
|
||||||
|
}
|
||||||
|
endpointInfos[endpoint.IPAddress.String()] = endpointInfos[endpoint.Id]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
klog.V(3).InfoS("Queried endpoints from network", "network", networkName)
|
||||||
|
return endpointInfos, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (hns hnsV1) getEndpointByID(id string) (*endpointsInfo, error) {
|
func (hns hnsV1) getEndpointByID(id string) (*endpointsInfo, error) {
|
||||||
hnsendpoint, err := hcsshim.GetHNSEndpointByID(id)
|
hnsendpoint, err := hcsshim.GetHNSEndpointByID(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -186,38 +223,48 @@ func (hns hnsV1) deleteEndpoint(hnsID string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
func (hns hnsV1) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) {
|
||||||
plists, err := hcsshim.HNSListPolicyListRequest()
|
plists, err := hcsshim.HNSListPolicyListRequest()
|
||||||
|
var id loadBalancerIdentifier
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
|
||||||
|
for _, plist := range plists {
|
||||||
|
// Validate if input meets any of the policy lists
|
||||||
|
lb := hcsshim.ELBPolicy{}
|
||||||
|
if err = json.Unmarshal(plist.Policies[0], &lb); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Policy is ELB policy
|
||||||
|
portMap := lb.LBPolicy
|
||||||
|
if len(lb.VIPs) == 0 {
|
||||||
|
// Leave VIP uninitialized
|
||||||
|
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort}
|
||||||
|
} else {
|
||||||
|
id = loadBalancerIdentifier{protocol: portMap.Protocol, internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.VIPs[0]}
|
||||||
|
}
|
||||||
|
loadBalancers[id] = &loadBalancerInfo{
|
||||||
|
hnsID: plist.ID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return loadBalancers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
|
||||||
if flags.isDSR {
|
if flags.isDSR {
|
||||||
klog.V(3).InfoS("DSR is not supported in V1. Using non DSR instead")
|
klog.V(3).InfoS("DSR is not supported in V1. Using non DSR instead")
|
||||||
}
|
}
|
||||||
|
var id loadBalancerIdentifier
|
||||||
|
if len(vip) > 0 {
|
||||||
|
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsCount: len(endpoints)}
|
||||||
|
} else {
|
||||||
|
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsCount: len(endpoints)}
|
||||||
|
}
|
||||||
|
|
||||||
for _, plist := range plists {
|
if lb, found := previousLoadBalancers[id]; found {
|
||||||
if len(plist.EndpointReferences) != len(endpoints) {
|
klog.V(1).InfoS("Found existing Hns loadbalancer policy resource", "policies", lb)
|
||||||
continue
|
return lb, nil
|
||||||
}
|
|
||||||
// Validate if input meets any of the policy lists
|
|
||||||
elbPolicy := hcsshim.ELBPolicy{}
|
|
||||||
if err = json.Unmarshal(plist.Policies[0], &elbPolicy); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if elbPolicy.Protocol == protocol && elbPolicy.InternalPort == internalPort && elbPolicy.ExternalPort == externalPort && elbPolicy.ILB == flags.isILB {
|
|
||||||
if len(vip) > 0 {
|
|
||||||
if len(elbPolicy.VIPs) == 0 || elbPolicy.VIPs[0] != vip {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else if len(elbPolicy.VIPs) != 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
klog.V(1).InfoS("Found existing Hns loadbalancer policy resource", "policies", plist)
|
|
||||||
return &loadBalancerInfo{
|
|
||||||
hnsID: plist.ID,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var hnsEndpoints []hcsshim.HNSEndpoint
|
var hnsEndpoints []hcsshim.HNSEndpoint
|
||||||
@ -243,9 +290,12 @@ func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFl
|
|||||||
} else {
|
} else {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &loadBalancerInfo{
|
lbInfo := &loadBalancerInfo{
|
||||||
hnsID: lb.ID,
|
hnsID: lb.ID,
|
||||||
}, err
|
}
|
||||||
|
// Add to map of load balancers
|
||||||
|
previousLoadBalancers[id] = lbInfo
|
||||||
|
return lbInfo, err
|
||||||
}
|
}
|
||||||
func (hns hnsV1) deleteLoadBalancer(hnsID string) error {
|
func (hns hnsV1) deleteLoadBalancer(hnsID string) error {
|
||||||
if len(hnsID) == 0 {
|
if len(hnsID) == 0 {
|
||||||
|
@ -68,6 +68,39 @@ func (hns hnsV2) getNetworkByName(name string) (*hnsNetworkInfo, error) {
|
|||||||
remoteSubnets: remoteSubnets,
|
remoteSubnets: remoteSubnets,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (hns hnsV2) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointsInfo), error) {
|
||||||
|
hcnnetwork, err := hcn.GetNetworkByName(networkName)
|
||||||
|
if err != nil {
|
||||||
|
klog.ErrorS(err, "failed to get HNS network by name", "name", networkName)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
endpoints, err := hcn.ListEndpointsOfNetwork(hcnnetwork.Id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to list endpoints: %w", err)
|
||||||
|
}
|
||||||
|
endpointInfos := make(map[string]*(endpointsInfo))
|
||||||
|
for _, ep := range endpoints {
|
||||||
|
// Add to map with key endpoint ID or IP address
|
||||||
|
// Storing this is expensive in terms of memory, however there is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address.
|
||||||
|
// TODO: Store by IP only and remove any lookups by endpoint ID.
|
||||||
|
endpointInfos[ep.Id] = &endpointsInfo{
|
||||||
|
ip: ep.IpConfigurations[0].IpAddress,
|
||||||
|
isLocal: uint32(ep.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0,
|
||||||
|
macAddress: ep.MacAddress,
|
||||||
|
hnsID: ep.Id,
|
||||||
|
hns: hns,
|
||||||
|
// only ready and not terminating endpoints were added to HNS
|
||||||
|
ready: true,
|
||||||
|
serving: true,
|
||||||
|
terminating: false,
|
||||||
|
}
|
||||||
|
endpointInfos[ep.IpConfigurations[0].IpAddress] = endpointInfos[ep.Id]
|
||||||
|
}
|
||||||
|
klog.V(3).InfoS("Queried endpoints from network", "network", networkName)
|
||||||
|
return endpointInfos, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (hns hnsV2) getEndpointByID(id string) (*endpointsInfo, error) {
|
func (hns hnsV2) getEndpointByID(id string) (*endpointsInfo, error) {
|
||||||
hnsendpoint, err := hcn.GetEndpointByID(id)
|
hnsendpoint, err := hcn.GetEndpointByID(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -111,7 +144,6 @@ func (hns hnsV2) getEndpointByIpAddress(ip string, networkName string) (*endpoin
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
|
return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
|
||||||
}
|
}
|
||||||
func (hns hnsV2) getEndpointByName(name string) (*endpointsInfo, error) {
|
func (hns hnsV2) getEndpointByName(name string) (*endpointsInfo, error) {
|
||||||
@ -195,45 +227,43 @@ func (hns hnsV2) deleteEndpoint(hnsID string) error {
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
|
||||||
plists, err := hcn.ListLoadBalancers()
|
func (hns hnsV2) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) {
|
||||||
|
lbs, err := hcn.ListLoadBalancers()
|
||||||
|
var id loadBalancerIdentifier
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
|
||||||
for _, plist := range plists {
|
for _, lb := range lbs {
|
||||||
if len(plist.HostComputeEndpoints) != len(endpoints) {
|
portMap := lb.PortMappings[0]
|
||||||
continue
|
if len(lb.FrontendVIPs) == 0 {
|
||||||
|
// Leave VIP uninitialized
|
||||||
|
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsCount: len(lb.HostComputeEndpoints)}
|
||||||
|
} else {
|
||||||
|
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsCount: len(lb.HostComputeEndpoints)}
|
||||||
}
|
}
|
||||||
// Validate if input meets any of the policy lists
|
loadBalancers[id] = &loadBalancerInfo{
|
||||||
lbPortMapping := plist.PortMappings[0]
|
hnsID: lb.Id,
|
||||||
if lbPortMapping.Protocol == uint32(protocol) && lbPortMapping.InternalPort == internalPort && lbPortMapping.ExternalPort == externalPort && (lbPortMapping.Flags&1 != 0) == flags.isILB {
|
|
||||||
if len(vip) > 0 {
|
|
||||||
if len(plist.FrontendVIPs) == 0 || plist.FrontendVIPs[0] != vip {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else if len(plist.FrontendVIPs) != 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
klog.V(1).InfoS("Found existing Hns loadbalancer policy resource", "policies", plist)
|
|
||||||
return &loadBalancerInfo{
|
|
||||||
hnsID: plist.Id,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
klog.V(3).InfoS("Queried load balancers", "count", len(lbs))
|
||||||
|
return loadBalancers, nil
|
||||||
|
}
|
||||||
|
|
||||||
var hnsEndpoints []hcn.HostComputeEndpoint
|
func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
|
||||||
for _, ep := range endpoints {
|
var id loadBalancerIdentifier
|
||||||
endpoint, err := hcn.GetEndpointByID(ep.hnsID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
hnsEndpoints = append(hnsEndpoints, *endpoint)
|
|
||||||
}
|
|
||||||
|
|
||||||
vips := []string{}
|
vips := []string{}
|
||||||
if len(vip) > 0 {
|
if len(vip) > 0 {
|
||||||
|
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsCount: len(endpoints)}
|
||||||
vips = append(vips, vip)
|
vips = append(vips, vip)
|
||||||
|
} else {
|
||||||
|
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsCount: len(endpoints)}
|
||||||
|
}
|
||||||
|
|
||||||
|
if lb, found := previousLoadBalancers[id]; found {
|
||||||
|
klog.V(1).InfoS("Found cached Hns loadbalancer policy resource", "policies", lb)
|
||||||
|
return lb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lbPortMappingFlags := hcn.LoadBalancerPortMappingFlagsNone
|
lbPortMappingFlags := hcn.LoadBalancerPortMappingFlagsNone
|
||||||
@ -284,8 +314,8 @@ func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFl
|
|||||||
Flags: lbFlags,
|
Flags: lbFlags,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, endpoint := range hnsEndpoints {
|
for _, ep := range endpoints {
|
||||||
loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, endpoint.Id)
|
loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, ep.hnsID)
|
||||||
}
|
}
|
||||||
|
|
||||||
lb, err := loadBalancer.Create()
|
lb, err := loadBalancer.Create()
|
||||||
@ -294,11 +324,15 @@ func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFl
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(1).InfoS("Hns loadbalancer policy resource", "loadBalancer", lb)
|
klog.V(1).InfoS("Created Hns loadbalancer policy resource", "loadBalancer", lb)
|
||||||
return &loadBalancerInfo{
|
lbInfo := &loadBalancerInfo{
|
||||||
hnsID: lb.Id,
|
hnsID: lb.Id,
|
||||||
}, err
|
}
|
||||||
|
// Add to map of load balancers
|
||||||
|
previousLoadBalancers[id] = lbInfo
|
||||||
|
return lbInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns hnsV2) deleteLoadBalancer(hnsID string) error {
|
func (hns hnsV2) deleteLoadBalancer(hnsID string) error {
|
||||||
lb, err := hcn.GetLoadBalancerByID(hnsID)
|
lb, err := hcn.GetLoadBalancerByID(hnsID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -327,6 +327,7 @@ func testDeleteEndpoint(t *testing.T, hns HostNetworkService) {
|
|||||||
|
|
||||||
func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) {
|
func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) {
|
||||||
Network := mustTestNetwork(t)
|
Network := mustTestNetwork(t)
|
||||||
|
lbs := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
|
||||||
|
|
||||||
ipConfig := &hcn.IpConfig{
|
ipConfig := &hcn.IpConfig{
|
||||||
IpAddress: epIpAddress,
|
IpAddress: epIpAddress,
|
||||||
@ -358,13 +359,16 @@ func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
// We populate this to ensure we test for getting existing load balancer
|
||||||
|
id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsCount: len(Endpoints)}
|
||||||
|
lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id}
|
||||||
|
|
||||||
endpoint := &endpointsInfo{
|
endpoint := &endpointsInfo{
|
||||||
ip: Endpoint.IpConfigurations[0].IpAddress,
|
ip: Endpoint.IpConfigurations[0].IpAddress,
|
||||||
hnsID: Endpoint.Id,
|
hnsID: Endpoint.Id,
|
||||||
}
|
}
|
||||||
endpoints := []endpointsInfo{*endpoint}
|
endpoints := []endpointsInfo{*endpoint}
|
||||||
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort)
|
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -388,6 +392,8 @@ func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) {
|
|||||||
}
|
}
|
||||||
func testGetLoadBalancerNew(t *testing.T, hns HostNetworkService) {
|
func testGetLoadBalancerNew(t *testing.T, hns HostNetworkService) {
|
||||||
Network := mustTestNetwork(t)
|
Network := mustTestNetwork(t)
|
||||||
|
// We keep this empty to ensure we test for new load balancer creation.
|
||||||
|
lbs := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
|
||||||
|
|
||||||
ipConfig := &hcn.IpConfig{
|
ipConfig := &hcn.IpConfig{
|
||||||
IpAddress: epIpAddress,
|
IpAddress: epIpAddress,
|
||||||
@ -409,7 +415,7 @@ func testGetLoadBalancerNew(t *testing.T, hns HostNetworkService) {
|
|||||||
hnsID: Endpoint.Id,
|
hnsID: Endpoint.Id,
|
||||||
}
|
}
|
||||||
endpoints := []endpointsInfo{*endpoint}
|
endpoints := []endpointsInfo{*endpoint}
|
||||||
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort)
|
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
@ -96,6 +96,14 @@ type loadBalancerInfo struct {
|
|||||||
hnsID string
|
hnsID string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type loadBalancerIdentifier struct {
|
||||||
|
protocol uint16
|
||||||
|
internalPort uint16
|
||||||
|
externalPort uint16
|
||||||
|
vip string
|
||||||
|
endpointsCount int
|
||||||
|
}
|
||||||
|
|
||||||
type loadBalancerFlags struct {
|
type loadBalancerFlags struct {
|
||||||
isILB bool
|
isILB bool
|
||||||
isDSR bool
|
isDSR bool
|
||||||
@ -1032,15 +1040,32 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
staleServices.Insert(svcInfo.ClusterIP().String())
|
staleServices.Insert(svcInfo.ClusterIP().String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Query HNS for endpoints and load balancers
|
||||||
|
queriedEndpoints, err := hns.getAllEndpointsByNetwork(hnsNetworkName)
|
||||||
|
if err != nil {
|
||||||
|
klog.ErrorS(err, "Querying HNS for endpoints failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if queriedEndpoints == nil {
|
||||||
|
klog.V(4).InfoS("No existing endpoints found in HNS")
|
||||||
|
queriedEndpoints = make(map[string]*(endpointsInfo))
|
||||||
|
}
|
||||||
|
queriedLoadBalancers, err := hns.getAllLoadBalancers()
|
||||||
|
if queriedLoadBalancers == nil {
|
||||||
|
klog.V(4).InfoS("No existing load balancers found in HNS")
|
||||||
|
queriedLoadBalancers = make(map[loadBalancerIdentifier]*(loadBalancerInfo))
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
klog.ErrorS(err, "Querying HNS for load balancers failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
|
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
|
||||||
existingSourceVip, err := hns.getEndpointByIpAddress(proxier.sourceVip, hnsNetworkName)
|
if _, ok := queriedEndpoints[proxier.sourceVip]; !ok {
|
||||||
if existingSourceVip == nil {
|
|
||||||
_, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
|
_, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
|
||||||
}
|
if err != nil {
|
||||||
if err != nil {
|
klog.ErrorS(err, "Source Vip endpoint creation failed")
|
||||||
klog.ErrorS(err, "Source Vip endpoint creation failed")
|
return
|
||||||
return
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1060,7 +1085,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
|
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
|
||||||
serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.ClusterIP().String(), hnsNetworkName)
|
serviceVipEndpoint := queriedEndpoints[svcInfo.ClusterIP().String()]
|
||||||
if serviceVipEndpoint == nil {
|
if serviceVipEndpoint == nil {
|
||||||
klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP())
|
klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP())
|
||||||
hnsEndpoint := &endpointsInfo{
|
hnsEndpoint := &endpointsInfo{
|
||||||
@ -1079,6 +1104,9 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
|
newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
|
||||||
*newHnsEndpoint.refCount++
|
*newHnsEndpoint.refCount++
|
||||||
svcInfo.remoteEndpoint = newHnsEndpoint
|
svcInfo.remoteEndpoint = newHnsEndpoint
|
||||||
|
// store newly created endpoints in queriedEndpoints
|
||||||
|
queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint
|
||||||
|
queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1099,7 +1127,6 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
if !ep.IsReady() {
|
if !ep.IsReady() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var newHnsEndpoint *endpointsInfo
|
var newHnsEndpoint *endpointsInfo
|
||||||
hnsNetworkName := proxier.network.name
|
hnsNetworkName := proxier.network.name
|
||||||
var err error
|
var err error
|
||||||
@ -1110,17 +1137,19 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
if svcInfo.targetPort == 0 {
|
if svcInfo.targetPort == 0 {
|
||||||
svcInfo.targetPort = int(ep.port)
|
svcInfo.targetPort = int(ep.port)
|
||||||
}
|
}
|
||||||
|
// There is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address, so we need to check using endpoint ID first.
|
||||||
|
// TODO: Remove lookup by endpoint ID, and use the IP address only, so we don't need to maintain multiple keys for lookup.
|
||||||
if len(ep.hnsID) > 0 {
|
if len(ep.hnsID) > 0 {
|
||||||
newHnsEndpoint, err = hns.getEndpointByID(ep.hnsID)
|
newHnsEndpoint = queriedEndpoints[ep.hnsID]
|
||||||
}
|
}
|
||||||
|
|
||||||
if newHnsEndpoint == nil {
|
if newHnsEndpoint == nil {
|
||||||
// First check if an endpoint resource exists for this IP, on the current host
|
// First check if an endpoint resource exists for this IP, on the current host
|
||||||
// A Local endpoint could exist here already
|
// A Local endpoint could exist here already
|
||||||
// A remote endpoint was already created and proxy was restarted
|
// A remote endpoint was already created and proxy was restarted
|
||||||
newHnsEndpoint, err = hns.getEndpointByIpAddress(ep.IP(), hnsNetworkName)
|
newHnsEndpoint = queriedEndpoints[ep.IP()]
|
||||||
}
|
}
|
||||||
|
|
||||||
if newHnsEndpoint == nil {
|
if newHnsEndpoint == nil {
|
||||||
if ep.GetIsLocal() {
|
if ep.GetIsLocal() {
|
||||||
klog.ErrorS(err, "Local endpoint not found: on network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName)
|
klog.ErrorS(err, "Local endpoint not found: on network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName)
|
||||||
@ -1170,7 +1199,6 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// For Overlay networks 'SourceVIP' on an Load balancer Policy can either be chosen as
|
// For Overlay networks 'SourceVIP' on an Load balancer Policy can either be chosen as
|
||||||
// a) Source VIP configured on kube-proxy (or)
|
// a) Source VIP configured on kube-proxy (or)
|
||||||
// b) Node IP of the current node
|
// b) Node IP of the current node
|
||||||
@ -1242,6 +1270,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
Enum(svcInfo.Protocol()),
|
Enum(svcInfo.Protocol()),
|
||||||
uint16(svcInfo.targetPort),
|
uint16(svcInfo.targetPort),
|
||||||
uint16(svcInfo.Port()),
|
uint16(svcInfo.Port()),
|
||||||
|
queriedLoadBalancers,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Policy creation failed")
|
klog.ErrorS(err, "Policy creation failed")
|
||||||
@ -1269,6 +1298,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
Enum(svcInfo.Protocol()),
|
Enum(svcInfo.Protocol()),
|
||||||
uint16(svcInfo.targetPort),
|
uint16(svcInfo.targetPort),
|
||||||
uint16(svcInfo.NodePort()),
|
uint16(svcInfo.NodePort()),
|
||||||
|
queriedLoadBalancers,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Policy creation failed")
|
klog.ErrorS(err, "Policy creation failed")
|
||||||
@ -1300,6 +1330,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
Enum(svcInfo.Protocol()),
|
Enum(svcInfo.Protocol()),
|
||||||
uint16(svcInfo.targetPort),
|
uint16(svcInfo.targetPort),
|
||||||
uint16(svcInfo.Port()),
|
uint16(svcInfo.Port()),
|
||||||
|
queriedLoadBalancers,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Policy creation failed")
|
klog.ErrorS(err, "Policy creation failed")
|
||||||
@ -1328,6 +1359,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
Enum(svcInfo.Protocol()),
|
Enum(svcInfo.Protocol()),
|
||||||
uint16(svcInfo.targetPort),
|
uint16(svcInfo.targetPort),
|
||||||
uint16(svcInfo.Port()),
|
uint16(svcInfo.Port()),
|
||||||
|
queriedLoadBalancers,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Policy creation failed")
|
klog.ErrorS(err, "Policy creation failed")
|
||||||
@ -1354,6 +1386,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
Enum(svcInfo.Protocol()),
|
Enum(svcInfo.Protocol()),
|
||||||
uint16(nodeport),
|
uint16(nodeport),
|
||||||
uint16(nodeport),
|
uint16(nodeport),
|
||||||
|
queriedLoadBalancers,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Policy creation failed")
|
klog.ErrorS(err, "Policy creation failed")
|
||||||
|
@ -69,6 +69,10 @@ func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (hns fakeHNS) getAllEndpointsByNetwork(networkName string) (map[string]*(endpointsInfo), error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (hns fakeHNS) getEndpointByID(id string) (*endpointsInfo, error) {
|
func (hns fakeHNS) getEndpointByID(id string) (*endpointsInfo, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -82,6 +86,10 @@ func (hns fakeHNS) getEndpointByName(name string) (*endpointsInfo, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (hns fakeHNS) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
|
func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
|
||||||
_, ipNet, _ := netutils.ParseCIDRSloppy(destinationPrefix)
|
_, ipNet, _ := netutils.ParseCIDRSloppy(destinationPrefix)
|
||||||
|
|
||||||
@ -112,7 +120,7 @@ func (hns fakeHNS) deleteEndpoint(hnsID string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
|
||||||
return &loadBalancerInfo{
|
return &loadBalancerInfo{
|
||||||
hnsID: guid,
|
hnsID: guid,
|
||||||
}, nil
|
}, nil
|
||||||
|
Loading…
Reference in New Issue
Block a user