Implement a windows Netpol NewModel

This commit is contained in:
jay vyas 2021-05-25 10:03:21 -04:00
parent 69019a03d3
commit 8651fcb25a
6 changed files with 57 additions and 39 deletions

View File

@ -66,8 +66,9 @@ package auth
import ( import (
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
restclient "k8s.io/client-go/rest"
"os" "os"
restclient "k8s.io/client-go/rest"
) )
// Info holds Kubernetes API authorization config. It is intended // Info holds Kubernetes API authorization config. It is intended

View File

@ -58,7 +58,6 @@ func (f *Framework) ExecWithOptions(options ExecOptions) (string, string, error)
const tty = false const tty = false
Logf("ExecWithOptions: Clientset creation ")
req := f.ClientSet.CoreV1().RESTClient().Post(). req := f.ClientSet.CoreV1().RESTClient().Post().
Resource("pods"). Resource("pods").
Name(options.PodName). Name(options.PodName).
@ -75,9 +74,8 @@ func (f *Framework) ExecWithOptions(options ExecOptions) (string, string, error)
}, scheme.ParameterCodec) }, scheme.ParameterCodec)
var stdout, stderr bytes.Buffer var stdout, stderr bytes.Buffer
Logf("ExecWithOptions: execute(POST %s %s)", req.URL())
err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty) err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty)
if options.PreserveWhitespace { if options.PreserveWhitespace {
return stdout.String(), stderr.String(), err return stdout.String(), stderr.String(), err
} }

View File

