mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #72046 from m1093782566/service-topology-api
Service Topology implementation
This commit is contained in:
commit
d9be37e926
@ -214,6 +214,7 @@ API rule violation: list_type_missing,k8s.io/api/core/v1,ServiceAccountList,Item
|
||||
API rule violation: list_type_missing,k8s.io/api/core/v1,ServiceList,Items
|
||||
API rule violation: list_type_missing,k8s.io/api/core/v1,ServiceSpec,ExternalIPs
|
||||
API rule violation: list_type_missing,k8s.io/api/core/v1,ServiceSpec,LoadBalancerSourceRanges
|
||||
API rule violation: list_type_missing,k8s.io/api/core/v1,ServiceSpec,TopologyKeys
|
||||
API rule violation: list_type_missing,k8s.io/api/core/v1,TopologySelectorLabelRequirement,Values
|
||||
API rule violation: list_type_missing,k8s.io/api/core/v1,TopologySelectorTerm,MatchLabelExpressions
|
||||
API rule violation: list_type_missing,k8s.io/api/events/v1beta1,EventList,Items
|
||||
|
7
api/openapi-spec/swagger.json
generated
7
api/openapi-spec/swagger.json
generated
@ -11499,6 +11499,13 @@
|
||||
"$ref": "#/definitions/io.k8s.api.core.v1.SessionAffinityConfig",
|
||||
"description": "sessionAffinityConfig contains the configurations of session affinity."
|
||||
},
|
||||
"topologyKeys": {
|
||||
"description": "topologyKeys is a preference-order list of topology keys which implementations of services should use to preferentially sort endpoints when accessing this Service, it can not be used at the same time as externalTrafficPolicy=Local. Topology keys must be valid label keys and at most 16 keys may be specified. Endpoints are chosen based on the first topology key with available backends. If this field is specified and all entries have no backends that match the topology of the client, the service has no backends for that client and connections should fail. The special value \"*\" may be used to mean \"any topology\". This catch-all value, if used, only makes sense as the last value in the list. If this is not specified or empty, no topology constraints will be applied.",
|
||||
"items": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"type": {
|
||||
"description": "type determines how the Service is exposed. Defaults to ClusterIP. Valid options are ExternalName, ClusterIP, NodePort, and LoadBalancer. \"ExternalName\" maps to the specified externalName. \"ClusterIP\" allocates a cluster-internal IP address for load-balancing to endpoints. Endpoints are determined by the selector or if that is not specified, by manual construction of an Endpoints object. If clusterIP is \"None\", no virtual IP is allocated and the endpoints are published as a set of endpoints rather than a stable IP. \"NodePort\" builds on ClusterIP and allocates a port on every node which routes to the clusterIP. \"LoadBalancer\" builds on NodePort and creates an external load-balancer (if supported in the current cloud) which routes to the clusterIP. More info: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types",
|
||||
"type": "string"
|
||||
|
@ -45,6 +45,7 @@ go_library(
|
||||
"//pkg/util/sysctl:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
gerrors "github.com/pkg/errors"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
@ -663,6 +664,7 @@ func (s *ProxyServer) Run() error {
|
||||
labelSelector := labels.NewSelector()
|
||||
labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
|
||||
|
||||
// Make informers that filter out objects that want a non-default service proxy.
|
||||
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
|
||||
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
|
||||
options.LabelSelector = labelSelector.String()
|
||||
@ -690,6 +692,21 @@ func (s *ProxyServer) Run() error {
|
||||
// functions must configure their shared informer event handlers first.
|
||||
informerFactory.Start(wait.NeverStop)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) {
|
||||
// Make an informer that selects for our nodename.
|
||||
currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
|
||||
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
|
||||
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
|
||||
}))
|
||||
nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod)
|
||||
nodeConfig.RegisterEventHandler(s.Proxier)
|
||||
go nodeConfig.Run(wait.NeverStop)
|
||||
|
||||
// This has to start after the calls to NewNodeConfig because that must
|
||||
// configure the shared informer event handler first.
|
||||
currentNodeInformerFactory.Start(wait.NeverStop)
|
||||
}
|
||||
|
||||
// Birth Cry after the birth is successful
|
||||
s.birthCry()
|
||||
|
||||
|
@ -3391,6 +3391,8 @@ const (
|
||||
IPv4Protocol IPFamily = "IPv4"
|
||||
// IPv6Protocol indicates that this IP is IPv6 protocol
|
||||
IPv6Protocol IPFamily = "IPv6"
|
||||
// MaxServiceTopologyKeys is the largest number of topology keys allowed on a service
|
||||
MaxServiceTopologyKeys = 16
|
||||
)
|
||||
|
||||
// ServiceSpec describes the attributes that a user creates on a service
|
||||
@ -3503,6 +3505,21 @@ type ServiceSpec struct {
|
||||
// cluster (e.g. IPv6 in IPv4 only cluster) is an error condition and will fail during clusterIP assignment.
|
||||
// +optional
|
||||
IPFamily *IPFamily
|
||||
|
||||
// topologyKeys is a preference-order list of topology keys which
|
||||
// implementations of services should use to preferentially sort endpoints
|
||||
// when accessing this Service, it can not be used at the same time as
|
||||
// externalTrafficPolicy=Local.
|
||||
// Topology keys must be valid label keys and at most 16 keys may be specified.
|
||||
// Endpoints are chosen based on the first topology key with available backends.
|
||||
// If this field is specified and all entries have no backends that match
|
||||
// the topology of the client, the service has no backends for that client
|
||||
// and connections should fail.
|
||||
// The special value "*" may be used to mean "any topology". This catch-all
|
||||
// value, if used, only makes sense as the last value in the list.
|
||||
// If this is not specified or empty, no topology constraints will be applied.
|
||||
// +optional
|
||||
TopologyKeys []string
|
||||
}
|
||||
|
||||
// ServicePort represents the port on which the service is exposed
|
||||
|
2
pkg/apis/core/v1/zz_generated.conversion.go
generated
2
pkg/apis/core/v1/zz_generated.conversion.go
generated
@ -7489,6 +7489,7 @@ func autoConvert_v1_ServiceSpec_To_core_ServiceSpec(in *v1.ServiceSpec, out *cor
|
||||
out.PublishNotReadyAddresses = in.PublishNotReadyAddresses
|
||||
out.SessionAffinityConfig = (*core.SessionAffinityConfig)(unsafe.Pointer(in.SessionAffinityConfig))
|
||||
out.IPFamily = (*core.IPFamily)(unsafe.Pointer(in.IPFamily))
|
||||
out.TopologyKeys = *(*[]string)(unsafe.Pointer(&in.TopologyKeys))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -7512,6 +7513,7 @@ func autoConvert_core_ServiceSpec_To_v1_ServiceSpec(in *core.ServiceSpec, out *v
|
||||
out.HealthCheckNodePort = in.HealthCheckNodePort
|
||||
out.PublishNotReadyAddresses = in.PublishNotReadyAddresses
|
||||
out.IPFamily = (*v1.IPFamily)(unsafe.Pointer(in.IPFamily))
|
||||
out.TopologyKeys = *(*[]string)(unsafe.Pointer(&in.TopologyKeys))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -4056,6 +4056,35 @@ func ValidateService(service *core.Service) field.ErrorList {
|
||||
ports[key] = true
|
||||
}
|
||||
|
||||
// Validate TopologyKeys
|
||||
if len(service.Spec.TopologyKeys) > 0 {
|
||||
topoPath := specPath.Child("topologyKeys")
|
||||
// topologyKeys is mutually exclusive with 'externalTrafficPolicy=Local'
|
||||
if service.Spec.ExternalTrafficPolicy == core.ServiceExternalTrafficPolicyTypeLocal {
|
||||
allErrs = append(allErrs, field.Forbidden(topoPath, "may not be specified when `externalTrafficPolicy=Local`"))
|
||||
}
|
||||
if len(service.Spec.TopologyKeys) > core.MaxServiceTopologyKeys {
|
||||
allErrs = append(allErrs, field.TooMany(topoPath, len(service.Spec.TopologyKeys), core.MaxServiceTopologyKeys))
|
||||
}
|
||||
topoKeys := sets.NewString()
|
||||
for i, key := range service.Spec.TopologyKeys {
|
||||
keyPath := topoPath.Index(i)
|
||||
if topoKeys.Has(key) {
|
||||
allErrs = append(allErrs, field.Duplicate(keyPath, key))
|
||||
}
|
||||
topoKeys.Insert(key)
|
||||
// "Any" must be the last value specified
|
||||
if key == v1.TopologyKeyAny && i != len(service.Spec.TopologyKeys)-1 {
|
||||
allErrs = append(allErrs, field.Invalid(keyPath, key, `"*" must be the last value specified`))
|
||||
}
|
||||
if key != v1.TopologyKeyAny {
|
||||
for _, msg := range validation.IsQualifiedName(key) {
|
||||
allErrs = append(allErrs, field.Invalid(keyPath, service.Spec.TopologyKeys, msg))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Validate SourceRange field and annotation
|
||||
_, ok := service.Annotations[core.AnnotationLoadBalancerSourceRangesKey]
|
||||
if len(service.Spec.LoadBalancerSourceRanges) > 0 || ok {
|
||||
@ -4146,6 +4175,10 @@ func validateServiceExternalTrafficFieldsValue(service *core.Service) field.Erro
|
||||
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("externalTrafficPolicy"), service.Spec.ExternalTrafficPolicy,
|
||||
fmt.Sprintf("ExternalTrafficPolicy must be empty, %v or %v", core.ServiceExternalTrafficPolicyTypeCluster, core.ServiceExternalTrafficPolicyTypeLocal)))
|
||||
}
|
||||
// 'externalTrafficPolicy=Local' is mutually exclusive with topologyKeys
|
||||
if service.Spec.ExternalTrafficPolicy == core.ServiceExternalTrafficPolicyTypeLocal && len(service.Spec.TopologyKeys) > 0 {
|
||||
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec").Child("externalTrafficPolicy"), "externalTrafficPolicy must not be set to 'Local' when topologyKeys is specified"))
|
||||
}
|
||||
if service.Spec.HealthCheckNodePort < 0 {
|
||||
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("healthCheckNodePort"), service.Spec.HealthCheckNodePort,
|
||||
"HealthCheckNodePort must be not less than 0"))
|
||||
|
@ -18,6 +18,7 @@ package validation
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"strings"
|
||||
@ -9389,6 +9390,7 @@ func TestValidatePodEphemeralContainersUpdate(t *testing.T) {
|
||||
|
||||
func TestValidateService(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SCTPSupport, true)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTopology, true)()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
@ -10067,6 +10069,66 @@ func TestValidateService(t *testing.T) {
|
||||
},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: "valid topology keys",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
s.Spec.TopologyKeys = []string{
|
||||
"kubernetes.io/hostname",
|
||||
"failure-domain.beta.kubernetes.io/zone",
|
||||
"failure-domain.beta.kubernetes.io/region",
|
||||
v1.TopologyKeyAny,
|
||||
}
|
||||
},
|
||||
numErrs: 0,
|
||||
},
|
||||
{
|
||||
name: "invalid topology key",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
s.Spec.TopologyKeys = []string{"NoUppercaseOrSpecialCharsLike=Equals"}
|
||||
},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: "too many topology keys",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
for i := 0; i < core.MaxServiceTopologyKeys+1; i++ {
|
||||
s.Spec.TopologyKeys = append(s.Spec.TopologyKeys, fmt.Sprintf("topologykey-%d", i))
|
||||
}
|
||||
},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: `"Any" was not the last key`,
|
||||
tweakSvc: func(s *core.Service) {
|
||||
s.Spec.TopologyKeys = []string{
|
||||
"kubernetes.io/hostname",
|
||||
v1.TopologyKeyAny,
|
||||
"failure-domain.beta.kubernetes.io/zone",
|
||||
}
|
||||
},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: `duplicate topology key`,
|
||||
tweakSvc: func(s *core.Service) {
|
||||
s.Spec.TopologyKeys = []string{
|
||||
"kubernetes.io/hostname",
|
||||
"kubernetes.io/hostname",
|
||||
"failure-domain.beta.kubernetes.io/zone",
|
||||
}
|
||||
},
|
||||
numErrs: 1,
|
||||
},
|
||||
{
|
||||
name: `use topology keys with externalTrafficPolicy=Local`,
|
||||
tweakSvc: func(s *core.Service) {
|
||||
s.Spec.ExternalTrafficPolicy = "Local"
|
||||
s.Spec.TopologyKeys = []string{
|
||||
"kubernetes.io/hostname",
|
||||
}
|
||||
},
|
||||
numErrs: 2,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
|
5
pkg/apis/core/zz_generated.deepcopy.go
generated
5
pkg/apis/core/zz_generated.deepcopy.go
generated
@ -5171,6 +5171,11 @@ func (in *ServiceSpec) DeepCopyInto(out *ServiceSpec) {
|
||||
*out = new(IPFamily)
|
||||
**out = **in
|
||||
}
|
||||
if in.TopologyKeys != nil {
|
||||
in, out := &in.TopologyKeys, &out.TopologyKeys
|
||||
*out = make([]string, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -538,6 +538,12 @@ const (
|
||||
//
|
||||
// Enable all logic related to the PodDisruptionBudget API object in policy
|
||||
PodDisruptionBudget featuregate.Feature = "PodDisruptionBudget"
|
||||
|
||||
// owner: @m1093782566
|
||||
// alpha: v1.17
|
||||
//
|
||||
// Enables topology aware service routing
|
||||
ServiceTopology featuregate.Feature = "ServiceTopology"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -623,6 +629,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
StartupProbe: {Default: false, PreRelease: featuregate.Alpha},
|
||||
AllowInsecureBackendProxy: {Default: true, PreRelease: featuregate.Beta},
|
||||
PodDisruptionBudget: {Default: true, PreRelease: featuregate.Beta},
|
||||
ServiceTopology: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
||||
// unintentionally on either side:
|
||||
|
@ -45,6 +45,7 @@ type HollowProxy struct {
|
||||
|
||||
type FakeProxier struct {
|
||||
proxyconfig.NoopEndpointSliceHandler
|
||||
proxyconfig.NoopNodeHandler
|
||||
}
|
||||
|
||||
func (*FakeProxier) Sync() {}
|
||||
|
@ -13,6 +13,7 @@ go_library(
|
||||
"endpoints.go",
|
||||
"endpointslicecache.go",
|
||||
"service.go",
|
||||
"topology.go",
|
||||
"types.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/proxy",
|
||||
@ -62,6 +63,7 @@ go_test(
|
||||
"endpoints_test.go",
|
||||
"endpointslicecache_test.go",
|
||||
"service_test.go",
|
||||
"topology_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
|
@ -369,3 +369,128 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) {
|
||||
c.eventHandlers[i].OnServiceDelete(service)
|
||||
}
|
||||
}
|
||||
|
||||
// NodeHandler is an abstract interface of objects which receive
|
||||
// notifications about node object changes.
|
||||
type NodeHandler interface {
|
||||
// OnNodeAdd is called whenever creation of new node object
|
||||
// is observed.
|
||||
OnNodeAdd(node *v1.Node)
|
||||
// OnNodeUpdate is called whenever modification of an existing
|
||||
// node object is observed.
|
||||
OnNodeUpdate(oldNode, node *v1.Node)
|
||||
// OnNodeDelete is called whever deletion of an existing node
|
||||
// object is observed.
|
||||
OnNodeDelete(node *v1.Node)
|
||||
// OnNodeSynced is called once all the initial event handlers were
|
||||
// called and the state is fully propagated to local cache.
|
||||
OnNodeSynced()
|
||||
}
|
||||
|
||||
// NoopNodeHandler is a noop handler for proxiers that have not yet
|
||||
// implemented a full NodeHandler.
|
||||
type NoopNodeHandler struct{}
|
||||
|
||||
// OnNodeAdd is a noop handler for Node creates.
|
||||
func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {}
|
||||
|
||||
// OnNodeUpdate is a noop handler for Node updates.
|
||||
func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {}
|
||||
|
||||
// OnNodeDelete is a noop handler for Node deletes.
|
||||
func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {}
|
||||
|
||||
// OnNodeSynced is a noop handler for Node syncs.
|
||||
func (*NoopNodeHandler) OnNodeSynced() {}
|
||||
|
||||
// NodeConfig tracks a set of node configurations.
|
||||
// It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change.
|
||||
type NodeConfig struct {
|
||||
listerSynced cache.InformerSynced
|
||||
eventHandlers []NodeHandler
|
||||
}
|
||||
|
||||
// NewNodeConfig creates a new NodeConfig.
|
||||
func NewNodeConfig(nodeInformer coreinformers.NodeInformer, resyncPeriod time.Duration) *NodeConfig {
|
||||
result := &NodeConfig{
|
||||
listerSynced: nodeInformer.Informer().HasSynced,
|
||||
}
|
||||
|
||||
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: result.handleAddNode,
|
||||
UpdateFunc: result.handleUpdateNode,
|
||||
DeleteFunc: result.handleDeleteNode,
|
||||
},
|
||||
resyncPeriod,
|
||||
)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// RegisterEventHandler registers a handler which is called on every node change.
|
||||
func (c *NodeConfig) RegisterEventHandler(handler NodeHandler) {
|
||||
c.eventHandlers = append(c.eventHandlers, handler)
|
||||
}
|
||||
|
||||
// Run starts the goroutine responsible for calling registered handlers.
|
||||
func (c *NodeConfig) Run(stopCh <-chan struct{}) {
|
||||
klog.Info("Starting node config controller")
|
||||
|
||||
if !cache.WaitForNamedCacheSync("node config", stopCh, c.listerSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := range c.eventHandlers {
|
||||
klog.V(3).Infof("Calling handler.OnNodeSynced()")
|
||||
c.eventHandlers[i].OnNodeSynced()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *NodeConfig) handleAddNode(obj interface{}) {
|
||||
node, ok := obj.(*v1.Node)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
|
||||
return
|
||||
}
|
||||
for i := range c.eventHandlers {
|
||||
klog.V(4).Infof("Calling handler.OnNodeAdd")
|
||||
c.eventHandlers[i].OnNodeAdd(node)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *NodeConfig) handleUpdateNode(oldObj, newObj interface{}) {
|
||||
oldNode, ok := oldObj.(*v1.Node)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
|
||||
return
|
||||
}
|
||||
node, ok := newObj.(*v1.Node)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
|
||||
return
|
||||
}
|
||||
for i := range c.eventHandlers {
|
||||
klog.V(5).Infof("Calling handler.OnNodeUpdate")
|
||||
c.eventHandlers[i].OnNodeUpdate(oldNode, node)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *NodeConfig) handleDeleteNode(obj interface{}) {
|
||||
node, ok := obj.(*v1.Node)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
|
||||
return
|
||||
}
|
||||
if node, ok = tombstone.Obj.(*v1.Node); !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
|
||||
return
|
||||
}
|
||||
}
|
||||
for i := range c.eventHandlers {
|
||||
klog.V(4).Infof("Calling handler.OnNodeDelete")
|
||||
c.eventHandlers[i].OnNodeDelete(node)
|
||||
}
|
||||
}
|
||||
|
@ -48,7 +48,8 @@ var supportedEndpointSliceAddressTypes = sets.NewString(
|
||||
type BaseEndpointInfo struct {
|
||||
Endpoint string // TODO: should be an endpointString type
|
||||
// IsLocal indicates whether the endpoint is running in same host as kube-proxy.
|
||||
IsLocal bool
|
||||
IsLocal bool
|
||||
Topology map[string]string
|
||||
}
|
||||
|
||||
var _ Endpoint = &BaseEndpointInfo{}
|
||||
@ -63,6 +64,11 @@ func (info *BaseEndpointInfo) GetIsLocal() bool {
|
||||
return info.IsLocal
|
||||
}
|
||||
|
||||
// GetTopology returns the topology information of the endpoint.
|
||||
func (info *BaseEndpointInfo) GetTopology() map[string]string {
|
||||
return info.Topology
|
||||
}
|
||||
|
||||
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
|
||||
func (info *BaseEndpointInfo) IP() string {
|
||||
return utilproxy.IPPart(info.Endpoint)
|
||||
@ -78,10 +84,11 @@ func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
|
||||
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
|
||||
}
|
||||
|
||||
func newBaseEndpointInfo(IP string, port int, isLocal bool) *BaseEndpointInfo {
|
||||
func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]string) *BaseEndpointInfo {
|
||||
return &BaseEndpointInfo{
|
||||
Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)),
|
||||
IsLocal: isLocal,
|
||||
Topology: topology,
|
||||
}
|
||||
}
|
||||
|
||||
@ -358,7 +365,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
|
||||
continue
|
||||
}
|
||||
isLocal := addr.NodeName != nil && *addr.NodeName == ect.hostname
|
||||
baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal)
|
||||
baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal, nil)
|
||||
if ect.makeEndpointInfo != nil {
|
||||
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo))
|
||||
} else {
|
||||
|
@ -400,7 +400,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
|
||||
} else {
|
||||
for i := range newEndpoints[x] {
|
||||
ep := newEndpoints[x][i].(*BaseEndpointInfo)
|
||||
if *ep != *(tc.expected[x][i]) {
|
||||
if !(reflect.DeepEqual(*ep, *(tc.expected[x][i]))) {
|
||||
t.Errorf("[%s] expected new[%v][%d] to be %v, got %v", tc.desc, x, i, tc.expected[x][i], *ep)
|
||||
}
|
||||
}
|
||||
@ -1699,21 +1699,21 @@ func TestCheckoutChanges(t *testing.T) {
|
||||
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false),
|
||||
expectedChanges: []*endpointsChange{{
|
||||
previous: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")},
|
||||
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", ""), newTestEp("10.0.1.2:443", "")},
|
||||
},
|
||||
current: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")},
|
||||
},
|
||||
}},
|
||||
items: map[types.NamespacedName]*endpointsChange{
|
||||
{Namespace: "ns1", Name: "svc1"}: {
|
||||
previous: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")},
|
||||
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", ""), newTestEp("10.0.1.2:443", "")},
|
||||
},
|
||||
current: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -1724,7 +1724,7 @@ func TestCheckoutChanges(t *testing.T) {
|
||||
expectedChanges: []*endpointsChange{{
|
||||
previous: EndpointsMap{},
|
||||
current: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")},
|
||||
},
|
||||
}},
|
||||
useEndpointSlices: true,
|
||||
@ -1737,11 +1737,11 @@ func TestCheckoutChanges(t *testing.T) {
|
||||
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
|
||||
expectedChanges: []*endpointsChange{{
|
||||
previous: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")},
|
||||
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", "host1"), newTestEp("10.0.1.2:443", "host1")},
|
||||
},
|
||||
current: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")},
|
||||
},
|
||||
}},
|
||||
useEndpointSlices: true,
|
||||
@ -1796,6 +1796,9 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
|
||||
if len(newMap) != len(expected) {
|
||||
t.Errorf("expected %d results, got %d: %v", len(expected), len(newMap), newMap)
|
||||
}
|
||||
endpointEqual := func(a, b *BaseEndpointInfo) bool {
|
||||
return a.Endpoint == b.Endpoint && a.IsLocal == b.IsLocal
|
||||
}
|
||||
for x := range expected {
|
||||
if len(newMap[x]) != len(expected[x]) {
|
||||
t.Errorf("expected %d endpoints for %v, got %d", len(expected[x]), x, len(newMap[x]))
|
||||
@ -1807,7 +1810,7 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
|
||||
t.Errorf("Failed to cast endpointsInfo")
|
||||
continue
|
||||
}
|
||||
if *newEp != *(expected[x][i]) {
|
||||
if !endpointEqual(newEp, expected[x][i]) {
|
||||
t.Errorf("expected new[%v][%d] to be %v, got %v (IsLocal expected %v, got %v)", x, i, expected[x][i], newEp, expected[x][i].IsLocal, newEp.IsLocal)
|
||||
}
|
||||
}
|
||||
@ -1815,8 +1818,14 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
|
||||
}
|
||||
}
|
||||
|
||||
func newTestEp(ep string) *BaseEndpointInfo {
|
||||
return &BaseEndpointInfo{Endpoint: ep}
|
||||
func newTestEp(ep, host string) *BaseEndpointInfo {
|
||||
endpointInfo := &BaseEndpointInfo{Endpoint: ep}
|
||||
if host != "" {
|
||||
endpointInfo.Topology = map[string]string{
|
||||
"kubernetes.io/hostname": host,
|
||||
}
|
||||
}
|
||||
return endpointInfo
|
||||
}
|
||||
|
||||
func initializeCache(endpointSliceCache *EndpointSliceCache, endpointSlices []*discovery.EndpointSlice) {
|
||||
|
@ -254,7 +254,7 @@ func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName
|
||||
}
|
||||
|
||||
isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname])
|
||||
endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal)
|
||||
endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology)
|
||||
|
||||
// This logic ensures we're deduping potential overlapping endpoints
|
||||
// isLocal should not vary between matching IPs, but if it does, we
|
||||
|
@ -176,9 +176,9 @@ func TestEndpointInfoByServicePort(t *testing.T) {
|
||||
},
|
||||
expectedMap: spToEndpointMap{
|
||||
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
|
||||
"10.0.1.1": &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false},
|
||||
"10.0.1.2": &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true},
|
||||
"10.0.1.3": &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false},
|
||||
"10.0.1.1": &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false, Topology: map[string]string{"kubernetes.io/hostname": "host2"}},
|
||||
"10.0.1.2": &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true, Topology: map[string]string{"kubernetes.io/hostname": "host1"}},
|
||||
"10.0.1.3": &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false, Topology: map[string]string{"kubernetes.io/hostname": "host2"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"encoding/base32"
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -181,6 +182,7 @@ type Proxier struct {
|
||||
serviceMap proxy.ServiceMap
|
||||
endpointsMap proxy.EndpointsMap
|
||||
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
|
||||
nodeLabels map[string]string
|
||||
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true
|
||||
// when corresponding objects are synced after startup. This is used to avoid
|
||||
// updating iptables with some partial data after kube-proxy restart.
|
||||
@ -591,6 +593,58 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
|
||||
proxier.syncProxyRules()
|
||||
}
|
||||
|
||||
// OnNodeAdd is called whenever creation of new node object
|
||||
// is observed.
|
||||
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
|
||||
if node.Name != proxier.hostname {
|
||||
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
|
||||
return
|
||||
}
|
||||
oldLabels := proxier.nodeLabels
|
||||
newLabels := node.Labels
|
||||
proxier.mu.Lock()
|
||||
proxier.nodeLabels = newLabels
|
||||
proxier.mu.Unlock()
|
||||
if !reflect.DeepEqual(oldLabels, newLabels) {
|
||||
proxier.syncProxyRules()
|
||||
}
|
||||
}
|
||||
|
||||
// OnNodeUpdate is called whenever modification of an existing
|
||||
// node object is observed.
|
||||
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
|
||||
if node.Name != proxier.hostname {
|
||||
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
|
||||
return
|
||||
}
|
||||
oldLabels := proxier.nodeLabels
|
||||
newLabels := node.Labels
|
||||
proxier.mu.Lock()
|
||||
proxier.nodeLabels = newLabels
|
||||
proxier.mu.Unlock()
|
||||
if !reflect.DeepEqual(oldLabels, newLabels) {
|
||||
proxier.syncProxyRules()
|
||||
}
|
||||
}
|
||||
|
||||
// OnNodeDelete is called whever deletion of an existing node
|
||||
// object is observed.
|
||||
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
|
||||
if node.Name != proxier.hostname {
|
||||
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
|
||||
return
|
||||
}
|
||||
proxier.mu.Lock()
|
||||
proxier.nodeLabels = nil
|
||||
proxier.mu.Unlock()
|
||||
proxier.syncProxyRules()
|
||||
}
|
||||
|
||||
// OnNodeSynced is called once all the initial event handlers were
|
||||
// called and the state is fully propagated to local cache.
|
||||
func (proxier *Proxier) OnNodeSynced() {
|
||||
}
|
||||
|
||||
// portProtoHash takes the ServicePortName and protocol for a service
|
||||
// returns the associated 16 character hash. This is computed by hashing (sha256)
|
||||
// then encoding to base32 and truncating to 16 chars. We do this because IPTables
|
||||
@ -858,7 +912,20 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
|
||||
protocol := strings.ToLower(string(svcInfo.Protocol()))
|
||||
svcNameString := svcInfo.serviceNameString
|
||||
hasEndpoints := len(proxier.endpointsMap[svcName]) > 0
|
||||
|
||||
allEndpoints := proxier.endpointsMap[svcName]
|
||||
|
||||
hasEndpoints := len(allEndpoints) > 0
|
||||
|
||||
// Service Topology will not be enabled in the following cases:
|
||||
// 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
|
||||
// 2. ServiceTopology is not enabled.
|
||||
// 3. EndpointSlice is not enabled (service topology depends on endpoint slice
|
||||
// to get topology information).
|
||||
if !svcInfo.OnlyNodeLocalEndpoints() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
|
||||
allEndpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, svcInfo.TopologyKeys(), allEndpoints)
|
||||
hasEndpoints = len(allEndpoints) > 0
|
||||
}
|
||||
|
||||
svcChain := svcInfo.servicePortChainName
|
||||
if hasEndpoints {
|
||||
@ -1168,12 +1235,13 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
endpoints = endpoints[:0]
|
||||
endpointChains = endpointChains[:0]
|
||||
var endpointChain utiliptables.Chain
|
||||
for _, ep := range proxier.endpointsMap[svcName] {
|
||||
for _, ep := range allEndpoints {
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
if !ok {
|
||||
klog.Errorf("Failed to cast endpointsInfo %q", ep.String())
|
||||
continue
|
||||
}
|
||||
|
||||
endpoints = append(endpoints, epInfo)
|
||||
endpointChain = epInfo.endpointChain(svcNameString, protocol)
|
||||
endpointChains = append(endpointChains, endpointChain)
|
||||
@ -1220,6 +1288,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Error parsing this endpoint has been logged. Skip to next endpoint.
|
||||
continue
|
||||
}
|
||||
|
||||
// Balancing rules in the per-service chain.
|
||||
args = append(args[:0], "-A", string(svcChain))
|
||||
proxier.appendServiceCommentLocked(args, svcNameString)
|
||||
|
@ -56,6 +56,7 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/proxy:go_default_library",
|
||||
"//pkg/proxy/config:go_default_library",
|
||||
"//pkg/proxy/healthcheck:go_default_library",
|
||||
"//pkg/proxy/metrics:go_default_library",
|
||||
"//pkg/proxy/util:go_default_library",
|
||||
|
@ -22,6 +22,8 @@ import (
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/config"
|
||||
|
||||
utilnet "k8s.io/utils/net"
|
||||
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
@ -30,6 +32,8 @@ import (
|
||||
type metaProxier struct {
|
||||
ipv4Proxier proxy.Provider
|
||||
ipv6Proxier proxy.Provider
|
||||
// TODO(imroc): implement node handler for meta proxier.
|
||||
config.NoopNodeHandler
|
||||
}
|
||||
|
||||
// NewMetaProxier returns a dual-stack "meta-proxier". Proxier API
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -30,6 +31,10 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog"
|
||||
utilexec "k8s.io/utils/exec"
|
||||
utilnet "k8s.io/utils/net"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -38,7 +43,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
@ -50,8 +54,6 @@ import (
|
||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
|
||||
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
||||
utilexec "k8s.io/utils/exec"
|
||||
utilnet "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -200,6 +202,7 @@ type Proxier struct {
|
||||
serviceMap proxy.ServiceMap
|
||||
endpointsMap proxy.EndpointsMap
|
||||
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
|
||||
nodeLabels map[string]string
|
||||
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when
|
||||
// corresponding objects are synced after startup. This is used to avoid updating
|
||||
// ipvs rules with some partial data after kube-proxy restart.
|
||||
@ -896,6 +899,58 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
|
||||
proxier.syncProxyRules()
|
||||
}
|
||||
|
||||
// OnNodeAdd is called whenever creation of new node object
|
||||
// is observed.
|
||||
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
|
||||
if node.Name != proxier.hostname {
|
||||
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
|
||||
return
|
||||
}
|
||||
oldLabels := proxier.nodeLabels
|
||||
newLabels := node.Labels
|
||||
proxier.mu.Lock()
|
||||
proxier.nodeLabels = newLabels
|
||||
proxier.mu.Unlock()
|
||||
if !reflect.DeepEqual(oldLabels, newLabels) {
|
||||
proxier.syncProxyRules()
|
||||
}
|
||||
}
|
||||
|
||||
// OnNodeUpdate is called whenever modification of an existing
|
||||
// node object is observed.
|
||||
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
|
||||
if node.Name != proxier.hostname {
|
||||
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
|
||||
return
|
||||
}
|
||||
oldLabels := proxier.nodeLabels
|
||||
newLabels := node.Labels
|
||||
proxier.mu.Lock()
|
||||
proxier.nodeLabels = newLabels
|
||||
proxier.mu.Unlock()
|
||||
if !reflect.DeepEqual(oldLabels, newLabels) {
|
||||
proxier.syncProxyRules()
|
||||
}
|
||||
}
|
||||
|
||||
// OnNodeDelete is called whever deletion of an existing node
|
||||
// object is observed.
|
||||
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
|
||||
if node.Name != proxier.hostname {
|
||||
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
|
||||
return
|
||||
}
|
||||
proxier.mu.Lock()
|
||||
proxier.nodeLabels = nil
|
||||
proxier.mu.Unlock()
|
||||
proxier.syncProxyRules()
|
||||
}
|
||||
|
||||
// OnNodeSynced is called once all the initial event handlers were
|
||||
// called and the state is fully propagated to local cache.
|
||||
func (proxier *Proxier) OnNodeSynced() {
|
||||
}
|
||||
|
||||
// EntryInvalidErr indicates if an ipset entry is invalid or not
|
||||
const EntryInvalidErr = "error adding entry %s to ipset %s"
|
||||
|
||||
@ -1866,7 +1921,18 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
|
||||
curEndpoints.Insert(des.String())
|
||||
}
|
||||
|
||||
for _, epInfo := range proxier.endpointsMap[svcPortName] {
|
||||
endpoints := proxier.endpointsMap[svcPortName]
|
||||
|
||||
// Service Topology will not be enabled in the following cases:
|
||||
// 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
|
||||
// 2. ServiceTopology is not enabled.
|
||||
// 3. EndpointSlice is not enabled (service topology depends on endpoint slice
|
||||
// to get topology information).
|
||||
if !onlyNodeLocalEndpoints && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
|
||||
endpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, proxier.serviceMap[svcPortName].TopologyKeys(), endpoints)
|
||||
}
|
||||
|
||||
for _, epInfo := range endpoints {
|
||||
if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() {
|
||||
continue
|
||||
}
|
||||
|
@ -2951,7 +2951,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expe
|
||||
t.Errorf("Failed to cast proxy.BaseEndpointInfo")
|
||||
continue
|
||||
}
|
||||
if *newEp != *(expected[x][i]) {
|
||||
if !reflect.DeepEqual(*newEp, *(expected[x][i])) {
|
||||
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
|
||||
}
|
||||
}
|
||||
@ -3702,7 +3702,6 @@ func TestEndpointSliceE2E(t *testing.T) {
|
||||
// Add initial service
|
||||
serviceName := "svc1"
|
||||
namespaceName := "ns1"
|
||||
|
||||
fp.OnServiceAdd(&v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
|
||||
Spec: v1.ServiceSpec{
|
||||
|
@ -51,6 +51,7 @@ type BaseServiceInfo struct {
|
||||
loadBalancerSourceRanges []string
|
||||
healthCheckNodePort int
|
||||
onlyNodeLocalEndpoints bool
|
||||
topologyKeys []string
|
||||
}
|
||||
|
||||
var _ ServicePort = &BaseServiceInfo{}
|
||||
@ -119,6 +120,11 @@ func (info *BaseServiceInfo) OnlyNodeLocalEndpoints() bool {
|
||||
return info.onlyNodeLocalEndpoints
|
||||
}
|
||||
|
||||
// TopologyKeys is part of ServicePort interface.
|
||||
func (info *BaseServiceInfo) TopologyKeys() []string {
|
||||
return info.topologyKeys
|
||||
}
|
||||
|
||||
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
|
||||
onlyNodeLocalEndpoints := false
|
||||
if apiservice.RequestsOnlyLocalTraffic(service) {
|
||||
@ -139,6 +145,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
|
||||
sessionAffinityType: service.Spec.SessionAffinity,
|
||||
stickyMaxAgeSeconds: stickyMaxAgeSeconds,
|
||||
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
|
||||
topologyKeys: service.Spec.TopologyKeys,
|
||||
}
|
||||
|
||||
if sct.isIPv6Mode == nil {
|
||||
|
80
pkg/proxy/topology.go
Normal file
80
pkg/proxy/topology.go
Normal file
@ -0,0 +1,80 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// FilterTopologyEndpoint returns the appropriate endpoints based on the cluster
|
||||
// topology.
|
||||
// This uses the current node's labels, which contain topology information, and
|
||||
// the required topologyKeys to find appropriate endpoints. If both the endpoint's
|
||||
// topology and the current node have matching values for topologyKeys[0], the
|
||||
// endpoint will be chosen. If no endpoints are chosen, toplogyKeys[1] will be
|
||||
// considered, and so on. If either the node or the endpoint do not have values
|
||||
// for a key, it is considered to not match.
|
||||
//
|
||||
// If topologyKeys is specified, but no endpoints are chosen for any key, the
|
||||
// the service has no viable endpoints for clients on this node, and connections
|
||||
// should fail.
|
||||
//
|
||||
// The special key "*" may be used as the last entry in topologyKeys to indicate
|
||||
// "any endpoint" is acceptable.
|
||||
//
|
||||
// If topologyKeys is not specified or empty, no topology constraints will be
|
||||
// applied and this will return all endpoints.
|
||||
func FilterTopologyEndpoint(nodeLabels map[string]string, topologyKeys []string, endpoints []Endpoint) []Endpoint {
|
||||
// Do not filter endpoints if service has no topology keys.
|
||||
if len(topologyKeys) == 0 {
|
||||
return endpoints
|
||||
}
|
||||
|
||||
filteredEndpoint := []Endpoint{}
|
||||
|
||||
if len(nodeLabels) == 0 {
|
||||
if topologyKeys[len(topologyKeys)-1] == v1.TopologyKeyAny {
|
||||
// edge case: include all endpoints if topology key "Any" specified
|
||||
// when we cannot determine current node's topology.
|
||||
return endpoints
|
||||
}
|
||||
// edge case: do not include any endpoints if topology key "Any" is
|
||||
// not specified when we cannot determine current node's topology.
|
||||
return filteredEndpoint
|
||||
}
|
||||
|
||||
for _, key := range topologyKeys {
|
||||
if key == v1.TopologyKeyAny {
|
||||
return endpoints
|
||||
}
|
||||
topologyValue, found := nodeLabels[key]
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, ep := range endpoints {
|
||||
topology := ep.GetTopology()
|
||||
if value, found := topology[key]; found && value == topologyValue {
|
||||
filteredEndpoint = append(filteredEndpoint, ep)
|
||||
}
|
||||
}
|
||||
if len(filteredEndpoint) > 0 {
|
||||
return filteredEndpoint
|
||||
}
|
||||
}
|
||||
return filteredEndpoint
|
||||
}
|
478
pkg/proxy/topology_test.go
Normal file
478
pkg/proxy/topology_test.go
Normal file
@ -0,0 +1,478 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
func TestFilterTopologyEndpoint(t *testing.T) {
|
||||
type endpoint struct {
|
||||
Endpoint string
|
||||
NodeName types.NodeName
|
||||
}
|
||||
testCases := []struct {
|
||||
Name string
|
||||
nodeLabels map[types.NodeName]map[string]string
|
||||
endpoints []endpoint
|
||||
currentNodeName types.NodeName
|
||||
topologyKeys []string
|
||||
expected []endpoint
|
||||
}{
|
||||
{
|
||||
// Case[0]: no topology key and endpoints at all = 0 endpoints
|
||||
Name: "no topology key and endpoints",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "10.0.0.1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
}},
|
||||
endpoints: []endpoint{},
|
||||
currentNodeName: "testNode1",
|
||||
topologyKeys: nil,
|
||||
expected: []endpoint{},
|
||||
},
|
||||
{
|
||||
// Case[1]: no topology key, 2 nodes each with 2 endpoints = 4
|
||||
// endpoints
|
||||
Name: "no topology key but have endpoints",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "testNode1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode2": {
|
||||
"kubernetes.io/hostname": "testNode2",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
},
|
||||
endpoints: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
},
|
||||
currentNodeName: "testNode1",
|
||||
topologyKeys: nil,
|
||||
expected: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Case[2]: 1 topology key (hostname), 2 nodes each with 2 endpoints
|
||||
// 1 match = 2 endpoints
|
||||
Name: "one topology key with one node matched",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "testNode1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode2": {
|
||||
"kubernetes.io/hostname": "testNode2",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
},
|
||||
endpoints: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
},
|
||||
currentNodeName: "testNode1",
|
||||
topologyKeys: []string{"kubernetes.io/hostname"},
|
||||
expected: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Case[3]: 1 topology key (hostname), 2 nodes each with 2 endpoints
|
||||
// no match = 0 endpoints
|
||||
Name: "one topology key without node matched",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "testNode1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode2": {
|
||||
"kubernetes.io/hostname": "testNode2",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode3": {
|
||||
"kubernetes.io/hostname": "testNode3",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
},
|
||||
endpoints: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
},
|
||||
currentNodeName: "testNode3",
|
||||
topologyKeys: []string{"kubernetes.io/hostname"},
|
||||
expected: []endpoint{},
|
||||
},
|
||||
{
|
||||
// Case[4]: 1 topology key (zone), 2 nodes in zone a, 2 nodes in
|
||||
// zone b, each with 2 endpoints = 4 endpoints
|
||||
Name: "one topology key with multiple nodes matched",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "testNode1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode2": {
|
||||
"kubernetes.io/hostname": "testNode2",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode3": {
|
||||
"kubernetes.io/hostname": "testNode3",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode4": {
|
||||
"kubernetes.io/hostname": "testNode4",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
},
|
||||
endpoints: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
|
||||
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
|
||||
},
|
||||
currentNodeName: "testNode2",
|
||||
topologyKeys: []string{"topology.kubernetes.io/zone"},
|
||||
expected: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Case[5]: 2 topology keys (hostname, zone), 2 nodes each with 2
|
||||
// endpoints, 1 hostname match = 2 endpoints (2nd key ignored)
|
||||
Name: "early match in multiple topology keys",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "testNode1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode2": {
|
||||
"kubernetes.io/hostname": "testNode2",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode3": {
|
||||
"kubernetes.io/hostname": "testNode3",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode4": {
|
||||
"kubernetes.io/hostname": "testNode4",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
},
|
||||
endpoints: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
|
||||
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
|
||||
},
|
||||
currentNodeName: "testNode2",
|
||||
topologyKeys: []string{"kubernetes.io/hostname"},
|
||||
expected: []endpoint{
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Case[6]: 2 topology keys (hostname, zone), 2 nodes in zone a, 2
|
||||
// nodes in zone b, each with 2 endpoints, no hostname match, 1 zone
|
||||
// match = 4 endpoints
|
||||
Name: "later match in multiple topology keys",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "testNode1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode2": {
|
||||
"kubernetes.io/hostname": "testNode2",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode3": {
|
||||
"kubernetes.io/hostname": "testNode3",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode4": {
|
||||
"kubernetes.io/hostname": "testNode4",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode5": {
|
||||
"kubernetes.io/hostname": "testNode5",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
},
|
||||
endpoints: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
|
||||
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
|
||||
},
|
||||
currentNodeName: "testNode5",
|
||||
topologyKeys: []string{"kubernetes.io/hostname", "topology.kubernetes.io/zone"},
|
||||
expected: []endpoint{
|
||||
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
|
||||
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Case[7]: 2 topology keys (hostname, zone), 2 nodes in zone a, 2
|
||||
// nodes in zone b, each with 2 endpoints, no hostname match, no zone
|
||||
// match = 0 endpoints
|
||||
Name: "multiple topology keys without node matched",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "testNode1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode2": {
|
||||
"kubernetes.io/hostname": "testNode2",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode3": {
|
||||
"kubernetes.io/hostname": "testNode3",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode4": {
|
||||
"kubernetes.io/hostname": "testNode4",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode5": {
|
||||
"kubernetes.io/hostname": "testNode5",
|
||||
"topology.kubernetes.io/zone": "90003",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
},
|
||||
endpoints: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
|
||||
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
|
||||
},
|
||||
currentNodeName: "testNode5",
|
||||
topologyKeys: []string{"kubernetes.io/hostname", "topology.kubernetes.io/zone"},
|
||||
expected: []endpoint{},
|
||||
},
|
||||
{
|
||||
// Case[8]: 2 topology keys (hostname, "*"), 2 nodes each with 2
|
||||
// endpoints, 1 match hostname = 2 endpoints
|
||||
Name: "multiple topology keys matched node when 'Any' key ignored",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "testNode1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode2": {
|
||||
"kubernetes.io/hostname": "testNode2",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
},
|
||||
endpoints: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
},
|
||||
currentNodeName: "testNode1",
|
||||
topologyKeys: []string{"kubernetes.io/hostname", v1.TopologyKeyAny},
|
||||
expected: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Case[9]: 2 topology keys (hostname, "*"), 2 nodes each with 2
|
||||
// endpoints, no hostname match, catch-all ("*") matched with 4
|
||||
// endpoints
|
||||
Name: "two topology keys matched node with 'Any' key",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "testNode1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode2": {
|
||||
"kubernetes.io/hostname": "testNode2",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode3": {
|
||||
"kubernetes.io/hostname": "testNode3",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
},
|
||||
endpoints: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
},
|
||||
currentNodeName: "testNode3",
|
||||
topologyKeys: []string{"kubernetes.io/hostname", v1.TopologyKeyAny},
|
||||
expected: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Case[10]: 3 topology keys (hostname, zone, "*"), 2 nodes in zone a,
|
||||
// 2 nodes in zone b, each with 2 endpoints, no hostname match, no
|
||||
// zone, catch-all ("*") matched with 8 endpoints
|
||||
Name: "multiple topology keys matched node with 'Any' key",
|
||||
nodeLabels: map[types.NodeName]map[string]string{
|
||||
"testNode1": {
|
||||
"kubernetes.io/hostname": "testNode1",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode2": {
|
||||
"kubernetes.io/hostname": "testNode2",
|
||||
"topology.kubernetes.io/zone": "90001",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode3": {
|
||||
"kubernetes.io/hostname": "testNode3",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode4": {
|
||||
"kubernetes.io/hostname": "testNode4",
|
||||
"topology.kubernetes.io/zone": "90002",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
"testNode5": {
|
||||
"kubernetes.io/hostname": "testNode5",
|
||||
"topology.kubernetes.io/zone": "90003",
|
||||
"topology.kubernetes.io/region": "cd",
|
||||
},
|
||||
},
|
||||
endpoints: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
|
||||
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
|
||||
},
|
||||
currentNodeName: "testNode5",
|
||||
topologyKeys: []string{"kubernetes.io/hostname", "topology.kubernetes.io/zone", v1.TopologyKeyAny},
|
||||
expected: []endpoint{
|
||||
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
|
||||
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
|
||||
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
|
||||
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
|
||||
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
|
||||
},
|
||||
},
|
||||
}
|
||||
endpointsToStringArray := func(endpoints []endpoint) []string {
|
||||
result := make([]string, 0, len(endpoints))
|
||||
for _, ep := range endpoints {
|
||||
result = append(result, ep.Endpoint)
|
||||
}
|
||||
return result
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
m := make(map[Endpoint]endpoint)
|
||||
endpoints := []Endpoint{}
|
||||
for _, ep := range tc.endpoints {
|
||||
var e Endpoint = &BaseEndpointInfo{Endpoint: ep.Endpoint, Topology: tc.nodeLabels[ep.NodeName]}
|
||||
m[e] = ep
|
||||
endpoints = append(endpoints, e)
|
||||
}
|
||||
currentNodeLabels := tc.nodeLabels[tc.currentNodeName]
|
||||
filteredEndpoint := []endpoint{}
|
||||
for _, ep := range FilterTopologyEndpoint(currentNodeLabels, tc.topologyKeys, endpoints) {
|
||||
filteredEndpoint = append(filteredEndpoint, m[ep])
|
||||
}
|
||||
if !reflect.DeepEqual(filteredEndpoint, tc.expected) {
|
||||
t.Errorf("expected %v, got %v", endpointsToStringArray(tc.expected), endpointsToStringArray(filteredEndpoint))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -30,6 +30,7 @@ type Provider interface {
|
||||
config.EndpointsHandler
|
||||
config.EndpointSliceHandler
|
||||
config.ServiceHandler
|
||||
config.NodeHandler
|
||||
|
||||
// Sync immediately synchronizes the Provider's current state to proxy rules.
|
||||
Sync()
|
||||
@ -77,6 +78,8 @@ type ServicePort interface {
|
||||
NodePort() int
|
||||
// GetOnlyNodeLocalEndpoints returns if a service has only node local endpoints
|
||||
OnlyNodeLocalEndpoints() bool
|
||||
// TopologyKeys returns service TopologyKeys as a string array.
|
||||
TopologyKeys() []string
|
||||
}
|
||||
|
||||
// Endpoint in an interface which abstracts information about an endpoint.
|
||||
@ -87,6 +90,8 @@ type Endpoint interface {
|
||||
String() string
|
||||
// GetIsLocal returns true if the endpoint is running in same host as kube-proxy, otherwise returns false.
|
||||
GetIsLocal() bool
|
||||
// GetTopology returns the topology information of the endpoint.
|
||||
GetTopology() map[string]string
|
||||
// IP returns IP part of the endpoint.
|
||||
IP() string
|
||||
// Port returns the Port part of the endpoint.
|
||||
|
@ -112,6 +112,8 @@ type asyncRunnerInterface interface {
|
||||
type Proxier struct {
|
||||
// EndpointSlice support has not been added for this proxier yet.
|
||||
config.NoopEndpointSliceHandler
|
||||
// TODO(imroc): implement node handler for userspace proxier.
|
||||
config.NoopNodeHandler
|
||||
|
||||
loadBalancer LoadBalancer
|
||||
mu sync.Mutex // protects serviceMap
|
||||
|
@ -444,6 +444,8 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap, curServices proxySe
|
||||
type Proxier struct {
|
||||
// EndpointSlice support has not been added for this proxier yet.
|
||||
proxyconfig.NoopEndpointSliceHandler
|
||||
// TODO(imroc): implement node handler for winkernel proxier.
|
||||
proxyconfig.NoopNodeHandler
|
||||
|
||||
// endpointsChanges and serviceChanges contains all changes to endpoints and
|
||||
// services that happened since policies were synced. For a single object,
|
||||
|
@ -83,6 +83,8 @@ func logTimeout(err error) bool {
|
||||
type Proxier struct {
|
||||
// EndpointSlice support has not been added for this proxier yet.
|
||||
config.NoopEndpointSliceHandler
|
||||
// TODO(imroc): implement node handler for winuserspace proxier.
|
||||
config.NoopNodeHandler
|
||||
|
||||
loadBalancer LoadBalancer
|
||||
mu sync.Mutex // protects serviceMap
|
||||
|
@ -23,11 +23,10 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apiserver/pkg/storage/names"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/apis/core/validation"
|
||||
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
@ -121,6 +120,10 @@ func dropServiceDisabledFields(newSvc *api.Service, oldSvc *api.Service) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && !serviceIPFamilyInUse(oldSvc) {
|
||||
newSvc.Spec.IPFamily = nil
|
||||
}
|
||||
// Drop TopologyKeys if ServiceTopology is not enabled
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && !topologyKeysInUse(oldSvc) {
|
||||
newSvc.Spec.TopologyKeys = nil
|
||||
}
|
||||
}
|
||||
|
||||
// returns true if svc.Spec.ServiceIPFamily field is in use
|
||||
@ -134,6 +137,14 @@ func serviceIPFamilyInUse(svc *api.Service) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// returns true if svc.Spec.TopologyKeys field is in use
|
||||
func topologyKeysInUse(svc *api.Service) bool {
|
||||
if svc == nil {
|
||||
return false
|
||||
}
|
||||
return len(svc.Spec.TopologyKeys) > 0
|
||||
}
|
||||
|
||||
type serviceStatusStrategy struct {
|
||||
svcStrategy
|
||||
}
|
||||
|
@ -461,7 +461,7 @@ func ClusterRoles() []rbacv1.ClusterRole {
|
||||
// node-proxier role is used by kube-proxy.
|
||||
nodeProxierRules := []rbacv1.PolicyRule{
|
||||
rbacv1helpers.NewRule("list", "watch").Groups(legacyGroup).Resources("services", "endpoints").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
|
||||
|
||||
eventsRule(),
|
||||
}
|
||||
|
@ -1040,6 +1040,8 @@ items:
|
||||
- nodes
|
||||
verbs:
|
||||
- get
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- ""
|
||||
- events.k8s.io
|
||||
|
1755
staging/src/k8s.io/api/core/v1/generated.pb.go
generated
1755
staging/src/k8s.io/api/core/v1/generated.pb.go
generated
File diff suppressed because it is too large
Load Diff
@ -4741,6 +4741,21 @@ message ServiceSpec {
|
||||
// cluster (e.g. IPv6 in IPv4 only cluster) is an error condition and will fail during clusterIP assignment.
|
||||
// +optional
|
||||
optional string ipFamily = 15;
|
||||
|
||||
// topologyKeys is a preference-order list of topology keys which
|
||||
// implementations of services should use to preferentially sort endpoints
|
||||
// when accessing this Service, it can not be used at the same time as
|
||||
// externalTrafficPolicy=Local.
|
||||
// Topology keys must be valid label keys and at most 16 keys may be specified.
|
||||
// Endpoints are chosen based on the first topology key with available backends.
|
||||
// If this field is specified and all entries have no backends that match
|
||||
// the topology of the client, the service has no backends for that client
|
||||
// and connections should fail.
|
||||
// The special value "*" may be used to mean "any topology". This catch-all
|
||||
// value, if used, only makes sense as the last value in the list.
|
||||
// If this is not specified or empty, no topology constraints will be applied.
|
||||
// +optional
|
||||
repeated string topologyKeys = 16;
|
||||
}
|
||||
|
||||
// ServiceStatus represents the current status of a service.
|
||||
|
@ -30,6 +30,8 @@ const (
|
||||
NamespaceAll string = ""
|
||||
// NamespaceNodeLease is the namespace where we place node lease objects (used for node heartbeats)
|
||||
NamespaceNodeLease string = "kube-node-lease"
|
||||
// TopologyKeyAny is the service topology key that matches any node
|
||||
TopologyKeyAny string = "*"
|
||||
)
|
||||
|
||||
// Volume represents a named volume in a pod that may be accessed by any container in the pod.
|
||||
@ -3826,6 +3828,8 @@ const (
|
||||
IPv4Protocol IPFamily = "IPv4"
|
||||
// IPv6Protocol indicates that this IP is IPv6 protocol
|
||||
IPv6Protocol IPFamily = "IPv6"
|
||||
// MaxServiceTopologyKeys is the largest number of topology keys allowed on a service
|
||||
MaxServiceTopologyKeys = 16
|
||||
)
|
||||
|
||||
// ServiceSpec describes the attributes that a user creates on a service.
|
||||
@ -3940,6 +3944,7 @@ type ServiceSpec struct {
|
||||
// of peer discovery.
|
||||
// +optional
|
||||
PublishNotReadyAddresses bool `json:"publishNotReadyAddresses,omitempty" protobuf:"varint,13,opt,name=publishNotReadyAddresses"`
|
||||
|
||||
// sessionAffinityConfig contains the configurations of session affinity.
|
||||
// +optional
|
||||
SessionAffinityConfig *SessionAffinityConfig `json:"sessionAffinityConfig,omitempty" protobuf:"bytes,14,opt,name=sessionAffinityConfig"`
|
||||
@ -3953,6 +3958,21 @@ type ServiceSpec struct {
|
||||
// cluster (e.g. IPv6 in IPv4 only cluster) is an error condition and will fail during clusterIP assignment.
|
||||
// +optional
|
||||
IPFamily *IPFamily `json:"ipFamily,omitempty" protobuf:"bytes,15,opt,name=ipFamily,Configcasttype=IPFamily"`
|
||||
|
||||
// topologyKeys is a preference-order list of topology keys which
|
||||
// implementations of services should use to preferentially sort endpoints
|
||||
// when accessing this Service, it can not be used at the same time as
|
||||
// externalTrafficPolicy=Local.
|
||||
// Topology keys must be valid label keys and at most 16 keys may be specified.
|
||||
// Endpoints are chosen based on the first topology key with available backends.
|
||||
// If this field is specified and all entries have no backends that match
|
||||
// the topology of the client, the service has no backends for that client
|
||||
// and connections should fail.
|
||||
// The special value "*" may be used to mean "any topology". This catch-all
|
||||
// value, if used, only makes sense as the last value in the list.
|
||||
// If this is not specified or empty, no topology constraints will be applied.
|
||||
// +optional
|
||||
TopologyKeys []string `json:"topologyKeys,omitempty" protobuf:"bytes,16,opt,name=topologyKeys"`
|
||||
}
|
||||
|
||||
// ServicePort contains information on service's port.
|
||||
|
@ -2204,6 +2204,7 @@ var map_ServiceSpec = map[string]string{
|
||||
"publishNotReadyAddresses": "publishNotReadyAddresses, when set to true, indicates that DNS implementations must publish the notReadyAddresses of subsets for the Endpoints associated with the Service. The default value is false. The primary use case for setting this field is to use a StatefulSet's Headless Service to propagate SRV records for its Pods without respect to their readiness for purpose of peer discovery.",
|
||||
"sessionAffinityConfig": "sessionAffinityConfig contains the configurations of session affinity.",
|
||||
"ipFamily": "ipFamily specifies whether this Service has a preference for a particular IP family (e.g. IPv4 vs. IPv6). If a specific IP family is requested, the clusterIP field will be allocated from that family, if it is available in the cluster. If no IP family is requested, the cluster's primary IP family will be used. Other IP fields (loadBalancerIP, loadBalancerSourceRanges, externalIPs) and controllers which allocate external load-balancers should use the same IP family. Endpoints for this Service will be of this family. This field is immutable after creation. Assigning a ServiceIPFamily not available in the cluster (e.g. IPv6 in IPv4 only cluster) is an error condition and will fail during clusterIP assignment.",
|
||||
"topologyKeys": "topologyKeys is a preference-order list of topology keys which implementations of services should use to preferentially sort endpoints when accessing this Service, it can not be used at the same time as externalTrafficPolicy=Local. Topology keys must be valid label keys and at most 16 keys may be specified. Endpoints are chosen based on the first topology key with available backends. If this field is specified and all entries have no backends that match the topology of the client, the service has no backends for that client and connections should fail. The special value \"*\" may be used to mean \"any topology\". This catch-all value, if used, only makes sense as the last value in the list. If this is not specified or empty, no topology constraints will be applied.",
|
||||
}
|
||||
|
||||
func (ServiceSpec) SwaggerDoc() map[string]string {
|
||||
|
@ -5186,6 +5186,11 @@ func (in *ServiceSpec) DeepCopyInto(out *ServiceSpec) {
|
||||
*out = new(IPFamily)
|
||||
**out = **in
|
||||
}
|
||||
if in.TopologyKeys != nil {
|
||||
in, out := &in.TopologyKeys, &out.TopologyKeys
|
||||
*out = make([]string, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -72,14 +72,17 @@
|
||||
"timeoutSeconds": -1973740160
|
||||
}
|
||||
},
|
||||
"ipFamily": "³-Ǐ忄*齧獚敆ȎțêɘIJ斬"
|
||||
"ipFamily": "³-Ǐ忄*齧獚敆ȎțêɘIJ斬",
|
||||
"topologyKeys": [
|
||||
"28"
|
||||
]
|
||||
},
|
||||
"status": {
|
||||
"loadBalancer": {
|
||||
"ingress": [
|
||||
{
|
||||
"ip": "28",
|
||||
"hostname": "29"
|
||||
"ip": "29",
|
||||
"hostname": "30"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
Binary file not shown.
@ -53,9 +53,11 @@ spec:
|
||||
sessionAffinityConfig:
|
||||
clientIP:
|
||||
timeoutSeconds: -1973740160
|
||||
topologyKeys:
|
||||
- "28"
|
||||
type: .蘯6ċV夸
|
||||
status:
|
||||
loadBalancer:
|
||||
ingress:
|
||||
- hostname: "29"
|
||||
ip: "28"
|
||||
- hostname: "30"
|
||||
ip: "29"
|
||||
|
Loading…
Reference in New Issue
Block a user