From bbd4a07decc7ca2add213cd1c4cc3b5c405e7bfb Mon Sep 17 00:00:00 2001 From: Vinod K L Swamy Date: Mon, 29 Jun 2020 14:31:15 -0700 Subject: [PATCH] Changes to WinKernel to support EndpointSlices --- cmd/kube-proxy/app/server_windows.go | 2 +- pkg/proxy/winkernel/BUILD | 8 +- pkg/proxy/winkernel/proxier.go | 764 ++++++++++----------------- pkg/proxy/winkernel/proxier_test.go | 190 +++++-- 4 files changed, 445 insertions(+), 519 deletions(-) diff --git a/cmd/kube-proxy/app/server_windows.go b/cmd/kube-proxy/app/server_windows.go index 66ed52ccd59..8036c6ffe2d 100644 --- a/cmd/kube-proxy/app/server_windows.go +++ b/cmd/kube-proxy/app/server_windows.go @@ -174,7 +174,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi OOMScoreAdj: config.OOMScoreAdj, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, HealthzServer: healthzServer, - UseEndpointSlices: false, + UseEndpointSlices: utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying), }, nil } diff --git a/pkg/proxy/winkernel/BUILD b/pkg/proxy/winkernel/BUILD index fae9fc2011e..bd62aef52bd 100644 --- a/pkg/proxy/winkernel/BUILD +++ b/pkg/proxy/winkernel/BUILD @@ -15,19 +15,20 @@ go_library( "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", ] + select({ "@io_bazel_rules_go//go/platform:windows": [ - "//pkg/api/v1/service:go_default_library", "//pkg/apis/core/v1/helper:go_default_library", + "//pkg/features:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/apis/config:go_default_library", "//pkg/proxy/config:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/metaproxier:go_default_library", "//pkg/proxy/metrics:go_default_library", + "//pkg/proxy/util:go_default_library", "//pkg/util/async:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", @@ -67,11 +68,14 @@ go_test( "@io_bazel_rules_go//go/platform:windows": [ "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", + "//pkg/proxy/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/github.com/Microsoft/hcsshim/hcn:go_default_library", + "//vendor/k8s.io/utils/pointer:go_default_library", ], "//conditions:default": [], }), diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 4c1cae087af..04925251f1d 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -23,33 +23,33 @@ import ( "fmt" "net" "os" - "reflect" + "strconv" "sync" "sync/atomic" "time" "github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim/hcn" + discovery "k8s.io/api/discovery/v1beta1" "github.com/davecgh/go-spew/spew" - "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" genericfeatures "k8s.io/apiserver/pkg/features" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" - apiservice "k8s.io/kubernetes/pkg/api/v1/service" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/apis/config" proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metaproxier" "k8s.io/kubernetes/pkg/proxy/metrics" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/async" utilnet "k8s.io/utils/net" ) @@ -108,25 +108,16 @@ type loadBalancerFlags struct { // internal struct for string service information type serviceInfo struct { - clusterIP net.IP - port int - protocol v1.Protocol - nodePort int - targetPort int - loadBalancerStatus v1.LoadBalancerStatus - sessionAffinityType v1.ServiceAffinity - stickyMaxAgeSeconds int - externalIPs []*externalIPInfo - loadBalancerIngressIPs []*loadBalancerIngressInfo - loadBalancerSourceRanges []string - onlyNodeLocalEndpoints bool - healthCheckNodePort int - hnsID string - nodePorthnsID string - policyApplied bool - remoteEndpoint *endpointsInfo - hns HostNetworkService - preserveDIP bool + *proxy.BaseServiceInfo + targetPort int + externalIPs []*externalIPInfo + loadBalancerIngressIPs []*loadBalancerIngressInfo + hnsID string + nodePorthnsID string + policyApplied bool + remoteEndpoint *endpointsInfo + hns HostNetworkService + preserveDIP bool } type hnsNetworkInfo struct { @@ -144,7 +135,7 @@ type remoteSubnetInfo struct { } func Log(v interface{}, message string, level klog.Level) { - klog.V(level).Infof("%s, %s", message, spew.Sdump(v)) + klog.V(level).Infof("%s, %s", message, spewSdump(v)) } func LogJson(v interface{}, message string, level klog.Level) { @@ -154,6 +145,12 @@ func LogJson(v interface{}, message string, level klog.Level) { } } +func spewSdump(v interface{}) string { + scs := spew.NewDefaultConfig() + scs.DisableMethods = true + return scs.Sdump(v) +} + // internal struct for endpoints information type endpointsInfo struct { ip string @@ -166,7 +163,37 @@ type endpointsInfo struct { hns HostNetworkService } -//Uses mac prefix and IP address to return a mac address +// String is part of proxy.Endpoint interface. +func (info *endpointsInfo) String() string { + return net.JoinHostPort(info.ip, strconv.Itoa(int(info.port))) +} + +// GetIsLocal is part of proxy.Endpoint interface. +func (info *endpointsInfo) GetIsLocal() bool { + return info.isLocal +} + +// GetTopology returns the topology information of the endpoint. +func (info *endpointsInfo) GetTopology() map[string]string { + return nil +} + +// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. +func (info *endpointsInfo) IP() string { + return info.ip +} + +// Port returns just the Port part of the endpoint. +func (info *endpointsInfo) Port() (int, error) { + return int(info.port), nil +} + +// Equal is part of proxy.Endpoint interface. +func (info *endpointsInfo) Equal(other proxy.Endpoint) bool { + return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal() +} + +//Uses mac prefix and IPv4 address to return a mac address //This ensures mac addresses are unique for proper load balancing //There is a possibility of MAC collisions but this Mac address is used for remote endpoints only //and not sent on the wire. @@ -181,15 +208,97 @@ func conjureMac(macPrefix string, ip net.IP) string { return "02-11-22-33-44-55" } -func newEndpointInfo(ip string, port uint16, isLocal bool, hns HostNetworkService) *endpointsInfo { +func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) { + for svcPortName := range oldEndpointsMap { + proxier.onEndpointsMapChange(&svcPortName) + } + + for svcPortName := range newEndpointsMap { + proxier.onEndpointsMapChange(&svcPortName) + } +} + +func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName) { + + svc, exists := proxier.serviceMap[*svcPortName] + + if exists { + svcInfo, ok := svc.(*serviceInfo) + + if !ok { + klog.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + return + } + + klog.V(3).Infof("Endpoints are modified. Service [%v] is stale", *svcPortName) + svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName]) + } else { + // If no service exists, just cleanup the remote endpoints + klog.V(3).Infof("Endpoints are orphaned. Cleaning up") + // Cleanup Endpoints references + epInfos, exists := proxier.endpointsMap[*svcPortName] + + if exists { + // Cleanup Endpoints references + for _, ep := range epInfos { + epInfo, ok := ep.(*endpointsInfo) + + if ok { + epInfo.Cleanup() + } + + } + } + } +} + +func (proxier *Proxier) serviceMapChange(previous, current proxy.ServiceMap) { + for svcPortName := range current { + proxier.onServiceMapChange(&svcPortName) + } + + for svcPortName := range previous { + if _, ok := current[svcPortName]; ok { + continue + } + proxier.onServiceMapChange(&svcPortName) + } +} + +func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) { + + svc, exists := proxier.serviceMap[*svcPortName] + + if exists { + svcInfo, ok := svc.(*serviceInfo) + + if !ok { + klog.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + return + } + + klog.V(3).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, svcInfo.ClusterIP(), svcInfo.Port(), svcInfo.Protocol()) + svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName]) + } +} + +// returns a new proxy.Endpoint which abstracts a endpointsInfo +func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo) proxy.Endpoint { + + portNumber, err := baseInfo.Port() + + if err != nil { + portNumber = 0 + } + info := &endpointsInfo{ - ip: ip, - port: port, - isLocal: isLocal, - macAddress: conjureMac("02-11", net.ParseIP(ip)), + ip: baseInfo.IP(), + port: uint16(portNumber), + isLocal: baseInfo.GetIsLocal(), + macAddress: conjureMac("02-11", net.ParseIP(baseInfo.IP())), refCount: new(uint16), hnsID: "", - hns: hns, + hns: proxier.hns, } return info @@ -215,13 +324,13 @@ func (ep *endpointsInfo) Cleanup() { // Remove the remote hns endpoint, if no service is referring it // Never delete a Local Endpoint. Local Endpoints are already created by other entities. // Remove only remote endpoints created by this service - if (ep.refCount == nil || *ep.refCount <= 0) && !ep.isLocal { + if (ep.refCount == nil || *ep.refCount <= 0) && !ep.GetIsLocal() { klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep) err := ep.hns.deleteEndpoint(ep.hnsID) if err == nil { ep.hnsID = "" } else { - klog.Errorf("Endpoint deletion failed for %v: %v", ep.ip, err) + klog.Errorf("Endpoint deletion failed for %v: %v", ep.IP(), err) } } } @@ -235,21 +344,9 @@ func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 return refCount } -// returns a new serviceInfo struct -func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, service *v1.Service, hns HostNetworkService) *serviceInfo { - onlyNodeLocalEndpoints := false - if apiservice.RequestsOnlyLocalTraffic(service) { - onlyNodeLocalEndpoints = true - } - - // set default session sticky max age 180min=10800s - stickyMaxAgeSeconds := 10800 - if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP && service.Spec.SessionAffinityConfig != nil { - stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) - } - - klog.Infof("Service %q preserve-destination: %v", svcPortName.NamespacedName.String(), service.Annotations["preserve-destination"]) - +// returns a new proxy.ServicePort which abstracts a serviceInfo +func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort { + info := &serviceInfo{BaseServiceInfo: baseInfo} preserveDIP := service.Annotations["preserve-destination"] == "true" err := hcn.DSRSupported() if err != nil { @@ -261,39 +358,18 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, ser if port.TargetPort.Type == intstr.Int { targetPort = port.TargetPort.IntValue() } - info := &serviceInfo{ - clusterIP: net.ParseIP(service.Spec.ClusterIP), - port: int(port.Port), - protocol: port.Protocol, - nodePort: int(port.NodePort), - targetPort: targetPort, - // Deep-copy in case the service instance changes - loadBalancerStatus: *service.Status.LoadBalancer.DeepCopy(), - sessionAffinityType: service.Spec.SessionAffinity, - stickyMaxAgeSeconds: stickyMaxAgeSeconds, - loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), - onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, - hns: hns, - preserveDIP: preserveDIP, - } - copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) + info.preserveDIP = preserveDIP + info.targetPort = targetPort + info.hns = proxier.hns + for _, eip := range service.Spec.ExternalIPs { info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip}) } + for _, ingress := range service.Status.LoadBalancer.Ingress { info.loadBalancerIngressIPs = append(info.loadBalancerIngressIPs, &loadBalancerIngressInfo{ip: ingress.IP}) } - - if apiservice.NeedsHealthCheck(service) { - p := service.Spec.HealthCheckNodePort - if p == 0 { - klog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String()) - } else { - info.healthCheckNodePort = int(p) - } - } - return info } @@ -315,180 +391,11 @@ func (network hnsNetworkInfo) findRemoteSubnetProviderAddress(ip string) string return providerAddress } -type endpointsChange struct { - previous proxyEndpointsMap - current proxyEndpointsMap -} - -type endpointsChangeMap struct { - lock sync.Mutex - hostname string - items map[types.NamespacedName]*endpointsChange -} - -type serviceChange struct { - previous proxyServiceMap - current proxyServiceMap -} - -type serviceChangeMap struct { - lock sync.Mutex - items map[types.NamespacedName]*serviceChange -} - -type updateEndpointMapResult struct { - hcEndpoints map[types.NamespacedName]int - staleEndpoints map[endpointServicePair]bool - staleServiceNames map[proxy.ServicePortName]bool -} - -type updateServiceMapResult struct { - hcServices map[types.NamespacedName]uint16 - staleServices sets.String -} -type proxyServiceMap map[proxy.ServicePortName]*serviceInfo -type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo type endPointsReferenceCountMap map[string]*uint16 -func newEndpointsChangeMap(hostname string) endpointsChangeMap { - return endpointsChangeMap{ - hostname: hostname, - items: make(map[types.NamespacedName]*endpointsChange), - } -} - -func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Endpoints, hns HostNetworkService) bool { - ecm.lock.Lock() - defer ecm.lock.Unlock() - - change, exists := ecm.items[*namespacedName] - if !exists { - change = &endpointsChange{} - change.previous = endpointsToEndpointsMap(previous, ecm.hostname, hns) - ecm.items[*namespacedName] = change - } - change.current = endpointsToEndpointsMap(current, ecm.hostname, hns) - if reflect.DeepEqual(change.previous, change.current) { - delete(ecm.items, *namespacedName) - } - return len(ecm.items) > 0 -} - -func newServiceChangeMap() serviceChangeMap { - return serviceChangeMap{ - items: make(map[types.NamespacedName]*serviceChange), - } -} - -func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Service, hns HostNetworkService) bool { - scm.lock.Lock() - defer scm.lock.Unlock() - - change, exists := scm.items[*namespacedName] - if !exists { - // Service is Added - change = &serviceChange{} - change.previous = serviceToServiceMap(previous, hns) - scm.items[*namespacedName] = change - } - change.current = serviceToServiceMap(current, hns) - if reflect.DeepEqual(change.previous, change.current) { - delete(scm.items, *namespacedName) - } - return len(scm.items) > 0 -} - -func (sm *proxyServiceMap) merge(other proxyServiceMap, curEndpoints proxyEndpointsMap) sets.String { - existingPorts := sets.NewString() - for svcPortName, info := range other { - existingPorts.Insert(svcPortName.Port) - svcInfo, exists := (*sm)[svcPortName] - if !exists { - klog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) - } else { - klog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) - svcInfo.cleanupAllPolicies(curEndpoints[svcPortName]) - delete(*sm, svcPortName) - } - (*sm)[svcPortName] = info - } - return existingPorts -} - -func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String, curEndpoints proxyEndpointsMap) { - for svcPortName := range other { - if existingPorts.Has(svcPortName.Port) { - continue - } - info, exists := (*sm)[svcPortName] - if exists { - klog.V(1).Infof("Removing service port %q", svcPortName) - if info.protocol == v1.ProtocolUDP { - staleServices.Insert(info.clusterIP.String()) - } - info.cleanupAllPolicies(curEndpoints[svcPortName]) - delete(*sm, svcPortName) - } else { - klog.Errorf("Service port %q removed, but doesn't exists", svcPortName) - } - } -} - -func (em proxyEndpointsMap) merge(other proxyEndpointsMap, curServices proxyServiceMap) { - // Endpoint Update/Add - for svcPortName := range other { - epInfos, exists := em[svcPortName] - if exists { - // - info, exists := curServices[svcPortName] - klog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) - if exists { - klog.V(2).Infof("Endpoints are modified. Service [%v] is stale", svcPortName) - info.cleanupAllPolicies(epInfos) - } else { - // If no service exists, just cleanup the remote endpoints - klog.V(2).Infof("Endpoints are orphaned. Cleaning up") - // Cleanup Endpoints references - for _, ep := range epInfos { - ep.Cleanup() - } - - } - - delete(em, svcPortName) - } - em[svcPortName] = other[svcPortName] - } -} - -func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap, curServices proxyServiceMap) { - // Endpoint Update/Removal - for svcPortName := range other { - info, exists := curServices[svcPortName] - if exists { - klog.V(2).Infof("Service [%v] is stale", info) - info.cleanupAllPolicies(em[svcPortName]) - } else { - // If no service exists, just cleanup the remote endpoints - klog.V(2).Infof("Endpoints are orphaned. Cleaning up") - // Cleanup Endpoints references - epInfos, exists := em[svcPortName] - if exists { - for _, ep := range epInfos { - ep.Cleanup() - } - } - } - - delete(em, svcPortName) - } -} - // Proxier is an hns based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { - // EndpointSlice support has not been added for this proxier yet. - proxyconfig.NoopEndpointSliceHandler // TODO(imroc): implement node handler for winkernel proxier. proxyconfig.NoopNodeHandler @@ -496,23 +403,22 @@ type Proxier struct { // services that happened since policies were synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. - endpointsChanges endpointsChangeMap - serviceChanges serviceChangeMap - - mu sync.Mutex // protects the following fields - serviceMap proxyServiceMap - endpointsMap proxyEndpointsMap - portsMap map[localPort]closeable + endpointsChanges *proxy.EndpointChangeTracker + serviceChanges *proxy.ServiceChangeTracker endPointsRefCount endPointsReferenceCountMap + mu sync.Mutex // protects the following fields + serviceMap proxy.ServiceMap + endpointsMap proxy.EndpointsMap + portsMap map[utilproxy.LocalPort]utilproxy.Closeable // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating hns policies // with some partial data after kube-proxy restart. - endpointsSynced bool - servicesSynced bool - isIPv6Mode bool - initialized int32 - syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules - + endpointsSynced bool + endpointSlicesSynced bool + servicesSynced bool + isIPv6Mode bool + initialized int32 + syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules // These are effectively const and do not need the mutex to be held. masqueradeAll bool masqueradeMark string @@ -673,14 +579,12 @@ func NewProxier( } isIPv6 := utilnet.IsIPv6(nodeIP) - + endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) proxier := &Proxier{ - portsMap: make(map[localPort]closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), endPointsRefCount: make(endPointsReferenceCountMap), + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + serviceMap: make(proxy.ServiceMap), + endpointsMap: make(proxy.EndpointsMap), masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, clusterCIDR: clusterCIDR, @@ -698,11 +602,15 @@ func NewProxier( isIPv6Mode: isIPv6, } + serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, &isIPv6, recorder, proxier.serviceMapChange) + endPointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled, proxier.endpointsMapChange) + proxier.endpointsChanges = endPointChangeTracker + proxier.serviceChanges = serviceChanges + burstSyncs := 2 klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) return proxier, nil - } func NewDualStackProxier( @@ -748,13 +656,16 @@ func CleanupLeftovers() (encounteredError bool) { return encounteredError } -func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []*endpointsInfo) { +func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) { Log(svcInfo, "Service Cleanup", 3) // Skip the svcInfo.policyApplied check to remove all the policies svcInfo.deleteAllHnsLoadBalancerPolicy() // Cleanup Endpoints references for _, ep := range endpoints { - ep.Cleanup() + epInfo, ok := ep.(*endpointsInfo) + if ok { + epInfo.Cleanup() + } } if svcInfo.remoteEndpoint != nil { svcInfo.remoteEndpoint.Cleanup() @@ -846,17 +757,13 @@ func (proxier *Proxier) isInitialized() bool { // OnServiceAdd is called whenever creation of new service object // is observed. func (proxier *Proxier) OnServiceAdd(service *v1.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, nil, service, proxier.hns) && proxier.isInitialized() { - proxier.Sync() - } + proxier.OnServiceUpdate(nil, service) } // OnServiceUpdate is called whenever modification of an existing // service object is observed. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, oldService, service, proxier.hns) && proxier.isInitialized() { + if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.Sync() } } @@ -864,10 +771,7 @@ func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { // OnServiceDelete is called whenever deletion of an existing service // object is observed. func (proxier *Proxier) OnServiceDelete(service *v1.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, service, nil, proxier.hns) && proxier.isInitialized() { - proxier.Sync() - } + proxier.OnServiceUpdate(service, nil) } // OnServiceSynced is called once all the initial event handlers were @@ -875,7 +779,11 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) + if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) { + proxier.setInitialized(proxier.endpointSlicesSynced) + } else { + proxier.setInitialized(proxier.endpointsSynced) + } proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -896,50 +804,17 @@ func shouldSkipService(svcName types.NamespacedName, service *v1.Service) bool { return false } -// is updated by this function (based on the given changes). -// map is cleared after applying them. -func (proxier *Proxier) updateServiceMap() (result updateServiceMapResult) { - result.staleServices = sets.NewString() - - serviceMap := proxier.serviceMap - changes := &proxier.serviceChanges - - func() { - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { - existingPorts := serviceMap.merge(change.current, proxier.endpointsMap) - serviceMap.unmerge(change.previous, existingPorts, result.staleServices, proxier.endpointsMap) - } - changes.items = make(map[types.NamespacedName]*serviceChange) - }() - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to serviceMap. - result.hcServices = make(map[types.NamespacedName]uint16) - for svcPortName, info := range serviceMap { - if info.healthCheckNodePort != 0 { - result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) - } - } - - return result -} - // OnEndpointsAdd is called whenever creation of new endpoints object // is observed. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, nil, endpoints, proxier.hns) && proxier.isInitialized() { - proxier.Sync() - } + proxier.OnEndpointsUpdate(nil, endpoints) } // OnEndpointsUpdate is called whenever modification of an existing // endpoints object is observed. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints, proxier.hns) && proxier.isInitialized() { + + if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() { proxier.Sync() } } @@ -947,10 +822,7 @@ func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) // OnEndpointsDelete is called whenever deletion of an existing endpoints // object is observed. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, endpoints, nil, proxier.hns) && proxier.isInitialized() { - proxier.Sync() - } + proxier.OnEndpointsUpdate(endpoints, nil) } // OnEndpointsSynced is called once all the initial event handlers were @@ -965,8 +837,49 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.syncProxyRules() } +// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object +// is observed. +func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) { + if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() { + proxier.Sync() + } +} + +// OnEndpointSliceUpdate is called whenever modification of an existing endpoint +// slice object is observed. +func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) { + if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() { + proxier.Sync() + } +} + +// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice +// object is observed. +func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) { + if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() { + proxier.Sync() + } +} + +// OnEndpointSlicesSynced is called once all the initial event handlers were +// called and the state is fully propagated to local cache. +func (proxier *Proxier) OnEndpointSlicesSynced() { + proxier.mu.Lock() + proxier.endpointSlicesSynced = true + proxier.setInitialized(proxier.servicesSynced) + proxier.mu.Unlock() + + // Sync unconditionally - this is called once per lifetime. + proxier.syncProxyRules() +} + func (proxier *Proxier) cleanupAllPolicies() { - for svcName, svcInfo := range proxier.serviceMap { + for svcName, svc := range proxier.serviceMap { + svcInfo, ok := svc.(*serviceInfo) + if !ok { + klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) + continue + } svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName]) } } @@ -984,118 +897,6 @@ func isNetworkNotFoundError(err error) bool { return false } -// is updated by this function (based on the given changes). -// map is cleared after applying them. -func (proxier *Proxier) updateEndpointsMap() (result updateEndpointMapResult) { - result.staleEndpoints = make(map[endpointServicePair]bool) - result.staleServiceNames = make(map[proxy.ServicePortName]bool) - - endpointsMap := proxier.endpointsMap - changes := &proxier.endpointsChanges - - func() { - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { - endpointsMap.unmerge(change.previous, proxier.serviceMap) - endpointsMap.merge(change.current, proxier.serviceMap) - } - changes.items = make(map[types.NamespacedName]*endpointsChange) - }() - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to endpointsMap. - result.hcEndpoints = make(map[types.NamespacedName]int) - localIPs := getLocalIPs(endpointsMap) - for nsn, ips := range localIPs { - result.hcEndpoints[nsn] = len(ips) - } - - return result -} -func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { - localIPs := make(map[types.NamespacedName]sets.String) - for svcPortName := range endpointsMap { - for _, ep := range endpointsMap[svcPortName] { - if ep.isLocal { - nsn := svcPortName.NamespacedName - if localIPs[nsn] == nil { - localIPs[nsn] = sets.NewString() - } - localIPs[nsn].Insert(ep.ip) // just the IP part - } - } - } - return localIPs -} - -// Translates single Endpoints object to proxyEndpointsMap. -// This function is used for incremental updated of endpointsMap. -// -// NOTE: endpoints object should NOT be modified. -func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string, hns HostNetworkService) proxyEndpointsMap { - if endpoints == nil { - return nil - } - - endpointsMap := make(proxyEndpointsMap) - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - for i := range endpoints.Subsets { - ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - if port.Port == 0 { - klog.Warningf("Ignoring invalid endpoint port %s", port.Name) - continue - } - svcPortName := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, - Port: port.Name, - } - for i := range ss.Addresses { - addr := &ss.Addresses[i] - if addr.IP == "" { - klog.Warningf("Ignoring invalid endpoint port %s with empty host", port.Name) - continue - } - isLocal := addr.NodeName != nil && *addr.NodeName == hostname - epInfo := newEndpointInfo(addr.IP, uint16(port.Port), isLocal, hns) - endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo) - } - if klog.V(3).Enabled() { - newEPList := []*endpointsInfo{} - for _, ep := range endpointsMap[svcPortName] { - newEPList = append(newEPList, ep) - } - klog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList) - } - } - } - return endpointsMap -} - -// Translates single Service object to proxyServiceMap. -// -// NOTE: service object should NOT be modified. -func serviceToServiceMap(service *v1.Service, hns HostNetworkService) proxyServiceMap { - if service == nil { - return nil - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if shouldSkipService(svcName, service) { - return nil - } - - serviceMap := make(proxyServiceMap) - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service, hns) - } - return serviceMap -} - // This is where all of the hns save/restore calls happen. // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules() { @@ -1108,7 +909,7 @@ func (proxier *Proxier) syncProxyRules() { klog.V(4).Infof("syncProxyRules took %v", time.Since(start)) }() // don't sync rules till we've received services and endpoints - if !proxier.endpointsSynced || !proxier.servicesSynced { + if !proxier.isInitialized() { klog.V(2).Info("Not syncing hns until Services and Endpoints have been received from master") return } @@ -1130,15 +931,15 @@ func (proxier *Proxier) syncProxyRules() { // We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. - serviceUpdateResult := proxier.updateServiceMap() - endpointUpdateResult := proxier.updateEndpointsMap() + serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges) + endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) - staleServices := serviceUpdateResult.staleServices + staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap - for svcPortName := range endpointUpdateResult.staleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == v1.ProtocolUDP { - klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) - staleServices.Insert(svcInfo.clusterIP.String()) + for _, svcPortName := range endpointUpdateResult.StaleServiceNames { + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { + klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String()) + staleServices.Insert(svcInfo.ClusterIP().String()) } } @@ -1156,18 +957,24 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).Infof("Syncing Policies") // Program HNS by adding corresponding policies for each service. - for svcName, svcInfo := range proxier.serviceMap { + for svcName, svc := range proxier.serviceMap { + svcInfo, ok := svc.(*serviceInfo) + if !ok { + klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) + continue + } + if svcInfo.policyApplied { - klog.V(4).Infof("Policy already applied for %s", spew.Sdump(svcInfo)) + klog.V(4).Infof("Policy already applied for %s", spewSdump(svcInfo)) continue } if proxier.network.networkType == "Overlay" { - serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.clusterIP.String(), hnsNetworkName) + serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.ClusterIP().String(), hnsNetworkName) if serviceVipEndpoint == nil { - klog.V(4).Infof("No existing remote endpoint for service VIP %v", svcInfo.clusterIP.String()) + klog.V(4).Infof("No existing remote endpoint for service VIP %v", svcInfo.ClusterIP().String()) hnsEndpoint := &endpointsInfo{ - ip: svcInfo.clusterIP.String(), + ip: svcInfo.ClusterIP().String(), isLocal: false, macAddress: proxier.hostMac, providerAddress: proxier.nodeIP.String(), @@ -1192,7 +999,14 @@ func (proxier *Proxier) syncProxyRules() { containsPublicIP := false containsNodeIP := false - for _, ep := range proxier.endpointsMap[svcName] { + for _, epInfo := range proxier.endpointsMap[svcName] { + + ep, ok := epInfo.(*endpointsInfo) + if !ok { + klog.Errorf("Failed to cast endpointsInfo %q", svcName.String()) + continue + } + var newHnsEndpoint *endpointsInfo hnsNetworkName := proxier.network.name var err error @@ -1212,11 +1026,11 @@ func (proxier *Proxier) syncProxyRules() { // First check if an endpoint resource exists for this IP, on the current host // A Local endpoint could exist here already // A remote endpoint was already created and proxy was restarted - newHnsEndpoint, err = hns.getEndpointByIpAddress(ep.ip, hnsNetworkName) + newHnsEndpoint, err = hns.getEndpointByIpAddress(ep.IP(), hnsNetworkName) } if newHnsEndpoint == nil { - if ep.isLocal { - klog.Errorf("Local endpoint not found for %v: err: %v on network %s", ep.ip, err, hnsNetworkName) + if ep.GetIsLocal() { + klog.Errorf("Local endpoint not found for %v: err: %v on network %s", ep.IP(), err, hnsNetworkName) continue } @@ -1230,11 +1044,9 @@ func (proxier *Proxier) syncProxyRules() { return } proxier.network = *updatedNetwork - - providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.ip) - + providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP()) if len(providerAddress) == 0 { - klog.Infof("Could not find provider address for %s. Assuming it is a public IP", ep.ip) + klog.Infof("Could not find provider address for %s. Assuming it is a public IP", ep.IP()) providerAddress = proxier.nodeIP.String() } @@ -1247,10 +1059,11 @@ func (proxier *Proxier) syncProxyRules() { newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName) if err != nil { - klog.Errorf("Remote endpoint creation failed: %v, %s", err, spew.Sdump(hnsEndpoint)) + klog.Errorf("Remote endpoint creation failed: %v, %s", err, spewSdump(hnsEndpoint)) continue } } else { + hnsEndpoint := &endpointsInfo{ ip: ep.ip, isLocal: false, @@ -1266,11 +1079,11 @@ func (proxier *Proxier) syncProxyRules() { } if proxier.network.networkType == "Overlay" { - providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.ip) + providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP()) - isNodeIP := (ep.ip == providerAddress) + isNodeIP := (ep.IP() == providerAddress) isPublicIP := (len(providerAddress) == 0) - klog.Infof("Endpoint %s on overlay network %s is classified as NodeIp: %v, Public Ip: %v", ep.ip, hnsNetworkName, isNodeIP, isPublicIP) + klog.Infof("Endpoint %s on overlay network %s is classified as NodeIp: %v, Public Ip: %v", ep.IP(), hnsNetworkName, isNodeIP, isPublicIP) containsNodeIP = containsNodeIP || isNodeIP containsPublicIP = containsPublicIP || isPublicIP @@ -1279,7 +1092,7 @@ func (proxier *Proxier) syncProxyRules() { // Save the hnsId for reference LogJson(newHnsEndpoint, "Hns Endpoint resource", 1) hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint) - if newHnsEndpoint.isLocal { + if newHnsEndpoint.GetIsLocal() { hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint) } else { // We only share the refCounts for remote endpoints @@ -1292,7 +1105,7 @@ func (proxier *Proxier) syncProxyRules() { Log(ep, "Endpoint resource found", 3) } - klog.V(3).Infof("Associated endpoints [%s] for service [%s]", spew.Sdump(hnsEndpoints), svcName) + klog.V(3).Infof("Associated endpoints [%s] for service [%s]", spewSdump(hnsEndpoints), svcName) if len(svcInfo.hnsID) > 0 { // This should not happen @@ -1304,14 +1117,14 @@ func (proxier *Proxier) syncProxyRules() { continue } - klog.V(4).Infof("Trying to Apply Policies for service %s", spew.Sdump(svcInfo)) + klog.V(4).Infof("Trying to Apply Policies for service %s", spewSdump(svcInfo)) var hnsLoadBalancer *loadBalancerInfo var sourceVip = proxier.sourceVip if containsPublicIP || containsNodeIP { sourceVip = proxier.nodeIP.String() } - sessionAffinityClientIP := svcInfo.sessionAffinityType == v1.ServiceAffinityClientIP + sessionAffinityClientIP := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP if sessionAffinityClientIP && !proxier.supportedFeatures.SessionAffinity { klog.Warningf("Session Affinity is not supported on this version of Windows.") } @@ -1320,10 +1133,10 @@ func (proxier *Proxier) syncProxyRules() { hnsEndpoints, loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP}, sourceVip, - svcInfo.clusterIP.String(), - Enum(svcInfo.protocol), + svcInfo.ClusterIP().String(), + Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), - uint16(svcInfo.port), + uint16(svcInfo.Port()), ) if err != nil { klog.Errorf("Policy creation failed: %v", err) @@ -1331,10 +1144,10 @@ func (proxier *Proxier) syncProxyRules() { } svcInfo.hnsID = hnsLoadBalancer.hnsID - klog.V(3).Infof("Hns LoadBalancer resource created for cluster ip resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.hnsID) + klog.V(3).Infof("Hns LoadBalancer resource created for cluster ip resources %v, Id [%s]", svcInfo.ClusterIP(), hnsLoadBalancer.hnsID) // If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints - if svcInfo.nodePort > 0 { + if svcInfo.NodePort() > 0 { // If the preserve-destination service annotation is present, we will disable routing mesh for NodePort. // This means that health services can use Node Port without falsely getting results from a different node. nodePortEndpoints := hnsEndpoints @@ -1346,9 +1159,9 @@ func (proxier *Proxier) syncProxyRules() { loadBalancerFlags{localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, sourceVip, "", - Enum(svcInfo.protocol), + Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), - uint16(svcInfo.nodePort), + uint16(svcInfo.NodePort()), ) if err != nil { klog.Errorf("Policy creation failed: %v", err) @@ -1356,7 +1169,7 @@ func (proxier *Proxier) syncProxyRules() { } svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID - klog.V(3).Infof("Hns LoadBalancer resource created for nodePort resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.hnsID) + klog.V(3).Infof("Hns LoadBalancer resource created for nodePort resources %v, Id [%s]", svcInfo.ClusterIP(), hnsLoadBalancer.hnsID) } // Create a Load Balancer Policy for each external IP @@ -1367,9 +1180,9 @@ func (proxier *Proxier) syncProxyRules() { loadBalancerFlags{sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, sourceVip, externalIP.ip, - Enum(svcInfo.protocol), + Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), - uint16(svcInfo.port), + uint16(svcInfo.Port()), ) if err != nil { klog.Errorf("Policy creation failed: %v", err) @@ -1390,9 +1203,9 @@ func (proxier *Proxier) syncProxyRules() { loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, sourceVip, lbIngressIP.ip, - Enum(svcInfo.protocol), + Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), - uint16(svcInfo.port), + uint16(svcInfo.Port()), ) if err != nil { klog.Errorf("Policy creation failed: %v", err) @@ -1413,10 +1226,10 @@ func (proxier *Proxier) syncProxyRules() { // Update service healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. - if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.hcServices); err != nil { + if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { klog.Errorf("Error syncing healthcheck services: %v", err) } - if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { + if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { klog.Errorf("Error syncing healthcheck endpoints: %v", err) } @@ -1434,8 +1247,3 @@ func (proxier *Proxier) syncProxyRules() { } } } - -type endpointServicePair struct { - endpoint string - servicePortName proxy.ServicePortName -} diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index 5d87067c3f2..01a46b44aa7 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -19,18 +19,20 @@ limitations under the License. package winkernel import ( + "fmt" "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" - + utilproxy "k8s.io/kubernetes/pkg/proxy/util" + utilpointer "k8s.io/utils/pointer" "net" "strings" "testing" "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const testHostName = "test-hostname" @@ -99,7 +101,7 @@ func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancer func (hns fakeHNS) deleteLoadBalancer(hnsID string) error { return nil } -func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string) *Proxier { +func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string, endpointSliceEnabled bool) *Proxier { sourceVip := "192.168.1.2" hnsNetworkInfo := &hnsNetworkInfo{ id: strings.ToUpper(guid), @@ -107,11 +109,9 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust networkType: networkType, } proxier := &Proxier{ - portsMap: make(map[localPort]closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + serviceMap: make(proxy.ServiceMap), + endpointsMap: make(proxy.EndpointsMap), clusterCIDR: clusterCIDR, hostname: testHostName, nodeIP: nodeIP, @@ -123,12 +123,19 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust hns: newFakeHNS(), endPointsRefCount: make(endPointsReferenceCountMap), } + + isIPv6 := false + serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, &isIPv6, nil, proxier.serviceMapChange) + endpointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, &isIPv6, nil, endpointSliceEnabled, proxier.endpointsMapChange) + proxier.endpointsChanges = endpointChangeTracker + proxier.serviceChanges = serviceChanges + return proxier } func TestCreateServiceVip(t *testing.T) { syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay") + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false) if proxier == nil { t.Error() } @@ -140,6 +147,7 @@ func TestCreateServiceVip(t *testing.T) { svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", + Protocol: v1.ProtocolTCP, } timeoutSeconds := v1.DefaultClientIPServiceAffinitySeconds @@ -163,19 +171,26 @@ func TestCreateServiceVip(t *testing.T) { }), ) makeEndpointsMap(proxier) - + proxier.setInitialized(true) proxier.syncProxyRules() - if proxier.serviceMap[svcPortName].remoteEndpoint == nil { - t.Error() - } - if proxier.serviceMap[svcPortName].remoteEndpoint.ip != svcIP { - t.Error() + svc := proxier.serviceMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + if svcInfo.remoteEndpoint == nil { + t.Error() + } + if svcInfo.remoteEndpoint.ip != svcIP { + t.Error() + } } } func TestCreateRemoteEndpointOverlay(t *testing.T) { syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay") + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false) if proxier == nil { t.Error() } @@ -186,6 +201,7 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) { svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", + Protocol: v1.ProtocolTCP, } makeServiceMap(proxier, @@ -207,30 +223,38 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) { IP: epIpAddressRemote, }}, Ports: []v1.EndpointPort{{ - Name: svcPortName.Port, - Port: int32(svcPort), + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, }}, }} }), ) - + proxier.setInitialized(true) proxier.syncProxyRules() - if proxier.endpointsMap[svcPortName][0].hnsID != guid { - t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid) + ep := proxier.endpointsMap[svcPortName][0] + epInfo, ok := ep.(*endpointsInfo) + if !ok { + t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String()) + + } else { + if epInfo.hnsID != guid { + t.Errorf("%v does not match %v", epInfo.hnsID, guid) + } } if *proxier.endPointsRefCount[guid] <= 0 { t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid]) } - if *proxier.endPointsRefCount[guid] != *proxier.endpointsMap[svcPortName][0].refCount { - t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *proxier.endpointsMap[svcPortName][0].refCount) + if *proxier.endPointsRefCount[guid] != *epInfo.refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount) } } func TestCreateRemoteEndpointL2Bridge(t *testing.T) { syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge") + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false) if proxier == nil { t.Error() } @@ -241,6 +265,7 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) { svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", + Protocol: v1.ProtocolTCP, } makeServiceMap(proxier, @@ -262,31 +287,38 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) { IP: epIpAddressRemote, }}, Ports: []v1.EndpointPort{{ - Name: svcPortName.Port, - Port: int32(svcPort), + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, }}, }} }), ) - + proxier.setInitialized(true) proxier.syncProxyRules() + ep := proxier.endpointsMap[svcPortName][0] + epInfo, ok := ep.(*endpointsInfo) + if !ok { + t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String()) - if proxier.endpointsMap[svcPortName][0].hnsID != guid { - t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid) + } else { + if epInfo.hnsID != guid { + t.Errorf("%v does not match %v", epInfo.hnsID, guid) + } } if *proxier.endPointsRefCount[guid] <= 0 { t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid]) } - if *proxier.endPointsRefCount[guid] != *proxier.endpointsMap[svcPortName][0].refCount { - t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *proxier.endpointsMap[svcPortName][0].refCount) + if *proxier.endPointsRefCount[guid] != *epInfo.refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount) } } func TestCreateLoadBalancer(t *testing.T) { syncPeriod := 30 * time.Second - proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay") + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false) if proxier == nil { t.Error() } @@ -297,6 +329,7 @@ func TestCreateLoadBalancer(t *testing.T) { svcPortName := proxy.ServicePortName{ NamespacedName: makeNSN("ns1", "svc1"), Port: "p80", + Protocol: v1.ProtocolTCP, } makeServiceMap(proxier, @@ -318,20 +351,101 @@ func TestCreateLoadBalancer(t *testing.T) { IP: epIpAddressRemote, }}, Ports: []v1.EndpointPort{{ - Name: svcPortName.Port, - Port: int32(svcPort), + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, }}, }} }), ) + proxier.setInitialized(true) proxier.syncProxyRules() - if proxier.serviceMap[svcPortName].hnsID != guid { - t.Errorf("%v does not match %v", proxier.serviceMap[svcPortName].hnsID, guid) + svc := proxier.serviceMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + if svcInfo.hnsID != guid { + t.Errorf("%v does not match %v", svcInfo.hnsID, guid) + } + } + +} +func TestEndpointSlice(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", true) + if proxier == nil { + t.Error() + } + + proxier.servicesSynced = true + proxier.endpointSlicesSynced = true + + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + proxier.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: svcPortName.Name, Namespace: svcPortName.Namespace}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: svcPortName.Port, TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}}, + }, + }) + + // Add initial endpoint slice + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", svcPortName.Name), + Namespace: svcPortName.Namespace, + Labels: map[string]string{discovery.LabelServiceName: svcPortName.Name}, + }, + Ports: []discovery.EndpointPort{{ + Name: &svcPortName.Port, + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"192.168.2.3"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": "testhost2"}, + }}, + } + + proxier.OnEndpointSliceAdd(endpointSlice) + proxier.setInitialized(true) + proxier.syncProxyRules() + + svc := proxier.serviceMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + if svcInfo.hnsID != guid { + t.Errorf("The Hns Loadbalancer Id %v does not match %v. ServicePortName %q", svcInfo.hnsID, guid, svcPortName.String()) + } + } + + ep := proxier.endpointsMap[svcPortName][0] + epInfo, ok := ep.(*endpointsInfo) + if !ok { + t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String()) + + } else { + if epInfo.hnsID != guid { + t.Errorf("Hns EndpointId %v does not match %v. ServicePortName %q", epInfo.hnsID, guid, svcPortName.String()) + } } } - func TestNoopEndpointSlice(t *testing.T) { p := Proxier{} p.OnEndpointSliceAdd(&discovery.EndpointSlice{})