@ -63,15 +63,9 @@ func (k *kubeManager) initializeCluster(model *Model) error {
for _, pod := range ns.Pods { for _, pod := range ns.Pods {
framework.Logf("creating/updating pod %s/%s", ns.Name, pod.Name) framework.Logf("creating/updating pod %s/%s", ns.Name, pod.Name)
thePod := pod.KubePod() // note that we defer the logic of pod (i.e. node selector) specifics to the model
if framework.NodeOSDistroIs("windows") { // which is aware of linux vs windows pods
thePod.Spec.NodeSelector = map[string]string{ kubePod, err := k.createPod(pod.KubePod())
"kubernetes.io/os": "windows",
}
} else {
framework.Logf("node distro is NOT WINDOWS !!!!!!!!!!!")
}
kubePod, err := k.createPod(thePod)
if err != nil { if err != nil {
return err return err
} }
@ -86,7 +80,6 @@ func (k *kubeManager) initializeCluster(model *Model) error {
for _, podString := range model.AllPodStrings() { for _, podString := range model.AllPodStrings() {
k8sPod, err := k.getPod(podString.Namespace(), podString.PodName()) k8sPod, err := k.getPod(podString.Namespace(), podString.PodName())
if err != nil { if err != nil {
return err return err
} }
@ -119,16 +112,18 @@ func (k *kubeManager) getPod(ns string, name string) (*v1.Pod, error) {
} }
// probeConnectivity execs into a pod and checks its connectivity to another pod.. // probeConnectivity execs into a pod and checks its connectivity to another pod..
func (k *kubeManager) probeConnectivity(nsFrom string, podFrom string, containerFrom string, addrTo string, protocol v1.Protocol, toPort int) (bool, string, error) { func (k *kubeManager) probeConnectivity(nsFrom string, podFrom string, containerFrom string, addrTo string, protocol v1.Protocol, toPort int, timeoutSeconds int) (bool, string, error) {
port := strconv.Itoa(toPort) port := strconv.Itoa(toPort)
var cmd []string var cmd []string
timeout := fmt.Sprintf("--timeout=%vs", timeoutSeconds)
switch protocol { switch protocol {
case v1.ProtocolSCTP: case v1.ProtocolSCTP:
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=3s", "--protocol=sctp"} cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=sctp"}
case v1.ProtocolTCP: case v1.ProtocolTCP:
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=3s", "--protocol=tcp"} cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=tcp"}
case v1.ProtocolUDP: case v1.ProtocolUDP:
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=3s", "--protocol=udp"} cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=udp"}
if framework.NodeOSDistroIs("windows") { if framework.NodeOSDistroIs("windows") {
framework.Logf("probing UDP for windows may result in cluster instability for certain windows nodes with low CPU/Memory, depending on CRI version") framework.Logf("probing UDP for windows may result in cluster instability for certain windows nodes with low CPU/Memory, depending on CRI version")
} }
@ -265,14 +260,14 @@ func (k *kubeManager) deleteNamespaces(namespaces []string) error {
} }
// waitForHTTPServers waits for all webservers to be up, on all protocols sent in the input, and then validates them using the same probe logic as the rest of the suite. // waitForHTTPServers waits for all webservers to be up, on all protocols sent in the input, and then validates them using the same probe logic as the rest of the suite.
func (k *kubeManager) waitForHTTPServers(model *Model, protocols []v1.Protocol) error { func (k *kubeManager) waitForHTTPServers(model *Model) error {
const maxTries = 10 const maxTries = 10
framework.Logf("waiting for HTTP servers (ports 80 and 81) to become ready") framework.Logf("waiting for HTTP servers (ports 80 and 81) to become ready")
testCases := map[string]*TestCase{} testCases := map[string]*TestCase{}
for _, port := range model.Ports { for _, port := range model.Ports {
// Protocols is provided as input so that we can skip udp polling for windows // Protocols is provided as input so that we can skip udp polling for windows
for _, protocol := range protocols { for _, protocol := range model.Protocols {
fromPort := 81 fromPort := 81
desc := fmt.Sprintf("%d->%d,%s", fromPort, port, protocol) desc := fmt.Sprintf("%d->%d,%s", fromPort, port, protocol)
testCases[desc] = &TestCase{ToPort: int(port), Protocol: protocol} testCases[desc] = &TestCase{ToPort: int(port), Protocol: protocol}

View File

@ -41,6 +41,29 @@ type Model struct {
DNSDomain string DNSDomain string
} }
// NewWindowsModel returns a model specific to windows testing.
func NewWindowsModel(namespaces []string, podNames []string, ports []int32, dnsDomain string) *Model {
return NewModel(namespaces, podNames, ports, []v1.Protocol{v1.ProtocolTCP}, dnsDomain)
}
// GetProbeTimeoutSeconds returns a timeout for how long the probe should work before failing a check, and takes windows heuristics into account, where requests can take longer sometimes.
func (m *Model) GetProbeTimeoutSeconds() int {
timeoutSeconds := 1
if framework.NodeOSDistroIs("windows") {
timeoutSeconds = 3
}
return timeoutSeconds
}
// GetWorkers returns the number of workers suggested to run when testing, taking windows heuristics into account, where parallel probing is flakier.
func (m *Model) GetWorkers() int {
numberOfWorkers := 3
if framework.NodeOSDistroIs("windows") {
numberOfWorkers = 1 // See https://github.com/kubernetes/kubernetes/pull/97690
}
return numberOfWorkers
}
// NewModel instantiates a model based on: // NewModel instantiates a model based on:
// - namespaces // - namespaces
// - pods // - pods
@ -190,10 +213,11 @@ func (p *Pod) LabelSelector() map[string]string {
} }
} }
// KubePod returns the kube pod // KubePod returns the kube pod (will add label selectors for windows if needed).
func (p *Pod) KubePod() *v1.Pod { func (p *Pod) KubePod() *v1.Pod {
zero := int64(0) zero := int64(0)
return &v1.Pod{
thePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: p.Name, Name: p.Name,
Labels: p.LabelSelector(), Labels: p.LabelSelector(),
@ -204,6 +228,13 @@ func (p *Pod) KubePod() *v1.Pod {
Containers: p.ContainerSpecs(), Containers: p.ContainerSpecs(),
}, },
} }
if framework.NodeOSDistroIs("windows") {
thePod.Spec.NodeSelector = map[string]string{
"kubernetes.io/os": "windows",
}
}
return thePod
} }
// QualifiedServiceAddress returns the address that can be used to hit a service from // QualifiedServiceAddress returns the address that can be used to hit a service from

View File

@ -1269,6 +1269,10 @@ func defaultModel(namespaces []string, dnsDomain string) *Model {
if addSCTPContainers { if addSCTPContainers {
protocols = append(protocols, v1.ProtocolSCTP) protocols = append(protocols, v1.ProtocolSCTP)
} }
if framework.NodeOSDistroIs("windows") {
return NewWindowsModel(namespaces, []string{"a", "b", "c"}, []int32{80, 81}, dnsDomain)
}
return NewModel(namespaces, []string{"a", "b", "c"}, []int32{80, 81}, protocols, dnsDomain) return NewModel(namespaces, []string{"a", "b", "c"}, []int32{80, 81}, protocols, dnsDomain)
} }
@ -1312,7 +1316,6 @@ func initializeResourcesByFixedNS(f *framework.Framework) {
// model derived from the framework. It then waits for the resources described by the model to be up and running // model derived from the framework. It then waits for the resources described by the model to be up and running
// (i.e. all pods are ready and running in their namespaces). // (i.e. all pods are ready and running in their namespaces).
func initializeResources(f *framework.Framework) error { func initializeResources(f *framework.Framework) error {
var protocols []v1.Protocol
_, _, _, model, k8s := getK8SModel(f) _, _, _, model, k8s := getK8SModel(f)
framework.Logf("initializing cluster: ensuring namespaces, deployments, and pods exist and are ready") framework.Logf("initializing cluster: ensuring namespaces, deployments, and pods exist and are ready")
@ -1323,10 +1326,6 @@ func initializeResources(f *framework.Framework) error {
} }
framework.Logf("finished initializing cluster state") framework.Logf("finished initializing cluster state")
if framework.NodeOSDistroIs("windows") {
protocols = append(protocols, v1.ProtocolTCP) return k8s.waitForHTTPServers(model)
} else {
protocols = model.Protocols
}
return k8s.waitForHTTPServers(model, protocols)
} }

View File

@ -41,18 +41,12 @@ type ProbeJobResults struct {
// ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability` // ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability`
func ProbePodToPodConnectivity(k8s *kubeManager, model *Model, testCase *TestCase) { func ProbePodToPodConnectivity(k8s *kubeManager, model *Model, testCase *TestCase) {
var numberOfWorkers int
if framework.NodeOSDistroIs("windows") {
numberOfWorkers = 1 // See https://github.com/kubernetes/kubernetes/pull/97690
} else {
numberOfWorkers = 3
}
allPods := model.AllPods() allPods := model.AllPods()
size := len(allPods) * len(allPods) size := len(allPods) * len(allPods)
jobs := make(chan *ProbeJob, size) jobs := make(chan *ProbeJob, size)
results := make(chan *ProbeJobResults, size) results := make(chan *ProbeJobResults, size)
for i := 0; i < numberOfWorkers; i++ { for i := 0; i < model.GetWorkers(); i++ {
go probeWorker(k8s, jobs, results) go probeWorker(k8s, jobs, results, model.GetProbeTimeoutSeconds())
} }
for _, podFrom := range allPods { for _, podFrom := range allPods {
for _, podTo := range allPods { for _, podTo := range allPods {
@ -89,11 +83,11 @@ func ProbePodToPodConnectivity(k8s *kubeManager, model *Model, testCase *TestCas
// probeWorker continues polling a pod connectivity status, until the incoming "jobs" channel is closed, and writes results back out to the "results" channel. // probeWorker continues polling a pod connectivity status, until the incoming "jobs" channel is closed, and writes results back out to the "results" channel.
// it only writes pass/fail status to a channel and has no failure side effects, this is by design since we do not want to fail inside a goroutine. // it only writes pass/fail status to a channel and has no failure side effects, this is by design since we do not want to fail inside a goroutine.
func probeWorker(k8s *kubeManager, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults) { func probeWorker(k8s *kubeManager, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults, timeoutSeconds int) {
defer ginkgo.GinkgoRecover() defer ginkgo.GinkgoRecover()
for job := range jobs { for job := range jobs {
podFrom := job.PodFrom podFrom := job.PodFrom
connected, command, err := k8s.probeConnectivity(podFrom.Namespace, podFrom.Name, podFrom.Containers[0].Name(), job.PodTo.QualifiedServiceAddress(job.ToPodDNSDomain), job.Protocol, job.ToPort) connected, command, err := k8s.probeConnectivity(podFrom.Namespace, podFrom.Name, podFrom.Containers[0].Name(), job.PodTo.QualifiedServiceAddress(job.ToPodDNSDomain), job.Protocol, job.ToPort, timeoutSeconds)
result := &ProbeJobResults{ result := &ProbeJobResults{
Job: job, Job: job,
IsConnected: connected, IsConnected: connected,