Add sync_proxy_rules_no_local_endpoints_total metric

This commit is contained in:
Max Renaud 2022-03-29 21:09:45 +00:00
parent e89b80bdd8
commit f0dfac5d07
5 changed files with 354 additions and 11 deletions

View File

@ -38,9 +38,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
@ -993,6 +995,11 @@ func (proxier *Proxier) syncProxyRules() {
}
}
// These two variables are used to publish the sync_proxy_rules_no_endpoints_total
// metric.
serviceNoLocalEndpointsTotalInternal := 0
serviceNoLocalEndpointsTotalExternal := 0
// Build rules for each service-port.
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
@ -1347,16 +1354,24 @@ func (proxier *Proxier) syncProxyRules() {
if len(localEndpoints) != 0 {
// Write rules jumping from localPolicyChain to localEndpointChains
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, localPolicyChain, localEndpoints, args)
} else if hasEndpoints {
// Blackhole all traffic since there are no local endpoints
args = append(args[:0],
"-A", string(localPolicyChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
"-j",
string(KubeMarkDropChain),
)
proxier.natRules.Write(args)
} else {
if svcInfo.InternalPolicyLocal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
serviceNoLocalEndpointsTotalInternal++
}
if svcInfo.ExternalPolicyLocal() {
serviceNoLocalEndpointsTotalExternal++
}
if hasEndpoints {
// Blackhole all traffic since there are no local endpoints
args = append(args[:0],
"-A", string(localPolicyChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
"-j",
string(KubeMarkDropChain),
)
proxier.natRules.Write(args)
}
}
}
}
@ -1478,6 +1493,8 @@ func (proxier *Proxier) syncProxyRules() {
}
}
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal))
if proxier.healthzServer != nil {
proxier.healthzServer.Updated()
}

View File

@ -6050,3 +6050,135 @@ func TestEndpointCommentElision(t *testing.T) {
t.Errorf("numComments (%d) != 0 when numEndpoints (%d) > threshold (%d)", numComments, numEndpoints, endpointChainsNumberThreshold)
}
}
func TestNoEndpointsMetric(t *testing.T) {
type endpoint struct {
ip string
hostname string
}
internalTrafficPolicyLocal := v1.ServiceInternalTrafficPolicyLocal
externalTrafficPolicyLocal := v1.ServiceExternalTrafficPolicyTypeLocal
metrics.RegisterMetrics()
testCases := []struct {
name string
internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType
externalTrafficPolicy v1.ServiceExternalTrafficPolicyType
endpoints []endpoint
expectedSyncProxyRulesNoLocalEndpointsTotalInternal int
expectedSyncProxyRulesNoLocalEndpointsTotalExternal int
}{
{
name: "internalTrafficPolicy is set and there is non-zero local endpoints",
internalTrafficPolicy: &internalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", testHostname},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
},
{
name: "externalTrafficPolicy is set and there is non-zero local endpoints",
externalTrafficPolicy: externalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", testHostname},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
},
{
name: "internalTrafficPolicy is set and there is zero local endpoint",
internalTrafficPolicy: &internalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", "host0"},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1,
},
{
name: "externalTrafficPolicy is set and there is zero local endpoint",
externalTrafficPolicy: externalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", "host0"},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, true)()
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.OnServiceSynced()
fp.OnEndpointSlicesSynced()
serviceName := "svc1"
namespaceName := "ns1"
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.30.1.1",
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Name: "", Port: 80, Protocol: v1.ProtocolTCP, NodePort: 123}},
},
}
if tc.internalTrafficPolicy != nil {
svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy
}
if tc.externalTrafficPolicy != "" {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.ExternalTrafficPolicy = tc.externalTrafficPolicy
}
fp.OnServiceAdd(svc)
tcpProtocol := v1.ProtocolTCP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
}
for _, ep := range tc.endpoints {
endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{
Addresses: []string{ep.ip},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
NodeName: utilpointer.StringPtr(ep.hostname),
})
}
fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
syncProxyRulesNoLocalEndpointsTotalInternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err)
}
if tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal != int(syncProxyRulesNoLocalEndpointsTotalInternal) {
t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalInternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal)
}
syncProxyRulesNoLocalEndpointsTotalExternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external"))
if err != nil {
t.Errorf("failed to get %s value(external), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err)
}
if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) {
t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal)
}
})
}
}

View File

@ -273,6 +273,16 @@ type Proxier struct {
// Inject for test purpose.
networkInterfacer utilproxy.NetworkInterfacer
gracefuldeleteManager *GracefulTerminationManager
// serviceNoLocalEndpointsInternal is a map of services that couldn't have their rules applied
// due to the absence of local endpoints when the internal traffic policy label set to "Local".
// It is used to publish the sync_proxy_rules_no_endpoints_total
// metric with the traffic_policy label set to "internal".
serviceNoLocalEndpointsInternal map[proxy.ServicePortName]bool
// serviceNoLocalEndpointsExternal is a map of services that couldn't have their rules applied
// due to the absence of any endpoints when the external traffic policy is "Local".
// It is used to publish the sync_proxy_rules_no_endpoints_total
// metric with the traffic_policy label set to "external".
serviceNoLocalEndpointsExternal map[proxy.ServicePortName]bool
}
// IPGetter helps get node network interface IP and IPs binded to the IPVS dummy interface
@ -1027,6 +1037,8 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(3).InfoS("Syncing ipvs proxier rules")
proxier.serviceNoLocalEndpointsInternal = make(map[proxy.ServicePortName]bool)
proxier.serviceNoLocalEndpointsExternal = make(map[proxy.ServicePortName]bool)
// Begin install iptables
// Reset all buffers used later.
@ -1599,6 +1611,24 @@ func (proxier *Proxier) syncProxyRules() {
}
}
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
serviceNoLocalEndpointsInternalTotal := 0
serviceNoLocalEndpointsExternalTotal := 0
for _, v := range proxier.serviceNoLocalEndpointsInternal {
if v {
serviceNoLocalEndpointsInternalTotal++
}
}
for _, v := range proxier.serviceNoLocalEndpointsExternal {
if v {
serviceNoLocalEndpointsExternalTotal++
}
}
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsInternalTotal))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsExternalTotal))
}
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
@ -1962,6 +1992,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
endpoints := proxier.endpointsMap[svcPortName]
localEndpoints := []proxy.Endpoint{}
// Filtering for topology aware endpoints. This function will only
// filter endpoints if appropriate feature gates are enabled and the
// Service does not have conflicting configuration such as
@ -1970,7 +2001,8 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
if !ok {
klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
} else {
clusterEndpoints, localEndpoints, _, _ := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels)
var clusterEndpoints []proxy.Endpoint
clusterEndpoints, localEndpoints, _, _ = proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels)
if onlyNodeLocalEndpoints {
if len(localEndpoints) > 0 {
endpoints = localEndpoints
@ -1990,6 +2022,15 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
for _, epInfo := range endpoints {
newEndpoints.Insert(epInfo.String())
}
if svcInfo.UsesLocalEndpoints() && len(localEndpoints) == 0 {
if svcInfo.NodeLocalInternal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
proxier.serviceNoLocalEndpointsInternal[svcPortName] = true
}
if svcInfo.NodeLocalExternal() {
proxier.serviceNoLocalEndpointsExternal[svcPortName] = true
}
}
// Create new endpoints
for _, ep := range newEndpoints.List() {

View File

@ -35,10 +35,12 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
"k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
@ -5799,3 +5801,140 @@ func TestIpIsValidForSet(t *testing.T) {
}
}
}
func TestNoEndpointsMetric(t *testing.T) {
type endpoint struct {
ip string
hostname string
}
internalTrafficPolicyLocal := v1.ServiceInternalTrafficPolicyLocal
externalTrafficPolicyLocal := v1.ServiceExternalTrafficPolicyTypeLocal
metrics.RegisterMetrics()
testCases := []struct {
name string
internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType
externalTrafficPolicy v1.ServiceExternalTrafficPolicyType
endpoints []endpoint
expectedSyncProxyRulesNoLocalEndpointsTotalInternal int
expectedSyncProxyRulesNoLocalEndpointsTotalExternal int
}{
{
name: "internalTrafficPolicy is set and there is non-zero local endpoints",
internalTrafficPolicy: &internalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", testHostname},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
},
{
name: "externalTrafficPolicy is set and there is non-zero local endpoints",
externalTrafficPolicy: externalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", testHostname},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
},
{
name: "internalTrafficPolicy is set and there is zero local endpoint",
internalTrafficPolicy: &internalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", "host0"},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1,
},
{
name: "externalTrafficPolicy is set and there is zero local endpoint",
externalTrafficPolicy: externalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", "host0"},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1,
},
}
for _, tc := range testCases {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, true)()
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.servicesSynced = true
// fp.endpointsSynced = true
fp.endpointSlicesSynced = true
// Add initial service
serviceName := "svc1"
namespaceName := "ns1"
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP, NodePort: 123}},
},
}
if tc.internalTrafficPolicy != nil {
svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy
}
if tc.externalTrafficPolicy != "" {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.ExternalTrafficPolicy = tc.externalTrafficPolicy
}
fp.OnServiceAdd(svc)
// Add initial endpoint slice
tcpProtocol := v1.ProtocolTCP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
}
for _, ep := range tc.endpoints {
endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{
Addresses: []string{ep.ip},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
NodeName: utilpointer.StringPtr(ep.hostname),
})
}
fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
syncProxyRulesNoLocalEndpointsTotalInternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal"))
if err != nil {
t.Errorf("failed to get %s value(internal), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err)
}
if tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal != int(syncProxyRulesNoLocalEndpointsTotalInternal) {
t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalInternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal)
}
syncProxyRulesNoLocalEndpointsTotalExternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external"))
if err != nil {
t.Errorf("failed to get %s value(external), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err)
}
if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) {
t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal)
}
}
}

View File

@ -148,6 +148,19 @@ var (
StabilityLevel: metrics.ALPHA,
},
)
// SyncProxyRulesNoLocalEndpointsTotal is the total number of rules that do
// not have an available endpoint. This can be caused by an internal
// traffic policy with no available local workload.
SyncProxyRulesNoLocalEndpointsTotal = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: kubeProxySubsystem,
Name: "sync_proxy_rules_no_local_endpoints_total",
Help: "Number of services with a Local traffic policy and no endpoints",
StabilityLevel: metrics.ALPHA,
},
[]string{"traffic_policy"},
)
)
var registerMetricsOnce sync.Once
@ -165,6 +178,7 @@ func RegisterMetrics() {
legacyregistry.MustRegister(IptablesRulesTotal)
legacyregistry.MustRegister(IptablesRestoreFailuresTotal)
legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp)
legacyregistry.MustRegister(SyncProxyRulesNoLocalEndpointsTotal)
})
}