Update Netpol e2e tests to use framework CreateNamespace

The main purpose of this change is to update the e2e Netpol tests to use
the srandard CreateNamespace function from the Framework. Before this
change, a custom Namespace creation function was used, with the
following consequences:

* Pod security admission settings had to be enforced locally (not using
  the centralized mechanism)
* the custom function was brittle, not waiting for default Namespace
  ServiceAccount creation, causing tests to fail in some infrastructures
* tests were not benefiting from standard framework capabilities:
  Namespace name generation, automatic Namespace deletion, etc.

As part of this change, we also do the following:

* clearly decouple responsibilities between the Model, which defines the
  K8s objects to be created, and the KubeManager, which has access to
  runtime information (actual Namespace names after their creation by
  the framework, Service IPs, etc.)
* simplify / clean-up tests and remove as much unneeded logic / funtions
  as possible for easier long-term maintenance
* remove the useFixedNamespaces compile-time constant switch, which
  aimed at re-using existing K8s resources across test cases. The
  reasons: a) it is currently broken as setting it to true causes most
  tests to panic on the master branch, b) it is not a good idea to have
  some switch like this which changes the behavior of the tests and is
  never exercised in CI, c) it cannot possibly work as different test
  cases have different Model requirements (e.g., the protocols list can
  differ) and hence different K8s resource requirements.

For #108298

Signed-off-by: Antonin Bas <abas@vmware.com>
This commit is contained in:
Antonin Bas 2022-08-09 16:26:35 -07:00
parent 42a5eb4818
commit 2e282e8e02
6 changed files with 463 additions and 609 deletions

View File

