From f0dfac5d076bc38b65e463b8398adf9d6ec04640 Mon Sep 17 00:00:00 2001 From: Max Renaud Date: Tue, 29 Mar 2022 21:09:45 +0000 Subject: [PATCH 1/5] Add sync_proxy_rules_no_local_endpoints_total metric --- pkg/proxy/iptables/proxier.go | 37 +++++--- pkg/proxy/iptables/proxier_test.go | 132 +++++++++++++++++++++++++++ pkg/proxy/ipvs/proxier.go | 43 ++++++++- pkg/proxy/ipvs/proxier_test.go | 139 +++++++++++++++++++++++++++++ pkg/proxy/metrics/metrics.go | 14 +++ 5 files changed, 354 insertions(+), 11 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 8692e3ae7c2..5f8094ef605 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -38,9 +38,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/events" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metaproxier" @@ -993,6 +995,11 @@ func (proxier *Proxier) syncProxyRules() { } } + // These two variables are used to publish the sync_proxy_rules_no_endpoints_total + // metric. + serviceNoLocalEndpointsTotalInternal := 0 + serviceNoLocalEndpointsTotalExternal := 0 + // Build rules for each service-port. for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) @@ -1347,16 +1354,24 @@ func (proxier *Proxier) syncProxyRules() { if len(localEndpoints) != 0 { // Write rules jumping from localPolicyChain to localEndpointChains proxier.writeServiceToEndpointRules(svcNameString, svcInfo, localPolicyChain, localEndpoints, args) - } else if hasEndpoints { - // Blackhole all traffic since there are no local endpoints - args = append(args[:0], - "-A", string(localPolicyChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"%s has no local endpoints"`, svcNameString), - "-j", - string(KubeMarkDropChain), - ) - proxier.natRules.Write(args) + } else { + if svcInfo.InternalPolicyLocal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { + serviceNoLocalEndpointsTotalInternal++ + } + if svcInfo.ExternalPolicyLocal() { + serviceNoLocalEndpointsTotalExternal++ + } + if hasEndpoints { + // Blackhole all traffic since there are no local endpoints + args = append(args[:0], + "-A", string(localPolicyChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"%s has no local endpoints"`, svcNameString), + "-j", + string(KubeMarkDropChain), + ) + proxier.natRules.Write(args) + } } } } @@ -1478,6 +1493,8 @@ func (proxier *Proxier) syncProxyRules() { } } + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal)) + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal)) if proxier.healthzServer != nil { proxier.healthzServer.Updated() } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 3544f748b99..c38ffdd71d9 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -6050,3 +6050,135 @@ func TestEndpointCommentElision(t *testing.T) { t.Errorf("numComments (%d) != 0 when numEndpoints (%d) > threshold (%d)", numComments, numEndpoints, endpointChainsNumberThreshold) } } + +func TestNoEndpointsMetric(t *testing.T) { + type endpoint struct { + ip string + hostname string + } + + internalTrafficPolicyLocal := v1.ServiceInternalTrafficPolicyLocal + externalTrafficPolicyLocal := v1.ServiceExternalTrafficPolicyTypeLocal + + metrics.RegisterMetrics() + testCases := []struct { + name string + internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType + externalTrafficPolicy v1.ServiceExternalTrafficPolicyType + endpoints []endpoint + expectedSyncProxyRulesNoLocalEndpointsTotalInternal int + expectedSyncProxyRulesNoLocalEndpointsTotalExternal int + }{ + { + name: "internalTrafficPolicy is set and there is non-zero local endpoints", + internalTrafficPolicy: &internalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, + { + name: "externalTrafficPolicy is set and there is non-zero local endpoints", + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, + { + name: "internalTrafficPolicy is set and there is zero local endpoint", + internalTrafficPolicy: &internalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, + }, + { + name: "externalTrafficPolicy is set and there is zero local endpoint", + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, true)() + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt) + fp.OnServiceSynced() + fp.OnEndpointSlicesSynced() + + serviceName := "svc1" + namespaceName := "ns1" + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.30.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "", Port: 80, Protocol: v1.ProtocolTCP, NodePort: 123}}, + }, + } + if tc.internalTrafficPolicy != nil { + svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy + } + if tc.externalTrafficPolicy != "" { + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.ExternalTrafficPolicy = tc.externalTrafficPolicy + } + + fp.OnServiceAdd(svc) + + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + } + for _, ep := range tc.endpoints { + endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{ + Addresses: []string{ep.ip}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + NodeName: utilpointer.StringPtr(ep.hostname), + }) + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + syncProxyRulesNoLocalEndpointsTotalInternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) + } + + if tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal != int(syncProxyRulesNoLocalEndpointsTotalInternal) { + t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalInternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal) + } + + syncProxyRulesNoLocalEndpointsTotalExternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external")) + if err != nil { + t.Errorf("failed to get %s value(external), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) + } + + if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) { + t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal) + } + }) + } +} diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index fe91de9691f..3efb9572141 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -273,6 +273,16 @@ type Proxier struct { // Inject for test purpose. networkInterfacer utilproxy.NetworkInterfacer gracefuldeleteManager *GracefulTerminationManager + // serviceNoLocalEndpointsInternal is a map of services that couldn't have their rules applied + // due to the absence of local endpoints when the internal traffic policy label set to "Local". + // It is used to publish the sync_proxy_rules_no_endpoints_total + // metric with the traffic_policy label set to "internal". + serviceNoLocalEndpointsInternal map[proxy.ServicePortName]bool + // serviceNoLocalEndpointsExternal is a map of services that couldn't have their rules applied + // due to the absence of any endpoints when the external traffic policy is "Local". + // It is used to publish the sync_proxy_rules_no_endpoints_total + // metric with the traffic_policy label set to "external". + serviceNoLocalEndpointsExternal map[proxy.ServicePortName]bool } // IPGetter helps get node network interface IP and IPs binded to the IPVS dummy interface @@ -1027,6 +1037,8 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).InfoS("Syncing ipvs proxier rules") + proxier.serviceNoLocalEndpointsInternal = make(map[proxy.ServicePortName]bool) + proxier.serviceNoLocalEndpointsExternal = make(map[proxy.ServicePortName]bool) // Begin install iptables // Reset all buffers used later. @@ -1599,6 +1611,24 @@ func (proxier *Proxier) syncProxyRules() { } } proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) + + serviceNoLocalEndpointsInternalTotal := 0 + serviceNoLocalEndpointsExternalTotal := 0 + + for _, v := range proxier.serviceNoLocalEndpointsInternal { + if v { + serviceNoLocalEndpointsInternalTotal++ + } + } + + for _, v := range proxier.serviceNoLocalEndpointsExternal { + if v { + serviceNoLocalEndpointsExternalTotal++ + } + } + + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsInternalTotal)) + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsExternalTotal)) } // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed @@ -1962,6 +1992,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode endpoints := proxier.endpointsMap[svcPortName] + localEndpoints := []proxy.Endpoint{} // Filtering for topology aware endpoints. This function will only // filter endpoints if appropriate feature gates are enabled and the // Service does not have conflicting configuration such as @@ -1970,7 +2001,8 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode if !ok { klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName) } else { - clusterEndpoints, localEndpoints, _, _ := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels) + var clusterEndpoints []proxy.Endpoint + clusterEndpoints, localEndpoints, _, _ = proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels) if onlyNodeLocalEndpoints { if len(localEndpoints) > 0 { endpoints = localEndpoints @@ -1990,6 +2022,15 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode for _, epInfo := range endpoints { newEndpoints.Insert(epInfo.String()) } + if svcInfo.UsesLocalEndpoints() && len(localEndpoints) == 0 { + if svcInfo.NodeLocalInternal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { + proxier.serviceNoLocalEndpointsInternal[svcPortName] = true + } + + if svcInfo.NodeLocalExternal() { + proxier.serviceNoLocalEndpointsExternal[svcPortName] = true + } + } // Create new endpoints for _, ep := range newEndpoints.List() { diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 2f43083774a..97719131774 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -35,10 +35,12 @@ import ( "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" + "k8s.io/kubernetes/pkg/proxy/metrics" utilproxy "k8s.io/kubernetes/pkg/proxy/util" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" @@ -5799,3 +5801,140 @@ func TestIpIsValidForSet(t *testing.T) { } } } + +func TestNoEndpointsMetric(t *testing.T) { + type endpoint struct { + ip string + hostname string + } + + internalTrafficPolicyLocal := v1.ServiceInternalTrafficPolicyLocal + externalTrafficPolicyLocal := v1.ServiceExternalTrafficPolicyTypeLocal + metrics.RegisterMetrics() + + testCases := []struct { + name string + internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType + externalTrafficPolicy v1.ServiceExternalTrafficPolicyType + endpoints []endpoint + expectedSyncProxyRulesNoLocalEndpointsTotalInternal int + expectedSyncProxyRulesNoLocalEndpointsTotalExternal int + }{ + { + name: "internalTrafficPolicy is set and there is non-zero local endpoints", + internalTrafficPolicy: &internalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, + { + name: "externalTrafficPolicy is set and there is non-zero local endpoints", + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, + { + name: "internalTrafficPolicy is set and there is zero local endpoint", + internalTrafficPolicy: &internalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, + }, + { + name: "externalTrafficPolicy is set and there is zero local endpoint", + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, + }, + } + for _, tc := range testCases { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, true)() + + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) + fp.servicesSynced = true + // fp.endpointsSynced = true + fp.endpointSlicesSynced = true + + // Add initial service + serviceName := "svc1" + namespaceName := "ns1" + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP, NodePort: 123}}, + }, + } + if tc.internalTrafficPolicy != nil { + svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy + } + if tc.externalTrafficPolicy != "" { + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.ExternalTrafficPolicy = tc.externalTrafficPolicy + } + + fp.OnServiceAdd(svc) + + // Add initial endpoint slice + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + } + + for _, ep := range tc.endpoints { + endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{ + Addresses: []string{ep.ip}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + NodeName: utilpointer.StringPtr(ep.hostname), + }) + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + + syncProxyRulesNoLocalEndpointsTotalInternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal")) + if err != nil { + t.Errorf("failed to get %s value(internal), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) + } + + if tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal != int(syncProxyRulesNoLocalEndpointsTotalInternal) { + t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalInternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal) + } + + syncProxyRulesNoLocalEndpointsTotalExternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external")) + if err != nil { + t.Errorf("failed to get %s value(external), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) + } + + if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) { + t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal) + } + } +} diff --git a/pkg/proxy/metrics/metrics.go b/pkg/proxy/metrics/metrics.go index 9c1aeca55ba..f61cf4fc991 100644 --- a/pkg/proxy/metrics/metrics.go +++ b/pkg/proxy/metrics/metrics.go @@ -148,6 +148,19 @@ var ( StabilityLevel: metrics.ALPHA, }, ) + + // SyncProxyRulesNoLocalEndpointsTotal is the total number of rules that do + // not have an available endpoint. This can be caused by an internal + // traffic policy with no available local workload. + SyncProxyRulesNoLocalEndpointsTotal = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: kubeProxySubsystem, + Name: "sync_proxy_rules_no_local_endpoints_total", + Help: "Number of services with a Local traffic policy and no endpoints", + StabilityLevel: metrics.ALPHA, + }, + []string{"traffic_policy"}, + ) ) var registerMetricsOnce sync.Once @@ -165,6 +178,7 @@ func RegisterMetrics() { legacyregistry.MustRegister(IptablesRulesTotal) legacyregistry.MustRegister(IptablesRestoreFailuresTotal) legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp) + legacyregistry.MustRegister(SyncProxyRulesNoLocalEndpointsTotal) }) } From ba4f5c4e7b120c9dba1851f12ab10a4f79b48a32 Mon Sep 17 00:00:00 2001 From: Max Renaud Date: Wed, 30 Mar 2022 02:45:32 +0000 Subject: [PATCH 2/5] use sets.String for tracking IPVS no local endpoint metric --- pkg/proxy/ipvs/proxier.go | 47 ++++++++++++---------------------- pkg/proxy/ipvs/proxier_test.go | 8 +++--- 2 files changed, 21 insertions(+), 34 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 3efb9572141..4abdcc283c8 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -273,16 +273,20 @@ type Proxier struct { // Inject for test purpose. networkInterfacer utilproxy.NetworkInterfacer gracefuldeleteManager *GracefulTerminationManager - // serviceNoLocalEndpointsInternal is a map of services that couldn't have their rules applied - // due to the absence of local endpoints when the internal traffic policy label set to "Local". + // serviceNoLocalEndpointsInternal represents the set of services that couldn't be applied + // due to the absence of local endpoints when the internal traffic policy is "Local". // It is used to publish the sync_proxy_rules_no_endpoints_total // metric with the traffic_policy label set to "internal". - serviceNoLocalEndpointsInternal map[proxy.ServicePortName]bool - // serviceNoLocalEndpointsExternal is a map of services that couldn't have their rules applied + // sets.String is used here since we end up calculating endpoint topology multiple times for the same Service + // if it has mulitple ports but each Service should only be counted once. + serviceNoLocalEndpointsInternal sets.String + // serviceNoLocalEndpointsExternal irepresents the set of services that couldn't be applied // due to the absence of any endpoints when the external traffic policy is "Local". // It is used to publish the sync_proxy_rules_no_endpoints_total // metric with the traffic_policy label set to "external". - serviceNoLocalEndpointsExternal map[proxy.ServicePortName]bool + // sets.String is used here since we end up calculating endpoint topology multiple times for the same Service + // if it has mulitple ports but each Service should only be counted once. + serviceNoLocalEndpointsExternal sets.String } // IPGetter helps get node network interface IP and IPs binded to the IPVS dummy interface @@ -1037,8 +1041,8 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).InfoS("Syncing ipvs proxier rules") - proxier.serviceNoLocalEndpointsInternal = make(map[proxy.ServicePortName]bool) - proxier.serviceNoLocalEndpointsExternal = make(map[proxy.ServicePortName]bool) + proxier.serviceNoLocalEndpointsInternal = sets.NewString() + proxier.serviceNoLocalEndpointsExternal = sets.NewString() // Begin install iptables // Reset all buffers used later. @@ -1612,23 +1616,8 @@ func (proxier *Proxier) syncProxyRules() { } proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) - serviceNoLocalEndpointsInternalTotal := 0 - serviceNoLocalEndpointsExternalTotal := 0 - - for _, v := range proxier.serviceNoLocalEndpointsInternal { - if v { - serviceNoLocalEndpointsInternalTotal++ - } - } - - for _, v := range proxier.serviceNoLocalEndpointsExternal { - if v { - serviceNoLocalEndpointsExternalTotal++ - } - } - - metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsInternalTotal)) - metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsExternalTotal)) + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len())) + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len())) } // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed @@ -1992,7 +1981,6 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode endpoints := proxier.endpointsMap[svcPortName] - localEndpoints := []proxy.Endpoint{} // Filtering for topology aware endpoints. This function will only // filter endpoints if appropriate feature gates are enabled and the // Service does not have conflicting configuration such as @@ -2001,8 +1989,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode if !ok { klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName) } else { - var clusterEndpoints []proxy.Endpoint - clusterEndpoints, localEndpoints, _, _ = proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels) + clusterEndpoints, localEndpoints, _, _ := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels) if onlyNodeLocalEndpoints { if len(localEndpoints) > 0 { endpoints = localEndpoints @@ -2022,13 +2009,13 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode for _, epInfo := range endpoints { newEndpoints.Insert(epInfo.String()) } - if svcInfo.UsesLocalEndpoints() && len(localEndpoints) == 0 { + if onlyNodeLocalEndpoints && len(newEndpoints) == 0 { if svcInfo.NodeLocalInternal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { - proxier.serviceNoLocalEndpointsInternal[svcPortName] = true + proxier.serviceNoLocalEndpointsInternal.Insert(svcPortName.NamespacedName.String()) } if svcInfo.NodeLocalExternal() { - proxier.serviceNoLocalEndpointsExternal[svcPortName] = true + proxier.serviceNoLocalEndpointsExternal.Insert(svcPortName.NamespacedName.String()) } } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 97719131774..20dbc70650b 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -5865,7 +5865,7 @@ func TestNoEndpointsMetric(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{netutils.ParseIPSloppy("10.0.0.1")}, nil, v1.IPv4Protocol) fp.servicesSynced = true // fp.endpointsSynced = true fp.endpointSlicesSynced = true @@ -5879,7 +5879,7 @@ func TestNoEndpointsMetric(t *testing.T) { Spec: v1.ServiceSpec{ ClusterIP: "172.20.1.1", Selector: map[string]string{"foo": "bar"}, - Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP, NodePort: 123}}, + Ports: []v1.ServicePort{{Name: "p80", Port: 80, TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP, NodePort: 30000}}, }, } if tc.internalTrafficPolicy != nil { @@ -5901,7 +5901,7 @@ func TestNoEndpointsMetric(t *testing.T) { Labels: map[string]string{discovery.LabelServiceName: serviceName}, }, Ports: []discovery.EndpointPort{{ - Name: utilpointer.StringPtr(""), + Name: utilpointer.StringPtr("p80"), Port: utilpointer.Int32Ptr(80), Protocol: &tcpProtocol, }}, @@ -5934,7 +5934,7 @@ func TestNoEndpointsMetric(t *testing.T) { } if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) { - t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal) + t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(external): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal) } } } From 198367a4865161424f99d89c07dfaca6df9784f3 Mon Sep 17 00:00:00 2001 From: Max Renaud Date: Wed, 30 Mar 2022 03:11:58 +0000 Subject: [PATCH 3/5] Added test where both policies are set --- pkg/proxy/iptables/proxier_test.go | 22 ++++++++++++++++++++++ pkg/proxy/ipvs/proxier.go | 4 ++-- pkg/proxy/ipvs/proxier_test.go | 22 ++++++++++++++++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index c38ffdd71d9..c2c94981d08 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -6087,6 +6087,16 @@ func TestNoEndpointsMetric(t *testing.T) { {"10.0.1.3", "host2"}, }, }, + { + name: "both policies are set and there is non-zero local endpoints", + internalTrafficPolicy: &internalTrafficPolicyLocal, + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, { name: "internalTrafficPolicy is set and there is zero local endpoint", internalTrafficPolicy: &internalTrafficPolicyLocal, @@ -6107,6 +6117,18 @@ func TestNoEndpointsMetric(t *testing.T) { }, expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, }, + { + name: "both policies are set and there is zero local endpoint", + internalTrafficPolicy: &internalTrafficPolicyLocal, + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, + expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, + }, } for _, tc := range testCases { diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 4abdcc283c8..0631be80ab8 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -278,14 +278,14 @@ type Proxier struct { // It is used to publish the sync_proxy_rules_no_endpoints_total // metric with the traffic_policy label set to "internal". // sets.String is used here since we end up calculating endpoint topology multiple times for the same Service - // if it has mulitple ports but each Service should only be counted once. + // if it has multiple ports but each Service should only be counted once. serviceNoLocalEndpointsInternal sets.String // serviceNoLocalEndpointsExternal irepresents the set of services that couldn't be applied // due to the absence of any endpoints when the external traffic policy is "Local". // It is used to publish the sync_proxy_rules_no_endpoints_total // metric with the traffic_policy label set to "external". // sets.String is used here since we end up calculating endpoint topology multiple times for the same Service - // if it has mulitple ports but each Service should only be counted once. + // if it has multiple ports but each Service should only be counted once. serviceNoLocalEndpointsExternal sets.String } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 20dbc70650b..2f6cbb2f05d 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -5838,6 +5838,16 @@ func TestNoEndpointsMetric(t *testing.T) { {"10.0.1.3", "host2"}, }, }, + { + name: "both policies are set and there is non-zero local endpoints", + internalTrafficPolicy: &internalTrafficPolicyLocal, + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, { name: "internalTrafficPolicy is set and there is zero local endpoint", internalTrafficPolicy: &internalTrafficPolicyLocal, @@ -5858,6 +5868,18 @@ func TestNoEndpointsMetric(t *testing.T) { }, expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, }, + { + name: "Both policies are set and there is zero local endpoint", + internalTrafficPolicy: &internalTrafficPolicyLocal, + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, + expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, + }, } for _, tc := range testCases { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, true)() From 61b7e6c49c7afa5f55874bbe89e7c54bbbbf8dfe Mon Sep 17 00:00:00 2001 From: Max Renaud Date: Thu, 31 Mar 2022 18:55:47 +0000 Subject: [PATCH 4/5] Changed usage of NodeLocal* to *PolicyLocal --- pkg/proxy/ipvs/proxier.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 0631be80ab8..0dcd8edf35a 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -2010,11 +2010,11 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode newEndpoints.Insert(epInfo.String()) } if onlyNodeLocalEndpoints && len(newEndpoints) == 0 { - if svcInfo.NodeLocalInternal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { + if svcInfo.InternalPolicyLocal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { proxier.serviceNoLocalEndpointsInternal.Insert(svcPortName.NamespacedName.String()) } - if svcInfo.NodeLocalExternal() { + if svcInfo.ExternalPolicyLocal() { proxier.serviceNoLocalEndpointsExternal.Insert(svcPortName.NamespacedName.String()) } } From 6454248b6bdf80f0e823786b10ace46c4cb41650 Mon Sep 17 00:00:00 2001 From: Max Renaud Date: Fri, 1 Apr 2022 15:52:21 +0000 Subject: [PATCH 5/5] Moved counting logic to accommodate rebase --- pkg/proxy/ipvs/proxier.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 0dcd8edf35a..28c76622f96 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1999,6 +1999,14 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode // Traffic from an external source will be routed but the reply // will have the POD address and will be discarded. endpoints = clusterEndpoints + + if svcInfo.InternalPolicyLocal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { + proxier.serviceNoLocalEndpointsInternal.Insert(svcPortName.NamespacedName.String()) + } + + if svcInfo.ExternalPolicyLocal() { + proxier.serviceNoLocalEndpointsExternal.Insert(svcPortName.NamespacedName.String()) + } } } else { endpoints = clusterEndpoints @@ -2009,15 +2017,6 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode for _, epInfo := range endpoints { newEndpoints.Insert(epInfo.String()) } - if onlyNodeLocalEndpoints && len(newEndpoints) == 0 { - if svcInfo.InternalPolicyLocal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { - proxier.serviceNoLocalEndpointsInternal.Insert(svcPortName.NamespacedName.String()) - } - - if svcInfo.ExternalPolicyLocal() { - proxier.serviceNoLocalEndpointsExternal.Insert(svcPortName.NamespacedName.String()) - } - } // Create new endpoints for _, ep := range newEndpoints.List() {