Merge pull request #97571 from mattfenwick/issue-97425

97425: improve netpol comments
This commit is contained in:
Kubernetes Prow Robot 2021-01-06 13:05:52 -08:00 committed by GitHub
commit 24f4fe7c8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 147 deletions

View File

@ -5,7 +5,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"k8s_util.go", "kubemanager.go",
"model.go", "model.go",
"network_policy.go", "network_policy.go",
"policies.go", "policies.go",

View File

@ -20,7 +20,6 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -33,28 +32,28 @@ import (
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
) )
// Scenario provides a convenience interface to kube functionality that we leverage for polling NetworkPolicy connections. // kubeManager provides a convenience interface to kube functionality that we leverage for polling NetworkPolicy connections.
type Scenario struct { // Its responsibilities are:
mutex sync.Mutex // - creating resources (pods, deployments, namespaces, services, network policies)
podCache map[string][]v1.Pod // - modifying and cleaning up resources
type kubeManager struct {
framework *framework.Framework framework *framework.Framework
ClientSet clientset.Interface clientSet clientset.Interface
} }
// NewScenario is a utility function that wraps creation of the stuff we're using for creating a cluster that expresses the global policy scenario we're testing. // newKubeManager is a utility function that wraps creation of the kubeManager instance.
func NewScenario(framework *framework.Framework) *Scenario { func newKubeManager(framework *framework.Framework) *kubeManager {
return &Scenario{ return &kubeManager{
podCache: map[string][]v1.Pod{},
framework: framework, framework: framework,
ClientSet: framework.ClientSet, clientSet: framework.ClientSet,
} }
} }
// InitializeCluster checks the state of the cluster, creating or updating namespaces and deployments as needed // initializeCluster checks the state of the cluster, creating or updating namespaces and deployments as needed.
func (k *Scenario) InitializeCluster(model *Model) error { func (k *kubeManager) initializeCluster(model *Model) error {
var createdPods []*v1.Pod var createdPods []*v1.Pod
for _, ns := range model.Namespaces { for _, ns := range model.Namespaces {
_, err := k.CreateNamespace(ns.Spec()) _, err := k.createNamespace(ns.Spec())
if err != nil { if err != nil {
return err return err
} }
@ -62,13 +61,13 @@ func (k *Scenario) InitializeCluster(model *Model) error {
for _, pod := range ns.Pods { for _, pod := range ns.Pods {
framework.Logf("creating/updating pod %s/%s", ns.Name, pod.Name) framework.Logf("creating/updating pod %s/%s", ns.Name, pod.Name)
kubePod, err := k.CreatePod(pod.KubePod()) kubePod, err := k.createPod(pod.KubePod())
if err != nil { if err != nil {
return err return err
} }
createdPods = append(createdPods, kubePod) createdPods = append(createdPods, kubePod)
_, err = k.CreateService(pod.Service()) _, err = k.createService(pod.Service())
if err != nil { if err != nil {
return err return err
} }
@ -76,21 +75,21 @@ func (k *Scenario) InitializeCluster(model *Model) error {
} }
for _, podString := range model.AllPodStrings() { for _, podString := range model.AllPodStrings() {
k8sPod, err := k.GetPodFromCache(podString.Namespace(), podString.PodName()) k8sPod, err := k.getPod(podString.Namespace(), podString.PodName())
if err != nil { if err != nil {
return err return err
} }
if k8sPod == nil { if k8sPod == nil {
return errors.Errorf("unable to find pod in ns %s with key/val pod=%s", podString.Namespace(), podString.PodName()) return errors.Errorf("unable to find pod in ns %s with key/val pod=%s", podString.Namespace(), podString.PodName())
} }
err = e2epod.WaitForPodNameRunningInNamespace(k.ClientSet, k8sPod.Name, k8sPod.Namespace) err = e2epod.WaitForPodNameRunningInNamespace(k.clientSet, k8sPod.Name, k8sPod.Namespace)
if err != nil { if err != nil {
return errors.Wrapf(err, "unable to wait for pod %s/%s", podString.Namespace(), podString.PodName()) return errors.Wrapf(err, "unable to wait for pod %s/%s", podString.Namespace(), podString.PodName())
} }
} }
for _, createdPod := range createdPods { for _, createdPod := range createdPods {
err := e2epod.WaitForPodRunningInNamespace(k.ClientSet, createdPod) err := e2epod.WaitForPodRunningInNamespace(k.clientSet, createdPod)
if err != nil { if err != nil {
return errors.Wrapf(err, "unable to wait for pod %s/%s", createdPod.Namespace, createdPod.Name) return errors.Wrapf(err, "unable to wait for pod %s/%s", createdPod.Namespace, createdPod.Name)
} }
@ -99,69 +98,17 @@ func (k *Scenario) InitializeCluster(model *Model) error {
return nil return nil
} }
// GetPodFromCache returns a pod with the matching namespace and name // getPod gets a pod by namespace and name.
func (k *Scenario) GetPodFromCache(ns string, name string) (*v1.Pod, error) { func (k *kubeManager) getPod(ns string, name string) (*v1.Pod, error) {
pods, err := k.getPodsUncached(ns, "pod", name) kubePod, err := k.clientSet.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if len(pods) == 0 {
return nil, nil
}
return &pods[0], nil
}
func (k *Scenario) getPodsUncached(ns string, key string, val string) ([]v1.Pod, error) {
v1PodList, err := k.ClientSet.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("%v=%v", key, val),
})
if err != nil {
return nil, errors.Wrapf(err, "unable to list Pods in ns %s with key/val %s=%s", ns, key, val)
}
return v1PodList.Items, nil
}
// GetPodsFromCacheByKeyVal returns an array of all Pods in the given namespace having a k/v label pair.
func (k *Scenario) GetPodsFromCacheByKeyVal(ns string, key string, val string) ([]v1.Pod, error) {
k.mutex.Lock()
p, ok := k.podCache[fmt.Sprintf("%v_%v_%v", ns, key, val)]
k.mutex.Unlock()
if ok {
return p, nil
}
v1PodList, err := k.getPodsUncached(ns, key, val)
if err != nil {
return nil, err
}
k.mutex.Lock()
k.podCache[fmt.Sprintf("%v_%v_%v", ns, key, val)] = v1PodList
k.mutex.Unlock()
return v1PodList, nil
}
// GetPod gets a pod by namespace and name
func (k *Scenario) GetPod(ns string, name string) (*v1.Pod, error) {
kubePod, err := k.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "unable to get pod %s/%s", ns, name) return nil, errors.Wrapf(err, "unable to get pod %s/%s", ns, name)
} }
return kubePod, nil return kubePod, nil
} }
// Probe execs into a pod and checks its connectivity to another pod. // probeConnectivity execs into a pod and checks its connectivity to another pod..
func (k *Scenario) Probe(nsFrom string, podFrom string, containerFrom string, addrTo string, protocol v1.Protocol, toPort int) (bool, string, error) { func (k *kubeManager) probeConnectivity(nsFrom string, podFrom string, containerFrom string, addrTo string, protocol v1.Protocol, toPort int) (bool, string, error) {
fromPods, err := k.GetPodsFromCacheByKeyVal(nsFrom, "pod", podFrom)
if err != nil {
return false, "", err
}
if len(fromPods) == 0 {
return false, "", errors.Errorf("pod %s/%s not found", nsFrom, podFrom)
}
fromPod := fromPods[0]
var cmd []string var cmd []string
switch protocol { switch protocol {
case v1.ProtocolSCTP: case v1.ProtocolSCTP:
@ -174,8 +121,8 @@ func (k *Scenario) Probe(nsFrom string, podFrom string, containerFrom string, ad
framework.Failf("protocol %s not supported", protocol) framework.Failf("protocol %s not supported", protocol)
} }
commandDebugString := fmt.Sprintf("kubectl exec %s -c %s -n %s -- %s", fromPod.Name, containerFrom, fromPod.Namespace, strings.Join(cmd, " ")) commandDebugString := fmt.Sprintf("kubectl exec %s -c %s -n %s -- %s", podFrom, containerFrom, nsFrom, strings.Join(cmd, " "))
stdout, stderr, err := k.ExecuteRemoteCommand(fromPod, containerFrom, cmd) stdout, stderr, err := k.executeRemoteCommand(nsFrom, podFrom, containerFrom, cmd)
if err != nil { if err != nil {
framework.Logf("%s/%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", nsFrom, podFrom, addrTo, err, stdout, stderr) framework.Logf("%s/%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", nsFrom, podFrom, addrTo, err, stdout, stderr)
return false, commandDebugString, nil return false, commandDebugString, nil
@ -183,65 +130,64 @@ func (k *Scenario) Probe(nsFrom string, podFrom string, containerFrom string, ad
return true, commandDebugString, nil return true, commandDebugString, nil
} }
// ExecuteRemoteCommand executes a remote shell command on the given pod // executeRemoteCommand executes a remote shell command on the given pod.
func (k *Scenario) ExecuteRemoteCommand(pod v1.Pod, containerName string, command []string) (string, string, error) { func (k *kubeManager) executeRemoteCommand(namespace string, pod string, containerName string, command []string) (string, string, error) {
return k.framework.ExecWithOptions(framework.ExecOptions{ return k.framework.ExecWithOptions(framework.ExecOptions{
Command: command, Command: command,
Namespace: pod.Namespace, Namespace: namespace,
PodName: pod.Name, PodName: pod,
ContainerName: containerName, ContainerName: containerName,
Stdin: nil, Stdin: nil,
CaptureStdout: true, CaptureStdout: true,
CaptureStderr: true, CaptureStderr: true,
PreserveWhitespace: false, PreserveWhitespace: false,
}) })
} }
// CreateNamespace is a convenience function for namespace setup // createNamespace is a convenience function for namespace setup.
func (k *Scenario) CreateNamespace(ns *v1.Namespace) (*v1.Namespace, error) { func (k *kubeManager) createNamespace(ns *v1.Namespace) (*v1.Namespace, error) {
createdNamespace, err := k.ClientSet.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) createdNamespace, err := k.clientSet.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{})
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "unable to update namespace %s", ns.Name) return nil, errors.Wrapf(err, "unable to update namespace %s", ns.Name)
} }
return createdNamespace, nil return createdNamespace, nil
} }
// CreateService is a convenience function for service setup // createService is a convenience function for service setup.
func (k *Scenario) CreateService(service *v1.Service) (*v1.Service, error) { func (k *kubeManager) createService(service *v1.Service) (*v1.Service, error) {
ns := service.Namespace ns := service.Namespace
name := service.Name name := service.Name
createdService, err := k.ClientSet.CoreV1().Services(ns).Create(context.TODO(), service, metav1.CreateOptions{}) createdService, err := k.clientSet.CoreV1().Services(ns).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "unable to create service %s/%s", ns, name) return nil, errors.Wrapf(err, "unable to create service %s/%s", ns, name)
} }
return createdService, nil return createdService, nil
} }
// CreatePod is a convenience function for pod setup // createPod is a convenience function for pod setup.
func (k *Scenario) CreatePod(pod *v1.Pod) (*v1.Pod, error) { func (k *kubeManager) createPod(pod *v1.Pod) (*v1.Pod, error) {
ns := pod.Namespace ns := pod.Namespace
framework.Logf("creating pod %s/%s", ns, pod.Name) framework.Logf("creating pod %s/%s", ns, pod.Name)
createdPod, err := k.ClientSet.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{}) createdPod, err := k.clientSet.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "unable to update pod %s/%s", ns, pod.Name) return nil, errors.Wrapf(err, "unable to update pod %s/%s", ns, pod.Name)
} }
return createdPod, nil return createdPod, nil
} }
// CleanNetworkPolicies is a convenience function for deleting network policies before startup of any new test. // cleanNetworkPolicies is a convenience function for deleting network policies before startup of any new test.
func (k *Scenario) CleanNetworkPolicies(namespaces []string) error { func (k *kubeManager) cleanNetworkPolicies(namespaces []string) error {
for _, ns := range namespaces { for _, ns := range namespaces {
framework.Logf("deleting policies in %s ..........", ns) framework.Logf("deleting policies in %s ..........", ns)
l, err := k.ClientSet.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), metav1.ListOptions{}) l, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil { if err != nil {
return errors.Wrapf(err, "unable to list network policies in ns %s", ns) return errors.Wrapf(err, "unable to list network policies in ns %s", ns)
} }
for _, np := range l.Items { for _, np := range l.Items {
framework.Logf("deleting network policy %s/%s", ns, np.Name) framework.Logf("deleting network policy %s/%s", ns, np.Name)
err = k.ClientSet.NetworkingV1().NetworkPolicies(ns).Delete(context.TODO(), np.Name, metav1.DeleteOptions{}) err = k.clientSet.NetworkingV1().NetworkPolicies(ns).Delete(context.TODO(), np.Name, metav1.DeleteOptions{})
if err != nil { if err != nil {
return errors.Wrapf(err, "unable to delete network policy %s/%s", ns, np.Name) return errors.Wrapf(err, "unable to delete network policy %s/%s", ns, np.Name)
} }
@ -250,58 +196,52 @@ func (k *Scenario) CleanNetworkPolicies(namespaces []string) error {
return nil return nil
} }
// ClearCache clears the kube pod cache // createNetworkPolicy is a convenience function for creating network policies.
func (k *Scenario) ClearCache() { func (k *kubeManager) createNetworkPolicy(ns string, netpol *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
framework.Logf("Clearing pod cache")
k.mutex.Lock()
k.podCache = map[string][]v1.Pod{}
k.mutex.Unlock()
framework.Logf("Pod cache successfully cleared")
}
// CreateNetworkPolicy is a convenience function for creating netpols
func (k *Scenario) CreateNetworkPolicy(ns string, netpol *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
framework.Logf("creating network policy %s/%s", ns, netpol.Name) framework.Logf("creating network policy %s/%s", ns, netpol.Name)
netpol.ObjectMeta.Namespace = ns netpol.ObjectMeta.Namespace = ns
np, err := k.ClientSet.NetworkingV1().NetworkPolicies(ns).Create(context.TODO(), netpol, metav1.CreateOptions{}) np, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).Create(context.TODO(), netpol, metav1.CreateOptions{})
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "unable to create network policy %s/%s", ns, netpol.Name) return nil, errors.Wrapf(err, "unable to create network policy %s/%s", ns, netpol.Name)
} }
return np, nil return np, nil
} }
// UpdateNetworkPolicy is a convenience function for updating netpols // updateNetworkPolicy is a convenience function for updating network policies.
func (k *Scenario) UpdateNetworkPolicy(ns string, netpol *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) { func (k *kubeManager) updateNetworkPolicy(ns string, netpol *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
framework.Logf("updating network policy %s/%s", ns, netpol.Name) framework.Logf("updating network policy %s/%s", ns, netpol.Name)
netpol.ObjectMeta.Namespace = ns netpol.ObjectMeta.Namespace = ns
np, err := k.ClientSet.NetworkingV1().NetworkPolicies(ns).Update(context.TODO(), netpol, metav1.UpdateOptions{}) np, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).Update(context.TODO(), netpol, metav1.UpdateOptions{})
if err != nil { if err != nil {
return np, errors.Wrapf(err, "unable to update network policy %s/%s", ns, netpol.Name) return np, errors.Wrapf(err, "unable to update network policy %s/%s", ns, netpol.Name)
} }
return np, nil return np, nil
} }
func (k *Scenario) getNamespace(ns string) (*v1.Namespace, error) { // getNamespace gets a namespace object from kubernetes.
selectedNameSpace, err := k.ClientSet.CoreV1().Namespaces().Get(context.TODO(), ns, metav1.GetOptions{}) func (k *kubeManager) getNamespace(ns string) (*v1.Namespace, error) {
selectedNameSpace, err := k.clientSet.CoreV1().Namespaces().Get(context.TODO(), ns, metav1.GetOptions{})
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "unable to get namespace %s", ns) return nil, errors.Wrapf(err, "unable to get namespace %s", ns)
} }
return selectedNameSpace, nil return selectedNameSpace, nil
} }
func (k *Scenario) setNamespaceLabels(ns string, labels map[string]string) error { // setNamespaceLabels sets the labels for a namespace object in kubernetes.
func (k *kubeManager) setNamespaceLabels(ns string, labels map[string]string) error {
selectedNameSpace, err := k.getNamespace(ns) selectedNameSpace, err := k.getNamespace(ns)
if err != nil { if err != nil {
return err return err
} }
selectedNameSpace.ObjectMeta.Labels = labels selectedNameSpace.ObjectMeta.Labels = labels
_, err = k.ClientSet.CoreV1().Namespaces().Update(context.TODO(), selectedNameSpace, metav1.UpdateOptions{}) _, err = k.clientSet.CoreV1().Namespaces().Update(context.TODO(), selectedNameSpace, metav1.UpdateOptions{})
return errors.Wrapf(err, "unable to update namespace %s", ns) return errors.Wrapf(err, "unable to update namespace %s", ns)
} }
func (k *Scenario) deleteNamespaces(namespaces []string) error { // deleteNamespaces removes a namespace from kubernetes.
func (k *kubeManager) deleteNamespaces(namespaces []string) error {
for _, ns := range namespaces { for _, ns := range namespaces {
err := k.ClientSet.CoreV1().Namespaces().Delete(context.TODO(), ns, metav1.DeleteOptions{}) err := k.clientSet.CoreV1().Namespaces().Delete(context.TODO(), ns, metav1.DeleteOptions{})
if err != nil { if err != nil {
return errors.Wrapf(err, "unable to delete namespace %s", ns) return errors.Wrapf(err, "unable to delete namespace %s", ns)
} }
@ -310,7 +250,7 @@ func (k *Scenario) deleteNamespaces(namespaces []string) error {
} }
// waitForHTTPServers waits for all webservers to be up, on all protocols, and then validates them using the same probe logic as the rest of the suite. // waitForHTTPServers waits for all webservers to be up, on all protocols, and then validates them using the same probe logic as the rest of the suite.
func (k *Scenario) waitForHTTPServers(model *Model) error { func (k *kubeManager) waitForHTTPServers(model *Model) error {
const maxTries = 10 const maxTries = 10
framework.Logf("waiting for HTTP servers (ports 80 and 81) to become ready") framework.Logf("waiting for HTTP servers (ports 80 and 81) to become ready")

View File

@ -122,10 +122,10 @@ var _ = SIGDescribeCopy("Netpol [LinuxOnly]", func() {
_ = initializeResources(f) _ = initializeResources(f)
_, _, _, model, k8s := getK8SModel(f) _, _, _, model, k8s := getK8SModel(f)
framework.ExpectNoError(k8s.CleanNetworkPolicies(model.NamespaceNames), "unable to clean network policies") framework.ExpectNoError(k8s.cleanNetworkPolicies(model.NamespaceNames), "unable to clean network policies")
err := wait.Poll(waitInterval, waitTimeout, func() (done bool, err error) { err := wait.Poll(waitInterval, waitTimeout, func() (done bool, err error) {
for _, ns := range model.NamespaceNames { for _, ns := range model.NamespaceNames {
netpols, err := k8s.ClientSet.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), metav1.ListOptions{}) netpols, err := k8s.clientSet.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), metav1.ListOptions{})
framework.ExpectNoError(err, "get network policies from ns %s", ns) framework.ExpectNoError(err, "get network policies from ns %s", ns)
if len(netpols.Items) > 0 { if len(netpols.Items) > 0 {
return false, nil return false, nil
@ -698,7 +698,7 @@ var _ = SIGDescribeCopy("Netpol [LinuxOnly]", func() {
reachability.ExpectPeer(&Peer{}, &Peer{Namespace: nsX}, false) reachability.ExpectPeer(&Peer{}, &Peer{Namespace: nsX}, false)
ValidateOrFail(k8s, model, &TestCase{FromPort: 81, ToPort: 80, Protocol: v1.ProtocolTCP, Reachability: reachability}) ValidateOrFail(k8s, model, &TestCase{FromPort: 81, ToPort: 80, Protocol: v1.ProtocolTCP, Reachability: reachability})
err := k8s.CleanNetworkPolicies(model.NamespaceNames) err := k8s.cleanNetworkPolicies(model.NamespaceNames)
time.Sleep(3 * time.Second) // TODO we can remove this eventually, its just a hack to keep CI stable. time.Sleep(3 * time.Second) // TODO we can remove this eventually, its just a hack to keep CI stable.
framework.ExpectNoError(err, "unable to clean network policies") framework.ExpectNoError(err, "unable to clean network policies")
@ -880,10 +880,10 @@ func defaultModel(namespaces []string, dnsDomain string) *Model {
return NewModel(namespaces, []string{"a", "b", "c"}, []int32{80, 81}, protocols, dnsDomain) return NewModel(namespaces, []string{"a", "b", "c"}, []int32{80, 81}, protocols, dnsDomain)
} }
// getK8sModel uses the e2e framework to create all necessary namespace resources, and returns the default probing model used // getK8sModel generates a network policy model using the framework's root namespace and cluster DNS domain.
// in the scaffold of this test. // This function is deterministic and has no side effects, so may be safely called multiple times.
func getK8SModel(f *framework.Framework) (string, string, string, *Model, *Scenario) { func getK8SModel(f *framework.Framework) (string, string, string, *Model, *kubeManager) {
k8s := NewScenario(f) k8s := newKubeManager(f)
rootNs := f.Namespace.GetName() rootNs := f.Namespace.GetName()
nsX, nsY, nsZ, namespaces := getNamespaces(rootNs) nsX, nsY, nsZ, namespaces := getNamespaces(rootNs)
@ -892,13 +892,15 @@ func getK8SModel(f *framework.Framework) (string, string, string, *Model, *Scena
return nsX, nsY, nsZ, model, k8s return nsX, nsY, nsZ, model, k8s
} }
// initializeResources generates a model and then waits for the model to be up-and-running (i.e. all pods are ready and running in their namespaces). // initializeResources uses the e2e framework to create all necessary namespace resources, based on the network policy
// model derived from the framework. It then waits for the resources described by the model to be up and running
// (i.e. all pods are ready and running in their namespaces).
func initializeResources(f *framework.Framework) error { func initializeResources(f *framework.Framework) error {
_, _, _, model, k8s := getK8SModel(f) _, _, _, model, k8s := getK8SModel(f)
framework.Logf("initializing cluster: ensuring namespaces, deployments, and pods exist and are ready") framework.Logf("initializing cluster: ensuring namespaces, deployments, and pods exist and are ready")
err := k8s.InitializeCluster(model) err := k8s.initializeCluster(model)
if err != nil { if err != nil {
return err return err
} }

View File

@ -41,8 +41,7 @@ type ProbeJobResults struct {
} }
// ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability` // ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability`
func ProbePodToPodConnectivity(k8s *Scenario, model *Model, testCase *TestCase) { func ProbePodToPodConnectivity(k8s *kubeManager, model *Model, testCase *TestCase) {
k8s.ClearCache()
numberOfWorkers := 30 numberOfWorkers := 30
allPods := model.AllPods() allPods := model.AllPods()
size := len(allPods) * len(allPods) size := len(allPods) * len(allPods)
@ -87,7 +86,7 @@ func ProbePodToPodConnectivity(k8s *Scenario, model *Model, testCase *TestCase)
// probeWorker continues polling a pod connectivity status, until the incoming "jobs" channel is closed, and writes results back out to the "results" channel. // probeWorker continues polling a pod connectivity status, until the incoming "jobs" channel is closed, and writes results back out to the "results" channel.
// it only writes pass/fail status to a channel and has no failure side effects, this is by design since we do not want to fail inside a goroutine. // it only writes pass/fail status to a channel and has no failure side effects, this is by design since we do not want to fail inside a goroutine.
func probeWorker(k8s *Scenario, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults) { func probeWorker(k8s *kubeManager, jobs <-chan *ProbeJob, results chan<- *ProbeJobResults) {
defer ginkgo.GinkgoRecover() defer ginkgo.GinkgoRecover()
for job := range jobs { for job := range jobs {
podFrom := job.PodFrom podFrom := job.PodFrom
@ -103,7 +102,7 @@ func probeWorker(k8s *Scenario, jobs <-chan *ProbeJob, results chan<- *ProbeJobR
results <- result results <- result
} else { } else {
// 2) real test runs here... // 2) real test runs here...
connected, command, err := k8s.Probe(podFrom.Namespace, podFrom.Name, containerFrom.Name(), job.PodTo.QualifiedServiceAddress(job.ToPodDNSDomain), job.Protocol, job.ToPort) connected, command, err := k8s.probeConnectivity(podFrom.Namespace, podFrom.Name, containerFrom.Name(), job.PodTo.QualifiedServiceAddress(job.ToPodDNSDomain), job.Protocol, job.ToPort)
result := &ProbeJobResults{ result := &ProbeJobResults{
Job: job, Job: job,
IsConnected: connected, IsConnected: connected,

View File

@ -41,31 +41,31 @@ func prettyPrint(policy *networkingv1.NetworkPolicy) string {
} }
// CreatePolicy creates a policy in the given namespace // CreatePolicy creates a policy in the given namespace
func CreatePolicy(k8s *Scenario, policy *networkingv1.NetworkPolicy, namespace string) { func CreatePolicy(k8s *kubeManager, policy *networkingv1.NetworkPolicy, namespace string) {
if isVerbose { if isVerbose {
framework.Logf("****************************************************************") framework.Logf("****************************************************************")
framework.Logf("Network Policy creating %s/%s \n%s", namespace, policy.Name, prettyPrint(policy)) framework.Logf("Network Policy creating %s/%s \n%s", namespace, policy.Name, prettyPrint(policy))
framework.Logf("****************************************************************") framework.Logf("****************************************************************")
} }
_, err := k8s.CreateNetworkPolicy(namespace, policy) _, err := k8s.createNetworkPolicy(namespace, policy)
framework.ExpectNoError(err, "Unable to create netpol %s/%s", namespace, policy.Name) framework.ExpectNoError(err, "Unable to create netpol %s/%s", namespace, policy.Name)
} }
// UpdatePolicy updates a networkpolicy // UpdatePolicy updates a networkpolicy
func UpdatePolicy(k8s *Scenario, policy *networkingv1.NetworkPolicy, namespace string) { func UpdatePolicy(k8s *kubeManager, policy *networkingv1.NetworkPolicy, namespace string) {
if isVerbose { if isVerbose {
framework.Logf("****************************************************************") framework.Logf("****************************************************************")
framework.Logf("Network Policy updating %s/%s \n%s", namespace, policy.Name, prettyPrint(policy)) framework.Logf("Network Policy updating %s/%s \n%s", namespace, policy.Name, prettyPrint(policy))
framework.Logf("****************************************************************") framework.Logf("****************************************************************")
} }
_, err := k8s.UpdateNetworkPolicy(namespace, policy) _, err := k8s.updateNetworkPolicy(namespace, policy)
framework.ExpectNoError(err, "Unable to update netpol %s/%s", namespace, policy.Name) framework.ExpectNoError(err, "Unable to update netpol %s/%s", namespace, policy.Name)
} }
// ValidateOrFail validates connectivity // ValidateOrFail validates connectivity
func ValidateOrFail(k8s *Scenario, model *Model, testCase *TestCase) { func ValidateOrFail(k8s *kubeManager, model *Model, testCase *TestCase) {
ginkgo.By("Validating reachability matrix...") ginkgo.By("Validating reachability matrix...")
// 1st try // 1st try
@ -89,7 +89,7 @@ func ValidateOrFail(k8s *Scenario, model *Model, testCase *TestCase) {
} }
// UpdateNamespaceLabels sets the labels for a namespace // UpdateNamespaceLabels sets the labels for a namespace
func UpdateNamespaceLabels(k8s *Scenario, ns string, newNsLabel map[string]string) { func UpdateNamespaceLabels(k8s *kubeManager, ns string, newNsLabel map[string]string) {
err := k8s.setNamespaceLabels(ns, newNsLabel) err := k8s.setNamespaceLabels(ns, newNsLabel)
framework.ExpectNoError(err, "Update namespace %s labels", ns) framework.ExpectNoError(err, "Update namespace %s labels", ns)
err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) { err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) {
@ -108,8 +108,8 @@ func UpdateNamespaceLabels(k8s *Scenario, ns string, newNsLabel map[string]strin
} }
// AddPodLabels adds new labels to a deployment's template // AddPodLabels adds new labels to a deployment's template
func AddPodLabels(k8s *Scenario, pod *Pod, newPodLabels map[string]string) { func AddPodLabels(k8s *kubeManager, pod *Pod, newPodLabels map[string]string) {
kubePod, err := k8s.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) kubePod, err := k8s.clientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "Unable to get pod %s/%s", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to get pod %s/%s", pod.Namespace, pod.Name)
if kubePod.Labels == nil { if kubePod.Labels == nil {
kubePod.Labels = map[string]string{} kubePod.Labels = map[string]string{}
@ -117,11 +117,11 @@ func AddPodLabels(k8s *Scenario, pod *Pod, newPodLabels map[string]string) {
for key, val := range newPodLabels { for key, val := range newPodLabels {
kubePod.Labels[key] = val kubePod.Labels[key] = val
} }
_, err = k8s.ClientSet.CoreV1().Pods(pod.Namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{}) _, err = k8s.clientSet.CoreV1().Pods(pod.Namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{})
framework.ExpectNoError(err, "Unable to add pod %s/%s labels", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to add pod %s/%s labels", pod.Namespace, pod.Name)
err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) { err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) {
waitForPod, err := k8s.GetPod(pod.Namespace, pod.Name) waitForPod, err := k8s.getPod(pod.Namespace, pod.Name)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -136,20 +136,20 @@ func AddPodLabels(k8s *Scenario, pod *Pod, newPodLabels map[string]string) {
} }
// ResetNamespaceLabels resets the labels for a namespace // ResetNamespaceLabels resets the labels for a namespace
func ResetNamespaceLabels(k8s *Scenario, ns string) { func ResetNamespaceLabels(k8s *kubeManager, ns string) {
UpdateNamespaceLabels(k8s, ns, (&Namespace{Name: ns}).LabelSelector()) UpdateNamespaceLabels(k8s, ns, (&Namespace{Name: ns}).LabelSelector())
} }
// ResetPodLabels resets the labels for a deployment's template // ResetPodLabels resets the labels for a deployment's template
func ResetPodLabels(k8s *Scenario, pod *Pod) { func ResetPodLabels(k8s *kubeManager, pod *Pod) {
kubePod, err := k8s.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) kubePod, err := k8s.clientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "Unable to get pod %s/%s", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to get pod %s/%s", pod.Namespace, pod.Name)
kubePod.Labels = pod.LabelSelector() kubePod.Labels = pod.LabelSelector()
_, err = k8s.ClientSet.CoreV1().Pods(pod.Namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{}) _, err = k8s.clientSet.CoreV1().Pods(pod.Namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{})
framework.ExpectNoError(err, "Unable to add pod %s/%s labels", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to add pod %s/%s labels", pod.Namespace, pod.Name)
err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) { err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) {
waitForPod, err := k8s.GetPod(pod.Namespace, pod.Name) waitForPod, err := k8s.getPod(pod.Namespace, pod.Name)
if err != nil { if err != nil {
return false, nil return false, nil
} }