Service Topology implementation

* Implement Service Topology for ipvs and iptables proxier
* Add test files
* API validation
This commit is contained in:
Roc Chan 2019-07-16 17:22:43 +08:00
parent cdaeabfb46
commit c9cf3f5b72
25 changed files with 1006 additions and 43 deletions

View File

@ -35,6 +35,7 @@ import (
gerrors "github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
@ -663,6 +664,7 @@ func (s *ProxyServer) Run() error {
labelSelector := labels.NewSelector()
labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
// Make informers that filter out objects that want a non-default service proxy.
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector.String()
@ -690,6 +692,21 @@ func (s *ProxyServer) Run() error {
// functions must configure their shared informer event handlers first.
informerFactory.Start(wait.NeverStop)
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) {
// Make an informer that selects for our nodename.
currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
}))
nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod)
nodeConfig.RegisterEventHandler(s.Proxier)
go nodeConfig.Run(wait.NeverStop)
// This has to start after the calls to NewNodeConfig because that must
// configure the shared informer event handler first.
currentNodeInformerFactory.Start(wait.NeverStop)
}
// Birth Cry after the birth is successful
s.birthCry()

View File

@ -3391,6 +3391,8 @@ const (
IPv4Protocol IPFamily = "IPv4"
// IPv6Protocol indicates that this IP is IPv6 protocol
IPv6Protocol IPFamily = "IPv6"
// MaxServiceTopologyKeys is the largest number of topology keys allowed on a service
MaxServiceTopologyKeys = 16
)
// ServiceSpec describes the attributes that a user creates on a service
@ -3506,14 +3508,14 @@ type ServiceSpec struct {
// topologyKeys is a preference-order list of topology keys which
// implementations of services should use to preferentially sort endpoints
// when accessing this Service. Topology keys must be valid label keys and
// at most 16 keys may be specified.
// If any ready backends exist for index [0], they should always be chosen;
// only if no backends exist for index [0] should backends for index [1] be considered.
// when accessing this Service, it can not be used at the same time as
// externalTrafficPolicy=Local.
// Topology keys must be valid label keys and at most 16 keys may be specified.
// Endpoints are chosen based on the first topology key with available backends.
// If this field is specified and all entries have no backends that match
// the topology of the client, the service has no backends for that client
// and connections should fail.
// The special value "" may be used to mean "any node". This catch-all
// The special value "*" may be used to mean "any topology". This catch-all
// value, if used, only makes sense as the last value in the list.
// If this is not specified or empty, no topology constraints will be applied.
// +optional

View File

@ -4053,6 +4053,35 @@ func ValidateService(service *core.Service) field.ErrorList {
ports[key] = true
}
// Validate TopologyKeys
if len(service.Spec.TopologyKeys) > 0 {
topoPath := specPath.Child("topologyKeys")
// topologyKeys is mutually exclusive with 'externalTrafficPolicy=Local'
if service.Spec.ExternalTrafficPolicy == core.ServiceExternalTrafficPolicyTypeLocal {
allErrs = append(allErrs, field.Forbidden(topoPath, "may not be specified when `externalTrafficPolicy=Local`"))
}
if len(service.Spec.TopologyKeys) > core.MaxServiceTopologyKeys {
allErrs = append(allErrs, field.TooMany(topoPath, len(service.Spec.TopologyKeys), core.MaxServiceTopologyKeys))
}
topoKeys := sets.NewString()
for i, key := range service.Spec.TopologyKeys {
keyPath := topoPath.Index(i)
if topoKeys.Has(key) {
allErrs = append(allErrs, field.Duplicate(keyPath, key))
}
topoKeys.Insert(key)
// "Any" must be the last value specified
if key == v1.TopologyKeyAny && i != len(service.Spec.TopologyKeys)-1 {
allErrs = append(allErrs, field.Invalid(keyPath, key, `"*" must be the last value specified`))
}
if key != v1.TopologyKeyAny {
for _, msg := range validation.IsQualifiedName(key) {
allErrs = append(allErrs, field.Invalid(keyPath, service.Spec.TopologyKeys, msg))
}
}
}
}
// Validate SourceRange field and annotation
_, ok := service.Annotations[core.AnnotationLoadBalancerSourceRangesKey]
if len(service.Spec.LoadBalancerSourceRanges) > 0 || ok {
@ -4143,6 +4172,10 @@ func validateServiceExternalTrafficFieldsValue(service *core.Service) field.Erro
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("externalTrafficPolicy"), service.Spec.ExternalTrafficPolicy,
fmt.Sprintf("ExternalTrafficPolicy must be empty, %v or %v", core.ServiceExternalTrafficPolicyTypeCluster, core.ServiceExternalTrafficPolicyTypeLocal)))
}
// 'externalTrafficPolicy=Local' is mutually exclusive with topologyKeys
if service.Spec.ExternalTrafficPolicy == core.ServiceExternalTrafficPolicyTypeLocal && len(service.Spec.TopologyKeys) > 0 {
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec").Child("externalTrafficPolicy"), "externalTrafficPolicy must not be set to 'Local' when topologyKeys is specified"))
}
if service.Spec.HealthCheckNodePort < 0 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("healthCheckNodePort"), service.Spec.HealthCheckNodePort,
"HealthCheckNodePort must be not less than 0"))

View File

@ -18,6 +18,7 @@ package validation
import (
"bytes"
"fmt"
"math"
"reflect"
"strings"
@ -9380,6 +9381,7 @@ func TestValidatePodEphemeralContainersUpdate(t *testing.T) {
func TestValidateService(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SCTPSupport, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTopology, true)()
testCases := []struct {
name string
@ -10058,6 +10060,66 @@ func TestValidateService(t *testing.T) {
},
numErrs: 1,
},
{
name: "valid topology keys",
tweakSvc: func(s *core.Service) {
s.Spec.TopologyKeys = []string{
"kubernetes.io/hostname",
"failure-domain.beta.kubernetes.io/zone",
"failure-domain.beta.kubernetes.io/region",
v1.TopologyKeyAny,
}
},
numErrs: 0,
},
{
name: "invalid topology key",
tweakSvc: func(s *core.Service) {
s.Spec.TopologyKeys = []string{"NoUppercaseOrSpecialCharsLike=Equals"}
},
numErrs: 1,
},
{
name: "too many topology keys",
tweakSvc: func(s *core.Service) {
for i := 0; i < core.MaxServiceTopologyKeys+1; i++ {
s.Spec.TopologyKeys = append(s.Spec.TopologyKeys, fmt.Sprintf("topologykey-%d", i))
}
},
numErrs: 1,
},
{
name: `"Any" was not the last key`,
tweakSvc: func(s *core.Service) {
s.Spec.TopologyKeys = []string{
"kubernetes.io/hostname",
v1.TopologyKeyAny,
"failure-domain.beta.kubernetes.io/zone",
}
},
numErrs: 1,
},
{
name: `duplicate topology key`,
tweakSvc: func(s *core.Service) {
s.Spec.TopologyKeys = []string{
"kubernetes.io/hostname",
"kubernetes.io/hostname",
"failure-domain.beta.kubernetes.io/zone",
}
},
numErrs: 1,
},
{
name: `use topology keys with externalTrafficPolicy=Local`,
tweakSvc: func(s *core.Service) {
s.Spec.ExternalTrafficPolicy = "Local"
s.Spec.TopologyKeys = []string{
"kubernetes.io/hostname",
}
},
numErrs: 2,
},
}
for _, tc := range testCases {

View File

@ -45,6 +45,7 @@ type HollowProxy struct {
type FakeProxier struct {
proxyconfig.NoopEndpointSliceHandler
proxyconfig.NoopNodeHandler
}
func (*FakeProxier) Sync() {}

View File

@ -369,3 +369,128 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) {
c.eventHandlers[i].OnServiceDelete(service)
}
}
// NodeHandler is an abstract interface of objects which receive
// notifications about node object changes.
type NodeHandler interface {
// OnNodeAdd is called whenever creation of new node object
// is observed.
OnNodeAdd(node *v1.Node)
// OnNodeUpdate is called whenever modification of an existing
// node object is observed.
OnNodeUpdate(oldNode, node *v1.Node)
// OnNodeDelete is called whever deletion of an existing node
// object is observed.
OnNodeDelete(node *v1.Node)
// OnNodeSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
OnNodeSynced()
}
// NoopNodeHandler is a noop handler for proxiers that have not yet
// implemented a full NodeHandler.
type NoopNodeHandler struct{}
// OnNodeAdd is a noop handler for Node creates.
func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {}
// OnNodeUpdate is a noop handler for Node updates.
func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {}
// OnNodeDelete is a noop handler for Node deletes.
func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {}
// OnNodeSynced is a noop handler for Node syncs.
func (*NoopNodeHandler) OnNodeSynced() {}
// NodeConfig tracks a set of node configurations.
// It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change.
type NodeConfig struct {
listerSynced cache.InformerSynced
eventHandlers []NodeHandler
}
// NewNodeConfig creates a new NodeConfig.
func NewNodeConfig(nodeInformer coreinformers.NodeInformer, resyncPeriod time.Duration) *NodeConfig {
result := &NodeConfig{
listerSynced: nodeInformer.Informer().HasSynced,
}
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddNode,
UpdateFunc: result.handleUpdateNode,
DeleteFunc: result.handleDeleteNode,
},
resyncPeriod,
)
return result
}
// RegisterEventHandler registers a handler which is called on every node change.
func (c *NodeConfig) RegisterEventHandler(handler NodeHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
}
// Run starts the goroutine responsible for calling registered handlers.
func (c *NodeConfig) Run(stopCh <-chan struct{}) {
klog.Info("Starting node config controller")
if !cache.WaitForNamedCacheSync("node config", stopCh, c.listerSynced) {
return
}
for i := range c.eventHandlers {
klog.V(3).Infof("Calling handler.OnNodeSynced()")
c.eventHandlers[i].OnNodeSynced()
}
}
func (c *NodeConfig) handleAddNode(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
for i := range c.eventHandlers {
klog.V(4).Infof("Calling handler.OnNodeAdd")
c.eventHandlers[i].OnNodeAdd(node)
}
}
func (c *NodeConfig) handleUpdateNode(oldObj, newObj interface{}) {
oldNode, ok := oldObj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
return
}
node, ok := newObj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
return
}
for i := range c.eventHandlers {
klog.V(5).Infof("Calling handler.OnNodeUpdate")
c.eventHandlers[i].OnNodeUpdate(oldNode, node)
}
}
func (c *NodeConfig) handleDeleteNode(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
if node, ok = tombstone.Obj.(*v1.Node); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
}
for i := range c.eventHandlers {
klog.V(4).Infof("Calling handler.OnNodeDelete")
c.eventHandlers[i].OnNodeDelete(node)
}
}

