diff --git a/pkg/proxy/conntrack/cleanup.go b/pkg/proxy/conntrack/cleanup.go index a6105411a77..5c8dded75ef 100644 --- a/pkg/proxy/conntrack/cleanup.go +++ b/pkg/proxy/conntrack/cleanup.go @@ -20,6 +20,7 @@ limitations under the License. package conntrack import ( + "errors" "time" "github.com/vishvananda/netlink" @@ -43,8 +44,12 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily, entries, err := ct.ListEntries(ipFamilyMap[ipFamily]) if err != nil { - klog.ErrorS(err, "Failed to list conntrack entries") - return + if errors.Is(err, unix.EINTR) { + klog.V(2).ErrorS(err, "received a partial result, continuing to clean with partial result") + } else { + klog.ErrorS(err, "Failed to list conntrack entries") + return + } } // serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs) diff --git a/pkg/proxy/conntrack/conntrack.go b/pkg/proxy/conntrack/conntrack.go index 1e01d654680..53cf6ba57f3 100644 --- a/pkg/proxy/conntrack/conntrack.go +++ b/pkg/proxy/conntrack/conntrack.go @@ -24,7 +24,9 @@ import ( "github.com/vishvananda/netlink" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/proxy/util" ) // Interface for dealing with conntrack @@ -57,8 +59,12 @@ func newConntracker(handler netlinkHandler) Interface { } // ListEntries list all conntrack entries for connections of the given IP family. -func (ct *conntracker) ListEntries(ipFamily uint8) ([]*netlink.ConntrackFlow, error) { - return ct.handler.ConntrackTableList(netlink.ConntrackTable, netlink.InetFamily(ipFamily)) +func (ct *conntracker) ListEntries(ipFamily uint8) (entries []*netlink.ConntrackFlow, err error) { + err = retry.OnError(util.MaxAttemptsEINTR, util.ShouldRetryOnEINTR, func() error { + entries, err = ct.handler.ConntrackTableList(netlink.ConntrackTable, netlink.InetFamily(ipFamily)) + return err + }) + return entries, err } // ClearEntries deletes conntrack entries for connections of the given IP family, @@ -69,7 +75,15 @@ func (ct *conntracker) ClearEntries(ipFamily uint8, filters ...netlink.CustomCon return 0, nil } - n, err := ct.handler.ConntrackDeleteFilters(netlink.ConntrackTable, netlink.InetFamily(ipFamily), filters...) + var n uint + var err error + err = retry.OnError(util.MaxAttemptsEINTR, util.ShouldRetryOnEINTR, func() error { + var count uint + count, err = ct.handler.ConntrackDeleteFilters(netlink.ConntrackTable, netlink.InetFamily(ipFamily), filters...) + n += count + return err + }) + if err != nil { return int(n), fmt.Errorf("error deleting conntrack entries, error: %w", err) } diff --git a/pkg/proxy/util/nfacct/nfacct.go b/pkg/proxy/util/nfacct/nfacct.go index cdb7ef4cc7c..fa15a8860ef 100644 --- a/pkg/proxy/util/nfacct/nfacct.go +++ b/pkg/proxy/util/nfacct/nfacct.go @@ -31,6 +31,6 @@ type Interface interface { Add(name string) error // Get retrieves the nfacct counter with the specified name, returning an error if it doesn't exist. Get(name string) (*Counter, error) - // List retrieves all nfacct counters. + // List retrieves nfacct counters, it could receive all counters or a subset of them with an unix.EINTR error. List() ([]*Counter, error) } diff --git a/pkg/proxy/util/nfacct/nfacct_linux.go b/pkg/proxy/util/nfacct/nfacct_linux.go index a5fb7ccecb9..4a91e84aa63 100644 --- a/pkg/proxy/util/nfacct/nfacct_linux.go +++ b/pkg/proxy/util/nfacct/nfacct_linux.go @@ -29,6 +29,9 @@ import ( "github.com/vishvananda/netlink/nl" "golang.org/x/sys/unix" + + "k8s.io/client-go/util/retry" + "k8s.io/kubernetes/pkg/proxy/util" ) // MaxLength represents the maximum length allowed for the name in a nfacct counter. @@ -146,9 +149,15 @@ func (r *runner) Get(name string) (*Counter, error) { // List is part of the interface. func (r *runner) List() ([]*Counter, error) { - req := r.handler.newRequest(cmdGet, unix.NLM_F_REQUEST|unix.NLM_F_DUMP) - msgs, err := req.Execute(unix.NETLINK_NETFILTER, 0) - if err != nil { + var err error + var msgs [][]byte + err = retry.OnError(util.MaxAttemptsEINTR, util.ShouldRetryOnEINTR, func() error { + req := r.handler.newRequest(cmdGet, unix.NLM_F_REQUEST|unix.NLM_F_DUMP) + msgs, err = req.Execute(unix.NETLINK_NETFILTER, 0) + return err + }) + + if err != nil && !errors.Is(err, unix.EINTR) { return nil, handleError(err) } @@ -160,7 +169,7 @@ func (r *runner) List() ([]*Counter, error) { } counters = append(counters, counter) } - return counters, nil + return counters, err } var ErrObjectNotFound = errors.New("object not found") diff --git a/pkg/proxy/util/utils_linux.go b/pkg/proxy/util/utils_linux.go new file mode 100644 index 00000000000..6d43a01aae5 --- /dev/null +++ b/pkg/proxy/util/utils_linux.go @@ -0,0 +1,31 @@ +//go:build linux +// +build linux + +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "errors" + + "golang.org/x/sys/unix" + + "k8s.io/apimachinery/pkg/util/wait" +) + +var MaxAttemptsEINTR = wait.Backoff{Steps: 5} +var ShouldRetryOnEINTR = func(err error) bool { return errors.Is(err, unix.EINTR) }