Merge pull request #98077 from jayunit100/jay-netpol-win

NetworkPolicy E2E enablement for Windows
This commit is contained in:
Kubernetes Prow Robot 2021-05-26 06:35:21 -07:00 committed by GitHub
commit 494dc731d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 56 additions and 14 deletions

View File

@ -63,12 +63,14 @@ func (k *kubeManager) initializeCluster(model *Model) error {
for _, pod := range ns.Pods {
framework.Logf("creating/updating pod %s/%s", ns.Name, pod.Name)
// note that we defer the logic of pod (i.e. node selector) specifics to the model
// which is aware of linux vs windows pods
kubePod, err := k.createPod(pod.KubePod())
if err != nil {
return err
}
createdPods = append(createdPods, kubePod)
createdPods = append(createdPods, kubePod)
_, err = k.createService(pod.Service())
if err != nil {
return err
@ -110,16 +112,21 @@ func (k *kubeManager) getPod(ns string, name string) (*v1.Pod, error) {
}
// probeConnectivity execs into a pod and checks its connectivity to another pod..
func (k *kubeManager) probeConnectivity(nsFrom string, podFrom string, containerFrom string, addrTo string, protocol v1.Protocol, toPort int) (bool, string, error) {
func (k *kubeManager) probeConnectivity(nsFrom string, podFrom string, containerFrom string, addrTo string, protocol v1.Protocol, toPort int, timeoutSeconds int) (bool, string, error) {
port := strconv.Itoa(toPort)
var cmd []string
timeout := fmt.Sprintf("--timeout=%vs", timeoutSeconds)
switch protocol {
case v1.ProtocolSCTP:
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=1s", "--protocol=sctp"}
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=sctp"}
case v1.ProtocolTCP:
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=1s", "--protocol=tcp"}
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=tcp"}
case v1.ProtocolUDP:
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), "--timeout=1s", "--protocol=udp"}
cmd = []string{"/agnhost", "connect", net.JoinHostPort(addrTo, port), timeout, "--protocol=udp"}
if framework.NodeOSDistroIs("windows") {
framework.Logf("probing UDP for windows may result in cluster instability for certain windows nodes with low CPU/Memory, depending on CRI version")
}
default:
framework.Failf("protocol %s not supported", protocol)
}
@ -252,13 +259,14 @@ func (k *kubeManager) deleteNamespaces(namespaces []string) error {
return nil
}
// waitForHTTPServers waits for all webservers to be up, on all protocols, and then validates them using the same probe logic as the rest of the suite.
// waitForHTTPServers waits for all webservers to be up, on all protocols sent in the input, and then validates them using the same probe logic as the rest of the suite.
func (k *kubeManager) waitForHTTPServers(model *Model) error {
const maxTries = 10
framework.Logf("waiting for HTTP servers (ports 80 and 81) to become ready")
testCases := map[string]*TestCase{}
for _, port := range model.Ports {
// Protocols is provided as input so that we can skip udp polling for windows
for _, protocol := range model.Protocols {
fromPort := 81
desc := fmt.Sprintf("%d->%d,%s", fromPort, port, protocol)

View File

@ -41,6 +41,29 @@ type Model struct {
DNSDomain string
}
// NewWindowsModel returns a model specific to windows testing.
func NewWindowsModel(namespaces []string, podNames []string, ports []int32, dnsDomain string) *Model {
return NewModel(namespaces, podNames, ports, []v1.Protocol{v1.ProtocolTCP}, dnsDomain)
}
// GetProbeTimeoutSeconds returns a timeout for how long the probe should work before failing a check, and takes windows heuristics into account, where requests can take longer sometimes.
func (m *Model) GetProbeTimeoutSeconds() int {
timeoutSeconds := 1
if framework.NodeOSDistroIs("windows") {
timeoutSeconds = 3
}
return timeoutSeconds
}
// GetWorkers returns the number of workers suggested to run when testing, taking windows heuristics into account, where parallel probing is flakier.
func (m *Model) GetWorkers() int {
numberOfWorkers := 3
if framework.NodeOSDistroIs("windows") {
numberOfWorkers = 1 // See https://github.com/kubernetes/kubernetes/pull/97690
}
return numberOfWorkers
}
// NewModel instantiates a model based on:
// - namespaces
// - pods
@ -190,10 +213,11 @@ func (p *Pod) LabelSelector() map[string]string {
}
}
// KubePod returns the kube pod
// KubePod returns the kube pod (will add label selectors for windows if needed).
func (p *Pod) KubePod() *v1.Pod {
zero := int64(0)
return &v1.Pod{
thePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: p.Name,
Labels: p.LabelSelector(),
@ -204,6 +228,13 @@ func (p *Pod) KubePod() *v1.Pod {
Containers: p.ContainerSpecs(),
},
}
if framework.NodeOSDistroIs("windows") {
thePod.Spec.NodeSelector = map[string]string{
"kubernetes.io/os": "windows",
}
}
return thePod
}
// QualifiedServiceAddress returns the address that can be used to hit a service from

View File

@ -117,7 +117,7 @@ and what is happening in practice:
z/c . . . . . . . . .
*/
var _ = common.SIGDescribe("Netpol [LinuxOnly]", func() {
var _ = common.SIGDescribe("Netpol", func() {
f := framework.NewDefaultFramework("netpol")
ginkgo.Context("NetworkPolicy between server and client", func() {
@ -1269,6 +1269,10 @@ func defaultModel(namespaces []string, dnsDomain string) *Model {
if addSCTPContainers {
protocols = append(protocols, v1.ProtocolSCTP)
}
if framework.NodeOSDistroIs("windows") {
return NewWindowsModel(namespaces, []string{"a", "b", "c"}, []int32{80, 81}, dnsDomain)
}
return NewModel(namespaces, []string{"a", "b", "c"}, []int32{80, 81}, protocols, dnsDomain)
}

View File

@ -41,13 +41,12 @@ type ProbeJobResults struct {
// ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability`
func ProbePodToPodConnectivity(k8s *kubeManager, model *Model, testCase *TestCase) {
numberOfWorkers := 3 // See https://github.com/kubernetes/kubernetes/pull/97690
allPods := model.AllPods()
size := len(allPods) * len(allPods)
jobs := make(chan *ProbeJob, size)
results := make(chan *ProbeJobResults, size)
for i := 0; i < numberOfWorkers; i++ {
go probeWorker(k8s, jobs, results)
for i := 0; i < model.GetWorkers(); i++ {
go probeWorker(k8s, jobs, results, model.GetProbeTimeoutSeconds())
}
for _, podFrom := range allPods {
for _, podTo := range allPods {
@ -84,11 +83,11 @@ func ProbePodToPodConnectivity(k8s *kubeManager, model *Model, testCase *TestCas
// probeWorker continues polling a pod connectivity status, until the incoming "jobs" channel is closed, and writes results back out to the "results" channel.
// it only writes pass/fail status to a channel and has no failure side effects, this is by design since we do not want to fail inside a goroutine.
func probeWorker(k8s *kubeManager, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults) {
func probeWorker(k8s *kubeManager, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults, timeoutSeconds int) {
defer ginkgo.GinkgoRecover()
for job := range jobs {
podFrom := job.PodFrom
connected, command, err := k8s.probeConnectivity(podFrom.Namespace, podFrom.Name, podFrom.Containers[0].Name(), job.PodTo.QualifiedServiceAddress(job.ToPodDNSDomain), job.Protocol, job.ToPort)
connected, command, err := k8s.probeConnectivity(podFrom.Namespace, podFrom.Name, podFrom.Containers[0].Name(), job.PodTo.QualifiedServiceAddress(job.ToPodDNSDomain), job.Protocol, job.ToPort, timeoutSeconds)
result := &ProbeJobResults{
Job: job,
IsConnected: connected,