View File

@ -48,7 +48,8 @@ var supportedEndpointSliceAddressTypes = sets.NewString(
type BaseEndpointInfo struct {
Endpoint string // TODO: should be an endpointString type
// IsLocal indicates whether the endpoint is running in same host as kube-proxy.
IsLocal bool
IsLocal bool
Topology map[string]string
}
var _ Endpoint = &BaseEndpointInfo{}
@ -63,6 +64,11 @@ func (info *BaseEndpointInfo) GetIsLocal() bool {
return info.IsLocal
}
// GetTopology returns the topology information of the endpoint.
func (info *BaseEndpointInfo) GetTopology() map[string]string {
return info.Topology
}
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IP() string {
return utilproxy.IPPart(info.Endpoint)
@ -78,10 +84,11 @@ func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
}
func newBaseEndpointInfo(IP string, port int, isLocal bool) *BaseEndpointInfo {
func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]string) *BaseEndpointInfo {
return &BaseEndpointInfo{
Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)),
IsLocal: isLocal,
Topology: topology,
}
}
@ -358,7 +365,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
continue
}
isLocal := addr.NodeName != nil && *addr.NodeName == ect.hostname
baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal)
baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal, nil)
if ect.makeEndpointInfo != nil {
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo))
} else {

View File

@ -400,7 +400,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
} else {
for i := range newEndpoints[x] {
ep := newEndpoints[x][i].(*BaseEndpointInfo)
if *ep != *(tc.expected[x][i]) {
if !(reflect.DeepEqual(*ep, *(tc.expected[x][i]))) {
t.Errorf("[%s] expected new[%v][%d] to be %v, got %v", tc.desc, x, i, tc.expected[x][i], *ep)
}
}
@ -1699,21 +1699,21 @@ func TestCheckoutChanges(t *testing.T) {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false),
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")},
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", ""), newTestEp("10.0.1.2:443", "")},
},
current: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")},
},
}},
items: map[types.NamespacedName]*endpointsChange{
{Namespace: "ns1", Name: "svc1"}: {
previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")},
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", ""), newTestEp("10.0.1.2:443", "")},
},
current: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")},
},
},
},
@ -1724,7 +1724,7 @@ func TestCheckoutChanges(t *testing.T) {
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{},
current: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")},
},
}},
useEndpointSlices: true,
@ -1737,11 +1737,11 @@ func TestCheckoutChanges(t *testing.T) {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")},
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", "host1"), newTestEp("10.0.1.2:443", "host1")},
},
current: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")},
},
}},
useEndpointSlices: true,
@ -1796,6 +1796,9 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
if len(newMap) != len(expected) {
t.Errorf("expected %d results, got %d: %v", len(expected), len(newMap), newMap)
}
endpointEqual := func(a, b *BaseEndpointInfo) bool {
return a.Endpoint == b.Endpoint && a.IsLocal == b.IsLocal
}
for x := range expected {
if len(newMap[x]) != len(expected[x]) {
t.Errorf("expected %d endpoints for %v, got %d", len(expected[x]), x, len(newMap[x]))
@ -1807,7 +1810,7 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
t.Errorf("Failed to cast endpointsInfo")
continue
}
if *newEp != *(expected[x][i]) {
if !endpointEqual(newEp, expected[x][i]) {
t.Errorf("expected new[%v][%d] to be %v, got %v (IsLocal expected %v, got %v)", x, i, expected[x][i], newEp, expected[x][i].IsLocal, newEp.IsLocal)
}
}
@ -1815,8 +1818,14 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
}
}
func newTestEp(ep string) *BaseEndpointInfo {
return &BaseEndpointInfo{Endpoint: ep}
func newTestEp(ep, host string) *BaseEndpointInfo {
endpointInfo := &BaseEndpointInfo{Endpoint: ep}
if host != "" {
endpointInfo.Topology = map[string]string{
"kubernetes.io/hostname": host,
}
}
return endpointInfo
}
func initializeCache(endpointSliceCache *EndpointSliceCache, endpointSlices []*discovery.EndpointSlice) {

View File

@ -254,7 +254,7 @@ func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName
}
isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname])
endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal)
endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology)
// This logic ensures we're deduping potential overlapping endpoints
// isLocal should not vary between matching IPs, but if it does, we

