From 69019a03d3ac43823e7a739c01d3eeac86d699c0 Mon Sep 17 00:00:00 2001 From: Peri Thompson Date: Fri, 21 May 2021 16:13:32 +0100 Subject: [PATCH 1/2] Add netpol tests for windows Co-authored-by: Jay Vyas --- .../k8s.io/client-go/tools/auth/clientauth.go | 3 +- test/e2e/framework/exec_util.go | 4 ++- test/e2e/network/netpol/kubemanager.go | 29 ++++++++++++++----- test/e2e/network/netpol/network_policy.go | 11 +++++-- test/e2e/network/netpol/probe.go | 7 ++++- 5 files changed, 39 insertions(+), 15 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/auth/clientauth.go b/staging/src/k8s.io/client-go/tools/auth/clientauth.go index 4c24f79977f..8526b50c565 100644 --- a/staging/src/k8s.io/client-go/tools/auth/clientauth.go +++ b/staging/src/k8s.io/client-go/tools/auth/clientauth.go @@ -66,9 +66,8 @@ package auth import ( "encoding/json" "io/ioutil" - "os" - restclient "k8s.io/client-go/rest" + "os" ) // Info holds Kubernetes API authorization config. It is intended diff --git a/test/e2e/framework/exec_util.go b/test/e2e/framework/exec_util.go index 4d96fb37b72..3b8157771cd 100644 --- a/test/e2e/framework/exec_util.go +++ b/test/e2e/framework/exec_util.go @@ -58,6 +58,7 @@ func (f *Framework) ExecWithOptions(options ExecOptions) (string, string, error) const tty = false + Logf("ExecWithOptions: Clientset creation ") req := f.ClientSet.CoreV1().RESTClient().Post(). Resource("pods"). Name(options.PodName). @@ -74,8 +75,9 @@ func (f *Framework) ExecWithOptions(options ExecOptions) (string, string, error) }, scheme.ParameterCodec) var stdout, stderr bytes.Buffer - err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty) + Logf("ExecWithOptions: execute(POST %s %s)", req.URL()) + err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty) if options.PreserveWhitespace { return stdout.String(), stderr.String(), err } diff --git a/test/e2e/network/netpol/kubemanager.go b/test/e2e/network/netpol/kubemanager.go index da60aebff18..834eabfafd4 100644 --- a/test/e2e/network/netpol/kubemanager.go +++ b/test/e2e/network/netpol/kubemanager.go @@ -63,12 +63,20 @@ func (k *kubeManager) initializeCluster(model *Model) error { for _, pod := range ns.Pods { framework.Logf("creating/updating pod %s/%s", ns.Name, pod.Name) - kubePod, err := k.createPod(pod.KubePod()) + thePod := pod.KubePod() + if framework.NodeOSDistroIs("windows") { + thePod.Spec.NodeSelector = map[string]string{ + "kubernetes.io/os": "windows", + } + } else { + framework.Logf("node distro is NOT WINDOWS !!!!!!!!!!!") + } + kubePod, err := k.createPod(thePod) if err != nil { return err } - createdPods = append(createdPods, kubePod) + createdPods = append(createdPods, kubePod) _, err = k.createService(pod.Service()) if err != nil { return err @@ -78,6 +86,7 @@ func (k *kubeManager) initializeCluster(model *Model) error { for _, podString := range model.AllPodStrings() { k8sPod, err := k.getPod(podString.Namespace(), podString.PodName()) + if err != nil { return err } @@ -115,11 +124,14 @@ func (k *kubeManager) probeConnectivity(nsFrom string, podFrom string, container var cmd []string switch protocol { case v1.ProtocolSCTP: - cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=1s", "--protocol=sctp"} + cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=3s", "--protocol=sctp"} case v1.ProtocolTCP: - cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=1s", "--protocol=tcp"} + cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=3s", "--protocol=tcp"} case v1.ProtocolUDP: - cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=1s", "--protocol=udp"} + cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=3s", "--protocol=udp"} + if framework.NodeOSDistroIs("windows") { + framework.Logf("probing UDP for windows may result in cluster instability for certain windows nodes with low CPU/Memory, depending on CRI version") + } default: framework.Failf("protocol %s not supported", protocol) } @@ -252,14 +264,15 @@ func (k *kubeManager) deleteNamespaces(namespaces []string) error { return nil } -// waitForHTTPServers waits for all webservers to be up, on all protocols, and then validates them using the same probe logic as the rest of the suite. -func (k *kubeManager) waitForHTTPServers(model *Model) error { +// waitForHTTPServers waits for all webservers to be up, on all protocols sent in the input, and then validates them using the same probe logic as the rest of the suite. +func (k *kubeManager) waitForHTTPServers(model *Model, protocols []v1.Protocol) error { const maxTries = 10 framework.Logf("waiting for HTTP servers (ports 80 and 81) to become ready") testCases := map[string]*TestCase{} for _, port := range model.Ports { - for _, protocol := range model.Protocols { + // Protocols is provided as input so that we can skip udp polling for windows + for _, protocol := range protocols { fromPort := 81 desc := fmt.Sprintf("%d->%d,%s", fromPort, port, protocol) testCases[desc] = &TestCase{ToPort: int(port), Protocol: protocol} diff --git a/test/e2e/network/netpol/network_policy.go b/test/e2e/network/netpol/network_policy.go index 256f88f2c11..722b2b0c9dc 100644 --- a/test/e2e/network/netpol/network_policy.go +++ b/test/e2e/network/netpol/network_policy.go @@ -117,7 +117,7 @@ and what is happening in practice: z/c . . . . . . . . . */ -var _ = common.SIGDescribe("Netpol [LinuxOnly]", func() { +var _ = common.SIGDescribe("Netpol", func() { f := framework.NewDefaultFramework("netpol") ginkgo.Context("NetworkPolicy between server and client", func() { @@ -1312,6 +1312,7 @@ func initializeResourcesByFixedNS(f *framework.Framework) { // model derived from the framework. It then waits for the resources described by the model to be up and running // (i.e. all pods are ready and running in their namespaces). func initializeResources(f *framework.Framework) error { + var protocols []v1.Protocol _, _, _, model, k8s := getK8SModel(f) framework.Logf("initializing cluster: ensuring namespaces, deployments, and pods exist and are ready") @@ -1322,6 +1323,10 @@ func initializeResources(f *framework.Framework) error { } framework.Logf("finished initializing cluster state") - - return k8s.waitForHTTPServers(model) + if framework.NodeOSDistroIs("windows") { + protocols = append(protocols, v1.ProtocolTCP) + } else { + protocols = model.Protocols + } + return k8s.waitForHTTPServers(model, protocols) } diff --git a/test/e2e/network/netpol/probe.go b/test/e2e/network/netpol/probe.go index 479980c83fb..b406bf9be7d 100644 --- a/test/e2e/network/netpol/probe.go +++ b/test/e2e/network/netpol/probe.go @@ -41,7 +41,12 @@ type ProbeJobResults struct { // ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability` func ProbePodToPodConnectivity(k8s *kubeManager, model *Model, testCase *TestCase) { - numberOfWorkers := 3 // See https://github.com/kubernetes/kubernetes/pull/97690 + var numberOfWorkers int + if framework.NodeOSDistroIs("windows") { + numberOfWorkers = 1 // See https://github.com/kubernetes/kubernetes/pull/97690 + } else { + numberOfWorkers = 3 + } allPods := model.AllPods() size := len(allPods) * len(allPods) jobs := make(chan *ProbeJob, size) From 8651fcb25a7a432c429d00ffa975555ad002010f Mon Sep 17 00:00:00 2001 From: jay vyas Date: Tue, 25 May 2021 10:03:21 -0400 Subject: [PATCH 2/2] Implement a windows Netpol NewModel --- .../k8s.io/client-go/tools/auth/clientauth.go | 3 +- test/e2e/framework/exec_util.go | 4 +-- test/e2e/network/netpol/kubemanager.go | 27 ++++++-------- test/e2e/network/netpol/model.go | 35 +++++++++++++++++-- test/e2e/network/netpol/network_policy.go | 13 ++++--- test/e2e/network/netpol/probe.go | 14 +++----- 6 files changed, 57 insertions(+), 39 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/auth/clientauth.go b/staging/src/k8s.io/client-go/tools/auth/clientauth.go index 8526b50c565..4c24f79977f 100644 --- a/staging/src/k8s.io/client-go/tools/auth/clientauth.go +++ b/staging/src/k8s.io/client-go/tools/auth/clientauth.go @@ -66,8 +66,9 @@ package auth import ( "encoding/json" "io/ioutil" - restclient "k8s.io/client-go/rest" "os" + + restclient "k8s.io/client-go/rest" ) // Info holds Kubernetes API authorization config. It is intended diff --git a/test/e2e/framework/exec_util.go b/test/e2e/framework/exec_util.go index 3b8157771cd..4d96fb37b72 100644 --- a/test/e2e/framework/exec_util.go +++ b/test/e2e/framework/exec_util.go @@ -58,7 +58,6 @@ func (f *Framework) ExecWithOptions(options ExecOptions) (string, string, error) const tty = false - Logf("ExecWithOptions: Clientset creation ") req := f.ClientSet.CoreV1().RESTClient().Post(). Resource("pods"). Name(options.PodName). @@ -75,9 +74,8 @@ func (f *Framework) ExecWithOptions(options ExecOptions) (string, string, error) }, scheme.ParameterCodec) var stdout, stderr bytes.Buffer - - Logf("ExecWithOptions: execute(POST %s %s)", req.URL()) err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty) + if options.PreserveWhitespace { return stdout.String(), stderr.String(), err } diff --git a/test/e2e/network/netpol/kubemanager.go b/test/e2e/network/netpol/kubemanager.go index 834eabfafd4..8b27fc3502b 100644 --- a/test/e2e/network/netpol/kubemanager.go +++ b/test/e2e/network/netpol/kubemanager.go @@ -63,15 +63,9 @@ func (k *kubeManager) initializeCluster(model *Model) error { for _, pod := range ns.Pods { framework.Logf("creating/updating pod %s/%s", ns.Name, pod.Name) - thePod := pod.KubePod() - if framework.NodeOSDistroIs("windows") { - thePod.Spec.NodeSelector = map[string]string{ - "kubernetes.io/os": "windows", - } - } else { - framework.Logf("node distro is NOT WINDOWS !!!!!!!!!!!") - } - kubePod, err := k.createPod(thePod) + // note that we defer the logic of pod (i.e. node selector) specifics to the model + // which is aware of linux vs windows pods + kubePod, err := k.createPod(pod.KubePod()) if err != nil { return err } @@ -86,7 +80,6 @@ func (k *kubeManager) initializeCluster(model *Model) error { for _, podString := range model.AllPodStrings() { k8sPod, err := k.getPod(podString.Namespace(), podString.PodName()) - if err != nil { return err } @@ -119,16 +112,18 @@ func (k *kubeManager) getPod(ns string, name string) (*v1.Pod, error) { } // probeConnectivity execs into a pod and checks its connectivity to another pod.. -func (k *kubeManager) probeConnectivity(nsFrom string, podFrom string, containerFrom string, addrTo string, protocol v1.Protocol, toPort int) (bool, string, error) { +func (k *kubeManager) probeConnectivity(nsFrom string, podFrom string, containerFrom string, addrTo string, protocol v1.Protocol, toPort int, timeoutSeconds int) (bool, string, error) { port := strconv.Itoa(toPort) var cmd []string + timeout := fmt.Sprintf("--timeout=%vs", timeoutSeconds) + switch protocol { case v1.ProtocolSCTP: - cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=3s", "--protocol=sctp"} + cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=sctp"} case v1.ProtocolTCP: - cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=3s", "--protocol=tcp"} + cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=tcp"} case v1.ProtocolUDP: - cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=3s", "--protocol=udp"} + cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=udp"} if framework.NodeOSDistroIs("windows") { framework.Logf("probing UDP for windows may result in cluster instability for certain windows nodes with low CPU/Memory, depending on CRI version") } @@ -265,14 +260,14 @@ func (k *kubeManager) deleteNamespaces(namespaces []string) error { } // waitForHTTPServers waits for all webservers to be up, on all protocols sent in the input, and then validates them using the same probe logic as the rest of the suite. -func (k *kubeManager) waitForHTTPServers(model *Model, protocols []v1.Protocol) error { +func (k *kubeManager) waitForHTTPServers(model *Model) error { const maxTries = 10 framework.Logf("waiting for HTTP servers (ports 80 and 81) to become ready") testCases := map[string]*TestCase{} for _, port := range model.Ports { // Protocols is provided as input so that we can skip udp polling for windows - for _, protocol := range protocols { + for _, protocol := range model.Protocols { fromPort := 81 desc := fmt.Sprintf("%d->%d,%s", fromPort, port, protocol) testCases[desc] = &TestCase{ToPort: int(port), Protocol: protocol} diff --git a/test/e2e/network/netpol/model.go b/test/e2e/network/netpol/model.go index 84a39a9e61d..8ed96df7331 100644 --- a/test/e2e/network/netpol/model.go +++ b/test/e2e/network/netpol/model.go @@ -41,6 +41,29 @@ type Model struct { DNSDomain string } +// NewWindowsModel returns a model specific to windows testing. +func NewWindowsModel(namespaces []string, podNames []string, ports []int32, dnsDomain string) *Model { + return NewModel(namespaces, podNames, ports, []v1.Protocol{v1.ProtocolTCP}, dnsDomain) +} + +// GetProbeTimeoutSeconds returns a timeout for how long the probe should work before failing a check, and takes windows heuristics into account, where requests can take longer sometimes. +func (m *Model) GetProbeTimeoutSeconds() int { + timeoutSeconds := 1 + if framework.NodeOSDistroIs("windows") { + timeoutSeconds = 3 + } + return timeoutSeconds +} + +// GetWorkers returns the number of workers suggested to run when testing, taking windows heuristics into account, where parallel probing is flakier. +func (m *Model) GetWorkers() int { + numberOfWorkers := 3 + if framework.NodeOSDistroIs("windows") { + numberOfWorkers = 1 // See https://github.com/kubernetes/kubernetes/pull/97690 + } + return numberOfWorkers +} + // NewModel instantiates a model based on: // - namespaces // - pods @@ -190,10 +213,11 @@ func (p *Pod) LabelSelector() map[string]string { } } -// KubePod returns the kube pod +// KubePod returns the kube pod (will add label selectors for windows if needed). func (p *Pod) KubePod() *v1.Pod { zero := int64(0) - return &v1.Pod{ + + thePod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: p.Name, Labels: p.LabelSelector(), @@ -204,6 +228,13 @@ func (p *Pod) KubePod() *v1.Pod { Containers: p.ContainerSpecs(), }, } + + if framework.NodeOSDistroIs("windows") { + thePod.Spec.NodeSelector = map[string]string{ + "kubernetes.io/os": "windows", + } + } + return thePod } // QualifiedServiceAddress returns the address that can be used to hit a service from diff --git a/test/e2e/network/netpol/network_policy.go b/test/e2e/network/netpol/network_policy.go index 722b2b0c9dc..8d548362b76 100644 --- a/test/e2e/network/netpol/network_policy.go +++ b/test/e2e/network/netpol/network_policy.go @@ -1269,6 +1269,10 @@ func defaultModel(namespaces []string, dnsDomain string) *Model { if addSCTPContainers { protocols = append(protocols, v1.ProtocolSCTP) } + + if framework.NodeOSDistroIs("windows") { + return NewWindowsModel(namespaces, []string{"a", "b", "c"}, []int32{80, 81}, dnsDomain) + } return NewModel(namespaces, []string{"a", "b", "c"}, []int32{80, 81}, protocols, dnsDomain) } @@ -1312,7 +1316,6 @@ func initializeResourcesByFixedNS(f *framework.Framework) { // model derived from the framework. It then waits for the resources described by the model to be up and running // (i.e. all pods are ready and running in their namespaces). func initializeResources(f *framework.Framework) error { - var protocols []v1.Protocol _, _, _, model, k8s := getK8SModel(f) framework.Logf("initializing cluster: ensuring namespaces, deployments, and pods exist and are ready") @@ -1323,10 +1326,6 @@ func initializeResources(f *framework.Framework) error { } framework.Logf("finished initializing cluster state") - if framework.NodeOSDistroIs("windows") { - protocols = append(protocols, v1.ProtocolTCP) - } else { - protocols = model.Protocols - } - return k8s.waitForHTTPServers(model, protocols) + + return k8s.waitForHTTPServers(model) } diff --git a/test/e2e/network/netpol/probe.go b/test/e2e/network/netpol/probe.go index b406bf9be7d..8c9f2ee01be 100644 --- a/test/e2e/network/netpol/probe.go +++ b/test/e2e/network/netpol/probe.go @@ -41,18 +41,12 @@ type ProbeJobResults struct { // ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability` func ProbePodToPodConnectivity(k8s *kubeManager, model *Model, testCase *TestCase) { - var numberOfWorkers int - if framework.NodeOSDistroIs("windows") { - numberOfWorkers = 1 // See https://github.com/kubernetes/kubernetes/pull/97690 - } else { - numberOfWorkers = 3 - } allPods := model.AllPods() size := len(allPods) * len(allPods) jobs := make(chan *ProbeJob, size) results := make(chan *ProbeJobResults, size) - for i := 0; i < numberOfWorkers; i++ { - go probeWorker(k8s, jobs, results) + for i := 0; i < model.GetWorkers(); i++ { + go probeWorker(k8s, jobs, results, model.GetProbeTimeoutSeconds()) } for _, podFrom := range allPods { for _, podTo := range allPods { @@ -89,11 +83,11 @@ func ProbePodToPodConnectivity(k8s *kubeManager, model *Model, testCase *TestCas // probeWorker continues polling a pod connectivity status, until the incoming "jobs" channel is closed, and writes results back out to the "results" channel. // it only writes pass/fail status to a channel and has no failure side effects, this is by design since we do not want to fail inside a goroutine. -func probeWorker(k8s *kubeManager, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults) { +func probeWorker(k8s *kubeManager, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults, timeoutSeconds int) { defer ginkgo.GinkgoRecover() for job := range jobs { podFrom := job.PodFrom - connected, command, err := k8s.probeConnectivity(podFrom.Namespace, podFrom.Name, podFrom.Containers[0].Name(), job.PodTo.QualifiedServiceAddress(job.ToPodDNSDomain), job.Protocol, job.ToPort) + connected, command, err := k8s.probeConnectivity(podFrom.Namespace, podFrom.Name, podFrom.Containers[0].Name(), job.PodTo.QualifiedServiceAddress(job.ToPodDNSDomain), job.Protocol, job.ToPort, timeoutSeconds) result := &ProbeJobResults{ Job: job, IsConnected: connected,