Merge pull request #130256 from adrianmoisey/netlink-without-tests

Add retries to some netlink calls - without tests
This commit is contained in:
Kubernetes Prow Robot 2025-02-19 15:52:26 -08:00 committed by GitHub
commit 1aef26a7eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 69 additions and 10 deletions

View File

@ -20,6 +20,7 @@ limitations under the License.
package conntrack
import (
"errors"
"time"
"github.com/vishvananda/netlink"
@ -43,8 +44,12 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
entries, err := ct.ListEntries(ipFamilyMap[ipFamily])
if err != nil {
klog.ErrorS(err, "Failed to list conntrack entries")
return
if errors.Is(err, unix.EINTR) {
klog.V(2).ErrorS(err, "received a partial result, continuing to clean with partial result")
} else {
klog.ErrorS(err, "Failed to list conntrack entries")
return
}
}
// serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs)

View File

@ -24,7 +24,9 @@ import (
"github.com/vishvananda/netlink"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy/util"
)
// Interface for dealing with conntrack
@ -57,8 +59,12 @@ func newConntracker(handler netlinkHandler) Interface {
}
// ListEntries list all conntrack entries for connections of the given IP family.
func (ct *conntracker) ListEntries(ipFamily uint8) ([]*netlink.ConntrackFlow, error) {
return ct.handler.ConntrackTableList(netlink.ConntrackTable, netlink.InetFamily(ipFamily))
func (ct *conntracker) ListEntries(ipFamily uint8) (entries []*netlink.ConntrackFlow, err error) {
err = retry.OnError(util.MaxAttemptsEINTR, util.ShouldRetryOnEINTR, func() error {
entries, err = ct.handler.ConntrackTableList(netlink.ConntrackTable, netlink.InetFamily(ipFamily))
return err
})
return entries, err
}
// ClearEntries deletes conntrack entries for connections of the given IP family,
@ -69,7 +75,15 @@ func (ct *conntracker) ClearEntries(ipFamily uint8, filters ...netlink.CustomCon
return 0, nil
}
n, err := ct.handler.ConntrackDeleteFilters(netlink.ConntrackTable, netlink.InetFamily(ipFamily), filters...)
var n uint
var err error
err = retry.OnError(util.MaxAttemptsEINTR, util.ShouldRetryOnEINTR, func() error {
var count uint
count, err = ct.handler.ConntrackDeleteFilters(netlink.ConntrackTable, netlink.InetFamily(ipFamily), filters...)
n += count
return err
})
if err != nil {
return int(n), fmt.Errorf("error deleting conntrack entries, error: %w", err)
}

View File

@ -31,6 +31,6 @@ type Interface interface {
Add(name string) error
// Get retrieves the nfacct counter with the specified name, returning an error if it doesn't exist.
Get(name string) (*Counter, error)
// List retrieves all nfacct counters.
// List retrieves nfacct counters, it could receive all counters or a subset of them with an unix.EINTR error.
List() ([]*Counter, error)
}

View File

@ -29,6 +29,9 @@ import (
"github.com/vishvananda/netlink/nl"
"golang.org/x/sys/unix"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/proxy/util"
)
// MaxLength represents the maximum length allowed for the name in a nfacct counter.
@ -146,9 +149,15 @@ func (r *runner) Get(name string) (*Counter, error) {
// List is part of the interface.
func (r *runner) List() ([]*Counter, error) {
req := r.handler.newRequest(cmdGet, unix.NLM_F_REQUEST|unix.NLM_F_DUMP)
msgs, err := req.Execute(unix.NETLINK_NETFILTER, 0)
if err != nil {
var err error
var msgs [][]byte
err = retry.OnError(util.MaxAttemptsEINTR, util.ShouldRetryOnEINTR, func() error {
req := r.handler.newRequest(cmdGet, unix.NLM_F_REQUEST|unix.NLM_F_DUMP)
msgs, err = req.Execute(unix.NETLINK_NETFILTER, 0)
return err
})
if err != nil && !errors.Is(err, unix.EINTR) {
return nil, handleError(err)
}
@ -160,7 +169,7 @@ func (r *runner) List() ([]*Counter, error) {
}
counters = append(counters, counter)
}
return counters, nil
return counters, err
}
var ErrObjectNotFound = errors.New("object not found")

View File

@ -0,0 +1,31 @@
//go:build linux
// +build linux
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"errors"
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/util/wait"
)
var MaxAttemptsEINTR = wait.Backoff{Steps: 5}
var ShouldRetryOnEINTR = func(err error) bool { return errors.Is(err, unix.EINTR) }