View File

@ -176,9 +176,9 @@ func TestEndpointInfoByServicePort(t *testing.T) {
},
expectedMap: spToEndpointMap{
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
"10.0.1.1": &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false},
"10.0.1.2": &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true},
"10.0.1.3": &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false},
"10.0.1.1": &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false, Topology: map[string]string{"kubernetes.io/hostname": "host2"}},
"10.0.1.2": &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true, Topology: map[string]string{"kubernetes.io/hostname": "host1"}},
"10.0.1.3": &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false, Topology: map[string]string{"kubernetes.io/hostname": "host2"}},
},
},
},

View File

@ -181,6 +181,7 @@ type Proxier struct {
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
nodeLabels map[string]string
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true
// when corresponding objects are synced after startup. This is used to avoid
// updating iptables with some partial data after kube-proxy restart.
@ -591,6 +592,47 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.syncProxyRules()
}
// OnNodeAdd is called whenever creation of new node object
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
return
}
proxier.mu.Lock()
proxier.nodeLabels = node.Labels
proxier.mu.Unlock()
}
// OnNodeUpdate is called whenever modification of an existing
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
return
}
proxier.mu.Lock()
proxier.nodeLabels = node.Labels
proxier.mu.Unlock()
}
// OnNodeDelete is called whever deletion of an existing node
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
return
}
proxier.mu.Lock()
proxier.nodeLabels = nil
proxier.mu.Unlock()
}
// OnNodeSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnNodeSynced() {
}
// portProtoHash takes the ServicePortName and protocol for a service
// returns the associated 16 character hash. This is computed by hashing (sha256)
// then encoding to base32 and truncating to 16 chars. We do this because IPTables
@ -858,7 +900,20 @@ func (proxier *Proxier) syncProxyRules() {
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
protocol := strings.ToLower(string(svcInfo.Protocol()))
svcNameString := svcInfo.serviceNameString
hasEndpoints := len(proxier.endpointsMap[svcName]) > 0
allEndpoints := proxier.endpointsMap[svcName]
hasEndpoints := len(allEndpoints) > 0
// Service Topology will not be enabled in the following cases:
// 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
// 2. ServiceTopology is not enabled.
// 3. EndpointSlice is not enabled (service topology depends on endpoint slice
// to get topology information).
if !svcInfo.OnlyNodeLocalEndpoints() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
allEndpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, svcInfo.TopologyKeys(), allEndpoints)
hasEndpoints = len(allEndpoints) > 0
}
svcChain := svcInfo.servicePortChainName
if hasEndpoints {
@ -1168,12 +1223,13 @@ func (proxier *Proxier) syncProxyRules() {
endpoints = endpoints[:0]
endpointChains = endpointChains[:0]
var endpointChain utiliptables.Chain
for _, ep := range proxier.endpointsMap[svcName] {
for _, ep := range allEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.Errorf("Failed to cast endpointsInfo %q", ep.String())
continue
}
endpoints = append(endpoints, epInfo)
endpointChain = epInfo.endpointChain(svcNameString, protocol)
endpointChains = append(endpointChains, endpointChain)
@ -1220,6 +1276,7 @@ func (proxier *Proxier) syncProxyRules() {
// Error parsing this endpoint has been logged. Skip to next endpoint.
continue
}
// Balancing rules in the per-service chain.
args = append(args[:0], "-A", string(svcChain))
proxier.appendServiceCommentLocked(args, svcNameString)

View File

@ -22,6 +22,8 @@ import (
"k8s.io/api/core/v1"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/config"
utilnet "k8s.io/utils/net"
discovery "k8s.io/api/discovery/v1beta1"
@ -30,6 +32,8 @@ import (
type metaProxier struct {
ipv4Proxier proxy.Provider
ipv6Proxier proxy.Provider
// TODO(imroc): implement node handler for meta proxier.
config.NoopNodeHandler
}
// NewMetaProxier returns a dual-stack "meta-proxier". Proxier API

View File

@ -30,6 +30,10 @@ import (
"sync/atomic"
"time"
"k8s.io/klog"
utilexec "k8s.io/utils/exec"
utilnet "k8s.io/utils/net"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
@ -38,7 +42,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
@ -50,8 +53,6 @@ import (
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
utilexec "k8s.io/utils/exec"
utilnet "k8s.io/utils/net"
)
const (
@ -200,6 +201,7 @@ type Proxier struct {
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
nodeLabels map[string]string
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when
// corresponding objects are synced after startup. This is used to avoid updating
// ipvs rules with some partial data after kube-proxy restart.
@ -896,6 +898,47 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.syncProxyRules()
}
// OnNodeAdd is called whenever creation of new node object
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
return
}
proxier.mu.Lock()
proxier.nodeLabels = node.Labels
proxier.mu.Unlock()
}
// OnNodeUpdate is called whenever modification of an existing
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
return
}
proxier.mu.Lock()
proxier.nodeLabels = node.Labels
proxier.mu.Unlock()
}
// OnNodeDelete is called whever deletion of an existing node
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
return
}
proxier.mu.Lock()
proxier.nodeLabels = nil
proxier.mu.Unlock()
}
// OnNodeSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnNodeSynced() {
}
// EntryInvalidErr indicates if an ipset entry is invalid or not
const EntryInvalidErr = "error adding entry %s to ipset %s"
@ -1866,7 +1909,18 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
curEndpoints.Insert(des.String())
}
for _, epInfo := range proxier.endpointsMap[svcPortName] {
endpoints := proxier.endpointsMap[svcPortName]
// Service Topology will not be enabled in the following cases:
// 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
// 2. ServiceTopology is not enabled.
// 3. EndpointSlice is not enabled (service topology depends on endpoint slice
// to get topology information).
if !onlyNodeLocalEndpoints && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
endpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, proxier.serviceMap[svcPortName].TopologyKeys(), endpoints)
}
for _, epInfo := range endpoints {
if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() {
continue
}

View File

@ -2951,7 +2951,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expe
t.Errorf("Failed to cast proxy.BaseEndpointInfo")
continue
}
if *newEp != *(expected[x][i]) {
if !reflect.DeepEqual(*newEp, *(expected[x][i])) {
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
}
}
@ -3702,7 +3702,6 @@ func TestEndpointSliceE2E(t *testing.T) {
// Add initial service
serviceName := "svc1"
namespaceName := "ns1"
fp.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{

View File

@ -51,6 +51,7 @@ type BaseServiceInfo struct {
loadBalancerSourceRanges []string
healthCheckNodePort int
onlyNodeLocalEndpoints bool
topologyKeys []string
}
var _ ServicePort = &BaseServiceInfo{}
@ -119,6 +120,11 @@ func (info *BaseServiceInfo) OnlyNodeLocalEndpoints() bool {
return info.onlyNodeLocalEndpoints
}
// TopologyKeys is part of ServicePort interface.
func (info *BaseServiceInfo) TopologyKeys() []string {
return info.topologyKeys
}
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
onlyNodeLocalEndpoints := false
if apiservice.RequestsOnlyLocalTraffic(service) {
@ -139,6 +145,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
sessionAffinityType: service.Spec.SessionAffinity,
stickyMaxAgeSeconds: stickyMaxAgeSeconds,
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
topologyKeys: service.Spec.TopologyKeys,
}
if sct.isIPv6Mode == nil {

80
pkg/proxy/topology.go Normal file
View File

@ -0,0 +1,80 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
v1 "k8s.io/api/core/v1"
)
// FilterTopologyEndpoint returns the appropriate endpoints based on the cluster
// topology.
// This uses the current node's labels, which contain topology information, and
// the required topologyKeys to find appropriate endpoints. If both the endpoint's
// topology and the current node have matching values for topologyKeys[0], the
// endpoint will be chosen. If no endpoints are chosen, toplogyKeys[1] will be
// considered, and so on. If either the node or the endpoint do not have values
// for a key, it is considered to not match.
//
// If topologyKeys is specified, but no endpoints are chosen for any key, the
// the service has no viable endpoints for clients on this node, and connections
// should fail.
//
// The special key "*" may be used as the last entry in topologyKeys to indicate
// "any endpoint" is acceptable.
//
// If topologyKeys is not specified or empty, no topology constraints will be
// applied and this will return all endpoints.
func FilterTopologyEndpoint(nodeLabels map[string]string, topologyKeys []string, endpoints []Endpoint) []Endpoint {
// Do not filter endpoints if service has no topology keys.
if len(topologyKeys) == 0 {
return endpoints
}
filteredEndpoint := []Endpoint{}
if len(nodeLabels) == 0 {
if topologyKeys[len(topologyKeys)-1] == v1.TopologyKeyAny {
// edge case: include all endpoints if topology key "Any" specified
// when we cannot determine current node's topology.
return endpoints
}
// edge case: do not include any endpoints if topology key "Any" is
// not specified when we cannot determine current node's topology.
return filteredEndpoint
}
for _, key := range topologyKeys {
if key == v1.TopologyKeyAny {
return endpoints
}
topologyValue, found := nodeLabels[key]
if !found {
continue
}
for _, ep := range endpoints {
topology := ep.GetTopology()
if value, found := topology[key]; found && value == topologyValue {
filteredEndpoint = append(filteredEndpoint, ep)
}
}
if len(filteredEndpoint) > 0 {
return filteredEndpoint
}
}
return filteredEndpoint
}

478
pkg/proxy/topology_test.go Normal file
View File

@ -0,0 +1,478 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)
func TestFilterTopologyEndpoint(t *testing.T) {
type endpoint struct {
Endpoint string
NodeName types.NodeName
}
testCases := []struct {
Name string
nodeLabels map[types.NodeName]map[string]string
endpoints []endpoint
currentNodeName types.NodeName
topologyKeys []string
expected []endpoint
}{
{
// Case[0]: no topology key and endpoints at all = 0 endpoints
Name: "no topology key and endpoints",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "10.0.0.1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
}},
endpoints: []endpoint{},
currentNodeName: "testNode1",
topologyKeys: nil,
expected: []endpoint{},
},
{
// Case[1]: no topology key, 2 nodes each with 2 endpoints = 4
// endpoints
Name: "no topology key but have endpoints",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "testNode1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode2": {
"kubernetes.io/hostname": "testNode2",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
},
endpoints: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
},
currentNodeName: "testNode1",
topologyKeys: nil,
expected: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
},
},
{
// Case[2]: 1 topology key (hostname), 2 nodes each with 2 endpoints
// 1 match = 2 endpoints
Name: "one topology key with one node matched",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "testNode1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode2": {
"kubernetes.io/hostname": "testNode2",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
},
endpoints: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
},
currentNodeName: "testNode1",
topologyKeys: []string{"kubernetes.io/hostname"},
expected: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
},
},
{
// Case[3]: 1 topology key (hostname), 2 nodes each with 2 endpoints
// no match = 0 endpoints
Name: "one topology key without node matched",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "testNode1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode2": {
"kubernetes.io/hostname": "testNode2",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode3": {
"kubernetes.io/hostname": "testNode3",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
},
endpoints: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
},
currentNodeName: "testNode3",
topologyKeys: []string{"kubernetes.io/hostname"},
expected: []endpoint{},
},
{
// Case[4]: 1 topology key (zone), 2 nodes in zone a, 2 nodes in
// zone b, each with 2 endpoints = 4 endpoints
Name: "one topology key with multiple nodes matched",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "testNode1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode2": {
"kubernetes.io/hostname": "testNode2",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode3": {
"kubernetes.io/hostname": "testNode3",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
"testNode4": {
"kubernetes.io/hostname": "testNode4",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
},
endpoints: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
},
currentNodeName: "testNode2",
topologyKeys: []string{"topology.kubernetes.io/zone"},
expected: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
},
},
{
// Case[5]: 2 topology keys (hostname, zone), 2 nodes each with 2
// endpoints, 1 hostname match = 2 endpoints (2nd key ignored)
Name: "early match in multiple topology keys",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "testNode1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode2": {
"kubernetes.io/hostname": "testNode2",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode3": {
"kubernetes.io/hostname": "testNode3",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
"testNode4": {
"kubernetes.io/hostname": "testNode4",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
},
endpoints: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
},
currentNodeName: "testNode2",
topologyKeys: []string{"kubernetes.io/hostname"},
expected: []endpoint{
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
},
},
{
// Case[6]: 2 topology keys (hostname, zone), 2 nodes in zone a, 2
// nodes in zone b, each with 2 endpoints, no hostname match, 1 zone
// match = 4 endpoints
Name: "later match in multiple topology keys",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "testNode1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode2": {
"kubernetes.io/hostname": "testNode2",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode3": {
"kubernetes.io/hostname": "testNode3",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
"testNode4": {
"kubernetes.io/hostname": "testNode4",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
"testNode5": {
"kubernetes.io/hostname": "testNode5",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
},
endpoints: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
},
currentNodeName: "testNode5",
topologyKeys: []string{"kubernetes.io/hostname", "topology.kubernetes.io/zone"},
expected: []endpoint{
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
},
},
{
// Case[7]: 2 topology keys (hostname, zone), 2 nodes in zone a, 2
// nodes in zone b, each with 2 endpoints, no hostname match, no zone
// match = 0 endpoints
Name: "multiple topology keys without node matched",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "testNode1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode2": {
"kubernetes.io/hostname": "testNode2",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode3": {
"kubernetes.io/hostname": "testNode3",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
"testNode4": {
"kubernetes.io/hostname": "testNode4",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
"testNode5": {
"kubernetes.io/hostname": "testNode5",
"topology.kubernetes.io/zone": "90003",
"topology.kubernetes.io/region": "cd",
},
},
endpoints: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
},
currentNodeName: "testNode5",
topologyKeys: []string{"kubernetes.io/hostname", "topology.kubernetes.io/zone"},
expected: []endpoint{},
},
{
// Case[8]: 2 topology keys (hostname, "*"), 2 nodes each with 2
// endpoints, 1 match hostname = 2 endpoints
Name: "multiple topology keys matched node when 'Any' key ignored",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "testNode1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode2": {
"kubernetes.io/hostname": "testNode2",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
},
endpoints: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
},
currentNodeName: "testNode1",
topologyKeys: []string{"kubernetes.io/hostname", v1.TopologyKeyAny},
expected: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
},
},
{
// Case[9]: 2 topology keys (hostname, "*"), 2 nodes each with 2
// endpoints, no hostname match, catch-all ("*") matched with 4
// endpoints
Name: "two topology keys matched node with 'Any' key",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "testNode1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode2": {
"kubernetes.io/hostname": "testNode2",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode3": {
"kubernetes.io/hostname": "testNode3",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
},
endpoints: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
},
currentNodeName: "testNode3",
topologyKeys: []string{"kubernetes.io/hostname", v1.TopologyKeyAny},
expected: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
},
},
{
// Case[10]: 3 topology keys (hostname, zone, "*"), 2 nodes in zone a,
// 2 nodes in zone b, each with 2 endpoints, no hostname match, no
// zone, catch-all ("*") matched with 8 endpoints
Name: "multiple topology keys matched node with 'Any' key",
nodeLabels: map[types.NodeName]map[string]string{
"testNode1": {
"kubernetes.io/hostname": "testNode1",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode2": {
"kubernetes.io/hostname": "testNode2",
"topology.kubernetes.io/zone": "90001",
"topology.kubernetes.io/region": "cd",
},
"testNode3": {
"kubernetes.io/hostname": "testNode3",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
"testNode4": {
"kubernetes.io/hostname": "testNode4",
"topology.kubernetes.io/zone": "90002",
"topology.kubernetes.io/region": "cd",
},
"testNode5": {
"kubernetes.io/hostname": "testNode5",
"topology.kubernetes.io/zone": "90003",
"topology.kubernetes.io/region": "cd",
},
},
endpoints: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
},
currentNodeName: "testNode5",
topologyKeys: []string{"kubernetes.io/hostname", "topology.kubernetes.io/zone", v1.TopologyKeyAny},
expected: []endpoint{
{Endpoint: "1.1.1.1:11", NodeName: "testNode1"},
{Endpoint: "1.1.1.2:11", NodeName: "testNode1"},
{Endpoint: "1.1.2.1:11", NodeName: "testNode2"},
{Endpoint: "1.1.2.2:11", NodeName: "testNode2"},
{Endpoint: "1.1.3.1:11", NodeName: "testNode3"},
{Endpoint: "1.1.3.2:11", NodeName: "testNode3"},
{Endpoint: "1.1.4.1:11", NodeName: "testNode4"},
{Endpoint: "1.1.4.2:11", NodeName: "testNode4"},
},
},
}
endpointsToStringArray := func(endpoints []endpoint) []string {
result := make([]string, 0, len(endpoints))
for _, ep := range endpoints {
result = append(result, ep.Endpoint)
}
return result
}
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
m := make(map[Endpoint]endpoint)
endpoints := []Endpoint{}
for _, ep := range tc.endpoints {
var e Endpoint = &BaseEndpointInfo{Endpoint: ep.Endpoint, Topology: tc.nodeLabels[ep.NodeName]}
m[e] = ep
endpoints = append(endpoints, e)
}
currentNodeLabels := tc.nodeLabels[tc.currentNodeName]
filteredEndpoint := []endpoint{}
for _, ep := range FilterTopologyEndpoint(currentNodeLabels, tc.topologyKeys, endpoints) {
filteredEndpoint = append(filteredEndpoint, m[ep])
}
if !reflect.DeepEqual(filteredEndpoint, tc.expected) {
t.Errorf("expected %v, got %v", endpointsToStringArray(tc.expected), endpointsToStringArray(filteredEndpoint))
}
})
}
}

