diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 8692e3ae7c2..5f8094ef605 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -38,9 +38,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/events" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metaproxier" @@ -993,6 +995,11 @@ func (proxier *Proxier) syncProxyRules() { } } + // These two variables are used to publish the sync_proxy_rules_no_endpoints_total + // metric. + serviceNoLocalEndpointsTotalInternal := 0 + serviceNoLocalEndpointsTotalExternal := 0 + // Build rules for each service-port. for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) @@ -1347,16 +1354,24 @@ func (proxier *Proxier) syncProxyRules() { if len(localEndpoints) != 0 { // Write rules jumping from localPolicyChain to localEndpointChains proxier.writeServiceToEndpointRules(svcNameString, svcInfo, localPolicyChain, localEndpoints, args) - } else if hasEndpoints { - // Blackhole all traffic since there are no local endpoints - args = append(args[:0], - "-A", string(localPolicyChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"%s has no local endpoints"`, svcNameString), - "-j", - string(KubeMarkDropChain), - ) - proxier.natRules.Write(args) + } else { + if svcInfo.InternalPolicyLocal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { + serviceNoLocalEndpointsTotalInternal++ + } + if svcInfo.ExternalPolicyLocal() { + serviceNoLocalEndpointsTotalExternal++ + } + if hasEndpoints { + // Blackhole all traffic since there are no local endpoints + args = append(args[:0], + "-A", string(localPolicyChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"%s has no local endpoints"`, svcNameString), + "-j", + string(KubeMarkDropChain), + ) + proxier.natRules.Write(args) + } } } } @@ -1478,6 +1493,8 @@ func (proxier *Proxier) syncProxyRules() { } } + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal)) + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal)) if proxier.healthzServer != nil { proxier.healthzServer.Updated() } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 3544f748b99..c2c94981d08 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -6050,3 +6050,157 @@ func TestEndpointCommentElision(t *testing.T) { t.Errorf("numComments (%d) != 0 when numEndpoints (%d) > threshold (%d)", numComments, numEndpoints, endpointChainsNumberThreshold) } } + +func TestNoEndpointsMetric(t *testing.T) { + type endpoint struct { + ip string + hostname string + } + + internalTrafficPolicyLocal := v1.ServiceInternalTrafficPolicyLocal + externalTrafficPolicyLocal := v1.ServiceExternalTrafficPolicyTypeLocal + + metrics.RegisterMetrics() + testCases := []struct { + name string + internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType + externalTrafficPolicy v1.ServiceExternalTrafficPolicyType + endpoints []endpoint + expectedSyncProxyRulesNoLocalEndpointsTotalInternal int + expectedSyncProxyRulesNoLocalEndpointsTotalExternal int + }{ + { + name: "internalTrafficPolicy is set and there is non-zero local endpoints", + internalTrafficPolicy: &internalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, + { + name: "externalTrafficPolicy is set and there is non-zero local endpoints", + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, + { + name: "both policies are set and there is non-zero local endpoints", + internalTrafficPolicy: &internalTrafficPolicyLocal, + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, + { + name: "internalTrafficPolicy is set and there is zero local endpoint", + internalTrafficPolicy: &internalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, + }, + { + name: "externalTrafficPolicy is set and there is zero local endpoint", + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, + }, + { + name: "both policies are set and there is zero local endpoint", + internalTrafficPolicy: &internalTrafficPolicyLocal, + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, + expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, true)() + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt) + fp.OnServiceSynced() + fp.OnEndpointSlicesSynced() + + serviceName := "svc1" + namespaceName := "ns1" + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.30.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "", Port: 80, Protocol: v1.ProtocolTCP, NodePort: 123}}, + }, + } + if tc.internalTrafficPolicy != nil { + svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy + } + if tc.externalTrafficPolicy != "" { + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.ExternalTrafficPolicy = tc.externalTrafficPolicy + } + + fp.OnServiceAdd(svc) + + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(""), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + } + for _, ep := range tc.endpoints { + endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{ + Addresses: []string{ep.ip}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + NodeName: utilpointer.StringPtr(ep.hostname), + }) + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + syncProxyRulesNoLocalEndpointsTotalInternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) + } + + if tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal != int(syncProxyRulesNoLocalEndpointsTotalInternal) { + t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalInternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal) + } + + syncProxyRulesNoLocalEndpointsTotalExternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external")) + if err != nil { + t.Errorf("failed to get %s value(external), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) + } + + if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) { + t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal) + } + }) + } +} diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index fe91de9691f..28c76622f96 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -273,6 +273,20 @@ type Proxier struct { // Inject for test purpose. networkInterfacer utilproxy.NetworkInterfacer gracefuldeleteManager *GracefulTerminationManager + // serviceNoLocalEndpointsInternal represents the set of services that couldn't be applied + // due to the absence of local endpoints when the internal traffic policy is "Local". + // It is used to publish the sync_proxy_rules_no_endpoints_total + // metric with the traffic_policy label set to "internal". + // sets.String is used here since we end up calculating endpoint topology multiple times for the same Service + // if it has multiple ports but each Service should only be counted once. + serviceNoLocalEndpointsInternal sets.String + // serviceNoLocalEndpointsExternal irepresents the set of services that couldn't be applied + // due to the absence of any endpoints when the external traffic policy is "Local". + // It is used to publish the sync_proxy_rules_no_endpoints_total + // metric with the traffic_policy label set to "external". + // sets.String is used here since we end up calculating endpoint topology multiple times for the same Service + // if it has multiple ports but each Service should only be counted once. + serviceNoLocalEndpointsExternal sets.String } // IPGetter helps get node network interface IP and IPs binded to the IPVS dummy interface @@ -1027,6 +1041,8 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).InfoS("Syncing ipvs proxier rules") + proxier.serviceNoLocalEndpointsInternal = sets.NewString() + proxier.serviceNoLocalEndpointsExternal = sets.NewString() // Begin install iptables // Reset all buffers used later. @@ -1599,6 +1615,9 @@ func (proxier *Proxier) syncProxyRules() { } } proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) + + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len())) + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len())) } // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed @@ -1980,6 +1999,14 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode // Traffic from an external source will be routed but the reply // will have the POD address and will be discarded. endpoints = clusterEndpoints + + if svcInfo.InternalPolicyLocal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { + proxier.serviceNoLocalEndpointsInternal.Insert(svcPortName.NamespacedName.String()) + } + + if svcInfo.ExternalPolicyLocal() { + proxier.serviceNoLocalEndpointsExternal.Insert(svcPortName.NamespacedName.String()) + } } } else { endpoints = clusterEndpoints diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 2f43083774a..2f6cbb2f05d 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -35,10 +35,12 @@ import ( "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" + "k8s.io/kubernetes/pkg/proxy/metrics" utilproxy "k8s.io/kubernetes/pkg/proxy/util" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" @@ -5799,3 +5801,162 @@ func TestIpIsValidForSet(t *testing.T) { } } } + +func TestNoEndpointsMetric(t *testing.T) { + type endpoint struct { + ip string + hostname string + } + + internalTrafficPolicyLocal := v1.ServiceInternalTrafficPolicyLocal + externalTrafficPolicyLocal := v1.ServiceExternalTrafficPolicyTypeLocal + metrics.RegisterMetrics() + + testCases := []struct { + name string + internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType + externalTrafficPolicy v1.ServiceExternalTrafficPolicyType + endpoints []endpoint + expectedSyncProxyRulesNoLocalEndpointsTotalInternal int + expectedSyncProxyRulesNoLocalEndpointsTotalExternal int + }{ + { + name: "internalTrafficPolicy is set and there is non-zero local endpoints", + internalTrafficPolicy: &internalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, + { + name: "externalTrafficPolicy is set and there is non-zero local endpoints", + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, + { + name: "both policies are set and there is non-zero local endpoints", + internalTrafficPolicy: &internalTrafficPolicyLocal, + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", testHostname}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + }, + { + name: "internalTrafficPolicy is set and there is zero local endpoint", + internalTrafficPolicy: &internalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, + }, + { + name: "externalTrafficPolicy is set and there is zero local endpoint", + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, + }, + { + name: "Both policies are set and there is zero local endpoint", + internalTrafficPolicy: &internalTrafficPolicyLocal, + externalTrafficPolicy: externalTrafficPolicyLocal, + endpoints: []endpoint{ + {"10.0.1.1", "host0"}, + {"10.0.1.2", "host1"}, + {"10.0.1.3", "host2"}, + }, + expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1, + expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1, + }, + } + for _, tc := range testCases { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, true)() + + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{netutils.ParseIPSloppy("10.0.0.1")}, nil, v1.IPv4Protocol) + fp.servicesSynced = true + // fp.endpointsSynced = true + fp.endpointSlicesSynced = true + + // Add initial service + serviceName := "svc1" + namespaceName := "ns1" + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + Spec: v1.ServiceSpec{ + ClusterIP: "172.20.1.1", + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Name: "p80", Port: 80, TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP, NodePort: 30000}}, + }, + } + if tc.internalTrafficPolicy != nil { + svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy + } + if tc.externalTrafficPolicy != "" { + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.ExternalTrafficPolicy = tc.externalTrafficPolicy + } + + fp.OnServiceAdd(svc) + + // Add initial endpoint slice + tcpProtocol := v1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-1", serviceName), + Namespace: namespaceName, + Labels: map[string]string{discovery.LabelServiceName: serviceName}, + }, + Ports: []discovery.EndpointPort{{ + Name: utilpointer.StringPtr("p80"), + Port: utilpointer.Int32Ptr(80), + Protocol: &tcpProtocol, + }}, + AddressType: discovery.AddressTypeIPv4, + } + + for _, ep := range tc.endpoints { + endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{ + Addresses: []string{ep.ip}, + Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, + NodeName: utilpointer.StringPtr(ep.hostname), + }) + } + + fp.OnEndpointSliceAdd(endpointSlice) + fp.syncProxyRules() + + syncProxyRulesNoLocalEndpointsTotalInternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal")) + if err != nil { + t.Errorf("failed to get %s value(internal), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) + } + + if tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal != int(syncProxyRulesNoLocalEndpointsTotalInternal) { + t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalInternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal) + } + + syncProxyRulesNoLocalEndpointsTotalExternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external")) + if err != nil { + t.Errorf("failed to get %s value(external), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err) + } + + if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) { + t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(external): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal) + } + } +} diff --git a/pkg/proxy/metrics/metrics.go b/pkg/proxy/metrics/metrics.go index 9c1aeca55ba..f61cf4fc991 100644 --- a/pkg/proxy/metrics/metrics.go +++ b/pkg/proxy/metrics/metrics.go @@ -148,6 +148,19 @@ var ( StabilityLevel: metrics.ALPHA, }, ) + + // SyncProxyRulesNoLocalEndpointsTotal is the total number of rules that do + // not have an available endpoint. This can be caused by an internal + // traffic policy with no available local workload. + SyncProxyRulesNoLocalEndpointsTotal = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: kubeProxySubsystem, + Name: "sync_proxy_rules_no_local_endpoints_total", + Help: "Number of services with a Local traffic policy and no endpoints", + StabilityLevel: metrics.ALPHA, + }, + []string{"traffic_policy"}, + ) ) var registerMetricsOnce sync.Once @@ -165,6 +178,7 @@ func RegisterMetrics() { legacyregistry.MustRegister(IptablesRulesTotal) legacyregistry.MustRegister(IptablesRestoreFailuresTotal) legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp) + legacyregistry.MustRegister(SyncProxyRulesNoLocalEndpointsTotal) }) }