From c9cf3f5b722569a5825cdd0d94d48b17cfd09d6d Mon Sep 17 00:00:00 2001 From: Roc Chan Date: Tue, 16 Jul 2019 17:22:43 +0800 Subject: [PATCH] Service Topology implementation * Implement Service Topology for ipvs and iptables proxier * Add test files * API validation --- cmd/kube-proxy/app/server.go | 17 + pkg/apis/core/types.go | 12 +- pkg/apis/core/validation/validation.go | 33 ++ pkg/apis/core/validation/validation_test.go | 62 +++ pkg/kubemark/hollow_proxy.go | 1 + pkg/proxy/config/config.go | 125 +++++ pkg/proxy/endpoints.go | 13 +- pkg/proxy/endpoints_test.go | 37 +- pkg/proxy/endpointslicecache.go | 2 +- pkg/proxy/endpointslicecache_test.go | 6 +- pkg/proxy/iptables/proxier.go | 61 ++- pkg/proxy/ipvs/meta_proxier.go | 4 + pkg/proxy/ipvs/proxier.go | 62 ++- pkg/proxy/ipvs/proxier_test.go | 3 +- pkg/proxy/service.go | 7 + pkg/proxy/topology.go | 80 +++ pkg/proxy/topology_test.go | 478 ++++++++++++++++++ pkg/proxy/types.go | 5 + pkg/proxy/userspace/proxier.go | 2 + pkg/proxy/winkernel/proxier.go | 2 + pkg/proxy/winuserspace/proxier.go | 2 + pkg/registry/core/service/strategy.go | 15 +- .../authorizer/rbac/bootstrappolicy/policy.go | 2 +- .../testdata/cluster-roles.yaml | 2 + staging/src/k8s.io/api/core/v1/types.go | 16 +- 25 files changed, 1006 insertions(+), 43 deletions(-) create mode 100644 pkg/proxy/topology.go create mode 100644 pkg/proxy/topology_test.go diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 89e3ded7b1c..51131702a2c 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -35,6 +35,7 @@ import ( gerrors "github.com/pkg/errors" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -663,6 +664,7 @@ func (s *ProxyServer) Run() error { labelSelector := labels.NewSelector() labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) + // Make informers that filter out objects that want a non-default service proxy. informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labelSelector.String() @@ -690,6 +692,21 @@ func (s *ProxyServer) Run() error { // functions must configure their shared informer event handlers first. informerFactory.Start(wait.NeverStop) + if utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) { + // Make an informer that selects for our nodename. + currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String() + })) + nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod) + nodeConfig.RegisterEventHandler(s.Proxier) + go nodeConfig.Run(wait.NeverStop) + + // This has to start after the calls to NewNodeConfig because that must + // configure the shared informer event handler first. + currentNodeInformerFactory.Start(wait.NeverStop) + } + // Birth Cry after the birth is successful s.birthCry() diff --git a/pkg/apis/core/types.go b/pkg/apis/core/types.go index a453f0cabf8..8c9ed45114d 100644 --- a/pkg/apis/core/types.go +++ b/pkg/apis/core/types.go @@ -3391,6 +3391,8 @@ const ( IPv4Protocol IPFamily = "IPv4" // IPv6Protocol indicates that this IP is IPv6 protocol IPv6Protocol IPFamily = "IPv6" + // MaxServiceTopologyKeys is the largest number of topology keys allowed on a service + MaxServiceTopologyKeys = 16 ) // ServiceSpec describes the attributes that a user creates on a service @@ -3506,14 +3508,14 @@ type ServiceSpec struct { // topologyKeys is a preference-order list of topology keys which // implementations of services should use to preferentially sort endpoints - // when accessing this Service. Topology keys must be valid label keys and - // at most 16 keys may be specified. - // If any ready backends exist for index [0], they should always be chosen; - // only if no backends exist for index [0] should backends for index [1] be considered. + // when accessing this Service, it can not be used at the same time as + // externalTrafficPolicy=Local. + // Topology keys must be valid label keys and at most 16 keys may be specified. + // Endpoints are chosen based on the first topology key with available backends. // If this field is specified and all entries have no backends that match // the topology of the client, the service has no backends for that client // and connections should fail. - // The special value "" may be used to mean "any node". This catch-all + // The special value "*" may be used to mean "any topology". This catch-all // value, if used, only makes sense as the last value in the list. // If this is not specified or empty, no topology constraints will be applied. // +optional diff --git a/pkg/apis/core/validation/validation.go b/pkg/apis/core/validation/validation.go index da110341340..7cbe0aa4c27 100644 --- a/pkg/apis/core/validation/validation.go +++ b/pkg/apis/core/validation/validation.go @@ -4053,6 +4053,35 @@ func ValidateService(service *core.Service) field.ErrorList { ports[key] = true } + // Validate TopologyKeys + if len(service.Spec.TopologyKeys) > 0 { + topoPath := specPath.Child("topologyKeys") + // topologyKeys is mutually exclusive with 'externalTrafficPolicy=Local' + if service.Spec.ExternalTrafficPolicy == core.ServiceExternalTrafficPolicyTypeLocal { + allErrs = append(allErrs, field.Forbidden(topoPath, "may not be specified when `externalTrafficPolicy=Local`")) + } + if len(service.Spec.TopologyKeys) > core.MaxServiceTopologyKeys { + allErrs = append(allErrs, field.TooMany(topoPath, len(service.Spec.TopologyKeys), core.MaxServiceTopologyKeys)) + } + topoKeys := sets.NewString() + for i, key := range service.Spec.TopologyKeys { + keyPath := topoPath.Index(i) + if topoKeys.Has(key) { + allErrs = append(allErrs, field.Duplicate(keyPath, key)) + } + topoKeys.Insert(key) + // "Any" must be the last value specified + if key == v1.TopologyKeyAny && i != len(service.Spec.TopologyKeys)-1 { + allErrs = append(allErrs, field.Invalid(keyPath, key, `"*" must be the last value specified`)) + } + if key != v1.TopologyKeyAny { + for _, msg := range validation.IsQualifiedName(key) { + allErrs = append(allErrs, field.Invalid(keyPath, service.Spec.TopologyKeys, msg)) + } + } + } + } + // Validate SourceRange field and annotation _, ok := service.Annotations[core.AnnotationLoadBalancerSourceRangesKey] if len(service.Spec.LoadBalancerSourceRanges) > 0 || ok { @@ -4143,6 +4172,10 @@ func validateServiceExternalTrafficFieldsValue(service *core.Service) field.Erro allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("externalTrafficPolicy"), service.Spec.ExternalTrafficPolicy, fmt.Sprintf("ExternalTrafficPolicy must be empty, %v or %v", core.ServiceExternalTrafficPolicyTypeCluster, core.ServiceExternalTrafficPolicyTypeLocal))) } + // 'externalTrafficPolicy=Local' is mutually exclusive with topologyKeys + if service.Spec.ExternalTrafficPolicy == core.ServiceExternalTrafficPolicyTypeLocal && len(service.Spec.TopologyKeys) > 0 { + allErrs = append(allErrs, field.Forbidden(field.NewPath("spec").Child("externalTrafficPolicy"), "externalTrafficPolicy must not be set to 'Local' when topologyKeys is specified")) + } if service.Spec.HealthCheckNodePort < 0 { allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("healthCheckNodePort"), service.Spec.HealthCheckNodePort, "HealthCheckNodePort must be not less than 0")) diff --git a/pkg/apis/core/validation/validation_test.go b/pkg/apis/core/validation/validation_test.go index dd935f33800..2716436a8fd 100644 --- a/pkg/apis/core/validation/validation_test.go +++ b/pkg/apis/core/validation/validation_test.go @@ -18,6 +18,7 @@ package validation import ( "bytes" + "fmt" "math" "reflect" "strings" @@ -9380,6 +9381,7 @@ func TestValidatePodEphemeralContainersUpdate(t *testing.T) { func TestValidateService(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SCTPSupport, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTopology, true)() testCases := []struct { name string @@ -10058,6 +10060,66 @@ func TestValidateService(t *testing.T) { }, numErrs: 1, }, + { + name: "valid topology keys", + tweakSvc: func(s *core.Service) { + s.Spec.TopologyKeys = []string{ + "kubernetes.io/hostname", + "failure-domain.beta.kubernetes.io/zone", + "failure-domain.beta.kubernetes.io/region", + v1.TopologyKeyAny, + } + }, + numErrs: 0, + }, + { + name: "invalid topology key", + tweakSvc: func(s *core.Service) { + s.Spec.TopologyKeys = []string{"NoUppercaseOrSpecialCharsLike=Equals"} + }, + numErrs: 1, + }, + { + name: "too many topology keys", + tweakSvc: func(s *core.Service) { + for i := 0; i < core.MaxServiceTopologyKeys+1; i++ { + s.Spec.TopologyKeys = append(s.Spec.TopologyKeys, fmt.Sprintf("topologykey-%d", i)) + } + }, + numErrs: 1, + }, + { + name: `"Any" was not the last key`, + tweakSvc: func(s *core.Service) { + s.Spec.TopologyKeys = []string{ + "kubernetes.io/hostname", + v1.TopologyKeyAny, + "failure-domain.beta.kubernetes.io/zone", + } + }, + numErrs: 1, + }, + { + name: `duplicate topology key`, + tweakSvc: func(s *core.Service) { + s.Spec.TopologyKeys = []string{ + "kubernetes.io/hostname", + "kubernetes.io/hostname", + "failure-domain.beta.kubernetes.io/zone", + } + }, + numErrs: 1, + }, + { + name: `use topology keys with externalTrafficPolicy=Local`, + tweakSvc: func(s *core.Service) { + s.Spec.ExternalTrafficPolicy = "Local" + s.Spec.TopologyKeys = []string{ + "kubernetes.io/hostname", + } + }, + numErrs: 2, + }, } for _, tc := range testCases { diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 102edba85a9..448a86bb9d5 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -45,6 +45,7 @@ type HollowProxy struct { type FakeProxier struct { proxyconfig.NoopEndpointSliceHandler + proxyconfig.NoopNodeHandler } func (*FakeProxier) Sync() {} diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 3010821f85a..387ca05e971 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -369,3 +369,128 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) { c.eventHandlers[i].OnServiceDelete(service) } } + +// NodeHandler is an abstract interface of objects which receive +// notifications about node object changes. +type NodeHandler interface { + // OnNodeAdd is called whenever creation of new node object + // is observed. + OnNodeAdd(node *v1.Node) + // OnNodeUpdate is called whenever modification of an existing + // node object is observed. + OnNodeUpdate(oldNode, node *v1.Node) + // OnNodeDelete is called whever deletion of an existing node + // object is observed. + OnNodeDelete(node *v1.Node) + // OnNodeSynced is called once all the initial event handlers were + // called and the state is fully propagated to local cache. + OnNodeSynced() +} + +// NoopNodeHandler is a noop handler for proxiers that have not yet +// implemented a full NodeHandler. +type NoopNodeHandler struct{} + +// OnNodeAdd is a noop handler for Node creates. +func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {} + +// OnNodeUpdate is a noop handler for Node updates. +func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {} + +// OnNodeDelete is a noop handler for Node deletes. +func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {} + +// OnNodeSynced is a noop handler for Node syncs. +func (*NoopNodeHandler) OnNodeSynced() {} + +// NodeConfig tracks a set of node configurations. +// It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change. +type NodeConfig struct { + listerSynced cache.InformerSynced + eventHandlers []NodeHandler +} + +// NewNodeConfig creates a new NodeConfig. +func NewNodeConfig(nodeInformer coreinformers.NodeInformer, resyncPeriod time.Duration) *NodeConfig { + result := &NodeConfig{ + listerSynced: nodeInformer.Informer().HasSynced, + } + + nodeInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: result.handleAddNode, + UpdateFunc: result.handleUpdateNode, + DeleteFunc: result.handleDeleteNode, + }, + resyncPeriod, + ) + + return result +} + +// RegisterEventHandler registers a handler which is called on every node change. +func (c *NodeConfig) RegisterEventHandler(handler NodeHandler) { + c.eventHandlers = append(c.eventHandlers, handler) +} + +// Run starts the goroutine responsible for calling registered handlers. +func (c *NodeConfig) Run(stopCh <-chan struct{}) { + klog.Info("Starting node config controller") + + if !cache.WaitForNamedCacheSync("node config", stopCh, c.listerSynced) { + return + } + + for i := range c.eventHandlers { + klog.V(3).Infof("Calling handler.OnNodeSynced()") + c.eventHandlers[i].OnNodeSynced() + } +} + +func (c *NodeConfig) handleAddNode(obj interface{}) { + node, ok := obj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + for i := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnNodeAdd") + c.eventHandlers[i].OnNodeAdd(node) + } +} + +func (c *NodeConfig) handleUpdateNode(oldObj, newObj interface{}) { + oldNode, ok := oldObj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj)) + return + } + node, ok := newObj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) + return + } + for i := range c.eventHandlers { + klog.V(5).Infof("Calling handler.OnNodeUpdate") + c.eventHandlers[i].OnNodeUpdate(oldNode, node) + } +} + +func (c *NodeConfig) handleDeleteNode(obj interface{}) { + node, ok := obj.(*v1.Node) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + if node, ok = tombstone.Obj.(*v1.Node); !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + } + for i := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnNodeDelete") + c.eventHandlers[i].OnNodeDelete(node) + } +} diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 52ae583f941..35c2fc58893 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -48,7 +48,8 @@ var supportedEndpointSliceAddressTypes = sets.NewString( type BaseEndpointInfo struct { Endpoint string // TODO: should be an endpointString type // IsLocal indicates whether the endpoint is running in same host as kube-proxy. - IsLocal bool + IsLocal bool + Topology map[string]string } var _ Endpoint = &BaseEndpointInfo{} @@ -63,6 +64,11 @@ func (info *BaseEndpointInfo) GetIsLocal() bool { return info.IsLocal } +// GetTopology returns the topology information of the endpoint. +func (info *BaseEndpointInfo) GetTopology() map[string]string { + return info.Topology +} + // IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. func (info *BaseEndpointInfo) IP() string { return utilproxy.IPPart(info.Endpoint) @@ -78,10 +84,11 @@ func (info *BaseEndpointInfo) Equal(other Endpoint) bool { return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal() } -func newBaseEndpointInfo(IP string, port int, isLocal bool) *BaseEndpointInfo { +func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]string) *BaseEndpointInfo { return &BaseEndpointInfo{ Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), IsLocal: isLocal, + Topology: topology, } } @@ -358,7 +365,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint continue } isLocal := addr.NodeName != nil && *addr.NodeName == ect.hostname - baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal) + baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal, nil) if ect.makeEndpointInfo != nil { endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo)) } else { diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 8919d92f335..d6abcc1ade9 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -400,7 +400,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) { } else { for i := range newEndpoints[x] { ep := newEndpoints[x][i].(*BaseEndpointInfo) - if *ep != *(tc.expected[x][i]) { + if !(reflect.DeepEqual(*ep, *(tc.expected[x][i]))) { t.Errorf("[%s] expected new[%v][%d] to be %v, got %v", tc.desc, x, i, tc.expected[x][i], *ep) } } @@ -1699,21 +1699,21 @@ func TestCheckoutChanges(t *testing.T) { endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{ - svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, - svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")}, + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")}, + svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", ""), newTestEp("10.0.1.2:443", "")}, }, current: EndpointsMap{ - svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")}, }, }}, items: map[types.NamespacedName]*endpointsChange{ {Namespace: "ns1", Name: "svc1"}: { previous: EndpointsMap{ - svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, - svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")}, + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")}, + svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", ""), newTestEp("10.0.1.2:443", "")}, }, current: EndpointsMap{ - svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")}, }, }, }, @@ -1724,7 +1724,7 @@ func TestCheckoutChanges(t *testing.T) { expectedChanges: []*endpointsChange{{ previous: EndpointsMap{}, current: EndpointsMap{ - svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")}, }, }}, useEndpointSlices: true, @@ -1737,11 +1737,11 @@ func TestCheckoutChanges(t *testing.T) { endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{ - svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, - svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")}, + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")}, + svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", "host1"), newTestEp("10.0.1.2:443", "host1")}, }, current: EndpointsMap{ - svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")}, }, }}, useEndpointSlices: true, @@ -1796,6 +1796,9 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser if len(newMap) != len(expected) { t.Errorf("expected %d results, got %d: %v", len(expected), len(newMap), newMap) } + endpointEqual := func(a, b *BaseEndpointInfo) bool { + return a.Endpoint == b.Endpoint && a.IsLocal == b.IsLocal + } for x := range expected { if len(newMap[x]) != len(expected[x]) { t.Errorf("expected %d endpoints for %v, got %d", len(expected[x]), x, len(newMap[x])) @@ -1807,7 +1810,7 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser t.Errorf("Failed to cast endpointsInfo") continue } - if *newEp != *(expected[x][i]) { + if !endpointEqual(newEp, expected[x][i]) { t.Errorf("expected new[%v][%d] to be %v, got %v (IsLocal expected %v, got %v)", x, i, expected[x][i], newEp, expected[x][i].IsLocal, newEp.IsLocal) } } @@ -1815,8 +1818,14 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser } } -func newTestEp(ep string) *BaseEndpointInfo { - return &BaseEndpointInfo{Endpoint: ep} +func newTestEp(ep, host string) *BaseEndpointInfo { + endpointInfo := &BaseEndpointInfo{Endpoint: ep} + if host != "" { + endpointInfo.Topology = map[string]string{ + "kubernetes.io/hostname": host, + } + } + return endpointInfo } func initializeCache(endpointSliceCache *EndpointSliceCache, endpointSlices []*discovery.EndpointSlice) { diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 85882bf5c35..e9c6165353b 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -254,7 +254,7 @@ func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName } isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname]) - endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal) + endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology) // This logic ensures we're deduping potential overlapping endpoints // isLocal should not vary between matching IPs, but if it does, we diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index 244348413e0..6651fa8daf0 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -176,9 +176,9 @@ func TestEndpointInfoByServicePort(t *testing.T) { }, expectedMap: spToEndpointMap{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { - "10.0.1.1": &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false}, - "10.0.1.2": &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true}, - "10.0.1.3": &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false}, + "10.0.1.1": &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false, Topology: map[string]string{"kubernetes.io/hostname": "host2"}}, + "10.0.1.2": &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true, Topology: map[string]string{"kubernetes.io/hostname": "host1"}}, + "10.0.1.3": &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false, Topology: map[string]string{"kubernetes.io/hostname": "host2"}}, }, }, }, diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 9f17033e588..3f6c5f4b9d5 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -181,6 +181,7 @@ type Proxier struct { serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable + nodeLabels map[string]string // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true // when corresponding objects are synced after startup. This is used to avoid // updating iptables with some partial data after kube-proxy restart. @@ -591,6 +592,47 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.syncProxyRules() } +// OnNodeAdd is called whenever creation of new node object +// is observed. +func (proxier *Proxier) OnNodeAdd(node *v1.Node) { + if node.Name != proxier.hostname { + klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + return + } + proxier.mu.Lock() + proxier.nodeLabels = node.Labels + proxier.mu.Unlock() +} + +// OnNodeUpdate is called whenever modification of an existing +// node object is observed. +func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { + if node.Name != proxier.hostname { + klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + return + } + proxier.mu.Lock() + proxier.nodeLabels = node.Labels + proxier.mu.Unlock() +} + +// OnNodeDelete is called whever deletion of an existing node +// object is observed. +func (proxier *Proxier) OnNodeDelete(node *v1.Node) { + if node.Name != proxier.hostname { + klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + return + } + proxier.mu.Lock() + proxier.nodeLabels = nil + proxier.mu.Unlock() +} + +// OnNodeSynced is called once all the initial event handlers were +// called and the state is fully propagated to local cache. +func (proxier *Proxier) OnNodeSynced() { +} + // portProtoHash takes the ServicePortName and protocol for a service // returns the associated 16 character hash. This is computed by hashing (sha256) // then encoding to base32 and truncating to 16 chars. We do this because IPTables @@ -858,7 +900,20 @@ func (proxier *Proxier) syncProxyRules() { isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) protocol := strings.ToLower(string(svcInfo.Protocol())) svcNameString := svcInfo.serviceNameString - hasEndpoints := len(proxier.endpointsMap[svcName]) > 0 + + allEndpoints := proxier.endpointsMap[svcName] + + hasEndpoints := len(allEndpoints) > 0 + + // Service Topology will not be enabled in the following cases: + // 1. externalTrafficPolicy=Local (mutually exclusive with service topology). + // 2. ServiceTopology is not enabled. + // 3. EndpointSlice is not enabled (service topology depends on endpoint slice + // to get topology information). + if !svcInfo.OnlyNodeLocalEndpoints() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) { + allEndpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, svcInfo.TopologyKeys(), allEndpoints) + hasEndpoints = len(allEndpoints) > 0 + } svcChain := svcInfo.servicePortChainName if hasEndpoints { @@ -1168,12 +1223,13 @@ func (proxier *Proxier) syncProxyRules() { endpoints = endpoints[:0] endpointChains = endpointChains[:0] var endpointChain utiliptables.Chain - for _, ep := range proxier.endpointsMap[svcName] { + for _, ep := range allEndpoints { epInfo, ok := ep.(*endpointsInfo) if !ok { klog.Errorf("Failed to cast endpointsInfo %q", ep.String()) continue } + endpoints = append(endpoints, epInfo) endpointChain = epInfo.endpointChain(svcNameString, protocol) endpointChains = append(endpointChains, endpointChain) @@ -1220,6 +1276,7 @@ func (proxier *Proxier) syncProxyRules() { // Error parsing this endpoint has been logged. Skip to next endpoint. continue } + // Balancing rules in the per-service chain. args = append(args[:0], "-A", string(svcChain)) proxier.appendServiceCommentLocked(args, svcNameString) diff --git a/pkg/proxy/ipvs/meta_proxier.go b/pkg/proxy/ipvs/meta_proxier.go index bef79df5984..062ac3feee5 100644 --- a/pkg/proxy/ipvs/meta_proxier.go +++ b/pkg/proxy/ipvs/meta_proxier.go @@ -22,6 +22,8 @@ import ( "k8s.io/api/core/v1" "k8s.io/klog" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/config" + utilnet "k8s.io/utils/net" discovery "k8s.io/api/discovery/v1beta1" @@ -30,6 +32,8 @@ import ( type metaProxier struct { ipv4Proxier proxy.Provider ipv6Proxier proxy.Provider + // TODO(imroc): implement node handler for meta proxier. + config.NoopNodeHandler } // NewMetaProxier returns a dual-stack "meta-proxier". Proxier API diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 7549c7e2665..8f71619b4b2 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -30,6 +30,10 @@ import ( "sync/atomic" "time" + "k8s.io/klog" + utilexec "k8s.io/utils/exec" + utilnet "k8s.io/utils/net" + v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/types" @@ -38,7 +42,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" - "k8s.io/klog" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" @@ -50,8 +53,6 @@ import ( utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" - utilexec "k8s.io/utils/exec" - utilnet "k8s.io/utils/net" ) const ( @@ -200,6 +201,7 @@ type Proxier struct { serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable + nodeLabels map[string]string // endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when // corresponding objects are synced after startup. This is used to avoid updating // ipvs rules with some partial data after kube-proxy restart. @@ -896,6 +898,47 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.syncProxyRules() } +// OnNodeAdd is called whenever creation of new node object +// is observed. +func (proxier *Proxier) OnNodeAdd(node *v1.Node) { + if node.Name != proxier.hostname { + klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + return + } + proxier.mu.Lock() + proxier.nodeLabels = node.Labels + proxier.mu.Unlock() +} + +// OnNodeUpdate is called whenever modification of an existing +// node object is observed. +func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { + if node.Name != proxier.hostname { + klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + return + } + proxier.mu.Lock() + proxier.nodeLabels = node.Labels + proxier.mu.Unlock() +} + +// OnNodeDelete is called whever deletion of an existing node +// object is observed. +func (proxier *Proxier) OnNodeDelete(node *v1.Node) { + if node.Name != proxier.hostname { + klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + return + } + proxier.mu.Lock() + proxier.nodeLabels = nil + proxier.mu.Unlock() +} + +// OnNodeSynced is called once all the initial event handlers were +// called and the state is fully propagated to local cache. +func (proxier *Proxier) OnNodeSynced() { +} + // EntryInvalidErr indicates if an ipset entry is invalid or not const EntryInvalidErr = "error adding entry %s to ipset %s" @@ -1866,7 +1909,18 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode curEndpoints.Insert(des.String()) } - for _, epInfo := range proxier.endpointsMap[svcPortName] { + endpoints := proxier.endpointsMap[svcPortName] + + // Service Topology will not be enabled in the following cases: + // 1. externalTrafficPolicy=Local (mutually exclusive with service topology). + // 2. ServiceTopology is not enabled. + // 3. EndpointSlice is not enabled (service topology depends on endpoint slice + // to get topology information). + if !onlyNodeLocalEndpoints && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) { + endpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, proxier.serviceMap[svcPortName].TopologyKeys(), endpoints) + } + + for _, epInfo := range endpoints { if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() { continue } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 1332a464c3b..ab49b8f64b3 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -2951,7 +2951,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expe t.Errorf("Failed to cast proxy.BaseEndpointInfo") continue } - if *newEp != *(expected[x][i]) { + if !reflect.DeepEqual(*newEp, *(expected[x][i])) { t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp) } } @@ -3702,7 +3702,6 @@ func TestEndpointSliceE2E(t *testing.T) { // Add initial service serviceName := "svc1" namespaceName := "ns1" - fp.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, Spec: v1.ServiceSpec{ diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 707947634c5..5a18d2c7941 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -51,6 +51,7 @@ type BaseServiceInfo struct { loadBalancerSourceRanges []string healthCheckNodePort int onlyNodeLocalEndpoints bool + topologyKeys []string } var _ ServicePort = &BaseServiceInfo{} @@ -119,6 +120,11 @@ func (info *BaseServiceInfo) OnlyNodeLocalEndpoints() bool { return info.onlyNodeLocalEndpoints } +// TopologyKeys is part of ServicePort interface. +func (info *BaseServiceInfo) TopologyKeys() []string { + return info.topologyKeys +} + func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { onlyNodeLocalEndpoints := false if apiservice.RequestsOnlyLocalTraffic(service) { @@ -139,6 +145,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic sessionAffinityType: service.Spec.SessionAffinity, stickyMaxAgeSeconds: stickyMaxAgeSeconds, onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, + topologyKeys: service.Spec.TopologyKeys, } if sct.isIPv6Mode == nil { diff --git a/pkg/proxy/topology.go b/pkg/proxy/topology.go new file mode 100644 index 00000000000..fda3348e410 --- /dev/null +++ b/pkg/proxy/topology.go @@ -0,0 +1,80 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + v1 "k8s.io/api/core/v1" +) + +// FilterTopologyEndpoint returns the appropriate endpoints based on the cluster +// topology. +// This uses the current node's labels, which contain topology information, and +// the required topologyKeys to find appropriate endpoints. If both the endpoint's +// topology and the current node have matching values for topologyKeys[0], the +// endpoint will be chosen. If no endpoints are chosen, toplogyKeys[1] will be +// considered, and so on. If either the node or the endpoint do not have values +// for a key, it is considered to not match. +// +// If topologyKeys is specified, but no endpoints are chosen for any key, the +// the service has no viable endpoints for clients on this node, and connections +// should fail. +// +// The special key "*" may be used as the last entry in topologyKeys to indicate +// "any endpoint" is acceptable. +// +// If topologyKeys is not specified or empty, no topology constraints will be +// applied and this will return all endpoints. +func FilterTopologyEndpoint(nodeLabels map[string]string, topologyKeys []string, endpoints []Endpoint) []Endpoint { + // Do not filter endpoints if service has no topology keys. + if len(topologyKeys) == 0 { + return endpoints + } + + filteredEndpoint := []Endpoint{} + + if len(nodeLabels) == 0 { + if topologyKeys[len(topologyKeys)-1] == v1.TopologyKeyAny { + // edge case: include all endpoints if topology key "Any" specified + // when we cannot determine current node's topology. + return endpoints + } + // edge case: do not include any endpoints if topology key "Any" is + // not specified when we cannot determine current node's topology. + return filteredEndpoint + } + + for _, key := range topologyKeys { + if key == v1.TopologyKeyAny { + return endpoints + } + topologyValue, found := nodeLabels[key] + if !found { + continue + } + + for _, ep := range endpoints { + topology := ep.GetTopology() + if value, found := topology[key]; found && value == topologyValue { + filteredEndpoint = append(filteredEndpoint, ep) + } + } + if len(filteredEndpoint) > 0 { + return filteredEndpoint + } + } + return filteredEndpoint +} diff --git a/pkg/proxy/topology_test.go b/pkg/proxy/topology_test.go new file mode 100644 index 00000000000..c874e0217c1 --- /dev/null +++ b/pkg/proxy/topology_test.go @@ -0,0 +1,478 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "reflect" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +func TestFilterTopologyEndpoint(t *testing.T) { + type endpoint struct { + Endpoint string + NodeName types.NodeName + } + testCases := []struct { + Name string + nodeLabels map[types.NodeName]map[string]string + endpoints []endpoint + currentNodeName types.NodeName + topologyKeys []string + expected []endpoint + }{ + { + // Case[0]: no topology key and endpoints at all = 0 endpoints + Name: "no topology key and endpoints", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "10.0.0.1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }}, + endpoints: []endpoint{}, + currentNodeName: "testNode1", + topologyKeys: nil, + expected: []endpoint{}, + }, + { + // Case[1]: no topology key, 2 nodes each with 2 endpoints = 4 + // endpoints + Name: "no topology key but have endpoints", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "testNode1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode2": { + "kubernetes.io/hostname": "testNode2", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + }, + endpoints: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + }, + currentNodeName: "testNode1", + topologyKeys: nil, + expected: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + }, + }, + { + // Case[2]: 1 topology key (hostname), 2 nodes each with 2 endpoints + // 1 match = 2 endpoints + Name: "one topology key with one node matched", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "testNode1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode2": { + "kubernetes.io/hostname": "testNode2", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + }, + endpoints: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + }, + currentNodeName: "testNode1", + topologyKeys: []string{"kubernetes.io/hostname"}, + expected: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + }, + }, + { + // Case[3]: 1 topology key (hostname), 2 nodes each with 2 endpoints + // no match = 0 endpoints + Name: "one topology key without node matched", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "testNode1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode2": { + "kubernetes.io/hostname": "testNode2", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode3": { + "kubernetes.io/hostname": "testNode3", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + }, + endpoints: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + }, + currentNodeName: "testNode3", + topologyKeys: []string{"kubernetes.io/hostname"}, + expected: []endpoint{}, + }, + { + // Case[4]: 1 topology key (zone), 2 nodes in zone a, 2 nodes in + // zone b, each with 2 endpoints = 4 endpoints + Name: "one topology key with multiple nodes matched", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "testNode1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode2": { + "kubernetes.io/hostname": "testNode2", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode3": { + "kubernetes.io/hostname": "testNode3", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + "testNode4": { + "kubernetes.io/hostname": "testNode4", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + }, + endpoints: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + {Endpoint: "1.1.3.1:11", NodeName: "testNode3"}, + {Endpoint: "1.1.3.2:11", NodeName: "testNode3"}, + {Endpoint: "1.1.4.1:11", NodeName: "testNode4"}, + {Endpoint: "1.1.4.2:11", NodeName: "testNode4"}, + }, + currentNodeName: "testNode2", + topologyKeys: []string{"topology.kubernetes.io/zone"}, + expected: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + }, + }, + { + // Case[5]: 2 topology keys (hostname, zone), 2 nodes each with 2 + // endpoints, 1 hostname match = 2 endpoints (2nd key ignored) + Name: "early match in multiple topology keys", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "testNode1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode2": { + "kubernetes.io/hostname": "testNode2", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode3": { + "kubernetes.io/hostname": "testNode3", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + "testNode4": { + "kubernetes.io/hostname": "testNode4", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + }, + endpoints: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + {Endpoint: "1.1.3.1:11", NodeName: "testNode3"}, + {Endpoint: "1.1.3.2:11", NodeName: "testNode3"}, + {Endpoint: "1.1.4.1:11", NodeName: "testNode4"}, + {Endpoint: "1.1.4.2:11", NodeName: "testNode4"}, + }, + currentNodeName: "testNode2", + topologyKeys: []string{"kubernetes.io/hostname"}, + expected: []endpoint{ + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + }, + }, + { + // Case[6]: 2 topology keys (hostname, zone), 2 nodes in zone a, 2 + // nodes in zone b, each with 2 endpoints, no hostname match, 1 zone + // match = 4 endpoints + Name: "later match in multiple topology keys", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "testNode1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode2": { + "kubernetes.io/hostname": "testNode2", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode3": { + "kubernetes.io/hostname": "testNode3", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + "testNode4": { + "kubernetes.io/hostname": "testNode4", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + "testNode5": { + "kubernetes.io/hostname": "testNode5", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + }, + endpoints: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + {Endpoint: "1.1.3.1:11", NodeName: "testNode3"}, + {Endpoint: "1.1.3.2:11", NodeName: "testNode3"}, + {Endpoint: "1.1.4.1:11", NodeName: "testNode4"}, + {Endpoint: "1.1.4.2:11", NodeName: "testNode4"}, + }, + currentNodeName: "testNode5", + topologyKeys: []string{"kubernetes.io/hostname", "topology.kubernetes.io/zone"}, + expected: []endpoint{ + {Endpoint: "1.1.3.1:11", NodeName: "testNode3"}, + {Endpoint: "1.1.3.2:11", NodeName: "testNode3"}, + {Endpoint: "1.1.4.1:11", NodeName: "testNode4"}, + {Endpoint: "1.1.4.2:11", NodeName: "testNode4"}, + }, + }, + { + // Case[7]: 2 topology keys (hostname, zone), 2 nodes in zone a, 2 + // nodes in zone b, each with 2 endpoints, no hostname match, no zone + // match = 0 endpoints + Name: "multiple topology keys without node matched", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "testNode1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode2": { + "kubernetes.io/hostname": "testNode2", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode3": { + "kubernetes.io/hostname": "testNode3", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + "testNode4": { + "kubernetes.io/hostname": "testNode4", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + "testNode5": { + "kubernetes.io/hostname": "testNode5", + "topology.kubernetes.io/zone": "90003", + "topology.kubernetes.io/region": "cd", + }, + }, + endpoints: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + {Endpoint: "1.1.3.1:11", NodeName: "testNode3"}, + {Endpoint: "1.1.3.2:11", NodeName: "testNode3"}, + {Endpoint: "1.1.4.1:11", NodeName: "testNode4"}, + {Endpoint: "1.1.4.2:11", NodeName: "testNode4"}, + }, + currentNodeName: "testNode5", + topologyKeys: []string{"kubernetes.io/hostname", "topology.kubernetes.io/zone"}, + expected: []endpoint{}, + }, + { + // Case[8]: 2 topology keys (hostname, "*"), 2 nodes each with 2 + // endpoints, 1 match hostname = 2 endpoints + Name: "multiple topology keys matched node when 'Any' key ignored", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "testNode1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode2": { + "kubernetes.io/hostname": "testNode2", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + }, + endpoints: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + }, + currentNodeName: "testNode1", + topologyKeys: []string{"kubernetes.io/hostname", v1.TopologyKeyAny}, + expected: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + }, + }, + { + // Case[9]: 2 topology keys (hostname, "*"), 2 nodes each with 2 + // endpoints, no hostname match, catch-all ("*") matched with 4 + // endpoints + Name: "two topology keys matched node with 'Any' key", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "testNode1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode2": { + "kubernetes.io/hostname": "testNode2", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode3": { + "kubernetes.io/hostname": "testNode3", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + }, + endpoints: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + }, + currentNodeName: "testNode3", + topologyKeys: []string{"kubernetes.io/hostname", v1.TopologyKeyAny}, + expected: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + }, + }, + { + // Case[10]: 3 topology keys (hostname, zone, "*"), 2 nodes in zone a, + // 2 nodes in zone b, each with 2 endpoints, no hostname match, no + // zone, catch-all ("*") matched with 8 endpoints + Name: "multiple topology keys matched node with 'Any' key", + nodeLabels: map[types.NodeName]map[string]string{ + "testNode1": { + "kubernetes.io/hostname": "testNode1", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode2": { + "kubernetes.io/hostname": "testNode2", + "topology.kubernetes.io/zone": "90001", + "topology.kubernetes.io/region": "cd", + }, + "testNode3": { + "kubernetes.io/hostname": "testNode3", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + "testNode4": { + "kubernetes.io/hostname": "testNode4", + "topology.kubernetes.io/zone": "90002", + "topology.kubernetes.io/region": "cd", + }, + "testNode5": { + "kubernetes.io/hostname": "testNode5", + "topology.kubernetes.io/zone": "90003", + "topology.kubernetes.io/region": "cd", + }, + }, + endpoints: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + {Endpoint: "1.1.3.1:11", NodeName: "testNode3"}, + {Endpoint: "1.1.3.2:11", NodeName: "testNode3"}, + {Endpoint: "1.1.4.1:11", NodeName: "testNode4"}, + {Endpoint: "1.1.4.2:11", NodeName: "testNode4"}, + }, + currentNodeName: "testNode5", + topologyKeys: []string{"kubernetes.io/hostname", "topology.kubernetes.io/zone", v1.TopologyKeyAny}, + expected: []endpoint{ + {Endpoint: "1.1.1.1:11", NodeName: "testNode1"}, + {Endpoint: "1.1.1.2:11", NodeName: "testNode1"}, + {Endpoint: "1.1.2.1:11", NodeName: "testNode2"}, + {Endpoint: "1.1.2.2:11", NodeName: "testNode2"}, + {Endpoint: "1.1.3.1:11", NodeName: "testNode3"}, + {Endpoint: "1.1.3.2:11", NodeName: "testNode3"}, + {Endpoint: "1.1.4.1:11", NodeName: "testNode4"}, + {Endpoint: "1.1.4.2:11", NodeName: "testNode4"}, + }, + }, + } + endpointsToStringArray := func(endpoints []endpoint) []string { + result := make([]string, 0, len(endpoints)) + for _, ep := range endpoints { + result = append(result, ep.Endpoint) + } + return result + } + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + m := make(map[Endpoint]endpoint) + endpoints := []Endpoint{} + for _, ep := range tc.endpoints { + var e Endpoint = &BaseEndpointInfo{Endpoint: ep.Endpoint, Topology: tc.nodeLabels[ep.NodeName]} + m[e] = ep + endpoints = append(endpoints, e) + } + currentNodeLabels := tc.nodeLabels[tc.currentNodeName] + filteredEndpoint := []endpoint{} + for _, ep := range FilterTopologyEndpoint(currentNodeLabels, tc.topologyKeys, endpoints) { + filteredEndpoint = append(filteredEndpoint, m[ep]) + } + if !reflect.DeepEqual(filteredEndpoint, tc.expected) { + t.Errorf("expected %v, got %v", endpointsToStringArray(tc.expected), endpointsToStringArray(filteredEndpoint)) + } + }) + } +} diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index 7055446e075..fae2c549ce3 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -30,6 +30,7 @@ type Provider interface { config.EndpointsHandler config.EndpointSliceHandler config.ServiceHandler + config.NodeHandler // Sync immediately synchronizes the Provider's current state to proxy rules. Sync() @@ -77,6 +78,8 @@ type ServicePort interface { NodePort() int // GetOnlyNodeLocalEndpoints returns if a service has only node local endpoints OnlyNodeLocalEndpoints() bool + // TopologyKeys returns service TopologyKeys as a string array. + TopologyKeys() []string } // Endpoint in an interface which abstracts information about an endpoint. @@ -87,6 +90,8 @@ type Endpoint interface { String() string // GetIsLocal returns true if the endpoint is running in same host as kube-proxy, otherwise returns false. GetIsLocal() bool + // GetTopology returns the topology information of the endpoint. + GetTopology() map[string]string // IP returns IP part of the endpoint. IP() string // Port returns the Port part of the endpoint. diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 9afa4c0adc7..7a34529d6ff 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -112,6 +112,8 @@ type asyncRunnerInterface interface { type Proxier struct { // EndpointSlice support has not been added for this proxier yet. config.NoopEndpointSliceHandler + // TODO(imroc): implement node handler for userspace proxier. + config.NoopNodeHandler loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index eb481a81e6b..b5a745acb89 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -444,6 +444,8 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap, curServices proxySe type Proxier struct { // EndpointSlice support has not been added for this proxier yet. proxyconfig.NoopEndpointSliceHandler + // TODO(imroc): implement node handler for winkernel proxier. + proxyconfig.NoopNodeHandler // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since policies were synced. For a single object, diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 3e2dbd4036f..544a39986b9 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -83,6 +83,8 @@ func logTimeout(err error) bool { type Proxier struct { // EndpointSlice support has not been added for this proxier yet. config.NoopEndpointSliceHandler + // TODO(imroc): implement node handler for winuserspace proxier. + config.NoopNodeHandler loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap diff --git a/pkg/registry/core/service/strategy.go b/pkg/registry/core/service/strategy.go index 7327a4da21b..cca5e12ddc4 100644 --- a/pkg/registry/core/service/strategy.go +++ b/pkg/registry/core/service/strategy.go @@ -23,11 +23,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apiserver/pkg/storage/names" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/validation" - - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" ) @@ -121,6 +120,10 @@ func dropServiceDisabledFields(newSvc *api.Service, oldSvc *api.Service) { if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && !serviceIPFamilyInUse(oldSvc) { newSvc.Spec.IPFamily = nil } + // Drop TopologyKeys if ServiceTopology is not enabled + if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && !topologyKeysInUse(oldSvc) { + newSvc.Spec.TopologyKeys = nil + } } // returns true if svc.Spec.ServiceIPFamily field is in use @@ -134,6 +137,14 @@ func serviceIPFamilyInUse(svc *api.Service) bool { return false } +// returns true if svc.Spec.TopologyKeys field is in use +func topologyKeysInUse(svc *api.Service) bool { + if svc == nil { + return false + } + return len(svc.Spec.TopologyKeys) > 0 +} + type serviceStatusStrategy struct { svcStrategy } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 10ba64be035..7e17f31dd5e 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -461,7 +461,7 @@ func ClusterRoles() []rbacv1.ClusterRole { // node-proxier role is used by kube-proxy. nodeProxierRules := []rbacv1.PolicyRule{ rbacv1helpers.NewRule("list", "watch").Groups(legacyGroup).Resources("services", "endpoints").RuleOrDie(), - rbacv1helpers.NewRule("get").Groups(legacyGroup).Resources("nodes").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), eventsRule(), } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index f2902321090..ad6468a53d2 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -1040,6 +1040,8 @@ items: - nodes verbs: - get + - list + - watch - apiGroups: - "" - events.k8s.io diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index ee12e3aa231..86543eacc51 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -30,6 +30,8 @@ const ( NamespaceAll string = "" // NamespaceNodeLease is the namespace where we place node lease objects (used for node heartbeats) NamespaceNodeLease string = "kube-node-lease" + // TopologyKeyAny is the service topology key that matches any node + TopologyKeyAny string = "*" ) // Volume represents a named volume in a pod that may be accessed by any container in the pod. @@ -3826,6 +3828,8 @@ const ( IPv4Protocol IPFamily = "IPv4" // IPv6Protocol indicates that this IP is IPv6 protocol IPv6Protocol IPFamily = "IPv6" + // MaxServiceTopologyKeys is the largest number of topology keys allowed on a service + MaxServiceTopologyKeys = 16 ) // ServiceSpec describes the attributes that a user creates on a service. @@ -3957,18 +3961,18 @@ type ServiceSpec struct { // topologyKeys is a preference-order list of topology keys which // implementations of services should use to preferentially sort endpoints - // when accessing this Service. Topology keys must be valid label keys and - // at most 16 keys may be specified. - // If any ready backends exist for index [0], they should always be chosen; - // only if no backends exist for index [0] should backends for index [1] be considered. + // when accessing this Service, it can not be used at the same time as + // externalTrafficPolicy=Local. + // Topology keys must be valid label keys and at most 16 keys may be specified. + // Endpoints are chosen based on the first topology key with available backends. // If this field is specified and all entries have no backends that match // the topology of the client, the service has no backends for that client // and connections should fail. - // The special value "" may be used to mean "any node". This catch-all + // The special value "*" may be used to mean "any topology". This catch-all // value, if used, only makes sense as the last value in the list. // If this is not specified or empty, no topology constraints will be applied. // +optional - TopologyKeys []string `json:"topologyKeys,omitempty" protobuf:"bytes,15,opt,name=topologyKeys"` + TopologyKeys []string `json:"topologyKeys,omitempty" protobuf:"bytes,16,opt,name=topologyKeys"` } // ServicePort contains information on service's port.