View File

@ -30,6 +30,7 @@ type Provider interface {
config.EndpointsHandler
config.EndpointSliceHandler
config.ServiceHandler
config.NodeHandler
// Sync immediately synchronizes the Provider's current state to proxy rules.
Sync()
@ -77,6 +78,8 @@ type ServicePort interface {
NodePort() int
// GetOnlyNodeLocalEndpoints returns if a service has only node local endpoints
OnlyNodeLocalEndpoints() bool
// TopologyKeys returns service TopologyKeys as a string array.
TopologyKeys() []string
}
// Endpoint in an interface which abstracts information about an endpoint.
@ -87,6 +90,8 @@ type Endpoint interface {
String() string
// GetIsLocal returns true if the endpoint is running in same host as kube-proxy, otherwise returns false.
GetIsLocal() bool
// GetTopology returns the topology information of the endpoint.
GetTopology() map[string]string
// IP returns IP part of the endpoint.
IP() string
// Port returns the Port part of the endpoint.

View File

@ -112,6 +112,8 @@ type asyncRunnerInterface interface {
type Proxier struct {
// EndpointSlice support has not been added for this proxier yet.
config.NoopEndpointSliceHandler
// TODO(imroc): implement node handler for userspace proxier.
config.NoopNodeHandler
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap

View File

@ -444,6 +444,8 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap, curServices proxySe
type Proxier struct {
// EndpointSlice support has not been added for this proxier yet.
proxyconfig.NoopEndpointSliceHandler
// TODO(imroc): implement node handler for winkernel proxier.
proxyconfig.NoopNodeHandler
// endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since policies were synced. For a single object,

View File

@ -83,6 +83,8 @@ func logTimeout(err error) bool {
type Proxier struct {
// EndpointSlice support has not been added for this proxier yet.
config.NoopEndpointSliceHandler
// TODO(imroc): implement node handler for winuserspace proxier.
config.NoopNodeHandler
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap

View File

@ -23,11 +23,10 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/storage/names"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/validation"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
@ -121,6 +120,10 @@ func dropServiceDisabledFields(newSvc *api.Service, oldSvc *api.Service) {
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && !serviceIPFamilyInUse(oldSvc) {
newSvc.Spec.IPFamily = nil
}
// Drop TopologyKeys if ServiceTopology is not enabled
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && !topologyKeysInUse(oldSvc) {
newSvc.Spec.TopologyKeys = nil
}
}
// returns true if svc.Spec.ServiceIPFamily field is in use
@ -134,6 +137,14 @@ func serviceIPFamilyInUse(svc *api.Service) bool {
return false
}
// returns true if svc.Spec.TopologyKeys field is in use
func topologyKeysInUse(svc *api.Service) bool {
if svc == nil {
return false
}
return len(svc.Spec.TopologyKeys) > 0
}
type serviceStatusStrategy struct {
svcStrategy
}

View File

@ -461,7 +461,7 @@ func ClusterRoles() []rbacv1.ClusterRole {
// node-proxier role is used by kube-proxy.
nodeProxierRules := []rbacv1.PolicyRule{
rbacv1helpers.NewRule("list", "watch").Groups(legacyGroup).Resources("services", "endpoints").RuleOrDie(),
rbacv1helpers.NewRule("get").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
eventsRule(),
}

View File

@ -1040,6 +1040,8 @@ items:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
- events.k8s.io

View File

@ -30,6 +30,8 @@ const (
NamespaceAll string = ""
// NamespaceNodeLease is the namespace where we place node lease objects (used for node heartbeats)
NamespaceNodeLease string = "kube-node-lease"
// TopologyKeyAny is the service topology key that matches any node
TopologyKeyAny string = "*"
)
// Volume represents a named volume in a pod that may be accessed by any container in the pod.
@ -3826,6 +3828,8 @@ const (
IPv4Protocol IPFamily = "IPv4"
// IPv6Protocol indicates that this IP is IPv6 protocol
IPv6Protocol IPFamily = "IPv6"
// MaxServiceTopologyKeys is the largest number of topology keys allowed on a service
MaxServiceTopologyKeys = 16
)
// ServiceSpec describes the attributes that a user creates on a service.
@ -3957,18 +3961,18 @@ type ServiceSpec struct {
// topologyKeys is a preference-order list of topology keys which
// implementations of services should use to preferentially sort endpoints
// when accessing this Service. Topology keys must be valid label keys and
// at most 16 keys may be specified.
// If any ready backends exist for index [0], they should always be chosen;
// only if no backends exist for index [0] should backends for index [1] be considered.
// when accessing this Service, it can not be used at the same time as
// externalTrafficPolicy=Local.
// Topology keys must be valid label keys and at most 16 keys may be specified.
// Endpoints are chosen based on the first topology key with available backends.
// If this field is specified and all entries have no backends that match
// the topology of the client, the service has no backends for that client
// and connections should fail.
// The special value "" may be used to mean "any node". This catch-all
// The special value "*" may be used to mean "any topology". This catch-all
// value, if used, only makes sense as the last value in the list.
// If this is not specified or empty, no topology constraints will be applied.
// +optional
TopologyKeys []string `json:"topologyKeys,omitempty" protobuf:"bytes,15,opt,name=topologyKeys"`
TopologyKeys []string `json:"topologyKeys,omitempty" protobuf:"bytes,16,opt,name=topologyKeys"`
}
// ServicePort contains information on service's port.