From 9665c590c7ac8801215a0bc89d12a9bafb8ceb9d Mon Sep 17 00:00:00 2001 From: Rob Scott Date: Tue, 6 Aug 2019 17:17:16 -0700 Subject: [PATCH] Adding EndpointSlice support for kube-proxy ipvs and iptables proxiers --- cmd/kube-proxy/app/BUILD | 1 + cmd/kube-proxy/app/server.go | 20 +- pkg/features/kube_features.go | 7 + pkg/kubemark/BUILD | 1 + pkg/kubemark/hollow_proxy.go | 7 +- pkg/proxy/BUILD | 5 + pkg/proxy/config/BUILD | 2 + pkg/proxy/config/config.go | 127 +++++++++++++ pkg/proxy/endpoints.go | 113 +++++++++--- pkg/proxy/endpoints_test.go | 245 ++++++++++++++++++++++++- pkg/proxy/endpointslicecache.go | 240 ++++++++++++++++++++++++ pkg/proxy/endpointslicecache_test.go | 239 ++++++++++++++++++++++++ pkg/proxy/iptables/BUILD | 5 + pkg/proxy/iptables/proxier.go | 67 +++++-- pkg/proxy/iptables/proxier_test.go | 154 ++++++++++++++-- pkg/proxy/ipvs/BUILD | 6 + pkg/proxy/ipvs/proxier.go | 60 +++++- pkg/proxy/ipvs/proxier_test.go | 182 +++++++++++++----- pkg/proxy/service_test.go | 2 +- pkg/proxy/types.go | 3 +- pkg/proxy/userspace/BUILD | 1 + pkg/proxy/userspace/proxier.go | 6 +- pkg/proxy/userspace/proxier_test.go | 9 + pkg/proxy/winkernel/BUILD | 2 + pkg/proxy/winkernel/proxier.go | 4 + pkg/proxy/winkernel/proxier_test.go | 10 + pkg/proxy/winuserspace/BUILD | 1 + pkg/proxy/winuserspace/proxier.go | 6 +- pkg/proxy/winuserspace/proxier_test.go | 9 + 29 files changed, 1401 insertions(+), 133 deletions(-) create mode 100644 pkg/proxy/endpointslicecache.go create mode 100644 pkg/proxy/endpointslicecache_test.go diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index 9d5525ae3ea..75e6f3a5718 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -19,6 +19,7 @@ go_library( importpath = "k8s.io/kubernetes/cmd/kube-proxy/app", deps = [ "//pkg/apis/core:go_default_library", + "//pkg/features:go_default_library", "//pkg/kubelet/qos:go_default_library", "//pkg/master/ports:go_default_library", "//pkg/proxy:go_default_library", diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 6360d968633..e1859876ec7 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -29,7 +29,7 @@ import ( "time" v1 "k8s.io/api/core/v1" - v1meta "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" @@ -50,6 +50,7 @@ import ( componentbaseconfig "k8s.io/component-base/config" "k8s.io/kube-proxy/config/v1alpha1" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/proxy" @@ -477,6 +478,7 @@ type ProxyServer struct { CleanupIPVS bool MetricsBindAddress string EnableProfiling bool + UseEndpointSlices bool OOMScoreAdj *int32 ConfigSyncPeriod time.Duration HealthzServer *healthcheck.HealthzServer @@ -619,11 +621,11 @@ func (s *ProxyServer) Run() error { labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, - informers.WithTweakListOptions(func(options *v1meta.ListOptions) { + informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labelSelector.String() })) - // Create configs (i.e. Watches for Services and Endpoints) + // Create configs (i.e. Watches for Services and Endpoints or EndpointSlices) // Note: RegisterHandler() calls need to happen before creation of Sources because sources // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. @@ -631,9 +633,15 @@ func (s *ProxyServer) Run() error { serviceConfig.RegisterEventHandler(s.Proxier) go serviceConfig.Run(wait.NeverStop) - endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) - endpointsConfig.RegisterEventHandler(s.Proxier) - go endpointsConfig.Run(wait.NeverStop) + if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) { + endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1alpha1().EndpointSlices(), s.ConfigSyncPeriod) + endpointSliceConfig.RegisterEventHandler(s.Proxier) + go endpointSliceConfig.Run(wait.NeverStop) + } else { + endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) + endpointsConfig.RegisterEventHandler(s.Proxier) + go endpointsConfig.Run(wait.NeverStop) + } // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those // functions must configure their shared informer event handlers first. diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 3746033a562..9a4402d945e 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -476,6 +476,12 @@ const ( // Enables ipv6 dual stack IPv6DualStack featuregate.Feature = "IPv6DualStack" + // owner: @robscott @freehan + // alpha: v1.16 + // + // Enable Endpoint Slices for more scalable Service endpoints. + EndpointSlice featuregate.Feature = "EndpointSlice" + // owner: @Huang-Wei // alpha: v1.16 // @@ -559,6 +565,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS VolumePVCDataSource: {Default: false, PreRelease: featuregate.Alpha}, PodOverhead: {Default: false, PreRelease: featuregate.Alpha}, IPv6DualStack: {Default: false, PreRelease: featuregate.Alpha}, + EndpointSlice: {Default: false, PreRelease: featuregate.Alpha}, EvenPodsSpread: {Default: false, PreRelease: featuregate.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index 49bd9b1d721..b7bced83d39 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -25,6 +25,7 @@ go_library( "//pkg/kubelet/dockershim:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/proxy/iptables:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/mount:go_default_library", diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 5a77012c74f..11c9fe2889f 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -20,13 +20,14 @@ import ( "fmt" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app" "k8s.io/kubernetes/pkg/proxy" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -41,7 +42,9 @@ type HollowProxy struct { ProxyServer *proxyapp.ProxyServer } -type FakeProxier struct{} +type FakeProxier struct { + proxyconfig.NoopEndpointSliceHandler +} func (*FakeProxier) Sync() {} func (*FakeProxier) SyncLoop() { diff --git a/pkg/proxy/BUILD b/pkg/proxy/BUILD index 8c69b958350..761d0aec8b9 100644 --- a/pkg/proxy/BUILD +++ b/pkg/proxy/BUILD @@ -11,6 +11,7 @@ go_library( srcs = [ "doc.go", "endpoints.go", + "endpointslicecache.go", "service.go", "types.go", ], @@ -21,6 +22,7 @@ go_library( "//pkg/proxy/metrics:go_default_library", "//pkg/proxy/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", @@ -58,15 +60,18 @@ go_test( name = "go_default_test", srcs = [ "endpoints_test.go", + "endpointslicecache_test.go", "service_test.go", ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/github.com/davecgh/go-spew/spew:go_default_library", + "//vendor/k8s.io/utils/pointer:go_default_library", ], ) diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index 353a30ae224..92881120869 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -15,8 +15,10 @@ go_library( importpath = "k8s.io/kubernetes/pkg/proxy/config", deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 97f78f110fe..76947e73327 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -21,8 +21,10 @@ import ( "time" "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" coreinformers "k8s.io/client-go/informers/core/v1" + discoveryinformers "k8s.io/client-go/informers/discovery/v1alpha1" "k8s.io/client-go/tools/cache" "k8s.io/klog" ) @@ -61,6 +63,40 @@ type EndpointsHandler interface { OnEndpointsSynced() } +// EndpointSliceHandler is an abstract interface of objects which receive +// notifications about endpoint slice object changes. +type EndpointSliceHandler interface { + // OnEndpointSliceAdd is called whenever creation of new endpoint slice + // object is observed. + OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) + // OnEndpointSliceUpdate is called whenever modification of an existing + // endpoint slice object is observed. + OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) + // OnEndpointSliceDelete is called whenever deletion of an existing + // endpoint slice object is observed. + OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) + // OnEndpointSlicesSynced is called once all the initial event handlers were + // called and the state is fully propagated to local cache. + OnEndpointSlicesSynced() +} + +// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet +// implemented a full EndpointSliceHandler. +type NoopEndpointSliceHandler struct{} + +// OnEndpointSliceAdd is a noop handler for EndpointSlice creates. +func (*NoopEndpointSliceHandler) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {} + +// OnEndpointSliceUpdate is a noop handler for EndpointSlice updates. +func (*NoopEndpointSliceHandler) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) { +} + +// OnEndpointSliceDelete is a noop handler for EndpointSlice deletes. +func (*NoopEndpointSliceHandler) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {} + +// OnEndpointSlicesSynced is a noop handler for EndpointSlice syncs. +func (*NoopEndpointSliceHandler) OnEndpointSlicesSynced() {} + // EndpointsConfig tracks a set of endpoints configurations. type EndpointsConfig struct { listerSynced cache.InformerSynced @@ -152,6 +188,97 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { } } +// EndpointSliceConfig tracks a set of endpoints configurations. +type EndpointSliceConfig struct { + listerSynced cache.InformerSynced + eventHandlers []EndpointSliceHandler +} + +// NewEndpointSliceConfig creates a new EndpointSliceConfig. +func NewEndpointSliceConfig(endpointSliceInformer discoveryinformers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig { + result := &EndpointSliceConfig{ + listerSynced: endpointSliceInformer.Informer().HasSynced, + } + + endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: result.handleAddEndpointSlice, + UpdateFunc: result.handleUpdateEndpointSlice, + DeleteFunc: result.handleDeleteEndpointSlice, + }, + resyncPeriod, + ) + + return result +} + +// RegisterEventHandler registers a handler which is called on every endpoint slice change. +func (c *EndpointSliceConfig) RegisterEventHandler(handler EndpointSliceHandler) { + c.eventHandlers = append(c.eventHandlers, handler) +} + +// Run waits for cache synced and invokes handlers after syncing. +func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) { + klog.Info("Starting endpoint slice config controller") + + if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) { + return + } + + for _, h := range c.eventHandlers { + klog.V(3).Infof("Calling handler.OnEndpointSlicesSynced()") + h.OnEndpointSlicesSynced() + } +} + +func (c *EndpointSliceConfig) handleAddEndpointSlice(obj interface{}) { + endpointSlice, ok := obj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return + } + for _, h := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnEndpointSliceUpdate %+v", endpointSlice) + h.OnEndpointSliceAdd(endpointSlice) + } +} + +func (c *EndpointSliceConfig) handleUpdateEndpointSlice(oldObj, newObj interface{}) { + oldEndpointSlice, ok := oldObj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj)) + return + } + newEndpointSlice, ok := newObj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj)) + return + } + for _, h := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnEndpointSliceUpdate") + h.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice) + } +} + +func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) { + endpointSlice, ok := obj.(*discovery.EndpointSlice) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return + } + if endpointSlice, ok = tombstone.Obj.(*discovery.EndpointSlice); !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return + } + } + for _, h := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnEndpointsDelete") + h.OnEndpointSliceDelete(endpointSlice) + } +} + // ServiceConfig tracks a set of service configurations. type ServiceConfig struct { listerSynced cache.InformerSynced diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index d7e99c48304..9bf1bdd4a25 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -26,7 +26,7 @@ import ( "k8s.io/klog" v1 "k8s.io/api/core/v1" - + discovery "k8s.io/api/discovery/v1alpha1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" @@ -92,6 +92,8 @@ type EndpointChangeTracker struct { items map[types.NamespacedName]*endpointsChange // makeEndpointInfo allows proxier to inject customized information when processing endpoint. makeEndpointInfo makeEndpointFunc + // endpointSliceCache holds a simplified version of endpoint slices + endpointSliceCache *EndpointSliceCache // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable. isIPv6Mode *bool recorder record.EventRecorder @@ -101,8 +103,8 @@ type EndpointChangeTracker struct { } // NewEndpointChangeTracker initializes an EndpointsChangeMap -func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder) *EndpointChangeTracker { - return &EndpointChangeTracker{ +func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder, endpointSlicesEnabled bool) *EndpointChangeTracker { + ect := &EndpointChangeTracker{ hostname: hostname, items: make(map[types.NamespacedName]*endpointsChange), makeEndpointInfo: makeEndpointInfo, @@ -110,6 +112,10 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc recorder: recorder, lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), } + if endpointSlicesEnabled { + ect.endpointSliceCache = NewEndpointSliceCache(hostname, isIPv6Mode, recorder, makeEndpointInfo) + } + return ect } // Update updates given service's endpoints change map based on the endpoints pair. It returns true @@ -141,9 +147,9 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool { change.previous = ect.endpointsToEndpointsMap(previous) ect.items[namespacedName] = change } - if t := getLastChangeTriggerTime(current); !t.IsZero() { - ect.lastChangeTriggerTimes[namespacedName] = - append(ect.lastChangeTriggerTimes[namespacedName], t) + + if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() { + ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t) } change.current = ect.endpointsToEndpointsMap(current) @@ -163,31 +169,83 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool { return len(ect.items) > 0 } -// getLastChangeTriggerTime returns the time.Time value of the EndpointsLastChangeTriggerTime -// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set -// or was set incorrectly. -func getLastChangeTriggerTime(endpoints *v1.Endpoints) time.Time { +// EndpointSliceUpdate updates given service's endpoints change map based on the endpoints pair. +// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap. +// If removeSlice is true, slice will be removed, otherwise it will be added or updated. +func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { + // This should never happen + if endpointSlice == nil { + klog.Error("Nil endpointSlice passed to EndpointSliceUpdate") + return false + } + + namespacedName, _ := endpointSliceCacheKeys(endpointSlice) + + metrics.EndpointChangesTotal.Inc() + + ect.lock.Lock() + defer ect.lock.Unlock() + + change, ok := ect.items[namespacedName] + if !ok { + change = &endpointsChange{} + change.previous = ect.endpointSliceCache.EndpointsMap(namespacedName) + ect.items[namespacedName] = change + } + + if removeSlice { + ect.endpointSliceCache.Delete(endpointSlice) + } else { + ect.endpointSliceCache.Update(endpointSlice) + } + + if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() { + ect.lastChangeTriggerTimes[namespacedName] = + append(ect.lastChangeTriggerTimes[namespacedName], t) + } + + change.current = ect.endpointSliceCache.EndpointsMap(namespacedName) + // if change.previous equal to change.current, it means no change + if reflect.DeepEqual(change.previous, change.current) { + delete(ect.items, namespacedName) + // Reset the lastChangeTriggerTimes for this service. Given that the network programming + // SLI is defined as the duration between a time of an event and a time when the network was + // programmed to incorporate that event, if there are events that happened between two + // consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted, + // there will be no network programming for them and thus no network programming latency metric + // should be exported. + delete(ect.lastChangeTriggerTimes, namespacedName) + } + + metrics.EndpointChangesPending.Set(float64(len(ect.items))) + return len(ect.items) > 0 +} + +// getLastChangeTriggerTime returns the time.Time value of the +// EndpointsLastChangeTriggerTime annotation stored in the given endpoints +// object or the "zero" time if the annotation wasn't set or was set +// incorrectly. +func getLastChangeTriggerTime(annotations map[string]string) time.Time { // TODO(#81360): ignore case when Endpoint is deleted. - if endpoints == nil { + if _, ok := annotations[v1.EndpointsLastChangeTriggerTime]; !ok { + // It's possible that the Endpoints object won't have the + // EndpointsLastChangeTriggerTime annotation set. In that case return + // the 'zero value', which is ignored in the upstream code. return time.Time{} } - if _, ok := endpoints.Annotations[v1.EndpointsLastChangeTriggerTime]; !ok { - // It's possible that the Endpoints object won't have the EndpointsLastChangeTriggerTime - // annotation set. In that case return the 'zero value', which is ignored in the upstream code. - return time.Time{} - } - val, err := time.Parse(time.RFC3339Nano, endpoints.Annotations[v1.EndpointsLastChangeTriggerTime]) + val, err := time.Parse(time.RFC3339Nano, annotations[v1.EndpointsLastChangeTriggerTime]) if err != nil { klog.Warningf("Error while parsing EndpointsLastChangeTriggerTimeAnnotation: '%s'. Error is %v", - endpoints.Annotations[v1.EndpointsLastChangeTriggerTime], err) + annotations[v1.EndpointsLastChangeTriggerTime], err) // In case of error val = time.Zero, which is ignored in the upstream code. } return val } -// endpointsChange contains all changes to endpoints that happened since proxy rules were synced. For a single object, -// changes are accumulated, i.e. previous is state from before applying the changes, -// current is state after applying the changes. +// endpointsChange contains all changes to endpoints that happened since proxy +// rules were synced. For a single object, changes are accumulated, i.e. +// previous is state from before applying the changes, current is state after +// applying the changes. type endpointsChange struct { previous EndpointsMap current EndpointsMap @@ -240,8 +298,8 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint } endpointsMap := make(EndpointsMap) - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. + // We need to build a map of portname -> all ip:ports for that portname. + // Explode Endpoints.Subsets[*] into this structure. for i := range endpoints.Subsets { ss := &endpoints.Subsets[i] for i := range ss.Ports { @@ -276,13 +334,8 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint endpointsMap[svcPortName] = append(endpointsMap[svcPortName], baseEndpointInfo) } } - if klog.V(3) { - newEPList := []string{} - for _, ep := range endpointsMap[svcPortName] { - newEPList = append(newEPList, ep.String()) - } - klog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList) - } + + klog.V(3).Infof("Setting endpoints for %q to %+v", svcPortName, formatEndpointsList(endpointsMap[svcPortName])) } } return endpointsMap diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 3d10e8000a7..c76e4621829 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -23,10 +23,12 @@ import ( "github.com/davecgh/go-spew/spew" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + utilpointer "k8s.io/utils/pointer" ) func (proxier *FakeProxier) addEndpoints(endpoints *v1.Endpoints) { @@ -133,7 +135,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1. // This is a coarse test, but it offers some modicum of confidence as the code is evolved. func TestEndpointsToEndpointsMap(t *testing.T) { - epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil) + epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil, false) trueVal := true falseVal := false @@ -1382,13 +1384,246 @@ func TestLastChangeTriggerTime(t *testing.T) { } } +func TestEndpointSliceUpdate(t *testing.T) { + testCases := map[string]struct { + startingSlices []*discovery.EndpointSlice + endpointChangeTracker *EndpointChangeTracker + namespacedName types.NamespacedName + paramEndpointSlice *discovery.EndpointSlice + paramRemoveSlice bool + expectedReturnVal bool + expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo + }{ + // test starting from an empty state + "add a simple slice that doesn't already exist": { + startingSlices: []*discovery.EndpointSlice{}, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, + expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:80"}, + }, + makeServicePortName("ns1", "svc1", "port-1"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:443"}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:443"}, + }, + }, + }, + // test no modification to state - current change should be nil as nothing changes + "add the same slice that already exists": { + startingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + paramRemoveSlice: false, + expectedReturnVal: false, + expectedCurrentChange: nil, + }, + // test additions to existing state + "add a slice that overlaps with existing state": { + startingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, + expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.4:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.5:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.2.1:80"}, + &BaseEndpointInfo{Endpoint: "10.0.2.2:80", IsLocal: true}, + }, + makeServicePortName("ns1", "svc1", "port-1"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.4:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.5:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.2.1:443"}, + &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true}, + }, + }, + }, + // test additions to existing state with partially overlapping slices and ports + "add a slice that overlaps with existing state and partial ports": { + startingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}), + paramRemoveSlice: false, + expectedReturnVal: true, + expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.4:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.5:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.2.1:80"}, + &BaseEndpointInfo{Endpoint: "10.0.2.2:80", IsLocal: true}, + }, + makeServicePortName("ns1", "svc1", "port-1"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:443"}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:443"}, + &BaseEndpointInfo{Endpoint: "10.0.2.1:443"}, + &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true}, + }, + }, + }, + // test deletions from existing state with partially overlapping slices and ports + "remove a slice that overlaps with existing state": { + startingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + paramRemoveSlice: true, + expectedReturnVal: true, + expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.2.1:80"}, + &BaseEndpointInfo{Endpoint: "10.0.2.2:80", IsLocal: true}, + }, + makeServicePortName("ns1", "svc1", "port-1"): { + &BaseEndpointInfo{Endpoint: "10.0.2.1:443"}, + &BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true}, + }, + }, + }, + // ensure a removal that has no effect turns into a no-op + "remove a slice that doesn't even exist in current state": { + startingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + paramRemoveSlice: true, + expectedReturnVal: false, + expectedCurrentChange: nil, + }, + // start with all endpoints ready, transition to no endpoints ready + "transition all endpoints to unready state": { + startingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, + expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{}, + }, + // start with no endpoints ready, transition to all endpoints ready + "transition all endpoints to ready state": { + startingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 2, 1, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, + expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true}, + }, + makeServicePortName("ns1", "svc1", "port-1"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true}, + }, + }, + }, + // start with some endpoints ready, transition to more endpoints ready + "transition some endpoints to ready state": { + startingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + generateEndpointSlice("svc1", "ns1", 2, 2, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + paramRemoveSlice: false, + expectedReturnVal: true, + expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.2.1:80", IsLocal: true}, + }, + makeServicePortName("ns1", "svc1", "port-1"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.2.1:443", IsLocal: true}, + }, + }, + }, + } + + for name, tc := range testCases { + for _, startingSlice := range tc.startingSlices { + tc.endpointChangeTracker.endpointSliceCache.Update(startingSlice) + } + + got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice) + if !reflect.DeepEqual(got, tc.expectedReturnVal) { + t.Errorf("[%s] EndpointSliceUpdate return value got: %v, want %v", name, got, tc.expectedReturnVal) + } + if tc.endpointChangeTracker.items == nil { + t.Errorf("[%s] Expected ect.items to not be nil", name) + } + if tc.expectedCurrentChange == nil { + if tc.endpointChangeTracker.items[tc.namespacedName] != nil { + t.Errorf("[%s] Expected ect.items[%s] to be nil", name, tc.namespacedName) + } + } else { + if tc.endpointChangeTracker.items[tc.namespacedName] == nil { + t.Errorf("[%s] Expected ect.items[%s] to not be nil", name, tc.namespacedName) + } + compareEndpointsMapsStr(t, name, tc.endpointChangeTracker.items[tc.namespacedName].current, tc.expectedCurrentChange) + } + } +} + +// Test helpers + func compareEndpointsMaps(t *testing.T, tci int, newMap EndpointsMap, expected map[ServicePortName][]*BaseEndpointInfo) { + t.Helper() + compareEndpointsMapsStr(t, string(tci), newMap, expected) +} + +func compareEndpointsMapsStr(t *testing.T, testName string, newMap EndpointsMap, expected map[ServicePortName][]*BaseEndpointInfo) { + t.Helper() if len(newMap) != len(expected) { - t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) + t.Errorf("[%s] expected %d results, got %d: %v", testName, len(expected), len(newMap), newMap) } for x := range expected { if len(newMap[x]) != len(expected[x]) { - t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) + t.Errorf("[%s] expected %d endpoints for %v, got %d", testName, len(expected[x]), x, len(newMap[x])) + t.Logf("Endpoints %+v", newMap[x]) } else { for i := range expected[x] { newEp, ok := newMap[x][i].(*BaseEndpointInfo) @@ -1397,7 +1632,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap EndpointsMap, expected m continue } if *newEp != *(expected[x][i]) { - t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp) + t.Errorf("[%s] expected new[%v][%d] to be %v, got %v (IsLocal expected %v, got %v)", testName, x, i, expected[x][i], newEp, expected[x][i].IsLocal, newEp.IsLocal) } } } diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go new file mode 100644 index 00000000000..229b7744424 --- /dev/null +++ b/pkg/proxy/endpointslicecache.go @@ -0,0 +1,240 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "sort" + + "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/klog" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" + utilnet "k8s.io/utils/net" +) + +// EndpointSliceCache is used as a cache of EndpointSlice information. +type EndpointSliceCache struct { + // sliceByServiceMap is the basis of this cache. It contains endpoint slice + // info grouped by service name and endpoint slice name. The first key + // represents a namespaced service name while the second key represents + // an endpoint slice name. Since endpoints can move between slices, we + // require slice specific caching to prevent endpoints being removed from + // the cache when they may have just moved to a different slice. + sliceByServiceMap map[types.NamespacedName]map[string]*endpointSliceInfo + makeEndpointInfo makeEndpointFunc + hostname string + isIPv6Mode *bool + recorder record.EventRecorder +} + +// endpointSliceInfo contains just the attributes kube-proxy cares about. +// Used for caching. Intentionally small to limit memory util. +type endpointSliceInfo struct { + Ports []discovery.EndpointPort + Endpoints []*endpointInfo +} + +// endpointInfo contains just the attributes kube-proxy cares about. +// Used for caching. Intentionally small to limit memory util. +// Addresses and Topology are copied from EndpointSlice Endpoints. +type endpointInfo struct { + Addresses []string + Topology map[string]string +} + +// NewEndpointSliceCache initializes an EndpointSliceCache. +func NewEndpointSliceCache(hostname string, isIPv6Mode *bool, recorder record.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache { + if makeEndpointInfo == nil { + makeEndpointInfo = standardEndpointInfo + } + return &EndpointSliceCache{ + sliceByServiceMap: map[types.NamespacedName]map[string]*endpointSliceInfo{}, + hostname: hostname, + isIPv6Mode: isIPv6Mode, + makeEndpointInfo: makeEndpointInfo, + recorder: recorder, + } +} + +// standardEndpointInfo is the default makeEndpointFunc. +func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint { + return ep +} + +// Update a slice in the cache. +func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) { + serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice) + // This should never actually happen + if serviceKey.Name == "" || serviceKey.Namespace == "" || sliceKey == "" { + klog.Errorf("Invalid endpoint slice, name and owner reference required %v", endpointSlice) + return + } + esInfo := &endpointSliceInfo{ + Ports: endpointSlice.Ports, + Endpoints: []*endpointInfo{}, + } + for _, endpoint := range endpointSlice.Endpoints { + if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready == true { + esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{ + Addresses: endpoint.Addresses, + Topology: endpoint.Topology, + }) + } + } + if _, exists := cache.sliceByServiceMap[serviceKey]; !exists { + cache.sliceByServiceMap[serviceKey] = map[string]*endpointSliceInfo{} + } + cache.sliceByServiceMap[serviceKey][sliceKey] = esInfo +} + +// Delete a slice from the cache. +func (cache *EndpointSliceCache) Delete(endpointSlice *discovery.EndpointSlice) { + serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice) + delete(cache.sliceByServiceMap[serviceKey], sliceKey) +} + +// EndpointsMap computes an EndpointsMap for a given service. +func (cache *EndpointSliceCache) EndpointsMap(serviceNN types.NamespacedName) EndpointsMap { + endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN) + return endpointsMapFromEndpointInfo(endpointInfoBySP) +} + +// endpointInfoByServicePort groups endpoint info by service port name and address. +func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName) map[ServicePortName]map[string]Endpoint { + endpointInfoBySP := map[ServicePortName]map[string]Endpoint{} + + for _, sliceInfo := range cache.sliceByServiceMap[serviceNN] { + for _, port := range sliceInfo.Ports { + if port.Name == nil { + klog.Warningf("ignoring port with nil name %v", port) + continue + } + // TODO: handle nil ports to mean "all" + if port.Port == nil || *port.Port == int32(0) { + klog.Warningf("ignoring invalid endpoint port %s", *port.Name) + continue + } + + svcPortName := ServicePortName{NamespacedName: serviceNN} + svcPortName.Port = *port.Name + + endpointInfoBySP[svcPortName] = cache.addEndpointsByIP(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints) + } + } + + return endpointInfoBySP +} + +// addEndpointsByIP adds endpointInfo for each IP. +func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName, portNum int, endpointsByIP map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint { + if endpointsByIP == nil { + endpointsByIP = map[string]Endpoint{} + } + + // iterate through endpoints to add them to endpointsByIP. + for _, endpoint := range endpoints { + if len(endpoint.Addresses) == 0 { + klog.Warningf("ignoring invalid endpoint port %s with empty addresses", endpoint) + continue + } + + // Filter out the incorrect IP version case. Any endpoint port that + // contains incorrect IP version will be ignored. + if cache.isIPv6Mode != nil && utilnet.IsIPv6String(endpoint.Addresses[0]) != *cache.isIPv6Mode { + // Emit event on the corresponding service which had a different IP + // version than the endpoint. + utilproxy.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], serviceNN.Name, serviceNN.Namespace, "") + continue + } + + isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname]) + endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal) + + // This logic ensures we're deduping potential overlapping endpoints + // isLocal should not vary between matching IPs, but if it does, we + // favor a true value here if it exists. + if _, exists := endpointsByIP[endpointInfo.IP()]; !exists || isLocal { + endpointsByIP[endpointInfo.IP()] = cache.makeEndpointInfo(endpointInfo) + } + } + + return endpointsByIP +} + +func (cache *EndpointSliceCache) isLocal(hostname string) bool { + return len(cache.hostname) > 0 && hostname == cache.hostname +} + +// endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that +// has been grouped by service port and IP. +func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[string]Endpoint) EndpointsMap { + endpointsMap := EndpointsMap{} + + // transform endpointInfoByServicePort into an endpointsMap with sorted IPs. + for svcPortName, endpointInfoByIP := range endpointInfoBySP { + if len(endpointInfoByIP) > 0 { + endpointsMap[svcPortName] = []Endpoint{} + for _, endpointInfo := range endpointInfoByIP { + endpointsMap[svcPortName] = append(endpointsMap[svcPortName], endpointInfo) + + } + // Ensure IPs are always returned in the same order to simplify diffing. + sort.Sort(byIP(endpointsMap[svcPortName])) + + klog.V(3).Infof("Setting endpoints for %q to %+v", svcPortName, formatEndpointsList(endpointsMap[svcPortName])) + } + } + + return endpointsMap +} + +// formatEndpointsList returns a string list converted from an endpoints list. +func formatEndpointsList(endpoints []Endpoint) []string { + var formattedList []string + for _, ep := range endpoints { + formattedList = append(formattedList, ep.String()) + } + return formattedList +} + +// endpointSliceCacheKeys returns cache keys used for a given EndpointSlice. +func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string) { + if len(endpointSlice.OwnerReferences) == 0 { + klog.Errorf("No owner reference set on endpoint slice: %s", endpointSlice.Name) + return types.NamespacedName{}, endpointSlice.Name + } + if len(endpointSlice.OwnerReferences) > 1 { + klog.Errorf("More than 1 owner reference set on endpoint slice: %s", endpointSlice.Name) + } + ownerRef := endpointSlice.OwnerReferences[0] + return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: ownerRef.Name}, endpointSlice.Name +} + +// byIP helps sort endpoints by IP +type byIP []Endpoint + +func (e byIP) Len() int { + return len(e) +} +func (e byIP) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} +func (e byIP) Less(i, j int) bool { + return e[i].IP() < e[j].IP() +} diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go new file mode 100644 index 00000000000..450f2d083c4 --- /dev/null +++ b/pkg/proxy/endpointslicecache_test.go @@ -0,0 +1,239 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "fmt" + "reflect" + "testing" + + discovery "k8s.io/api/discovery/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilpointer "k8s.io/utils/pointer" +) + +func TestEndpointsMapFromESC(t *testing.T) { + testCases := map[string]struct { + endpointSlices []*discovery.EndpointSlice + hostname string + namespacedName types.NamespacedName + expectedMap map[ServicePortName][]*BaseEndpointInfo + }{ + "1 slice, 2 hosts, ports 80,443": { + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + hostname: "host1", + endpointSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + expectedMap: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false}, + }, + makeServicePortName("ns1", "svc1", "port-1"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:443", IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: false}, + }, + }, + }, + "2 slices, same port": { + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + endpointSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + generateEndpointSlice("svc1", "ns1", 2, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + }, + expectedMap: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:80"}, + &BaseEndpointInfo{Endpoint: "10.0.2.1:80"}, + &BaseEndpointInfo{Endpoint: "10.0.2.2:80"}, + &BaseEndpointInfo{Endpoint: "10.0.2.3:80"}, + }, + }, + }, + // 2 slices, with some overlapping endpoints, result should be a union + // of the 2. + "2 overlapping slices, same port": { + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + endpointSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + generateEndpointSlice("svc1", "ns1", 1, 4, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + }, + expectedMap: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.4:80"}, + }, + }, + }, + // 2 slices with all endpoints overlapping, more unready in first than + // second. If an endpoint is marked ready, we add it to the + // EndpointsMap, even if conditions.Ready isn't true for another + // matching endpoint + "2 slices, overlapping endpoints, some endpoints unready in 1 or both": { + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + endpointSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 10, 3, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + generateEndpointSlice("svc1", "ns1", 1, 10, 6, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + }, + expectedMap: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.10:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.4:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.5:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.7:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.8:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.9:80"}, + }, + }, + }, + "2 slices, overlapping endpoints, all unready": { + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + endpointSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 10, 1, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + generateEndpointSlice("svc1", "ns1", 1, 10, 1, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + }, + expectedMap: map[ServicePortName][]*BaseEndpointInfo{}, + }, + "3 slices with different services and namespaces": { + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + endpointSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + generateEndpointSlice("svc2", "ns1", 2, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + generateEndpointSlice("svc1", "ns2", 3, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}), + }, + expectedMap: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0"): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80"}, + &BaseEndpointInfo{Endpoint: "10.0.1.3:80"}, + }, + }, + }, + // Ensuring that nil port value will not break things. This will + // represent all ports in the future, but that has not been implemented + // yet. + "Nil port should not break anything": { + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + hostname: "host1", + endpointSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{nil}), + }, + expectedMap: map[ServicePortName][]*BaseEndpointInfo{}, + }, + } + + for name, tc := range testCases { + esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil) + + for _, endpointSlice := range tc.endpointSlices { + esCache.Update(endpointSlice) + } + + compareEndpointsMapsStr(t, name, esCache.EndpointsMap(tc.namespacedName), tc.expectedMap) + } +} + +func TestEndpointInfoByServicePort(t *testing.T) { + testCases := map[string]struct { + namespacedName types.NamespacedName + endpointSlices []*discovery.EndpointSlice + hostname string + expectedMap map[ServicePortName]map[string]Endpoint + }{ + "simple use case with 3 endpoints": { + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + hostname: "host1", + endpointSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80)}), + }, + expectedMap: map[ServicePortName]map[string]Endpoint{ + {NamespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, Port: "port-0"}: { + "10.0.1.1": &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false}, + "10.0.1.2": &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true}, + "10.0.1.3": &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false}, + }, + }, + }, + } + + for name, tc := range testCases { + esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil) + + for _, endpointSlice := range tc.endpointSlices { + esCache.Update(endpointSlice) + } + + got := esCache.endpointInfoByServicePort(tc.namespacedName) + if !reflect.DeepEqual(got, tc.expectedMap) { + t.Errorf("[%s] endpointInfoByServicePort does not match. Want: %v, Got: %v", name, tc.expectedMap, got) + } + + } +} + +func generateEndpointSliceWithOffset(serviceName, namespace string, sliceNum, offset, numEndpoints, unreadyMod int, hosts []string, portNums []*int32) *discovery.EndpointSlice { + ipAddressType := discovery.AddressTypeIP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%d", serviceName, sliceNum), + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}}, + }, + Ports: []discovery.EndpointPort{}, + AddressType: &ipAddressType, + Endpoints: []discovery.Endpoint{}, + } + + for i, portNum := range portNums { + endpointSlice.Ports = append(endpointSlice.Ports, discovery.EndpointPort{ + Name: utilpointer.StringPtr(fmt.Sprintf("port-%d", i)), + Port: portNum, + }) + } + + for i := 1; i <= numEndpoints; i++ { + endpoint := discovery.Endpoint{ + Addresses: []string{fmt.Sprintf("10.0.%d.%d", offset, i)}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(i%unreadyMod != 0)}, + } + + if len(hosts) > 0 { + endpoint.Topology = map[string]string{ + "kubernetes.io/hostname": hosts[i%len(hosts)], + } + } + + endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpoint) + } + + return endpointSlice +} + +func generateEndpointSlice(serviceName, namespace string, sliceNum, numEndpoints, unreadyMod int, hosts []string, portNums []*int32) *discovery.EndpointSlice { + return generateEndpointSliceWithOffset(serviceName, namespace, sliceNum, sliceNum, numEndpoints, unreadyMod, hosts, portNums) +} diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index f7e0d409c6b..b45e983765f 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -11,6 +11,7 @@ go_library( srcs = ["proxier.go"], importpath = "k8s.io/kubernetes/pkg/proxy/iptables", deps = [ + "//pkg/features:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/metrics:go_default_library", @@ -20,8 +21,10 @@ go_library( "//pkg/util/iptables:go_default_library", "//pkg/util/sysctl:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", @@ -42,9 +45,11 @@ go_test( "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index d4fe596e945..84cede1fec0 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -32,12 +32,14 @@ import ( "sync/atomic" "time" - "k8s.io/klog" - - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metrics" @@ -179,13 +181,14 @@ type Proxier struct { serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable - // endpointsSynced and servicesSynced are set to true when corresponding - // objects are synced after startup. This is used to avoid updating iptables - // with some partial data after kube-proxy restart. - endpointsSynced bool - servicesSynced bool - initialized int32 - syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules + // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true + // when corresponding objects are synced after startup. This is used to avoid + // updating iptables with some partial data after kube-proxy restart. + endpointsSynced bool + endpointSlicesSynced bool + servicesSynced bool + initialized int32 + syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules // These are effectively const and do not need the mutex to be held. iptables utiliptables.Interface @@ -286,6 +289,8 @@ func NewProxier(ipt utiliptables.Interface, return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, ipt.IsIpv6()) } + endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) + healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps isIPv6 := ipt.IsIpv6() @@ -294,7 +299,7 @@ func NewProxier(ipt utiliptables.Interface, serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled), iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, @@ -500,7 +505,7 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) + proxier.setInitialized(proxier.endpointsSynced || proxier.endpointSlicesSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -517,7 +522,7 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { // endpoints object is observed. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() { - proxier.syncRunner.Run() + proxier.Sync() } } @@ -539,6 +544,42 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.syncProxyRules() } +// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object +// is observed. +func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) { + if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() { + proxier.Sync() + } +} + +// OnEndpointSliceUpdate is called whenever modification of an existing endpoint +// slice object is observed. +func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) { + if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() { + proxier.Sync() + } +} + +// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice +// object is observed. +func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) { + if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() { + proxier.Sync() + } +} + +// OnEndpointSlicesSynced is called once all the initial event handlers were +// called and the state is fully propagated to local cache. +func (proxier *Proxier) OnEndpointSlicesSynced() { + proxier.mu.Lock() + proxier.endpointSlicesSynced = true + proxier.setInitialized(proxier.servicesSynced && proxier.endpointSlicesSynced) + proxier.mu.Unlock() + + // Sync unconditionally - this is called once per lifetime. + proxier.syncProxyRules() +} + // portProtoHash takes the ServicePortName and protocol for a service // returns the associated 16 character hash. This is computed by hashing (sha256) // then encoding to base32 and truncating to 16 chars. We do this because IPTables diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 3ce3c378b50..41be026e368 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -28,7 +28,9 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" @@ -250,7 +252,7 @@ func TestDeleteEndpointConnections(t *testing.T) { } ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) fp.exec = &fexec for _, tc := range testCases { @@ -363,7 +365,7 @@ func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedNa const testHostname = "test-hostname" -func NewFakeProxier(ipt utiliptables.Interface) *Proxier { +func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. p := &Proxier{ @@ -371,7 +373,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled), iptables: ipt, clusterCIDR: "10.0.0.0/24", hostname: testHostname, @@ -575,7 +577,7 @@ func errorf(msg string, rules []iptablestest.Rule, t *testing.T) { func TestClusterIPReject(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) svcIP := "10.20.30.41" svcPort := 80 svcPortName := proxy.ServicePortName{ @@ -609,7 +611,7 @@ func TestClusterIPReject(t *testing.T) { func TestClusterIPEndpointsJump(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) svcIP := "10.20.30.41" svcPort := 80 svcPortName := proxy.ServicePortName{ @@ -666,7 +668,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { func TestLoadBalancer(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -726,7 +728,7 @@ func TestLoadBalancer(t *testing.T) { func TestNodePort(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -784,7 +786,7 @@ func TestNodePort(t *testing.T) { func TestExternalIPsReject(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) svcIP := "10.20.30.41" svcPort := 80 svcExternalIPs := "50.60.70.81" @@ -818,7 +820,7 @@ func TestExternalIPsReject(t *testing.T) { func TestNodePortReject(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -851,7 +853,7 @@ func TestNodePortReject(t *testing.T) { func TestOnlyLocalLoadBalancing(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -941,7 +943,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) // set cluster CIDR to empty before test fp.clusterCIDR = "" onlyLocalNodePorts(t, fp, ipt) @@ -949,7 +951,7 @@ func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) { func TestOnlyLocalNodePorts(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) onlyLocalNodePorts(t, fp, ipt) } @@ -1064,7 +1066,7 @@ func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port func TestBuildServiceMapAddRemove(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) services := []*v1.Service{ makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) { @@ -1170,7 +1172,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) { func TestBuildServiceMapServiceHeadless(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) makeServiceMap(fp, makeTestService("somewhere-else", "headless", func(svc *v1.Service) { @@ -1202,7 +1204,7 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) makeServiceMap(fp, makeTestService("somewhere-else", "external-name", func(svc *v1.Service) { @@ -1228,7 +1230,7 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { func TestBuildServiceMapServiceUpdate(t *testing.T) { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP @@ -2182,7 +2184,7 @@ func Test_updateEndpointsMap(t *testing.T) { for tci, tc := range testCases { ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) + fp := NewFakeProxier(ipt, false) fp.hostname = nodeName // First check that after adding all previous versions of endpoints, @@ -2250,4 +2252,120 @@ func Test_updateEndpointsMap(t *testing.T) { } } +// The majority of EndpointSlice specific tests are not iptables specific and focus on +// the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the +// iptables proxier supports translating EndpointSlices to iptables output. +func TestEndpointSliceE2E(t *testing.T) { + expectedIPTablesWithoutSlice := `*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +-A KUBE-SERVICES -m comment --comment "ns1/svc1: has no endpoints" -m -p -d 172.20.1.1/32 --dport 0 -j REJECT +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark -j ACCEPT +-A KUBE-FORWARD -s 10.0.0.0/24 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -d 10.0.0.0/24 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SVC-3WUAALNGPYZZAWAD - [0:0] +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --set-xmark +-X KUBE-SVC-3WUAALNGPYZZAWAD +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +` + + expectedIPTablesWithSlice := `*filter +:KUBE-SERVICES - [0:0] +:KUBE-EXTERNAL-SERVICES - [0:0] +:KUBE-FORWARD - [0:0] +-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark -j ACCEPT +-A KUBE-FORWARD -s 10.0.0.0/24 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -d 10.0.0.0/24 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT +COMMIT +*nat +:KUBE-SERVICES - [0:0] +:KUBE-NODEPORTS - [0:0] +:KUBE-POSTROUTING - [0:0] +:KUBE-MARK-MASQ - [0:0] +:KUBE-SVC-3WUAALNGPYZZAWAD - [0:0] +: - [0:0] +: - [0:0] +: - [0:0] +-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark -j MASQUERADE +-A KUBE-MARK-MASQ -j MARK --set-xmark +-A KUBE-SERVICES -m comment --comment "ns1/svc1: cluster IP" -m -p -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ +-A KUBE-SERVICES -m comment --comment "ns1/svc1: cluster IP" -m -p -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-3WUAALNGPYZZAWAD +-A KUBE-SVC-3WUAALNGPYZZAWAD -m statistic --mode random --probability 0.33333 -j +-A -s 10.0.1.1/32 -j KUBE-MARK-MASQ +-A -m -p -j DNAT --to-destination 10.0.1.1:80 +-A KUBE-SVC-3WUAALNGPYZZAWAD -m statistic --mode random --probability 0.50000 -j +-A -s 10.0.1.2/32 -j KUBE-MARK-MASQ +-A -m -p -j DNAT --to-destination 10.0.1.2:80 +-A KUBE-SVC-3WUAALNGPYZZAWAD -j +-A -s 10.0.1.3/32 -j KUBE-MARK-MASQ +-A -m -p -j DNAT --to-destination 10.0.1.3:80 +-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS +COMMIT +` + + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt, true) + fp.OnServiceSynced() + fp.OnEndpointsSynced() + fp.OnEndpointSlicesSynced() + + serviceName := "svc1" + namespaceName := "ns1" + + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80)}}, + }, + }) + + ipAddressType := discovery.AddressTypeIP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + }}, + AddressType: &ipAddressType, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": "node2"}, + }, { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": "node3"}, + }}, + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + assert.Equal(t, expectedIPTablesWithSlice, fp.iptablesData.String()) + + fp.OnEndpointSliceDelete(endpointSlice) + fp.syncProxyRules() + assert.Equal(t, expectedIPTablesWithoutSlice, fp.iptablesData.String()) +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces. diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index 39b2ce85406..741dbc07af7 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -26,12 +26,15 @@ go_test( "//pkg/util/ipvs:go_default_library", "//pkg/util/ipvs/testing:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", + "//vendor/k8s.io/utils/pointer:go_default_library", ], ) @@ -47,6 +50,7 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/proxy/ipvs", deps = [ + "//pkg/features:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/metrics:go_default_library", @@ -58,10 +62,12 @@ go_library( "//pkg/util/ipvs:go_default_library", "//pkg/util/sysctl:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 72ca19383ce..dae1d802763 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -31,12 +31,15 @@ import ( "time" v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/klog" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metrics" @@ -184,13 +187,14 @@ type Proxier struct { serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable - // endpointsSynced and servicesSynced are set to true when corresponding - // objects are synced after startup. This is used to avoid updating ipvs rules - // with some partial data after kube-proxy restart. - endpointsSynced bool - servicesSynced bool - initialized int32 - syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules + // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when + // corresponding objects are synced after startup. This is used to avoid updating + // ipvs rules with some partial data after kube-proxy restart. + endpointsSynced bool + endpointSlicesSynced bool + servicesSynced bool + initialized int32 + syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules // These are effectively const and do not need the mutex to be held. syncPeriod time.Duration @@ -406,12 +410,14 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps + endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) + proxier := &Proxier{ portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder, endpointSlicesEnabled), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, excludeCIDRs: parseExcludedCIDRs(excludeCIDRs), @@ -745,7 +751,7 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true - proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) + proxier.setInitialized(proxier.endpointsSynced || proxier.endpointSlicesSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. @@ -780,6 +786,42 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.syncProxyRules() } +// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object +// is observed. +func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) { + if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() { + proxier.Sync() + } +} + +// OnEndpointSliceUpdate is called whenever modification of an existing endpoint +// slice object is observed. +func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) { + if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() { + proxier.Sync() + } +} + +// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice +// object is observed. +func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) { + if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() { + proxier.Sync() + } +} + +// OnEndpointSlicesSynced is called once all the initial event handlers were +// called and the state is fully propagated to local cache. +func (proxier *Proxier) OnEndpointSlicesSynced() { + proxier.mu.Lock() + proxier.endpointSlicesSynced = true + proxier.setInitialized(proxier.servicesSynced && proxier.endpointSlicesSynced) + proxier.mu.Unlock() + + // Sync unconditionally - this is called once per lifetime. + proxier.syncProxyRules() +} + // EntryInvalidErr indicates if an ipset entry is invalid or not const EntryInvalidErr = "error adding entry %s to ipset %s" diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index a4ba2b2c600..7b5600c09f9 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -25,7 +25,9 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" @@ -42,6 +44,7 @@ import ( ipvstest "k8s.io/kubernetes/pkg/util/ipvs/testing" "k8s.io/utils/exec" fakeexec "k8s.io/utils/exec/testing" + utilpointer "k8s.io/utils/pointer" ) const testHostname = "test-hostname" @@ -112,7 +115,7 @@ func (fake *fakeIPSetVersioner) GetVersion() (string, error) { return fake.version, fake.err } -func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP, excludeCIDRs []*net.IPNet) *Proxier { +func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP, excludeCIDRs []*net.IPNet, endpointSlicesEnabled bool) *Proxier { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ func() ([]byte, error) { return []byte("dummy device have been created"), nil }, @@ -132,33 +135,34 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment) } return &Proxier{ - exec: fexec, - serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), - endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil), - excludeCIDRs: excludeCIDRs, - iptables: ipt, - ipvs: ipvs, - ipset: ipset, - clusterCIDR: "10.0.0.0/24", - strictARP: false, - hostname: testHostname, - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, - healthChecker: newFakeHealthChecker(), - ipvsScheduler: DefaultScheduler, - ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, - iptablesData: bytes.NewBuffer(nil), - filterChainsData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), - netlinkHandle: netlinktest.NewFakeNetlinkHandle(), - ipsetList: ipsetList, - nodePortAddresses: make([]string, 0), - networkInterfacer: proxyutiltest.NewFakeNetwork(), + exec: fexec, + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil, endpointSlicesEnabled), + excludeCIDRs: excludeCIDRs, + iptables: ipt, + ipvs: ipvs, + ipset: ipset, + clusterCIDR: "10.0.0.0/24", + strictARP: false, + hostname: testHostname, + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, + healthChecker: newFakeHealthChecker(), + ipvsScheduler: DefaultScheduler, + ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, + iptablesData: bytes.NewBuffer(nil), + filterChainsData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + filterChains: bytes.NewBuffer(nil), + filterRules: bytes.NewBuffer(nil), + netlinkHandle: netlinktest.NewFakeNetlinkHandle(), + ipsetList: ipsetList, + nodePortAddresses: make([]string, 0), + networkInterfacer: proxyutiltest.NewFakeNetwork(), + gracefuldeleteManager: NewGracefulTerminationManager(ipvs), } } @@ -215,7 +219,7 @@ func TestCleanupLeftovers(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -933,7 +937,7 @@ func TestNodePort(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, false) fp.nodePortAddresses = test.nodePortAddresses makeServiceMap(fp, test.services...) @@ -1100,7 +1104,7 @@ func TestClusterIP(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) makeServiceMap(fp, test.services...) makeEndpointsMap(fp, test.endpoints...) @@ -1120,7 +1124,7 @@ func TestExternalIPsNoEndpoint(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) svcIP := "10.20.30.41" svcPort := 80 svcExternalIPs := "50.60.70.81" @@ -1175,7 +1179,7 @@ func TestExternalIPs(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) svcIP := "10.20.30.41" svcPort := 80 svcExternalIPs := sets.NewString("50.60.70.81", "2012::51", "127.0.0.1") @@ -1693,7 +1697,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) services := []*v1.Service{ makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) { @@ -1803,7 +1807,7 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) makeServiceMap(fp, makeTestService("somewhere-else", "headless", func(svc *v1.Service) { @@ -1842,7 +1846,7 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) makeServiceMap(fp, makeTestService("somewhere-else", "external-name", func(svc *v1.Service) { @@ -1870,7 +1874,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP @@ -1954,7 +1958,7 @@ func TestSessionAffinity(t *testing.T) { ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) nodeIP := net.ParseIP("100.101.102.103") - fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}, nil, false) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -2817,7 +2821,7 @@ func Test_updateEndpointsMap(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) fp.hostname = nodeName // First check that after adding all previous versions of endpoints, @@ -3061,7 +3065,7 @@ func Test_syncService(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - proxier := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + proxier := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice) if testCases[i].oldVirtualServer != nil { @@ -3091,7 +3095,7 @@ func buildFakeProxier() (*iptablestest.FakeIPTables, *Proxier) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - return ipt, NewFakeProxier(ipt, ipvs, ipset, nil, nil) + return ipt, NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) } func hasJump(rules []iptablestest.Rule, destChain, ipSet string) bool { @@ -3168,7 +3172,7 @@ func TestCleanLegacyService(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"3.3.3.0/24", "4.4.4.0/24"})) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"3.3.3.0/24", "4.4.4.0/24"}), false) // All ipvs services that were processed in the latest sync loop. activeServices := map[string]bool{"ipvs0": true, "ipvs1": true} @@ -3274,7 +3278,7 @@ func TestCleanLegacyServiceWithRealServers(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) // all deleted expect ipvs2 activeServices := map[string]bool{"ipvs2": true} @@ -3368,7 +3372,7 @@ func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) { ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) gtm := NewGracefulTerminationManager(ipvs) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"4.4.4.4/32"})) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"4.4.4.4/32"}), false) fp.gracefuldeleteManager = gtm vs := &utilipvs.VirtualServer{ @@ -3422,7 +3426,7 @@ func TestCleanLegacyService6(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"3000::/64", "4000::/64"})) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"3000::/64", "4000::/64"}), false) fp.nodeIP = net.ParseIP("::1") // All ipvs services that were processed in the latest sync loop. @@ -3529,7 +3533,7 @@ func TestMultiPortServiceBindAddr(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false) service1 := makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP @@ -3626,3 +3630,89 @@ raid10 57344 0 - Live 0xffffffffc0597000`, }) } } + +// The majority of EndpointSlice specific tests are not ipvs specific and focus on +// the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the +// ipvs proxier supports translating EndpointSlices to ipvs output. +func TestEndpointSliceE2E(t *testing.T) { + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true) + fp.servicesSynced = true + fp.endpointsSynced = true + fp.endpointSlicesSynced = true + + // Add initial service + serviceName := "svc1" + namespaceName := "ns1" + + fp.OnServiceAdd(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80)}}, + }, + }) + + // Add initial endpoint slice + ipAddressType := discovery.AddressTypeIP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + }}, + AddressType: &ipAddressType, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.0.1.1"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": testHostname}, + }, { + Addresses: []string{"10.0.1.2"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": "node2"}, + }, { + Addresses: []string{"10.0.1.3"}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + Topology: map[string]string{"kubernetes.io/hostname": "node3"}, + }}, + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice update + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-LOOP-BACK") + assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod") + virtualServers1, vsErr1 := ipvs.GetVirtualServers() + assert.Nil(t, vsErr1, "Expected no error getting virtual servers") + assert.Len(t, virtualServers1, 1, "Expected 1 virtual server") + realServers1, rsErr1 := ipvs.GetRealServers(virtualServers1[0]) + assert.Nil(t, rsErr1, "Expected no error getting real servers") + assert.Len(t, realServers1, 3, "Expected 3 real servers") + assert.Equal(t, realServers1[0].String(), "10.0.1.1:80") + assert.Equal(t, realServers1[1].String(), "10.0.1.2:80") + assert.Equal(t, realServers1[2].String(), "10.0.1.3:80") + + fp.OnEndpointSliceDelete(endpointSlice) + fp.syncProxyRules() + + // Ensure that Proxier updates ipvs appropriately after EndpointSlice delete + assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"]) + activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries + assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK") + virtualServers2, vsErr2 := ipvs.GetVirtualServers() + assert.Nil(t, vsErr2, "Expected no error getting virtual servers") + assert.Len(t, virtualServers2, 1, "Expected 1 virtual server") + realServers2, rsErr2 := ipvs.GetRealServers(virtualServers2[0]) + assert.Nil(t, rsErr2, "Expected no error getting real servers") + assert.Len(t, realServers2, 0, "Expected 0 real servers") +} diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index f79b5c68029..4d9554d649a 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -396,7 +396,7 @@ func newFakeProxier() *FakeProxier { serviceMap: make(ServiceMap), serviceChanges: NewServiceChangeTracker(nil, nil, nil), endpointsMap: make(EndpointsMap), - endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil), + endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil, false), } } diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index 7279093c15b..c8ddf924c9c 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -20,7 +20,7 @@ import ( "fmt" "net" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/proxy/config" ) @@ -28,6 +28,7 @@ import ( // Provider is the interface provided by proxier implementations. type Provider interface { config.EndpointsHandler + config.EndpointSliceHandler config.ServiceHandler // Sync immediately synchronizes the Provider's current state to proxy rules. diff --git a/pkg/proxy/userspace/BUILD b/pkg/proxy/userspace/BUILD index 87e3da69e96..0d756475daf 100644 --- a/pkg/proxy/userspace/BUILD +++ b/pkg/proxy/userspace/BUILD @@ -83,6 +83,7 @@ go_test( "//pkg/proxy:go_default_library", "//pkg/util/iptables/testing:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 46d9e9c86cb..9e07f0f6073 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -26,7 +26,7 @@ import ( "sync/atomic" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -35,6 +35,7 @@ import ( "k8s.io/klog" "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/config" utilproxy "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/conntrack" @@ -109,6 +110,9 @@ type asyncRunnerInterface interface { // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { + // EndpointSlice support has not been added for this proxier yet. + config.NoopEndpointSliceHandler + loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap serviceMap map[proxy.ServicePortName]*ServiceInfo diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index e76400d114a..cb5b66c840b 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -31,6 +31,7 @@ import ( "time" "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" @@ -1137,6 +1138,14 @@ func TestOnServiceAddChangeMap(t *testing.T) { } } +func TestNoopEndpointSlice(t *testing.T) { + p := Proxier{} + p.OnEndpointSliceAdd(&discovery.EndpointSlice{}) + p.OnEndpointSliceUpdate(&discovery.EndpointSlice{}, &discovery.EndpointSlice{}) + p.OnEndpointSliceDelete(&discovery.EndpointSlice{}) + p.OnEndpointSlicesSynced() +} + func makeFakeExec() *fakeexec.FakeExec { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ diff --git a/pkg/proxy/winkernel/BUILD b/pkg/proxy/winkernel/BUILD index c91644282e7..9926904393d 100644 --- a/pkg/proxy/winkernel/BUILD +++ b/pkg/proxy/winkernel/BUILD @@ -18,6 +18,7 @@ go_library( "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/apis/config:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/util/async:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -61,6 +62,7 @@ go_test( "@io_bazel_rules_go//go/platform:windows": [ "//pkg/proxy:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/github.com/Microsoft/hcsshim/hcn:go_default_library", diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 22fde005c3e..5ef3daf2de7 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/apis/config" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/util/async" ) @@ -441,6 +442,9 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap, curServices proxySe // Proxier is an hns based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { + // EndpointSlice support has not been added for this proxier yet. + proxyconfig.NoopEndpointSliceHandler + // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since policies were synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index 3ed78bd54a6..c55618b9ed1 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -20,6 +20,7 @@ package winkernel import ( "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/proxy" @@ -331,6 +332,15 @@ func TestCreateLoadBalancer(t *testing.T) { t.Errorf("%v does not match %v", proxier.serviceMap[svcPortName].hnsID, guid) } } + +func TestNoopEndpointSlice(t *testing.T) { + p := Proxier{} + p.OnEndpointSliceAdd(&discovery.EndpointSlice{}) + p.OnEndpointSliceUpdate(&discovery.EndpointSlice{}, &discovery.EndpointSlice{}) + p.OnEndpointSliceDelete(&discovery.EndpointSlice{}) + p.OnEndpointSlicesSynced() +} + func makeNSN(namespace, name string) types.NamespacedName { return types.NamespacedName{Namespace: namespace, Name: name} } diff --git a/pkg/proxy/winuserspace/BUILD b/pkg/proxy/winuserspace/BUILD index b8ef1e38a62..f360cb67a40 100644 --- a/pkg/proxy/winuserspace/BUILD +++ b/pkg/proxy/winuserspace/BUILD @@ -45,6 +45,7 @@ go_test( "//pkg/proxy:go_default_library", "//pkg/util/netsh/testing:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 717a0bce553..e39b24f873b 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -27,12 +27,13 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/util/netsh" ) @@ -80,6 +81,9 @@ func logTimeout(err error) bool { // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { + // EndpointSlice support has not been added for this proxier yet. + config.NoopEndpointSliceHandler + loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap serviceMap map[ServicePortPortalName]*serviceInfo diff --git a/pkg/proxy/winuserspace/proxier_test.go b/pkg/proxy/winuserspace/proxier_test.go index eab4c56bb8c..cb1a7216237 100644 --- a/pkg/proxy/winuserspace/proxier_test.go +++ b/pkg/proxy/winuserspace/proxier_test.go @@ -30,6 +30,7 @@ import ( "time" "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" @@ -950,4 +951,12 @@ func TestProxyUpdatePortal(t *testing.T) { waitForNumProxyLoops(t, p, 1) } +func TestNoopEndpointSlice(t *testing.T) { + p := Proxier{} + p.OnEndpointSliceAdd(&discovery.EndpointSlice{}) + p.OnEndpointSliceUpdate(&discovery.EndpointSlice{}, &discovery.EndpointSlice{}) + p.OnEndpointSliceDelete(&discovery.EndpointSlice{}) + p.OnEndpointSlicesSynced() +} + // TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in