diff --git a/test/e2e/network/netpol/kubemanager.go b/test/e2e/network/netpol/kubemanager.go index f7ee9956e86..766a269be48 100644 --- a/test/e2e/network/netpol/kubemanager.go +++ b/test/e2e/network/netpol/kubemanager.go @@ -19,29 +19,30 @@ package netpol import ( "context" "fmt" - "net" - "strconv" - "strings" - v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" netutils "k8s.io/utils/net" + "net" + "strconv" + "strings" + "time" ) -// probeConnectivityArgs is set of arguments for a probeConnectivity -type probeConnectivityArgs struct { - nsFrom string - podFrom string - containerFrom string - addrTo string - protocol v1.Protocol - toPort int - timeoutSeconds int -} +// defaultPollIntervalSeconds [seconds] is the default value for which the Prober will wait before attempting next attempt. +const defaultPollIntervalSeconds = 2 + +// defaultPollTimeoutSeconds [seconds] is the default timeout when polling on probes. +// using this value leads to a minimum of 2 attempts for every probe +const defaultPollTimeoutSeconds = 1 + +// maxPollTimeoutSeconds [seconds] is the max timeout when polling on probes, this should only be used when expect a +// successful probe; use defaultPollTimeout otherwise +const maxPollTimeoutSeconds = 10 // TestPod represents an actual running pod. For each Pod defined by the model, // there will be a corresponding TestPod. TestPod includes some runtime info @@ -181,9 +182,21 @@ func (k *kubeManager) probeConnectivity(args *probeConnectivityArgs) (bool, stri } commandDebugString := fmt.Sprintf("kubectl exec %s -c %s -n %s -- %s", args.podFrom, args.containerFrom, args.nsFrom, strings.Join(cmd, " ")) - stdout, stderr, err := k.executeRemoteCommand(args.nsFrom, args.podFrom, args.containerFrom, cmd) + + attempt := 0 + + err := wait.PollImmediate(time.Duration(args.pollIntervalSeconds)*time.Second, time.Duration(args.pollTimeoutSeconds)*time.Second, + func() (bool, error) { + stdout, stderr, err := k.executeRemoteCommand(args.nsFrom, args.podFrom, args.containerFrom, cmd) + if err != nil { + framework.Logf("retrying probe #%d :: %s/%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", attempt+1, args.nsFrom, args.podFrom, args.addrTo, err, stdout, stderr) + attempt++ + return false, nil + } + return true, nil + }) + if err != nil { - framework.Logf("%s/%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", args.nsFrom, args.podFrom, args.addrTo, err, stdout, stderr) return false, commandDebugString, nil } return true, commandDebugString, nil @@ -290,3 +303,16 @@ func getProbeTimeoutSeconds() int { func getWorkers() int { return 3 } + +// getPollInterval returns the value for which the Prober will wait before attempting next attempt. +func getPollIntervalSeconds() int { + return defaultPollIntervalSeconds +} + +// getPollTimeout returns the timeout for polling on probes. +func getPollTimeoutSeconds(useMaxPollTimout bool) int { + if useMaxPollTimout { + return maxPollTimeoutSeconds + } + return defaultPollTimeoutSeconds +} diff --git a/test/e2e/network/netpol/probe.go b/test/e2e/network/netpol/probe.go index c4d5d03e758..216203ea432 100644 --- a/test/e2e/network/netpol/probe.go +++ b/test/e2e/network/netpol/probe.go @@ -25,6 +25,19 @@ import ( netutils "k8s.io/utils/net" ) +// probeConnectivityArgs is set of arguments for a probeConnectivity +type probeConnectivityArgs struct { + nsFrom string + podFrom string + containerFrom string + addrTo string + protocol v1.Protocol + toPort int + timeoutSeconds int + pollIntervalSeconds int + pollTimeoutSeconds int +} + // decouple us from k8smanager.go type Prober interface { probeConnectivity(args *probeConnectivityArgs) (bool, string, error) @@ -32,12 +45,13 @@ type Prober interface { // ProbeJob packages the data for the input of a pod->pod connectivity probe type ProbeJob struct { - PodFrom TestPod - PodTo TestPod - PodToServiceIP string - ToPort int - ToPodDNSDomain string - Protocol v1.Protocol + PodFrom TestPod + PodTo TestPod + PodToServiceIP string + ToPort int + ToPodDNSDomain string + Protocol v1.Protocol + UseMaxPollTimeout bool } // ProbeJobResults packages the data for the results of a pod->pod connectivity probe @@ -54,16 +68,22 @@ func ProbePodToPodConnectivity(prober Prober, allPods []TestPod, dnsDomain strin jobs := make(chan *ProbeJob, size) results := make(chan *ProbeJobResults, size) for i := 0; i < getWorkers(); i++ { - go probeWorker(prober, jobs, results, getProbeTimeoutSeconds()) + go probeWorker(prober, jobs, results) } for _, podFrom := range allPods { for _, podTo := range allPods { + useMaxPollTimeout := false + // we only want to use max poll timeout for the probes where we expect connectivity from "podFrom" to "podTo". + if testCase.Reachability.Expected.Get(podFrom.PodString().String(), podTo.PodString().String()) { + useMaxPollTimeout = true + } jobs <- &ProbeJob{ - PodFrom: podFrom, - PodTo: podTo, - ToPort: testCase.ToPort, - ToPodDNSDomain: dnsDomain, - Protocol: testCase.Protocol, + PodFrom: podFrom, + PodTo: podTo, + ToPort: testCase.ToPort, + ToPodDNSDomain: dnsDomain, + Protocol: testCase.Protocol, + UseMaxPollTimeout: useMaxPollTimeout, } } } @@ -91,7 +111,7 @@ func ProbePodToPodConnectivity(prober Prober, allPods []TestPod, dnsDomain strin // probeWorker continues polling a pod connectivity status, until the incoming "jobs" channel is closed, and writes results back out to the "results" channel. // it only writes pass/fail status to a channel and has no failure side effects, this is by design since we do not want to fail inside a goroutine. -func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults, timeoutSeconds int) { +func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults) { defer ginkgo.GinkgoRecover() for job := range jobs { podFrom := job.PodFrom @@ -110,13 +130,15 @@ func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobR // TODO make this work on dual-stack clusters... connected, command, err := prober.probeConnectivity(&probeConnectivityArgs{ - nsFrom: podFrom.Namespace, - podFrom: podFrom.Name, - containerFrom: podFrom.ContainerName, - addrTo: job.PodTo.ServiceIP, - protocol: job.Protocol, - toPort: job.ToPort, - timeoutSeconds: timeoutSeconds, + nsFrom: podFrom.Namespace, + podFrom: podFrom.Name, + containerFrom: podFrom.ContainerName, + addrTo: job.PodTo.ServiceIP, + protocol: job.Protocol, + toPort: job.ToPort, + timeoutSeconds: getProbeTimeoutSeconds(), + pollIntervalSeconds: getPollIntervalSeconds(), + pollTimeoutSeconds: getPollTimeoutSeconds(job.UseMaxPollTimeout), }) result := &ProbeJobResults{ Job: job, diff --git a/test/e2e/network/netpol/test_helper.go b/test/e2e/network/netpol/test_helper.go index d41b7105167..b54c7781a43 100644 --- a/test/e2e/network/netpol/test_helper.go +++ b/test/e2e/network/netpol/test_helper.go @@ -111,10 +111,13 @@ func waitForHTTPServers(k *kubeManager, model *Model) error { func ValidateOrFail(k8s *kubeManager, testCase *TestCase) { ginkgo.By("Validating reachability matrix...") - // 1st try + // 1st try, exponential backoff (starting at 1s) will happen for every probe to accommodate infra that might be + // network-congested, as is common in some GH actions or other heavily oversubscribed CI systems. ginkgo.By("Validating reachability matrix... (FIRST TRY)") ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase) - // 2nd try, in case first one failed + + // the aforementioned individual probe's exponential retries (introduced in january 2023) might be able to obviate + // this step, let's investigate removing this massive secondary polling of the matrix some day. if _, wrong, _, _ := testCase.Reachability.Summary(ignoreLoopback); wrong != 0 { framework.Logf("failed first probe %d wrong results ... retrying (SECOND TRY)", wrong) ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase)