e2e/network_policy: using PollImmediate for intra pod connectivity probes

This commit is contained in:
Daman 2023-01-19 20:57:58 +05:30
parent 6b55f097bb
commit faee4c33de
3 changed files with 89 additions and 38 deletions

View File

@ -19,29 +19,30 @@ package netpol
import ( import (
"context" "context"
"fmt" "fmt"
"net"
"strconv"
"strings"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1" networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
"net"
"strconv"
"strings"
"time"
) )
// probeConnectivityArgs is set of arguments for a probeConnectivity // defaultPollIntervalSeconds [seconds] is the default value for which the Prober will wait before attempting next attempt.
type probeConnectivityArgs struct { const defaultPollIntervalSeconds = 2
nsFrom string
podFrom string // defaultPollTimeoutSeconds [seconds] is the default timeout when polling on probes.
containerFrom string // using this value leads to a minimum of 2 attempts for every probe
addrTo string const defaultPollTimeoutSeconds = 1
protocol v1.Protocol
toPort int // maxPollTimeoutSeconds [seconds] is the max timeout when polling on probes, this should only be used when expect a
timeoutSeconds int // successful probe; use defaultPollTimeout otherwise
} const maxPollTimeoutSeconds = 10
// TestPod represents an actual running pod. For each Pod defined by the model, // TestPod represents an actual running pod. For each Pod defined by the model,
// there will be a corresponding TestPod. TestPod includes some runtime info // there will be a corresponding TestPod. TestPod includes some runtime info
@ -181,9 +182,21 @@ func (k *kubeManager) probeConnectivity(args *probeConnectivityArgs) (bool, stri
} }
commandDebugString := fmt.Sprintf("kubectl exec %s -c %s -n %s -- %s", args.podFrom, args.containerFrom, args.nsFrom, strings.Join(cmd, " ")) commandDebugString := fmt.Sprintf("kubectl exec %s -c %s -n %s -- %s", args.podFrom, args.containerFrom, args.nsFrom, strings.Join(cmd, " "))
stdout, stderr, err := k.executeRemoteCommand(args.nsFrom, args.podFrom, args.containerFrom, cmd)
attempt := 0
err := wait.PollImmediate(time.Duration(args.pollIntervalSeconds)*time.Second, time.Duration(args.pollTimeoutSeconds)*time.Second,
func() (bool, error) {
stdout, stderr, err := k.executeRemoteCommand(args.nsFrom, args.podFrom, args.containerFrom, cmd)
if err != nil {
framework.Logf("retrying probe #%d :: %s/%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", attempt+1, args.nsFrom, args.podFrom, args.addrTo, err, stdout, stderr)
attempt++
return false, nil
}
return true, nil
})
if err != nil { if err != nil {
framework.Logf("%s/%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", args.nsFrom, args.podFrom, args.addrTo, err, stdout, stderr)
return false, commandDebugString, nil return false, commandDebugString, nil
} }
return true, commandDebugString, nil return true, commandDebugString, nil
@ -290,3 +303,16 @@ func getProbeTimeoutSeconds() int {
func getWorkers() int { func getWorkers() int {
return 3 return 3
} }
// getPollInterval returns the value for which the Prober will wait before attempting next attempt.
func getPollIntervalSeconds() int {
return defaultPollIntervalSeconds
}
// getPollTimeout returns the timeout for polling on probes.
func getPollTimeoutSeconds(useMaxPollTimout bool) int {
if useMaxPollTimout {
return maxPollTimeoutSeconds
}
return defaultPollTimeoutSeconds
}

View File

@ -25,6 +25,19 @@ import (
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
) )
// probeConnectivityArgs is set of arguments for a probeConnectivity
type probeConnectivityArgs struct {
nsFrom string
podFrom string
containerFrom string
addrTo string
protocol v1.Protocol
toPort int
timeoutSeconds int
pollIntervalSeconds int
pollTimeoutSeconds int
}
// decouple us from k8smanager.go // decouple us from k8smanager.go
type Prober interface { type Prober interface {
probeConnectivity(args *probeConnectivityArgs) (bool, string, error) probeConnectivity(args *probeConnectivityArgs) (bool, string, error)
@ -32,12 +45,13 @@ type Prober interface {
// ProbeJob packages the data for the input of a pod->pod connectivity probe // ProbeJob packages the data for the input of a pod->pod connectivity probe
type ProbeJob struct { type ProbeJob struct {
PodFrom TestPod PodFrom TestPod
PodTo TestPod PodTo TestPod
PodToServiceIP string PodToServiceIP string
ToPort int ToPort int
ToPodDNSDomain string ToPodDNSDomain string
Protocol v1.Protocol Protocol v1.Protocol
UseMaxPollTimeout bool
} }
// ProbeJobResults packages the data for the results of a pod->pod connectivity probe // ProbeJobResults packages the data for the results of a pod->pod connectivity probe
@ -54,16 +68,22 @@ func ProbePodToPodConnectivity(prober Prober, allPods []TestPod, dnsDomain strin
jobs := make(chan *ProbeJob, size) jobs := make(chan *ProbeJob, size)
results := make(chan *ProbeJobResults, size) results := make(chan *ProbeJobResults, size)
for i := 0; i < getWorkers(); i++ { for i := 0; i < getWorkers(); i++ {
go probeWorker(prober, jobs, results, getProbeTimeoutSeconds()) go probeWorker(prober, jobs, results)
} }
for _, podFrom := range allPods { for _, podFrom := range allPods {
for _, podTo := range allPods { for _, podTo := range allPods {
useMaxPollTimeout := false
// we only want to use max poll timeout for the probes where we expect connectivity from "podFrom" to "podTo".
if testCase.Reachability.Expected.Get(podFrom.PodString().String(), podTo.PodString().String()) {
useMaxPollTimeout = true
}
jobs <- &ProbeJob{ jobs <- &ProbeJob{
PodFrom: podFrom, PodFrom: podFrom,
PodTo: podTo, PodTo: podTo,
ToPort: testCase.ToPort, ToPort: testCase.ToPort,
ToPodDNSDomain: dnsDomain, ToPodDNSDomain: dnsDomain,
Protocol: testCase.Protocol, Protocol: testCase.Protocol,
UseMaxPollTimeout: useMaxPollTimeout,
} }
} }
} }
@ -91,7 +111,7 @@ func ProbePodToPodConnectivity(prober Prober, allPods []TestPod, dnsDomain strin
// probeWorker continues polling a pod connectivity status, until the incoming "jobs" channel is closed, and writes results back out to the "results" channel. // probeWorker continues polling a pod connectivity status, until the incoming "jobs" channel is closed, and writes results back out to the "results" channel.
// it only writes pass/fail status to a channel and has no failure side effects, this is by design since we do not want to fail inside a goroutine. // it only writes pass/fail status to a channel and has no failure side effects, this is by design since we do not want to fail inside a goroutine.
func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults, timeoutSeconds int) { func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults) {
defer ginkgo.GinkgoRecover() defer ginkgo.GinkgoRecover()
for job := range jobs { for job := range jobs {
podFrom := job.PodFrom podFrom := job.PodFrom
@ -110,13 +130,15 @@ func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobR
// TODO make this work on dual-stack clusters... // TODO make this work on dual-stack clusters...
connected, command, err := prober.probeConnectivity(&probeConnectivityArgs{ connected, command, err := prober.probeConnectivity(&probeConnectivityArgs{
nsFrom: podFrom.Namespace, nsFrom: podFrom.Namespace,
podFrom: podFrom.Name, podFrom: podFrom.Name,
containerFrom: podFrom.ContainerName, containerFrom: podFrom.ContainerName,
addrTo: job.PodTo.ServiceIP, addrTo: job.PodTo.ServiceIP,
protocol: job.Protocol, protocol: job.Protocol,
toPort: job.ToPort, toPort: job.ToPort,
timeoutSeconds: timeoutSeconds, timeoutSeconds: getProbeTimeoutSeconds(),
pollIntervalSeconds: getPollIntervalSeconds(),
pollTimeoutSeconds: getPollTimeoutSeconds(job.UseMaxPollTimeout),
}) })
result := &ProbeJobResults{ result := &ProbeJobResults{
Job: job, Job: job,

View File

@ -111,10 +111,13 @@ func waitForHTTPServers(k *kubeManager, model *Model) error {
func ValidateOrFail(k8s *kubeManager, testCase *TestCase) { func ValidateOrFail(k8s *kubeManager, testCase *TestCase) {
ginkgo.By("Validating reachability matrix...") ginkgo.By("Validating reachability matrix...")
// 1st try // 1st try, exponential backoff (starting at 1s) will happen for every probe to accommodate infra that might be
// network-congested, as is common in some GH actions or other heavily oversubscribed CI systems.
ginkgo.By("Validating reachability matrix... (FIRST TRY)") ginkgo.By("Validating reachability matrix... (FIRST TRY)")
ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase) ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase)
// 2nd try, in case first one failed
// the aforementioned individual probe's exponential retries (introduced in january 2023) might be able to obviate
// this step, let's investigate removing this massive secondary polling of the matrix some day.
if _, wrong, _, _ := testCase.Reachability.Summary(ignoreLoopback); wrong != 0 { if _, wrong, _, _ := testCase.Reachability.Summary(ignoreLoopback); wrong != 0 {
framework.Logf("failed first probe %d wrong results ... retrying (SECOND TRY)", wrong) framework.Logf("failed first probe %d wrong results ... retrying (SECOND TRY)", wrong)
ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase) ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase)