Merge pull request #119140 from danwinship/iptables-metrics

fix sync_proxy_rules_iptables_total metric
This commit is contained in:
Kubernetes Prow Robot 2023-07-14 07:36:01 -07:00 committed by GitHub
commit ffa4c26321
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 546 additions and 286 deletions

View File

@ -288,10 +288,10 @@ func NewProxier(ipFamily v1.IPFamily,
precomputedProbabilities: make([]string, 0, 1001), precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
existingFilterChainsData: bytes.NewBuffer(nil), existingFilterChainsData: bytes.NewBuffer(nil),
filterChains: proxyutil.LineBuffer{}, filterChains: proxyutil.NewLineBuffer(),
filterRules: proxyutil.LineBuffer{}, filterRules: proxyutil.NewLineBuffer(),
natChains: proxyutil.LineBuffer{}, natChains: proxyutil.NewLineBuffer(),
natRules: proxyutil.LineBuffer{}, natRules: proxyutil.NewLineBuffer(),
localhostNodePorts: localhostNodePorts, localhostNodePorts: localhostNodePorts,
nodePortAddresses: nodePortAddresses, nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{}, networkInterfacer: proxyutil.RealNetwork{},
@ -411,8 +411,8 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
encounteredError = true encounteredError = true
} else { } else {
existingNATChains := utiliptables.GetChainsFromTable(iptablesData.Bytes()) existingNATChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
natChains := &proxyutil.LineBuffer{} natChains := proxyutil.NewLineBuffer()
natRules := &proxyutil.LineBuffer{} natRules := proxyutil.NewLineBuffer()
natChains.Write("*nat") natChains.Write("*nat")
// Start with chains we know we need to remove. // Start with chains we know we need to remove.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} { for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} {
@ -448,8 +448,8 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
encounteredError = true encounteredError = true
} else { } else {
existingFilterChains := utiliptables.GetChainsFromTable(iptablesData.Bytes()) existingFilterChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
filterChains := &proxyutil.LineBuffer{} filterChains := proxyutil.NewLineBuffer()
filterRules := &proxyutil.LineBuffer{} filterRules := proxyutil.NewLineBuffer()
filterChains.Write("*filter") filterChains.Write("*filter")
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} { for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
if _, found := existingFilterChains[chain]; found { if _, found := existingFilterChains[chain]; found {
@ -852,6 +852,9 @@ func (proxier *Proxier) syncProxyRules() {
proxier.natChains.Reset() proxier.natChains.Reset()
proxier.natRules.Reset() proxier.natRules.Reset()
skippedNatChains := proxyutil.NewDiscardLineBuffer()
skippedNatRules := proxyutil.NewDiscardLineBuffer()
// Write chain lines for all the "top-level" chains we'll be filling in // Write chain lines for all the "top-level" chains we'll be filling in
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain, kubeProxyFirewallChain} { for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain, kubeProxyFirewallChain} {
proxier.filterChains.Write(utiliptables.MakeChainLine(chainName)) proxier.filterChains.Write(utiliptables.MakeChainLine(chainName))
@ -1066,9 +1069,13 @@ func (proxier *Proxier) syncProxyRules() {
} }
} }
filterRules := proxier.filterRules
natChains := proxier.natChains
natRules := proxier.natRules
// Capture the clusterIP. // Capture the clusterIP.
if hasInternalEndpoints { if hasInternalEndpoints {
proxier.natRules.Write( natRules.Write(
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
@ -1077,7 +1084,7 @@ func (proxier *Proxier) syncProxyRules() {
"-j", string(internalTrafficChain)) "-j", string(internalTrafficChain))
} else { } else {
// No endpoints. // No endpoints.
proxier.filterRules.Write( filterRules.Write(
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", internalTrafficFilterComment, "-m", "comment", "--comment", internalTrafficFilterComment,
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
@ -1092,7 +1099,7 @@ func (proxier *Proxier) syncProxyRules() {
if hasEndpoints { if hasEndpoints {
// Send traffic bound for external IPs to the "external // Send traffic bound for external IPs to the "external
// destinations" chain. // destinations" chain.
proxier.natRules.Write( natRules.Write(
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcPortNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcPortNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
@ -1104,7 +1111,7 @@ func (proxier *Proxier) syncProxyRules() {
// Either no endpoints at all (REJECT) or no endpoints for // Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get // external traffic (DROP anything that didn't get
// short-circuited by the EXT chain.) // short-circuited by the EXT chain.)
proxier.filterRules.Write( filterRules.Write(
"-A", string(kubeExternalServicesChain), "-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", externalTrafficFilterComment, "-m", "comment", "--comment", externalTrafficFilterComment,
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
@ -1118,7 +1125,7 @@ func (proxier *Proxier) syncProxyRules() {
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, lbip := range svcInfo.LoadBalancerIPStrings() { for _, lbip := range svcInfo.LoadBalancerIPStrings() {
if hasEndpoints { if hasEndpoints {
proxier.natRules.Write( natRules.Write(
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
@ -1128,7 +1135,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
if usesFWChain { if usesFWChain {
proxier.filterRules.Write( filterRules.Write(
"-A", string(kubeProxyFirewallChain), "-A", string(kubeProxyFirewallChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s traffic not accepted by %s"`, svcPortNameString, svcInfo.firewallChainName), "-m", "comment", "--comment", fmt.Sprintf(`"%s traffic not accepted by %s"`, svcPortNameString, svcInfo.firewallChainName),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
@ -1142,7 +1149,7 @@ func (proxier *Proxier) syncProxyRules() {
// external traffic (DROP anything that didn't get short-circuited // external traffic (DROP anything that didn't get short-circuited
// by the EXT chain.) // by the EXT chain.)
for _, lbip := range svcInfo.LoadBalancerIPStrings() { for _, lbip := range svcInfo.LoadBalancerIPStrings() {
proxier.filterRules.Write( filterRules.Write(
"-A", string(kubeExternalServicesChain), "-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", externalTrafficFilterComment, "-m", "comment", "--comment", externalTrafficFilterComment,
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
@ -1159,7 +1166,7 @@ func (proxier *Proxier) syncProxyRules() {
// Jump to the external destination chain. For better or for // Jump to the external destination chain. For better or for
// worse, nodeports are not subect to loadBalancerSourceRanges, // worse, nodeports are not subect to loadBalancerSourceRanges,
// and we can't change that. // and we can't change that.
proxier.natRules.Write( natRules.Write(
"-A", string(kubeNodePortsChain), "-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcPortNameString, "-m", "comment", "--comment", svcPortNameString,
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
@ -1170,7 +1177,7 @@ func (proxier *Proxier) syncProxyRules() {
// Either no endpoints at all (REJECT) or no endpoints for // Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get // external traffic (DROP anything that didn't get
// short-circuited by the EXT chain.) // short-circuited by the EXT chain.)
proxier.filterRules.Write( filterRules.Write(
"-A", string(kubeExternalServicesChain), "-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", externalTrafficFilterComment, "-m", "comment", "--comment", externalTrafficFilterComment,
"-m", "addrtype", "--dst-type", "LOCAL", "-m", "addrtype", "--dst-type", "LOCAL",
@ -1185,7 +1192,7 @@ func (proxier *Proxier) syncProxyRules() {
if svcInfo.HealthCheckNodePort() != 0 { if svcInfo.HealthCheckNodePort() != 0 {
// no matter if node has local endpoints, healthCheckNodePorts // no matter if node has local endpoints, healthCheckNodePorts
// need to add a rule to accept the incoming connection // need to add a rule to accept the incoming connection
proxier.filterRules.Write( filterRules.Write(
"-A", string(kubeNodePortsChain), "-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcPortNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcPortNameString),
"-m", "tcp", "-p", "tcp", "-m", "tcp", "-p", "tcp",
@ -1196,9 +1203,12 @@ func (proxier *Proxier) syncProxyRules() {
// If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync // If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync
// then we can omit them from the restore input. (We have already marked // then we can omit them from the restore input. (We have already marked
// them in activeNATChains, so they won't get deleted.) // them in activeNATChains, so they won't get deleted.) However, we have
// to still figure out how many chains we _would_ have written to make the
// metrics come out right, so we just compute them and throw them away.
if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) { if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) {
continue natChains = skippedNatChains
natRules = skippedNatRules
} }
// Set up internal traffic handling. // Set up internal traffic handling.
@ -1210,7 +1220,7 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", strconv.Itoa(svcInfo.Port()), "--dport", strconv.Itoa(svcInfo.Port()),
) )
if proxier.masqueradeAll { if proxier.masqueradeAll {
proxier.natRules.Write( natRules.Write(
"-A", string(internalTrafficChain), "-A", string(internalTrafficChain),
args, args,
"-j", string(kubeMarkMasqChain)) "-j", string(kubeMarkMasqChain))
@ -1220,7 +1230,7 @@ func (proxier *Proxier) syncProxyRules() {
// Service range, routing to any node, and that node will // Service range, routing to any node, and that node will
// bridge into the Service for you. Since that might bounce // bridge into the Service for you. Since that might bounce
// off-node, we masquerade here. // off-node, we masquerade here.
proxier.natRules.Write( natRules.Write(
"-A", string(internalTrafficChain), "-A", string(internalTrafficChain),
args, args,
proxier.localDetector.IfNotLocal(), proxier.localDetector.IfNotLocal(),
@ -1233,12 +1243,12 @@ func (proxier *Proxier) syncProxyRules() {
// jump to externalTrafficChain, which will handle some special cases and // jump to externalTrafficChain, which will handle some special cases and
// then jump to externalPolicyChain. // then jump to externalPolicyChain.
if usesExternalTrafficChain { if usesExternalTrafficChain {
proxier.natChains.Write(utiliptables.MakeChainLine(externalTrafficChain)) natChains.Write(utiliptables.MakeChainLine(externalTrafficChain))
if !svcInfo.ExternalPolicyLocal() { if !svcInfo.ExternalPolicyLocal() {
// If we are using non-local endpoints we need to masquerade, // If we are using non-local endpoints we need to masquerade,
// in case we cross nodes. // in case we cross nodes.
proxier.natRules.Write( natRules.Write(
"-A", string(externalTrafficChain), "-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcPortNameString), "-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcPortNameString),
"-j", string(kubeMarkMasqChain)) "-j", string(kubeMarkMasqChain))
@ -1251,7 +1261,7 @@ func (proxier *Proxier) syncProxyRules() {
// traffic as a special-case. It is subject to neither // traffic as a special-case. It is subject to neither
// form of traffic policy, which simulates going up-and-out // form of traffic policy, which simulates going up-and-out
// to an external load-balancer and coming back in. // to an external load-balancer and coming back in.
proxier.natRules.Write( natRules.Write(
"-A", string(externalTrafficChain), "-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"pod traffic for %s external destinations"`, svcPortNameString), "-m", "comment", "--comment", fmt.Sprintf(`"pod traffic for %s external destinations"`, svcPortNameString),
proxier.localDetector.IfLocal(), proxier.localDetector.IfLocal(),
@ -1261,7 +1271,7 @@ func (proxier *Proxier) syncProxyRules() {
// Locally originated traffic (not a pod, but the host node) // Locally originated traffic (not a pod, but the host node)
// still needs masquerade because the LBIP itself is a local // still needs masquerade because the LBIP itself is a local
// address, so that will be the chosen source IP. // address, so that will be the chosen source IP.
proxier.natRules.Write( natRules.Write(
"-A", string(externalTrafficChain), "-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s external destinations"`, svcPortNameString), "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s external destinations"`, svcPortNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-m", "addrtype", "--src-type", "LOCAL",
@ -1270,7 +1280,7 @@ func (proxier *Proxier) syncProxyRules() {
// Redirect all src-type=LOCAL -> external destination to the // Redirect all src-type=LOCAL -> external destination to the
// policy=cluster chain. This allows traffic originating // policy=cluster chain. This allows traffic originating
// from the host to be redirected to the service correctly. // from the host to be redirected to the service correctly.
proxier.natRules.Write( natRules.Write(
"-A", string(externalTrafficChain), "-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s external destinations"`, svcPortNameString), "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s external destinations"`, svcPortNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-m", "addrtype", "--src-type", "LOCAL",
@ -1279,7 +1289,7 @@ func (proxier *Proxier) syncProxyRules() {
// Anything else falls thru to the appropriate policy chain. // Anything else falls thru to the appropriate policy chain.
if hasExternalEndpoints { if hasExternalEndpoints {
proxier.natRules.Write( natRules.Write(
"-A", string(externalTrafficChain), "-A", string(externalTrafficChain),
"-j", string(externalPolicyChain)) "-j", string(externalPolicyChain))
} }
@ -1287,7 +1297,7 @@ func (proxier *Proxier) syncProxyRules() {
// Set up firewall chain, if needed // Set up firewall chain, if needed
if usesFWChain { if usesFWChain {
proxier.natChains.Write(utiliptables.MakeChainLine(fwChain)) natChains.Write(utiliptables.MakeChainLine(fwChain))
// The service firewall rules are created based on the // The service firewall rules are created based on the
// loadBalancerSourceRanges field. This only works for VIP-like // loadBalancerSourceRanges field. This only works for VIP-like
@ -1302,7 +1312,7 @@ func (proxier *Proxier) syncProxyRules() {
// firewall filter based on each source range // firewall filter based on each source range
allowFromNode := false allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges() { for _, src := range svcInfo.LoadBalancerSourceRanges() {
proxier.natRules.Write(args, "-s", src, "-j", string(externalTrafficChain)) natRules.Write(args, "-s", src, "-j", string(externalTrafficChain))
_, cidr, err := netutils.ParseCIDRSloppy(src) _, cidr, err := netutils.ParseCIDRSloppy(src)
if err != nil { if err != nil {
klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr) klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr)
@ -1317,7 +1327,7 @@ func (proxier *Proxier) syncProxyRules() {
// need the following rules to allow requests from this node. // need the following rules to allow requests from this node.
if allowFromNode { if allowFromNode {
for _, lbip := range svcInfo.LoadBalancerIPStrings() { for _, lbip := range svcInfo.LoadBalancerIPStrings() {
proxier.natRules.Write( natRules.Write(
args, args,
"-s", lbip, "-s", lbip,
"-j", string(externalTrafficChain)) "-j", string(externalTrafficChain))
@ -1326,7 +1336,7 @@ func (proxier *Proxier) syncProxyRules() {
// If the packet was able to reach the end of firewall chain, // If the packet was able to reach the end of firewall chain,
// then it did not get DNATed, so it will match the // then it did not get DNATed, so it will match the
// corresponding KUBE-PROXY-FIREWALL rule. // corresponding KUBE-PROXY-FIREWALL rule.
proxier.natRules.Write( natRules.Write(
"-A", string(fwChain), "-A", string(fwChain),
"-m", "comment", "--comment", fmt.Sprintf(`"other traffic to %s will be dropped by KUBE-PROXY-FIREWALL"`, svcPortNameString), "-m", "comment", "--comment", fmt.Sprintf(`"other traffic to %s will be dropped by KUBE-PROXY-FIREWALL"`, svcPortNameString),
) )
@ -1335,15 +1345,15 @@ func (proxier *Proxier) syncProxyRules() {
// If Cluster policy is in use, create the chain and create rules jumping // If Cluster policy is in use, create the chain and create rules jumping
// from clusterPolicyChain to the clusterEndpoints // from clusterPolicyChain to the clusterEndpoints
if usesClusterPolicyChain { if usesClusterPolicyChain {
proxier.natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain)) natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain))
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args) proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args)
} }
// If Local policy is in use, create the chain and create rules jumping // If Local policy is in use, create the chain and create rules jumping
// from localPolicyChain to the localEndpoints // from localPolicyChain to the localEndpoints
if usesLocalPolicyChain { if usesLocalPolicyChain {
proxier.natChains.Write(utiliptables.MakeChainLine(localPolicyChain)) natChains.Write(utiliptables.MakeChainLine(localPolicyChain))
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, localPolicyChain, localEndpoints, args) proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, localPolicyChain, localEndpoints, args)
} }
// Generate the per-endpoint chains. // Generate the per-endpoint chains.
@ -1357,13 +1367,13 @@ func (proxier *Proxier) syncProxyRules() {
endpointChain := epInfo.ChainName endpointChain := epInfo.ChainName
// Create the endpoint chain // Create the endpoint chain
proxier.natChains.Write(utiliptables.MakeChainLine(endpointChain)) natChains.Write(utiliptables.MakeChainLine(endpointChain))
activeNATChains[endpointChain] = true activeNATChains[endpointChain] = true
args = append(args[:0], "-A", string(endpointChain)) args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcPortNameString) args = proxier.appendServiceCommentLocked(args, svcPortNameString)
// Handle traffic that loops back to the originator with SNAT. // Handle traffic that loops back to the originator with SNAT.
proxier.natRules.Write( natRules.Write(
args, args,
"-s", epInfo.IP(), "-s", epInfo.IP(),
"-j", string(kubeMarkMasqChain)) "-j", string(kubeMarkMasqChain))
@ -1373,7 +1383,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// DNAT to final destination. // DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint) args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint)
proxier.natRules.Write(args) natRules.Write(args)
} }
} }
@ -1381,6 +1391,7 @@ func (proxier *Proxier) syncProxyRules() {
// to run on hosts with lots of iptables rules, we don't bother to do this on // to run on hosts with lots of iptables rules, we don't bother to do this on
// every sync in large clusters. (Stale chains will not be referenced by any // every sync in large clusters. (Stale chains will not be referenced by any
// active rules, so they're harmless other than taking up memory.) // active rules, so they're harmless other than taking up memory.)
deletedChains := 0
if !proxier.largeClusterMode || time.Since(proxier.lastIPTablesCleanup) > proxier.syncPeriod { if !proxier.largeClusterMode || time.Since(proxier.lastIPTablesCleanup) > proxier.syncPeriod {
var existingNATChains map[utiliptables.Chain]struct{} var existingNATChains map[utiliptables.Chain]struct{}
@ -1400,6 +1411,7 @@ func (proxier *Proxier) syncProxyRules() {
// the chain. Then we can remove the chain. // the chain. Then we can remove the chain.
proxier.natChains.Write(utiliptables.MakeChainLine(chain)) proxier.natChains.Write(utiliptables.MakeChainLine(chain))
proxier.natRules.Write("-X", chainString) proxier.natRules.Write("-X", chainString)
deletedChains++
} }
} }
proxier.lastIPTablesCleanup = time.Now() proxier.lastIPTablesCleanup = time.Now()
@ -1481,7 +1493,9 @@ func (proxier *Proxier) syncProxyRules() {
) )
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines())) metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines()))
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines())) metrics.IptablesRulesLastSync.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines()))
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() + skippedNatRules.Lines() - deletedChains))
metrics.IptablesRulesLastSync.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() - deletedChains))
// Sync rules. // Sync rules.
proxier.iptablesData.Reset() proxier.iptablesData.Reset()
@ -1548,7 +1562,7 @@ func (proxier *Proxier) syncProxyRules() {
conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
} }
func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
// First write session affinity rules, if applicable. // First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, ep := range endpoints { for _, ep := range endpoints {
@ -1567,7 +1581,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, sv
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(epInfo.ChainName), "-j", string(epInfo.ChainName),
) )
proxier.natRules.Write(args) natRules.Write(args)
} }
} }
@ -1590,6 +1604,6 @@ func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, sv
"--probability", proxier.probability(numEndpoints-i)) "--probability", proxier.probability(numEndpoints-i))
} }
// The final (or only if n == 1) rule is a guaranteed match. // The final (or only if n == 1) rule is a guaranteed match.
proxier.natRules.Write(args, "-j", string(epInfo.ChainName)) natRules.Write(args, "-j", string(epInfo.ChainName))
} }
} }

View File

@ -330,10 +330,10 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
precomputedProbabilities: make([]string, 0, 1001), precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
existingFilterChainsData: bytes.NewBuffer(nil), existingFilterChainsData: bytes.NewBuffer(nil),
filterChains: proxyutil.LineBuffer{}, filterChains: proxyutil.NewLineBuffer(),
filterRules: proxyutil.LineBuffer{}, filterRules: proxyutil.NewLineBuffer(),
natChains: proxyutil.LineBuffer{}, natChains: proxyutil.NewLineBuffer(),
natRules: proxyutil.LineBuffer{}, natRules: proxyutil.NewLineBuffer(),
nodeIP: netutils.ParseIPSloppy(testNodeIP), nodeIP: netutils.ParseIPSloppy(testNodeIP),
localhostNodePorts: true, localhostNodePorts: true,
nodePortAddresses: proxyutil.NewNodePortAddresses(ipfamily, nil), nodePortAddresses: proxyutil.NewNodePortAddresses(ipfamily, nil),
@ -610,6 +610,24 @@ func countRules(tableName utiliptables.Table, ruleData string) int {
return rules return rules
} }
func countRulesFromMetric(tableName utiliptables.Table) int {
numRulesFloat, err := testutil.GetGaugeMetricValue(metrics.IptablesRulesTotal.WithLabelValues(string(tableName)))
if err != nil {
klog.ErrorS(err, "metrics are not registered?")
return -1
}
return int(numRulesFloat)
}
func countRulesFromLastSyncMetric(tableName utiliptables.Table) int {
numRulesFloat, err := testutil.GetGaugeMetricValue(metrics.IptablesRulesLastSync.WithLabelValues(string(tableName)))
if err != nil {
klog.ErrorS(err, "metrics are not registered?")
return -1
}
return int(numRulesFloat)
}
// findAllMatches takes an array of lines and a pattern with one parenthesized group, and // findAllMatches takes an array of lines and a pattern with one parenthesized group, and
// returns a sorted array of all of the unique matches of the parenthesized group. // returns a sorted array of all of the unique matches of the parenthesized group.
func findAllMatches(lines []string, pattern string) []string { func findAllMatches(lines []string, pattern string) []string {
@ -1967,12 +1985,7 @@ func TestOverallIPTablesRulesWithMultipleServices(t *testing.T) {
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
natRulesMetric, err := testutil.GetGaugeMetricValue(metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT))) nNatRules := countRulesFromMetric(utiliptables.TableNAT)
if err != nil {
t.Errorf("failed to get %s value, err: %v", metrics.IptablesRulesTotal.Name, err)
}
nNatRules := int(natRulesMetric)
expectedNatRules := countRules(utiliptables.TableNAT, fp.iptablesData.String()) expectedNatRules := countRules(utiliptables.TableNAT, fp.iptablesData.String())
if nNatRules != expectedNatRules { if nNatRules != expectedNatRules {
@ -5349,22 +5362,14 @@ func TestProxierMetricsIptablesTotalRules(t *testing.T) {
fp.syncProxyRules() fp.syncProxyRules()
iptablesData := fp.iptablesData.String() iptablesData := fp.iptablesData.String()
nFilterRulesMetric, err := testutil.GetGaugeMetricValue(metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter))) nFilterRules := countRulesFromMetric(utiliptables.TableFilter)
if err != nil {
t.Errorf("failed to get %s value, err: %v", metrics.IptablesRulesTotal.Name, err)
}
nFilterRules := int(nFilterRulesMetric)
expectedFilterRules := countRules(utiliptables.TableFilter, iptablesData) expectedFilterRules := countRules(utiliptables.TableFilter, iptablesData)
if nFilterRules != expectedFilterRules { if nFilterRules != expectedFilterRules {
t.Fatalf("Wrong number of filter rule: expected %d got %d\n%s", expectedFilterRules, nFilterRules, iptablesData) t.Fatalf("Wrong number of filter rule: expected %d got %d\n%s", expectedFilterRules, nFilterRules, iptablesData)
} }
nNatRulesMetric, err := testutil.GetGaugeMetricValue(metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT))) nNatRules := countRulesFromMetric(utiliptables.TableNAT)
if err != nil {
t.Errorf("failed to get %s value, err: %v", metrics.IptablesRulesTotal.Name, err)
}
nNatRules := int(nNatRulesMetric)
expectedNatRules := countRules(utiliptables.TableNAT, iptablesData) expectedNatRules := countRules(utiliptables.TableNAT, iptablesData)
if nNatRules != expectedNatRules { if nNatRules != expectedNatRules {
@ -5390,22 +5395,14 @@ func TestProxierMetricsIptablesTotalRules(t *testing.T) {
fp.syncProxyRules() fp.syncProxyRules()
iptablesData = fp.iptablesData.String() iptablesData = fp.iptablesData.String()
nFilterRulesMetric, err = testutil.GetGaugeMetricValue(metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter))) nFilterRules = countRulesFromMetric(utiliptables.TableFilter)
if err != nil {
t.Errorf("failed to get %s value, err: %v", metrics.IptablesRulesTotal.Name, err)
}
nFilterRules = int(nFilterRulesMetric)
expectedFilterRules = countRules(utiliptables.TableFilter, iptablesData) expectedFilterRules = countRules(utiliptables.TableFilter, iptablesData)
if nFilterRules != expectedFilterRules { if nFilterRules != expectedFilterRules {
t.Fatalf("Wrong number of filter rule: expected %d got %d\n%s", expectedFilterRules, nFilterRules, iptablesData) t.Fatalf("Wrong number of filter rule: expected %d got %d\n%s", expectedFilterRules, nFilterRules, iptablesData)
} }
nNatRulesMetric, err = testutil.GetGaugeMetricValue(metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT))) nNatRules = countRulesFromMetric(utiliptables.TableNAT)
if err != nil {
t.Errorf("failed to get %s value, err: %v", metrics.IptablesRulesTotal.Name, err)
}
nNatRules = int(nNatRulesMetric)
expectedNatRules = countRules(utiliptables.TableNAT, iptablesData) expectedNatRules = countRules(utiliptables.TableNAT, iptablesData)
if nNatRules != expectedNatRules { if nNatRules != expectedNatRules {
@ -7578,6 +7575,18 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`) `)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
rulesSynced := countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric := countRulesFromLastSyncMetric(utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
rulesTotal := rulesSynced
rulesTotalMetric := countRulesFromMetric(utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
// Add a new service and its endpoints. (This will only sync the SVC and SEP rules // Add a new service and its endpoints. (This will only sync the SVC and SEP rules
// for the new service, not the existing ones.) // for the new service, not the existing ones.)
makeServiceMap(fp, makeServiceMap(fp,
@ -7644,6 +7653,20 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`) `)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// We added 1 KUBE-SERVICES rule, 2 KUBE-SVC-X27LE4BHSL4DOUIK rules, and 2
// KUBE-SEP-BSWRHOQ77KEXZLNL rules.
rulesTotal += 5
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
// Delete a service. (Won't update the other services.) // Delete a service. (Won't update the other services.)
fp.OnServiceDelete(svc2) fp.OnServiceDelete(svc2)
fp.syncProxyRules() fp.syncProxyRules()
@ -7681,6 +7704,20 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`) `)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// We deleted 1 KUBE-SERVICES rule, 2 KUBE-SVC-2VJB64SDSIJUP5T6 rules, and 2
// KUBE-SEP-UHEGFW77JX3KXTOV rules
rulesTotal -= 5
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
// Add a service, sync, then add its endpoints. (The first sync will be a no-op other // Add a service, sync, then add its endpoints. (The first sync will be a no-op other
// than adding the REJECT rule. The second sync will create the new service.) // than adding the REJECT rule. The second sync will create the new service.)
var svc4 *v1.Service var svc4 *v1.Service
@ -7727,6 +7764,19 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`) `)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// The REJECT rule is in "filter", not NAT, so the number of NAT rules hasn't
// changed.
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
populateEndpointSlices(fp, populateEndpointSlices(fp,
makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) { makeTestEndpointSlice("ns4", "svc4", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4 eps.AddressType = discovery.AddressTypeIPv4
@ -7777,6 +7827,20 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`) `)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// We added 1 KUBE-SERVICES rule, 2 KUBE-SVC-4SW47YFZTEDKD3PK rules, and
// 2 KUBE-SEP-AYCN5HPXMIRJNJXU rules
rulesTotal += 5
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
// Change an endpoint of an existing service. This will cause its SVC and SEP // Change an endpoint of an existing service. This will cause its SVC and SEP
// chains to be rewritten. // chains to be rewritten.
eps3update := eps3.DeepCopy() eps3update := eps3.DeepCopy()
@ -7822,6 +7886,18 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`) `)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// We rewrote existing rules but did not change the overall number of rules.
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
// Add an endpoint to a service. This will cause its SVC and SEP chains to be rewritten. // Add an endpoint to a service. This will cause its SVC and SEP chains to be rewritten.
eps3update2 := eps3update.DeepCopy() eps3update2 := eps3update.DeepCopy()
eps3update2.Endpoints = append(eps3update2.Endpoints, discovery.Endpoint{Addresses: []string{"10.0.3.3"}}) eps3update2.Endpoints = append(eps3update2.Endpoints, discovery.Endpoint{Addresses: []string{"10.0.3.3"}})
@ -7868,6 +7944,21 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`) `)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// We added 2 KUBE-SEP-JVVZVJ7BSEPPRNBS rules and 1 KUBE-SVC-X27LE4BHSL4DOUIK rule
// jumping to the new SEP chain. The other rules related to svc3 got rewritten,
// but that does not change the count of rules.
rulesTotal += 3
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
// Sync with no new changes... This will not rewrite any SVC or SEP chains // Sync with no new changes... This will not rewrite any SVC or SEP chains
fp.syncProxyRules() fp.syncProxyRules()
@ -7901,6 +7992,18 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`) `)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// (No changes)
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
// Now force a partial resync error and ensure that it recovers correctly // Now force a partial resync error and ensure that it recovers correctly
if fp.needFullSync { if fp.needFullSync {
t.Fatalf("Proxier unexpectedly already needs a full sync?") t.Fatalf("Proxier unexpectedly already needs a full sync?")
@ -7998,6 +8101,20 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
COMMIT COMMIT
`) `)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// We deleted 1 KUBE-SERVICES rule, 2 KUBE-SVC-4SW47YFZTEDKD3PK rules, and 2
// KUBE-SEP-AYCN5HPXMIRJNJXU rules
rulesTotal -= 5
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
} }
func TestNoEndpointsMetric(t *testing.T) { func TestNoEndpointsMetric(t *testing.T) {

View File

@ -440,10 +440,10 @@ func NewProxier(ipFamily v1.IPFamily,
ipvsScheduler: scheduler, ipvsScheduler: scheduler,
iptablesData: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
filterChainsData: bytes.NewBuffer(nil), filterChainsData: bytes.NewBuffer(nil),
natChains: proxyutil.LineBuffer{}, natChains: proxyutil.NewLineBuffer(),
natRules: proxyutil.LineBuffer{}, natRules: proxyutil.NewLineBuffer(),
filterChains: proxyutil.LineBuffer{}, filterChains: proxyutil.NewLineBuffer(),
filterRules: proxyutil.LineBuffer{}, filterRules: proxyutil.NewLineBuffer(),
netlinkHandle: NewNetLinkHandle(ipFamily == v1.IPv6Protocol), netlinkHandle: NewNetLinkHandle(ipFamily == v1.IPv6Protocol),
ipset: ipset, ipset: ipset,
nodePortAddresses: nodePortAddresses, nodePortAddresses: nodePortAddresses,

View File

@ -163,10 +163,10 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
ipvsScheduler: defaultScheduler, ipvsScheduler: defaultScheduler,
iptablesData: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
filterChainsData: bytes.NewBuffer(nil), filterChainsData: bytes.NewBuffer(nil),
natChains: proxyutil.LineBuffer{}, natChains: proxyutil.NewLineBuffer(),
natRules: proxyutil.LineBuffer{}, natRules: proxyutil.NewLineBuffer(),
filterChains: proxyutil.LineBuffer{}, filterChains: proxyutil.NewLineBuffer(),
filterRules: proxyutil.LineBuffer{}, filterRules: proxyutil.NewLineBuffer(),
netlinkHandle: netlinkHandle, netlinkHandle: netlinkHandle,
ipsetList: ipsetList, ipsetList: ipsetList,
nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil), nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil),

View File

@ -160,12 +160,25 @@ var (
}, },
) )
// IptablesRulesTotal is the number of iptables rules that the iptables proxy installs. // IptablesRulesTotal is the total number of iptables rules that the iptables
// proxy has installed.
IptablesRulesTotal = metrics.NewGaugeVec( IptablesRulesTotal = metrics.NewGaugeVec(
&metrics.GaugeOpts{ &metrics.GaugeOpts{
Subsystem: kubeProxySubsystem, Subsystem: kubeProxySubsystem,
Name: "sync_proxy_rules_iptables_total", Name: "sync_proxy_rules_iptables_total",
Help: "Number of proxy iptables rules programmed", Help: "Total number of iptables rules owned by kube-proxy",
StabilityLevel: metrics.ALPHA,
},
[]string{"table"},
)
// IptablesRulesLastSync is the number of iptables rules that the iptables proxy
// updated in the last sync.
IptablesRulesLastSync = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: kubeProxySubsystem,
Name: "sync_proxy_rules_iptables_last",
Help: "Number of iptables rules written by kube-proxy in last sync",
StabilityLevel: metrics.ALPHA, StabilityLevel: metrics.ALPHA,
}, },
[]string{"table"}, []string{"table"},
@ -212,6 +225,7 @@ func RegisterMetrics() {
legacyregistry.MustRegister(ServiceChangesPending) legacyregistry.MustRegister(ServiceChangesPending)
legacyregistry.MustRegister(ServiceChangesTotal) legacyregistry.MustRegister(ServiceChangesTotal)
legacyregistry.MustRegister(IptablesRulesTotal) legacyregistry.MustRegister(IptablesRulesTotal)
legacyregistry.MustRegister(IptablesRulesLastSync)
legacyregistry.MustRegister(IptablesRestoreFailuresTotal) legacyregistry.MustRegister(IptablesRestoreFailuresTotal)
legacyregistry.MustRegister(IptablesPartialRestoreFailuresTotal) legacyregistry.MustRegister(IptablesPartialRestoreFailuresTotal)
legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp) legacyregistry.MustRegister(SyncProxyRulesLastQueuedTimestamp)

View File

@ -0,0 +1,150 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"bytes"
"fmt"
)
// LineBuffer is an interface for writing lines of input to a bytes.Buffer
type LineBuffer interface {
// Write takes a list of arguments, each a string or []string, joins all the
// individual strings with spaces, terminates with newline, and writes them to the
// buffer. Any other argument type will panic.
Write(args ...interface{})
// WriteBytes writes bytes to the buffer, and terminates with newline.
WriteBytes(bytes []byte)
// Reset clears the buffer
Reset()
// Bytes returns the contents of the buffer as a []byte
Bytes() []byte
// String returns the contents of the buffer as a string
String() string
// Lines returns the number of lines in the buffer. Note that more precisely, this
// returns the number of times Write() or WriteBytes() was called; it assumes that
// you never wrote any newlines to the buffer yourself.
Lines() int
}
type realLineBuffer struct {
b bytes.Buffer
lines int
}
// NewLineBuffer returns a new "real" LineBuffer
func NewLineBuffer() LineBuffer {
return &realLineBuffer{}
}
// Write is part of LineBuffer
func (buf *realLineBuffer) Write(args ...interface{}) {
for i, arg := range args {
if i > 0 {
buf.b.WriteByte(' ')
}
switch x := arg.(type) {
case string:
buf.b.WriteString(x)
case []string:
for j, s := range x {
if j > 0 {
buf.b.WriteByte(' ')
}
buf.b.WriteString(s)
}
default:
panic(fmt.Sprintf("unknown argument type: %T", x))
}
}
buf.b.WriteByte('\n')
buf.lines++
}
// WriteBytes is part of LineBuffer
func (buf *realLineBuffer) WriteBytes(bytes []byte) {
buf.b.Write(bytes)
buf.b.WriteByte('\n')
buf.lines++
}
// Reset is part of LineBuffer
func (buf *realLineBuffer) Reset() {
buf.b.Reset()
buf.lines = 0
}
// Bytes is part of LineBuffer
func (buf *realLineBuffer) Bytes() []byte {
return buf.b.Bytes()
}
// String is part of LineBuffer
func (buf *realLineBuffer) String() string {
return buf.b.String()
}
// Lines is part of LineBuffer
func (buf *realLineBuffer) Lines() int {
return buf.lines
}
type discardLineBuffer struct {
lines int
}
// NewDiscardLineBuffer returns a dummy LineBuffer that counts the number of writes but
// throws away the data. (This is used for iptables proxy partial syncs, to keep track of
// how many rules we managed to avoid having to sync.)
func NewDiscardLineBuffer() LineBuffer {
return &discardLineBuffer{}
}
// Write is part of LineBuffer
func (buf *discardLineBuffer) Write(args ...interface{}) {
buf.lines++
}
// WriteBytes is part of LineBuffer
func (buf *discardLineBuffer) WriteBytes(bytes []byte) {
buf.lines++
}
// Reset is part of LineBuffer
func (buf *discardLineBuffer) Reset() {
buf.lines = 0
}
// Bytes is part of LineBuffer
func (buf *discardLineBuffer) Bytes() []byte {
return []byte{}
}
// String is part of LineBuffer
func (buf *discardLineBuffer) String() string {
return ""
}
// Lines is part of LineBuffer
func (buf *discardLineBuffer) Lines() int {
return buf.lines
}

View File

@ -0,0 +1,168 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"math/rand"
"strings"
"testing"
)
func TestLineBufferWrite(t *testing.T) {
testCases := []struct {
name string
input []interface{}
expected string
}{
{
name: "none",
input: []interface{}{},
expected: "\n",
},
{
name: "one string",
input: []interface{}{"test1"},
expected: "test1\n",
},
{
name: "one slice",
input: []interface{}{[]string{"test1", "test2"}},
expected: "test1 test2\n",
},
{
name: "mixed",
input: []interface{}{"s1", "s2", []string{"s3", "s4"}, "", "s5", []string{}, []string{"s6"}, "s7"},
expected: "s1 s2 s3 s4 s5 s6 s7\n",
},
}
testBuffer := NewLineBuffer()
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
testBuffer.Write(testCase.input...)
if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) {
t.Fatalf("write word is %v\n expected: %q, got: %q", testCase.input, want, got)
}
if testBuffer.Lines() != 1 {
t.Fatalf("expected 1 line, got: %d", testBuffer.Lines())
}
})
}
}
func TestLineBufferWritePanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("did not panic")
}
}()
testBuffer := NewLineBuffer()
testBuffer.Write("string", []string{"a", "slice"}, 1234)
}
func TestLineBufferWriteBytes(t *testing.T) {
testCases := []struct {
name string
bytes []byte
expected string
}{
{
name: "empty bytes",
bytes: []byte{},
expected: "\n",
},
{
name: "test bytes",
bytes: []byte("test write bytes line"),
expected: "test write bytes line\n",
},
}
testBuffer := NewLineBuffer()
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
testBuffer.WriteBytes(testCase.bytes)
if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) {
t.Fatalf("write bytes is %v\n expected: %s, got: %s", testCase.bytes, want, got)
}
})
}
}
// obtained from https://stackoverflow.com/a/22892986
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randSeq() string {
b := make([]rune, 30)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
func TestWriteCountLines(t *testing.T) {
testCases := []struct {
name string
expected int
}{
{
name: "write no line",
expected: 0,
},
{
name: "write one line",
expected: 1,
},
{
name: "write 100 lines",
expected: 100,
},
{
name: "write 1000 lines",
expected: 1000,
},
{
name: "write 10000 lines",
expected: 10000,
},
{
name: "write 100000 lines",
expected: 100000,
},
}
testBuffer := NewLineBuffer()
discardBuffer := NewDiscardLineBuffer()
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
discardBuffer.Reset()
for i := 0; i < testCase.expected; i++ {
testBuffer.Write(randSeq())
discardBuffer.Write(randSeq())
}
n := testBuffer.Lines()
if n != testCase.expected {
t.Fatalf("lines expected: %d, got: %d", testCase.expected, n)
}
n = discardBuffer.Lines()
if n != testCase.expected {
t.Fatalf("discardBuffer lines expected: %d, got: %d", testCase.expected, n)
}
})
}
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package util package util
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"net" "net"
@ -321,67 +320,6 @@ func GetClusterIPByFamily(ipFamily v1.IPFamily, service *v1.Service) string {
return "" return ""
} }
type LineBuffer struct {
b bytes.Buffer
lines int
}
// Write takes a list of arguments, each a string or []string, joins all the
// individual strings with spaces, terminates with newline, and writes to buf.
// Any other argument type will panic.
func (buf *LineBuffer) Write(args ...interface{}) {
for i, arg := range args {
if i > 0 {
buf.b.WriteByte(' ')
}
switch x := arg.(type) {
case string:
buf.b.WriteString(x)
case []string:
for j, s := range x {
if j > 0 {
buf.b.WriteByte(' ')
}
buf.b.WriteString(s)
}
default:
panic(fmt.Sprintf("unknown argument type: %T", x))
}
}
buf.b.WriteByte('\n')
buf.lines++
}
// WriteBytes writes bytes to buffer, and terminates with newline.
func (buf *LineBuffer) WriteBytes(bytes []byte) {
buf.b.Write(bytes)
buf.b.WriteByte('\n')
buf.lines++
}
// Reset clears buf
func (buf *LineBuffer) Reset() {
buf.b.Reset()
buf.lines = 0
}
// Bytes returns the contents of buf as a []byte
func (buf *LineBuffer) Bytes() []byte {
return buf.b.Bytes()
}
// String returns the contents of buf as a string
func (buf *LineBuffer) String() string {
return buf.b.String()
}
// Lines returns the number of lines in buf. Note that more precisely, this returns the
// number of times Write() or WriteBytes() was called; it assumes that you never wrote
// any newlines to the buffer yourself.
func (buf *LineBuffer) Lines() int {
return buf.lines
}
// RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only // RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only
// closes the ports opened in this sync. // closes the ports opened in this sync.
func RevertPorts(replacementPortsMap, originalPortsMap map[netutils.LocalPort]netutils.Closeable) { func RevertPorts(replacementPortsMap, originalPortsMap map[netutils.LocalPort]netutils.Closeable) {

View File

@ -17,10 +17,8 @@ limitations under the License.
package util package util
import ( import (
"math/rand"
"net" "net"
"reflect" "reflect"
"strings"
"testing" "testing"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -707,145 +705,6 @@ func TestRevertPorts(t *testing.T) {
} }
} }
func TestLineBufferWrite(t *testing.T) {
testCases := []struct {
name string
input []interface{}
expected string
}{
{
name: "none",
input: []interface{}{},
expected: "\n",
},
{
name: "one string",
input: []interface{}{"test1"},
expected: "test1\n",
},
{
name: "one slice",
input: []interface{}{[]string{"test1", "test2"}},
expected: "test1 test2\n",
},
{
name: "mixed",
input: []interface{}{"s1", "s2", []string{"s3", "s4"}, "", "s5", []string{}, []string{"s6"}, "s7"},
expected: "s1 s2 s3 s4 s5 s6 s7\n",
},
}
testBuffer := LineBuffer{}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
testBuffer.Write(testCase.input...)
if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) {
t.Fatalf("write word is %v\n expected: %q, got: %q", testCase.input, want, got)
}
if testBuffer.Lines() != 1 {
t.Fatalf("expected 1 line, got: %d", testBuffer.Lines())
}
})
}
}
func TestLineBufferWritePanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("did not panic")
}
}()
testBuffer := LineBuffer{}
testBuffer.Write("string", []string{"a", "slice"}, 1234)
}
func TestLineBufferWriteBytes(t *testing.T) {
testCases := []struct {
name string
bytes []byte
expected string
}{
{
name: "empty bytes",
bytes: []byte{},
expected: "\n",
},
{
name: "test bytes",
bytes: []byte("test write bytes line"),
expected: "test write bytes line\n",
},
}
testBuffer := LineBuffer{}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
testBuffer.WriteBytes(testCase.bytes)
if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) {
t.Fatalf("write bytes is %v\n expected: %s, got: %s", testCase.bytes, want, got)
}
})
}
}
func TestWriteCountLines(t *testing.T) {
testCases := []struct {
name string
expected int
}{
{
name: "write no line",
expected: 0,
},
{
name: "write one line",
expected: 1,
},
{
name: "write 100 lines",
expected: 100,
},
{
name: "write 1000 lines",
expected: 1000,
},
{
name: "write 10000 lines",
expected: 10000,
},
{
name: "write 100000 lines",
expected: 100000,
},
}
testBuffer := LineBuffer{}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
for i := 0; i < testCase.expected; i++ {
testBuffer.Write(randSeq())
}
n := testBuffer.Lines()
if n != testCase.expected {
t.Fatalf("lines expected: %d, got: %d", testCase.expected, n)
}
})
}
}
// obtained from https://stackoverflow.com/a/22892986
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randSeq() string {
b := make([]rune, 30)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
func mustParseIPAddr(str string) net.Addr { func mustParseIPAddr(str string) net.Addr {
a, err := net.ResolveIPAddr("ip", str) a, err := net.ResolveIPAddr("ip", str)
if err != nil { if err != nil {