@ -31,7 +31,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
admissionapi "k8s.io/pod-security-admission/api"
) )
// probeConnectivityArgs is set of arguments for a probeConnectivity // probeConnectivityArgs is set of arguments for a probeConnectivity
@ -45,6 +44,20 @@ type probeConnectivityArgs struct {
timeoutSeconds int timeoutSeconds int
} }
// TestPod represents an actual running pod. For each Pod defined by the model,
// there will be a corresponding TestPod. TestPod includes some runtime info
// (namespace name, service IP) which is not available in the model.
type TestPod struct {
Namespace string
Name string
ContainerName string
ServiceIP string
}
func (pod TestPod) PodString() PodString {
return NewPodString(pod.Namespace, pod.Name)
}
// kubeManager provides a convenience interface to kube functionality that we leverage for polling NetworkPolicy connections. // kubeManager provides a convenience interface to kube functionality that we leverage for polling NetworkPolicy connections.
// Its responsibilities are: // Its responsibilities are:
// - creating resources (pods, deployments, namespaces, services, network policies) // - creating resources (pods, deployments, namespaces, services, network policies)
@ -52,58 +65,59 @@ type probeConnectivityArgs struct {
type kubeManager struct { type kubeManager struct {
framework *framework.Framework framework *framework.Framework
clientSet clientset.Interface clientSet clientset.Interface
namespaceNames []string
allPods []TestPod
allPodStrings []PodString
dnsDomain string
} }
// newKubeManager is a utility function that wraps creation of the kubeManager instance. // newKubeManager is a utility function that wraps creation of the kubeManager instance.
func newKubeManager(framework *framework.Framework) *kubeManager { func newKubeManager(framework *framework.Framework, dnsDomain string) *kubeManager {
return &kubeManager{ return &kubeManager{
framework: framework, framework: framework,
clientSet: framework.ClientSet, clientSet: framework.ClientSet,
dnsDomain: dnsDomain,
} }
} }
// initializeCluster checks the state of the cluster, creating or updating namespaces and deployments as needed. // initializeCluster initialized the cluster, creating namespaces pods and services as needed.
func (k *kubeManager) initializeCluster(model *Model) error { func (k *kubeManager) initializeClusterFromModel(model *Model) error {
var createdPods []*v1.Pod var createdPods []*v1.Pod
for _, ns := range model.Namespaces { for _, ns := range model.Namespaces {
_, err := k.createNamespace(ns.Spec()) // no labels needed, we just need the default kubernetes.io/metadata.name label
namespace, err := k.framework.CreateNamespace(ns.BaseName, nil)
if err != nil { if err != nil {
return err return err
} }
namespaceName := namespace.Name
k.namespaceNames = append(k.namespaceNames, namespaceName)
for _, pod := range ns.Pods { for _, pod := range ns.Pods {
framework.Logf("creating/updating pod %s/%s", ns.Name, pod.Name) framework.Logf("creating pod %s/%s with matching service", namespaceName, pod.Name)
// note that we defer the logic of pod (i.e. node selector) specifics to the model // note that we defer the logic of pod (i.e. node selector) specifics to the model
// which is aware of linux vs windows pods // which is aware of linux vs windows pods
kubePod, err := k.createPod(pod.KubePod()) kubePod, err := k.createPod(pod.KubePod(namespaceName))
if err != nil { if err != nil {
return err return err
} }
createdPods = append(createdPods, kubePod) createdPods = append(createdPods, kubePod)
svc, err := k.createService(pod.Service()) svc, err := k.createService(pod.Service(namespaceName))
if err != nil { if err != nil {
return err return err
} }
if netutils.ParseIPSloppy(svc.Spec.ClusterIP) == nil { if netutils.ParseIPSloppy(svc.Spec.ClusterIP) == nil {
return fmt.Errorf("empty IP address found for service %s/%s", svc.Namespace, svc.Name) return fmt.Errorf("empty IP address found for service %s/%s", svc.Namespace, svc.Name)
} }
pod.ServiceIP = svc.Spec.ClusterIP
}
}
for _, podString := range model.AllPodStrings() { k.allPods = append(k.allPods, TestPod{
k8sPod, err := k.getPod(podString.Namespace(), podString.PodName()) Namespace: kubePod.Namespace,
if err != nil { Name: kubePod.Name,
return err ContainerName: pod.Containers[0].Name(),
} ServiceIP: svc.Spec.ClusterIP,
if k8sPod == nil { })
return fmt.Errorf("unable to find pod in ns %s with key/val pod=%s", podString.Namespace(), podString.PodName()) k.allPodStrings = append(k.allPodStrings, NewPodString(kubePod.Namespace, kubePod.Name))
}
err = e2epod.WaitForPodNameRunningInNamespace(k.clientSet, k8sPod.Name, k8sPod.Namespace)
if err != nil {
return fmt.Errorf("unable to wait for pod %s/%s: %w", podString.Namespace(), podString.PodName(), err)
} }
} }
@ -117,6 +131,22 @@ func (k *kubeManager) initializeCluster(model *Model) error {
return nil return nil
} }
func (k *kubeManager) AllPods() []TestPod {
return k.allPods
}
func (k *kubeManager) AllPodStrings() []PodString {
return k.allPodStrings
}
func (k *kubeManager) DNSDomain() string {
return k.dnsDomain
}
func (k *kubeManager) NamespaceNames() []string {
return k.namespaceNames
}
// getPod gets a pod by namespace and name. // getPod gets a pod by namespace and name.
func (k *kubeManager) getPod(ns string, name string) (*v1.Pod, error) { func (k *kubeManager) getPod(ns string, name string) (*v1.Pod, error) {
kubePod, err := k.clientSet.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{}) kubePod, err := k.clientSet.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{})
@ -174,16 +204,6 @@ func (k *kubeManager) executeRemoteCommand(namespace string, pod string, contain
}) })
} }
// createNamespace is a convenience function for namespace setup.
func (k *kubeManager) createNamespace(ns *v1.Namespace) (*v1.Namespace, error) {
enforcePodSecurityBaseline(ns)
createdNamespace, err := k.clientSet.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("unable to create namespace %s: %w", ns.Name, err)
}
return createdNamespace, nil
}
// createService is a convenience function for service setup. // createService is a convenience function for service setup.
func (k *kubeManager) createService(service *v1.Service) (*v1.Service, error) { func (k *kubeManager) createService(service *v1.Service) (*v1.Service, error) {
ns := service.Namespace ns := service.Namespace
@ -209,8 +229,8 @@ func (k *kubeManager) createPod(pod *v1.Pod) (*v1.Pod, error) {
} }
// cleanNetworkPolicies is a convenience function for deleting network policies before startup of any new test. // cleanNetworkPolicies is a convenience function for deleting network policies before startup of any new test.
func (k *kubeManager) cleanNetworkPolicies(namespaces []string) error { func (k *kubeManager) cleanNetworkPolicies() error {
for _, ns := range namespaces { for _, ns := range k.namespaceNames {
framework.Logf("deleting policies in %s ..........", ns) framework.Logf("deleting policies in %s ..........", ns)
l, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), metav1.ListOptions{}) l, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil { if err != nil {
@ -258,36 +278,16 @@ func (k *kubeManager) getNamespace(ns string) (*v1.Namespace, error) {
return selectedNameSpace, nil return selectedNameSpace, nil
} }
// setNamespaceLabels sets the labels for a namespace object in kubernetes. // getProbeTimeoutSeconds returns a timeout for how long the probe should work before failing a check, and takes windows heuristics into account, where requests can take longer sometimes.
func (k *kubeManager) setNamespaceLabels(ns string, labels map[string]string) error { func getProbeTimeoutSeconds() int {
selectedNameSpace, err := k.getNamespace(ns) timeoutSeconds := 1
if err != nil { if framework.NodeOSDistroIs("windows") {
return err timeoutSeconds = 3
} }
selectedNameSpace.ObjectMeta.Labels = labels return timeoutSeconds
enforcePodSecurityBaseline(selectedNameSpace)
_, err = k.clientSet.CoreV1().Namespaces().Update(context.TODO(), selectedNameSpace, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("unable to update namespace %s: %w", ns, err)
}
return nil
} }
// deleteNamespaces removes a namespace from kubernetes. // getWorkers returns the number of workers suggested to run when testing.
func (k *kubeManager) deleteNamespaces(namespaces []string) error { func getWorkers() int {
for _, ns := range namespaces { return 3
err := k.clientSet.CoreV1().Namespaces().Delete(context.TODO(), ns, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("unable to delete namespace %s: %w", ns, err)
}
}
return nil
}
func enforcePodSecurityBaseline(ns *v1.Namespace) {
if len(ns.ObjectMeta.Labels) == 0 {
ns.ObjectMeta.Labels = make(map[string]string)
}
// TODO(https://github.com/kubernetes/kubernetes/issues/108298): route namespace creation via framework.Framework.CreateNamespace
ns.ObjectMeta.Labels[admissionapi.EnforceLevelLabel] = string(admissionapi.LevelBaseline)
} }

View File

@ -30,42 +30,34 @@ import (
// data for network policy test cases and provides the source of truth // data for network policy test cases and provides the source of truth
type Model struct { type Model struct {
Namespaces []*Namespace Namespaces []*Namespace
allPodStrings *[]PodString
allPods *[]*Pod
// the raw data
NamespaceNames []string
PodNames []string PodNames []string
Ports []int32 Ports []int32
Protocols []v1.Protocol Protocols []v1.Protocol
DNSDomain string
} }
// NewWindowsModel returns a model specific to windows testing. // NewWindowsModel returns a model specific to windows testing.
func NewWindowsModel(namespaces []string, podNames []string, ports []int32, dnsDomain string) *Model { func NewWindowsModel(namespaceBaseNames []string, podNames []string, ports []int32) *Model {
return NewModel(namespaces, podNames, ports, []v1.Protocol{v1.ProtocolTCP, v1.ProtocolUDP}, dnsDomain) return NewModel(namespaceBaseNames, podNames, ports, []v1.Protocol{v1.ProtocolTCP, v1.ProtocolUDP})
} }
// NewModel instantiates a model based on: // NewModel instantiates a model based on:
// - namespaces // - namespaceBaseNames
// - pods // - pods
// - ports to listen on // - ports to listen on
// - protocols to listen on // - protocols to listen on
// The total number of pods is the number of namespaces x the number of pods per namespace. // The total number of pods is the number of namespaces x the number of pods per namespace.
// The number of containers per pod is the number of ports x the number of protocols. // The number of containers per pod is the number of ports x the number of protocols.
// The *total* number of containers is namespaces x pods x ports x protocols. // The *total* number of containers is namespaces x pods x ports x protocols.
func NewModel(namespaces []string, podNames []string, ports []int32, protocols []v1.Protocol, dnsDomain string) *Model { func NewModel(namespaceBaseNames []string, podNames []string, ports []int32, protocols []v1.Protocol) *Model {
model := &Model{ model := &Model{
NamespaceNames: namespaces,
PodNames: podNames, PodNames: podNames,
Ports: ports, Ports: ports,
Protocols: protocols, Protocols: protocols,
DNSDomain: dnsDomain,
} }
framework.Logf("DnsDomain %v", model.DNSDomain)
// build the entire "model" for the overall test, which means, building // build the entire "model" for the overall test, which means, building
// namespaces, pods, containers for each protocol. // namespaces, pods, containers for each protocol.
for _, ns := range namespaces { for _, ns := range namespaceBaseNames {
var pods []*Pod var pods []*Pod
for _, podName := range podNames { for _, podName := range podNames {
var containers []*Container var containers []*Container
@ -78,112 +70,30 @@ func NewModel(namespaces []string, podNames []string, ports []int32, protocols [
} }
} }
pods = append(pods, &Pod{ pods = append(pods, &Pod{
Namespace: ns,
Name: podName, Name: podName,
Containers: containers, Containers: containers,
}) })
} }
model.Namespaces = append(model.Namespaces, &Namespace{Name: ns, Pods: pods}) model.Namespaces = append(model.Namespaces, &Namespace{
BaseName: ns,
Pods: pods,
})
} }
return model return model
} }
// GetProbeTimeoutSeconds returns a timeout for how long the probe should work before failing a check, and takes windows heuristics into account, where requests can take longer sometimes.
func (m *Model) GetProbeTimeoutSeconds() int {
timeoutSeconds := 1
if framework.NodeOSDistroIs("windows") {
timeoutSeconds = 3
}
return timeoutSeconds
}
// GetWorkers returns the number of workers suggested to run when testing.
func (m *Model) GetWorkers() int {
return 3
}
// NewReachability instantiates a default-true reachability from the model's pods
func (m *Model) NewReachability() *Reachability {
return NewReachability(m.AllPods(), true)
}
// AllPodStrings returns a slice of all pod strings
func (m *Model) AllPodStrings() []PodString {
if m.allPodStrings == nil {
var pods []PodString
for _, ns := range m.Namespaces {
for _, pod := range ns.Pods {
pods = append(pods, pod.PodString())
}
}
m.allPodStrings = &pods
}
return *m.allPodStrings
}
// AllPods returns a slice of all pods
func (m *Model) AllPods() []*Pod {
if m.allPods == nil {
var pods []*Pod
for _, ns := range m.Namespaces {
for _, pod := range ns.Pods {
pods = append(pods, pod)
}
}
m.allPods = &pods
}
return *m.allPods
}
// FindPod returns the pod of matching namespace and name, or an error
func (m *Model) FindPod(ns string, name string) (*Pod, error) {
for _, namespace := range m.Namespaces {
for _, pod := range namespace.Pods {
if namespace.Name == ns && pod.Name == name {
return pod, nil
}
}
}
return nil, fmt.Errorf("unable to find pod %s/%s", ns, name)
}
// Namespace is the abstract representation of what matters to network policy // Namespace is the abstract representation of what matters to network policy
// tests for a namespace; i.e. it ignores kube implementation details // tests for a namespace; i.e. it ignores kube implementation details
type Namespace struct { type Namespace struct {
Name string BaseName string
Pods []*Pod Pods []*Pod
} }
// Spec builds a kubernetes namespace spec
func (ns *Namespace) Spec() *v1.Namespace {
return &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns.Name,
Labels: ns.LabelSelector(),
},
}
}
// LabelSelector returns the default labels that should be placed on a namespace
// in order for it to be uniquely selectable by label selectors
func (ns *Namespace) LabelSelector() map[string]string {
return map[string]string{
"ns": ns.Name,
}
}
// Pod is the abstract representation of what matters to network policy tests for // Pod is the abstract representation of what matters to network policy tests for
// a pod; i.e. it ignores kube implementation details // a pod; i.e. it ignores kube implementation details
type Pod struct { type Pod struct {
Namespace string
Name string Name string
Containers []*Container Containers []*Container
ServiceIP string
}
// PodString returns a corresponding pod string
func (p *Pod) PodString() PodString {
return NewPodString(p.Namespace, p.Name)
} }
// ContainerSpecs builds kubernetes container specs for the pod // ContainerSpecs builds kubernetes container specs for the pod
@ -195,31 +105,27 @@ func (p *Pod) ContainerSpecs() []v1.Container {
return containers return containers
} }
func (p *Pod) labelSelectorKey() string { func podNameLabelKey() string {
return "pod" return "pod"
} }
func (p *Pod) labelSelectorValue() string { // Labels returns the default labels that should be placed on a pod/deployment
return p.Name
}
// LabelSelector returns the default labels that should be placed on a pod/deployment
// in order for it to be uniquely selectable by label selectors // in order for it to be uniquely selectable by label selectors
func (p *Pod) LabelSelector() map[string]string { func (p *Pod) Labels() map[string]string {
return map[string]string{ return map[string]string{
p.labelSelectorKey(): p.labelSelectorValue(), podNameLabelKey(): p.Name,
} }
} }
// KubePod returns the kube pod (will add label selectors for windows if needed). // KubePod returns the kube pod (will add label selectors for windows if needed).
func (p *Pod) KubePod() *v1.Pod { func (p *Pod) KubePod(namespace string) *v1.Pod {
zero := int64(0) zero := int64(0)
thePod := &v1.Pod{ thePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: p.Name, Name: p.Name,
Labels: p.LabelSelector(), Labels: p.Labels(),
Namespace: p.Namespace, Namespace: namespace,
}, },
Spec: v1.PodSpec{ Spec: v1.PodSpec{
TerminationGracePeriodSeconds: &zero, TerminationGracePeriodSeconds: &zero,
@ -235,26 +141,25 @@ func (p *Pod) KubePod() *v1.Pod {
return thePod return thePod
} }
// QualifiedServiceAddress returns the address that can be used to hit a service from // QualifiedServiceAddress returns the address that can be used to access the service
// any namespace in the cluster func (p *Pod) QualifiedServiceAddress(namespace string, dnsDomain string) string {
func (p *Pod) QualifiedServiceAddress(dnsDomain string) string { return fmt.Sprintf("%s.%s.svc.%s", p.ServiceName(namespace), namespace, dnsDomain)
return fmt.Sprintf("%s.%s.svc.%s", p.ServiceName(), p.Namespace, dnsDomain)
} }
// ServiceName returns the unqualified service name // ServiceName returns the unqualified service name
func (p *Pod) ServiceName() string { func (p *Pod) ServiceName(namespace string) string {
return fmt.Sprintf("s-%s-%s", p.Namespace, p.Name) return fmt.Sprintf("s-%s-%s", namespace, p.Name)
} }
// Service returns a kube service spec // Service returns a kube service spec
func (p *Pod) Service() *v1.Service { func (p *Pod) Service(namespace string) *v1.Service {
service := &v1.Service{ service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: p.ServiceName(), Name: p.ServiceName(namespace),
Namespace: p.Namespace, Namespace: namespace,
}, },
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
Selector: p.LabelSelector(), Selector: p.Labels(),
}, },
} }
for _, container := range p.Containers { for _, container := range p.Containers {

File diff suppressed because it is too large Load Diff

View File

@ -32,8 +32,9 @@ type Prober interface {
// ProbeJob packages the data for the input of a pod->pod connectivity probe // ProbeJob packages the data for the input of a pod->pod connectivity probe
type ProbeJob struct { type ProbeJob struct {
PodFrom *Pod PodFrom TestPod
PodTo *Pod PodTo TestPod
PodToServiceIP string
ToPort int ToPort int
ToPodDNSDomain string ToPodDNSDomain string
Protocol v1.Protocol Protocol v1.Protocol
@ -48,13 +49,12 @@ type ProbeJobResults struct {
} }
// ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability` // ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability`
func ProbePodToPodConnectivity(prober Prober, model *Model, testCase *TestCase) { func ProbePodToPodConnectivity(prober Prober, allPods []TestPod, dnsDomain string, testCase *TestCase) {
allPods := model.AllPods()
size := len(allPods) * len(allPods) size := len(allPods) * len(allPods)
jobs := make(chan *ProbeJob, size) jobs := make(chan *ProbeJob, size)
results := make(chan *ProbeJobResults, size) results := make(chan *ProbeJobResults, size)
for i := 0; i < model.GetWorkers(); i++ { for i := 0; i < getWorkers(); i++ {
go probeWorker(prober, jobs, results, model.GetProbeTimeoutSeconds()) go probeWorker(prober, jobs, results, getProbeTimeoutSeconds())
} }
for _, podFrom := range allPods { for _, podFrom := range allPods {
for _, podTo := range allPods { for _, podTo := range allPods {
@ -62,7 +62,7 @@ func ProbePodToPodConnectivity(prober Prober, model *Model, testCase *TestCase)
PodFrom: podFrom, PodFrom: podFrom,
PodTo: podTo, PodTo: podTo,
ToPort: testCase.ToPort, ToPort: testCase.ToPort,
ToPodDNSDomain: model.DNSDomain, ToPodDNSDomain: dnsDomain,
Protocol: testCase.Protocol, Protocol: testCase.Protocol,
} }
} }
@ -95,6 +95,7 @@ func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobR
defer ginkgo.GinkgoRecover() defer ginkgo.GinkgoRecover()
for job := range jobs { for job := range jobs {
podFrom := job.PodFrom podFrom := job.PodFrom
// defensive programming: this should not be possible as we already check in initializeClusterFromModel
if netutils.ParseIPSloppy(job.PodTo.ServiceIP) == nil { if netutils.ParseIPSloppy(job.PodTo.ServiceIP) == nil {
results <- &ProbeJobResults{ results <- &ProbeJobResults{
Job: job, Job: job,
@ -111,7 +112,7 @@ func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobR
connected, command, err := prober.probeConnectivity(&probeConnectivityArgs{ connected, command, err := prober.probeConnectivity(&probeConnectivityArgs{
nsFrom: podFrom.Namespace, nsFrom: podFrom.Namespace,
podFrom: podFrom.Name, podFrom: podFrom.Name,
containerFrom: podFrom.Containers[0].Name(), containerFrom: podFrom.ContainerName,
addrTo: job.PodTo.ServiceIP, addrTo: job.PodTo.ServiceIP,
protocol: job.Protocol, protocol: job.Protocol,
toPort: job.ToPort, toPort: job.ToPort,

View File

@ -81,19 +81,19 @@ func (p *Peer) Matches(pod PodString) bool {
type Reachability struct { type Reachability struct {
Expected *TruthTable Expected *TruthTable
Observed *TruthTable Observed *TruthTable
Pods []*Pod PodStrings []PodString
} }
// NewReachability instantiates a reachability // NewReachability instantiates a reachability
func NewReachability(pods []*Pod, defaultExpectation bool) *Reachability { func NewReachability(podStrings []PodString, defaultExpectation bool) *Reachability {
var podNames []string var podNames []string
for _, pod := range pods { for _, podString := range podStrings {
podNames = append(podNames, pod.PodString().String()) podNames = append(podNames, podString.String())
} }
r := &Reachability{ r := &Reachability{
Expected: NewTruthTableFromItems(podNames, &defaultExpectation), Expected: NewTruthTableFromItems(podNames, &defaultExpectation),
Observed: NewTruthTableFromItems(podNames, nil), Observed: NewTruthTableFromItems(podNames, nil),
Pods: pods, PodStrings: podStrings,
} }
return r return r
} }
@ -101,8 +101,8 @@ func NewReachability(pods []*Pod, defaultExpectation bool) *Reachability {
// AllowLoopback expects all communication from a pod to itself to be allowed. // AllowLoopback expects all communication from a pod to itself to be allowed.
// In general, call it after setting up any other rules since loopback logic follows no policy. // In general, call it after setting up any other rules since loopback logic follows no policy.
func (r *Reachability) AllowLoopback() { func (r *Reachability) AllowLoopback() {
for _, pod := range r.Pods { for _, podString := range r.PodStrings {
podName := pod.PodString().String() podName := podString.String()
r.Expected.Set(podName, podName, true) r.Expected.Set(podName, podName, true)
} }
} }
@ -130,11 +130,11 @@ func (r *Reachability) ExpectAllEgress(pod PodString, connected bool) {
// ExpectPeer sets expected values using Peer matchers // ExpectPeer sets expected values using Peer matchers
func (r *Reachability) ExpectPeer(from *Peer, to *Peer, connected bool) { func (r *Reachability) ExpectPeer(from *Peer, to *Peer, connected bool) {
for _, fromPod := range r.Pods { for _, fromPod := range r.PodStrings {
if from.Matches(fromPod.PodString()) { if from.Matches(fromPod) {
for _, toPod := range r.Pods { for _, toPod := range r.PodStrings {
if to.Matches(toPod.PodString()) { if to.Matches(toPod) {
r.Expected.Set(string(fromPod.PodString()), string(toPod.PodString()), connected) r.Expected.Set(fromPod.String(), toPod.String(), connected)
} }
} }
} }
@ -143,7 +143,7 @@ func (r *Reachability) ExpectPeer(from *Peer, to *Peer, connected bool) {
// Observe records a single connectivity observation // Observe records a single connectivity observation
func (r *Reachability) Observe(fromPod PodString, toPod PodString, isConnected bool) { func (r *Reachability) Observe(fromPod PodString, toPod PodString, isConnected bool) {
r.Observed.Set(string(fromPod), string(toPod), isConnected) r.Observed.Set(fromPod.String(), toPod.String(), isConnected)
} }
// Summary produces a useful summary of expected and observed data // Summary produces a useful summary of expected and observed data

View File

@ -87,9 +87,9 @@ func waitForHTTPServers(k *kubeManager, model *Model) error {
for i := 0; i < maxTries; i++ { for i := 0; i < maxTries; i++ {
for caseName, testCase := range testCases { for caseName, testCase := range testCases {
if notReady[caseName] { if notReady[caseName] {
reachability := NewReachability(model.AllPods(), true) reachability := NewReachability(k.AllPodStrings(), true)
testCase.Reachability = reachability testCase.Reachability = reachability
ProbePodToPodConnectivity(k, model, testCase) ProbePodToPodConnectivity(k, k.AllPods(), k.DNSDomain(), testCase)
_, wrong, _, _ := reachability.Summary(ignoreLoopback) _, wrong, _, _ := reachability.Summary(ignoreLoopback)
if wrong == 0 { if wrong == 0 {
framework.Logf("server %s is ready", caseName) framework.Logf("server %s is ready", caseName)
@ -108,16 +108,16 @@ func waitForHTTPServers(k *kubeManager, model *Model) error {
} }
// ValidateOrFail validates connectivity // ValidateOrFail validates connectivity
func ValidateOrFail(k8s *kubeManager, model *Model, testCase *TestCase) { func ValidateOrFail(k8s *kubeManager, testCase *TestCase) {
ginkgo.By("Validating reachability matrix...") ginkgo.By("Validating reachability matrix...")
// 1st try // 1st try
ginkgo.By("Validating reachability matrix... (FIRST TRY)") ginkgo.By("Validating reachability matrix... (FIRST TRY)")
ProbePodToPodConnectivity(k8s, model, testCase) ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase)
// 2nd try, in case first one failed // 2nd try, in case first one failed
if _, wrong, _, _ := testCase.Reachability.Summary(ignoreLoopback); wrong != 0 { if _, wrong, _, _ := testCase.Reachability.Summary(ignoreLoopback); wrong != 0 {
framework.Logf("failed first probe %d wrong results ... retrying (SECOND TRY)", wrong) framework.Logf("failed first probe %d wrong results ... retrying (SECOND TRY)", wrong)
ProbePodToPodConnectivity(k8s, model, testCase) ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase)
} }
// at this point we know if we passed or failed, print final matrix and pass/fail the test. // at this point we know if we passed or failed, print final matrix and pass/fail the test.
@ -131,40 +131,43 @@ func ValidateOrFail(k8s *kubeManager, model *Model, testCase *TestCase) {
framework.Logf("VALIDATION SUCCESSFUL") framework.Logf("VALIDATION SUCCESSFUL")
} }
// UpdateNamespaceLabels sets the labels for a namespace // AddNamespaceLabels adds a new label to a namespace
func UpdateNamespaceLabels(k8s *kubeManager, ns string, newNsLabel map[string]string) { func AddNamespaceLabel(k8s *kubeManager, name string, key string, val string) {
err := k8s.setNamespaceLabels(ns, newNsLabel) ns, err := k8s.getNamespace(name)
framework.ExpectNoError(err, "Update namespace %s labels", ns) framework.ExpectNoError(err, "Unable to get namespace %s", name)
err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) { ns.Labels[key] = val
namespace, err := k8s.getNamespace(ns) _, err = k8s.clientSet.CoreV1().Namespaces().Update(context.TODO(), ns, metav1.UpdateOptions{})
if err != nil { framework.ExpectNoError(err, "Unable to update namespace %s", name)
return false, err
}
for key, expected := range newNsLabel {
if actual, ok := namespace.Labels[key]; !ok || (expected != actual) {
return false, nil
}
}
return true, nil
})
framework.ExpectNoError(err, "Unable to wait for ns %s to update labels", ns)
} }
// AddPodLabels adds new labels to a deployment's template // DeleteNamespaceLabel deletes a label from a namespace (if present)
func AddPodLabels(k8s *kubeManager, pod *Pod, newPodLabels map[string]string) { func DeleteNamespaceLabel(k8s *kubeManager, name string, key string) {
kubePod, err := k8s.clientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) ns, err := k8s.getNamespace(name)
framework.ExpectNoError(err, "Unable to get pod %s/%s", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to get namespace %s", name)
if _, ok := ns.Labels[key]; !ok {
// nothing to do if the label is not present
return
}
delete(ns.Labels, key)
_, err = k8s.clientSet.CoreV1().Namespaces().Update(context.TODO(), ns, metav1.UpdateOptions{})
framework.ExpectNoError(err, "Unable to update namespace %s", name)
}
// AddPodLabels adds new labels to a running pod
func AddPodLabels(k8s *kubeManager, namespace string, name string, newPodLabels map[string]string) {
kubePod, err := k8s.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
framework.ExpectNoError(err, "Unable to get pod %s/%s", namespace, name)
if kubePod.Labels == nil { if kubePod.Labels == nil {
kubePod.Labels = map[string]string{} kubePod.Labels = map[string]string{}
} }
for key, val := range newPodLabels { for key, val := range newPodLabels {
kubePod.Labels[key] = val kubePod.Labels[key] = val
} }
_, err = k8s.clientSet.CoreV1().Pods(pod.Namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{}) _, err = k8s.clientSet.CoreV1().Pods(namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{})
framework.ExpectNoError(err, "Unable to add pod %s/%s labels", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to add pod %s/%s labels", namespace, name)
err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) { err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) {
waitForPod, err := k8s.getPod(pod.Namespace, pod.Name) waitForPod, err := k8s.getPod(namespace, name)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -175,33 +178,31 @@ func AddPodLabels(k8s *kubeManager, pod *Pod, newPodLabels map[string]string) {
} }
return true, nil return true, nil
}) })
framework.ExpectNoError(err, "Unable to wait for pod %s/%s to update labels", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to wait for pod %s/%s to update labels", namespace, name)
}
// ResetNamespaceLabels resets the labels for a namespace
func ResetNamespaceLabels(k8s *kubeManager, ns string) {
UpdateNamespaceLabels(k8s, ns, (&Namespace{Name: ns}).LabelSelector())
} }
// ResetPodLabels resets the labels for a deployment's template // ResetPodLabels resets the labels for a deployment's template
func ResetPodLabels(k8s *kubeManager, pod *Pod) { func ResetPodLabels(k8s *kubeManager, namespace string, name string) {
kubePod, err := k8s.clientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) kubePod, err := k8s.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
framework.ExpectNoError(err, "Unable to get pod %s/%s", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to get pod %s/%s", namespace, name)
kubePod.Labels = pod.LabelSelector() labels := map[string]string{
_, err = k8s.clientSet.CoreV1().Pods(pod.Namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{}) podNameLabelKey(): name,
framework.ExpectNoError(err, "Unable to add pod %s/%s labels", pod.Namespace, pod.Name) }
kubePod.Labels = labels
_, err = k8s.clientSet.CoreV1().Pods(namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{})
framework.ExpectNoError(err, "Unable to add pod %s/%s labels", namespace, name)
err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) { err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) {
waitForPod, err := k8s.getPod(pod.Namespace, pod.Name) waitForPod, err := k8s.getPod(namespace, name)
if err != nil { if err != nil {
return false, nil return false, nil
} }
for key, expected := range pod.LabelSelector() { for key, expected := range labels {
if actual, ok := waitForPod.Labels[key]; !ok || (expected != actual) { if actual, ok := waitForPod.Labels[key]; !ok || (expected != actual) {
return false, nil return false, nil
} }
} }
return true, nil return true, nil
}) })
framework.ExpectNoError(err, "Unable to wait for pod %s/%s to update labels", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to wait for pod %s/%s to update labels", namespace, name)
} }