Merge pull request #107138 from crisboarna/feat/102334_netpol-arg-improvement

feat(netpol tests):  netpol probeConnectivity refactor
This commit is contained in:
Kubernetes Prow Robot 2021-12-21 07:44:46 -08:00 committed by GitHub
commit 813671d1a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 15 deletions

View File

@ -33,6 +33,17 @@ import (
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
) )
// probeConnectivityArgs is set of arguments for a probeConnectivity
type probeConnectivityArgs struct {
nsFrom string
podFrom string
containerFrom string
addrTo string
protocol v1.Protocol
toPort int
timeoutSeconds int
}
// kubeManager provides a convenience interface to kube functionality that we leverage for polling NetworkPolicy connections. // kubeManager provides a convenience interface to kube functionality that we leverage for polling NetworkPolicy connections.
// Its responsibilities are: // Its responsibilities are:
// - creating resources (pods, deployments, namespaces, services, network policies) // - creating resources (pods, deployments, namespaces, services, network policies)
@ -116,33 +127,33 @@ func (k *kubeManager) getPod(ns string, name string) (*v1.Pod, error) {
// probeConnectivity execs into a pod and checks its connectivity to another pod. // probeConnectivity execs into a pod and checks its connectivity to another pod.
// Implements the Prober interface. // Implements the Prober interface.
func (k *kubeManager) probeConnectivity(nsFrom string, podFrom string, containerFrom string, addrTo string, protocol v1.Protocol, toPort int, timeoutSeconds int) (bool, string, error) { func (k *kubeManager) probeConnectivity(args *probeConnectivityArgs) (bool, string, error) {
port := strconv.Itoa(toPort) port := strconv.Itoa(args.toPort)
if addrTo == "" { if args.addrTo == "" {
return false, "no IP provided", fmt.Errorf("empty addrTo field") return false, "no IP provided", fmt.Errorf("empty addrTo field")
} }
framework.Logf("Starting probe from pod %v to %v", podFrom, addrTo) framework.Logf("Starting probe from pod %v to %v", args.podFrom, args.addrTo)
var cmd []string var cmd []string
timeout := fmt.Sprintf("--timeout=%vs", timeoutSeconds) timeout := fmt.Sprintf("--timeout=%vs", args.timeoutSeconds)
switch protocol { switch args.protocol {
case v1.ProtocolSCTP: case v1.ProtocolSCTP:
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=sctp"} cmd = []string{"/agnhost", "connect", net.JoinHostPort(args.addrTo, port), timeout, "--protocol=sctp"}
case v1.ProtocolTCP: case v1.ProtocolTCP:
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=tcp"} cmd = []string{"/agnhost", "connect", net.JoinHostPort(args.addrTo, port), timeout, "--protocol=tcp"}
case v1.ProtocolUDP: case v1.ProtocolUDP:
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=udp"} cmd = []string{"/agnhost", "connect", net.JoinHostPort(args.addrTo, port), timeout, "--protocol=udp"}
if framework.NodeOSDistroIs("windows") { if framework.NodeOSDistroIs("windows") {
framework.Logf("probing UDP for windows may result in cluster instability for certain windows nodes with low CPU/Memory, depending on CRI version") framework.Logf("probing UDP for windows may result in cluster instability for certain windows nodes with low CPU/Memory, depending on CRI version")
} }
default: default:
framework.Failf("protocol %s not supported", protocol) framework.Failf("protocol %s not supported", args.protocol)
} }
commandDebugString := fmt.Sprintf("kubectl exec %s -c %s -n %s -- %s", podFrom, containerFrom, nsFrom, strings.Join(cmd, " ")) commandDebugString := fmt.Sprintf("kubectl exec %s -c %s -n %s -- %s", args.podFrom, args.containerFrom, args.nsFrom, strings.Join(cmd, " "))
stdout, stderr, err := k.executeRemoteCommand(nsFrom, podFrom, containerFrom, cmd) stdout, stderr, err := k.executeRemoteCommand(args.nsFrom, args.podFrom, args.containerFrom, cmd)
if err != nil { if err != nil {
framework.Logf("%s/%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", nsFrom, podFrom, addrTo, err, stdout, stderr) framework.Logf("%s/%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", args.nsFrom, args.podFrom, args.addrTo, err, stdout, stderr)
return false, commandDebugString, nil return false, commandDebugString, nil
} }
return true, commandDebugString, nil return true, commandDebugString, nil

View File

@ -27,7 +27,7 @@ import (
// decouple us from k8smanager.go // decouple us from k8smanager.go
type Prober interface { type Prober interface {
probeConnectivity(nsFrom string, podFrom string, containerFrom string, addrTo string, protocol v1.Protocol, toPort int, timeoutSeconds int) (bool, string, error) probeConnectivity(args *probeConnectivityArgs) (bool, string, error)
} }
// ProbeJob packages the data for the input of a pod->pod connectivity probe // ProbeJob packages the data for the input of a pod->pod connectivity probe
@ -108,7 +108,15 @@ func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobR
// dnsName := job.PodTo.QualifiedServiceAddress(job.ToPodDNSDomain) // dnsName := job.PodTo.QualifiedServiceAddress(job.ToPodDNSDomain)
// TODO make this work on dual-stack clusters... // TODO make this work on dual-stack clusters...
connected, command, err := prober.probeConnectivity(podFrom.Namespace, podFrom.Name, podFrom.Containers[0].Name(), job.PodTo.ServiceIP, job.Protocol, job.ToPort, timeoutSeconds) connected, command, err := prober.probeConnectivity(&probeConnectivityArgs{
nsFrom: podFrom.Namespace,
podFrom: podFrom.Name,
containerFrom: podFrom.Containers[0].Name(),
addrTo: job.PodTo.ServiceIP,
protocol: job.Protocol,
toPort: job.ToPort,
timeoutSeconds: timeoutSeconds,
})
result := &ProbeJobResults{ result := &ProbeJobResults{
Job: job, Job: job,
IsConnected: connected, IsConnected: connected,