Merge pull request #111789 from antoninbas/update-netpol-e2e-tests

Update Netpol e2e tests to use framework CreateNamespace
This commit is contained in:
Kubernetes Prow Robot 2022-08-23 19:00:44 -07:00 committed by GitHub
commit a19935ac8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 463 additions and 609 deletions

View File

@ -31,7 +31,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
admissionapi "k8s.io/pod-security-admission/api"
) )
// probeConnectivityArgs is set of arguments for a probeConnectivity // probeConnectivityArgs is set of arguments for a probeConnectivity
@ -45,65 +44,80 @@ type probeConnectivityArgs struct {
timeoutSeconds int timeoutSeconds int
} }
// TestPod represents an actual running pod. For each Pod defined by the model,
// there will be a corresponding TestPod. TestPod includes some runtime info
// (namespace name, service IP) which is not available in the model.
type TestPod struct {
Namespace string
Name string
ContainerName string
ServiceIP string
}
func (pod TestPod) PodString() PodString {
return NewPodString(pod.Namespace, pod.Name)
}
// kubeManager provides a convenience interface to kube functionality that we leverage for polling NetworkPolicy connections. // kubeManager provides a convenience interface to kube functionality that we leverage for polling NetworkPolicy connections.
// Its responsibilities are: // Its responsibilities are:
// - creating resources (pods, deployments, namespaces, services, network policies) // - creating resources (pods, deployments, namespaces, services, network policies)
// - modifying and cleaning up resources // - modifying and cleaning up resources
type kubeManager struct { type kubeManager struct {
framework *framework.Framework framework *framework.Framework
clientSet clientset.Interface clientSet clientset.Interface
namespaceNames []string
allPods []TestPod
allPodStrings []PodString
dnsDomain string
} }
// newKubeManager is a utility function that wraps creation of the kubeManager instance. // newKubeManager is a utility function that wraps creation of the kubeManager instance.
func newKubeManager(framework *framework.Framework) *kubeManager { func newKubeManager(framework *framework.Framework, dnsDomain string) *kubeManager {
return &kubeManager{ return &kubeManager{
framework: framework, framework: framework,
clientSet: framework.ClientSet, clientSet: framework.ClientSet,
dnsDomain: dnsDomain,
} }
} }
// initializeCluster checks the state of the cluster, creating or updating namespaces and deployments as needed. // initializeCluster initialized the cluster, creating namespaces pods and services as needed.
func (k *kubeManager) initializeCluster(model *Model) error { func (k *kubeManager) initializeClusterFromModel(model *Model) error {
var createdPods []*v1.Pod var createdPods []*v1.Pod
for _, ns := range model.Namespaces { for _, ns := range model.Namespaces {
_, err := k.createNamespace(ns.Spec()) // no labels needed, we just need the default kubernetes.io/metadata.name label
namespace, err := k.framework.CreateNamespace(ns.BaseName, nil)
if err != nil { if err != nil {
return err return err
} }
namespaceName := namespace.Name
k.namespaceNames = append(k.namespaceNames, namespaceName)
for _, pod := range ns.Pods { for _, pod := range ns.Pods {
framework.Logf("creating/updating pod %s/%s", ns.Name, pod.Name) framework.Logf("creating pod %s/%s with matching service", namespaceName, pod.Name)
// note that we defer the logic of pod (i.e. node selector) specifics to the model // note that we defer the logic of pod (i.e. node selector) specifics to the model
// which is aware of linux vs windows pods // which is aware of linux vs windows pods
kubePod, err := k.createPod(pod.KubePod()) kubePod, err := k.createPod(pod.KubePod(namespaceName))
if err != nil { if err != nil {
return err return err
} }
createdPods = append(createdPods, kubePod) createdPods = append(createdPods, kubePod)
svc, err := k.createService(pod.Service()) svc, err := k.createService(pod.Service(namespaceName))
if err != nil { if err != nil {
return err return err
} }
if netutils.ParseIPSloppy(svc.Spec.ClusterIP) == nil { if netutils.ParseIPSloppy(svc.Spec.ClusterIP) == nil {
return fmt.Errorf("empty IP address found for service %s/%s", svc.Namespace, svc.Name) return fmt.Errorf("empty IP address found for service %s/%s", svc.Namespace, svc.Name)
} }
pod.ServiceIP = svc.Spec.ClusterIP
}
}
for _, podString := range model.AllPodStrings() { k.allPods = append(k.allPods, TestPod{
k8sPod, err := k.getPod(podString.Namespace(), podString.PodName()) Namespace: kubePod.Namespace,
if err != nil { Name: kubePod.Name,
return err ContainerName: pod.Containers[0].Name(),
} ServiceIP: svc.Spec.ClusterIP,
if k8sPod == nil { })
return fmt.Errorf("unable to find pod in ns %s with key/val pod=%s", podString.Namespace(), podString.PodName()) k.allPodStrings = append(k.allPodStrings, NewPodString(kubePod.Namespace, kubePod.Name))
}
err = e2epod.WaitForPodNameRunningInNamespace(k.clientSet, k8sPod.Name, k8sPod.Namespace)
if err != nil {
return fmt.Errorf("unable to wait for pod %s/%s: %w", podString.Namespace(), podString.PodName(), err)
} }
} }
@ -117,6 +131,22 @@ func (k *kubeManager) initializeCluster(model *Model) error {
return nil return nil
} }
func (k *kubeManager) AllPods() []TestPod {
return k.allPods
}
func (k *kubeManager) AllPodStrings() []PodString {
return k.allPodStrings
}
func (k *kubeManager) DNSDomain() string {
return k.dnsDomain
}
func (k *kubeManager) NamespaceNames() []string {
return k.namespaceNames
}
// getPod gets a pod by namespace and name. // getPod gets a pod by namespace and name.
func (k *kubeManager) getPod(ns string, name string) (*v1.Pod, error) { func (k *kubeManager) getPod(ns string, name string) (*v1.Pod, error) {
kubePod, err := k.clientSet.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{}) kubePod, err := k.clientSet.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{})
@ -174,16 +204,6 @@ func (k *kubeManager) executeRemoteCommand(namespace string, pod string, contain
}) })
} }
// createNamespace is a convenience function for namespace setup.
func (k *kubeManager) createNamespace(ns *v1.Namespace) (*v1.Namespace, error) {
enforcePodSecurityBaseline(ns)
createdNamespace, err := k.clientSet.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("unable to create namespace %s: %w", ns.Name, err)
}
return createdNamespace, nil
}
// createService is a convenience function for service setup. // createService is a convenience function for service setup.
func (k *kubeManager) createService(service *v1.Service) (*v1.Service, error) { func (k *kubeManager) createService(service *v1.Service) (*v1.Service, error) {
ns := service.Namespace ns := service.Namespace
@ -209,8 +229,8 @@ func (k *kubeManager) createPod(pod *v1.Pod) (*v1.Pod, error) {
} }
// cleanNetworkPolicies is a convenience function for deleting network policies before startup of any new test. // cleanNetworkPolicies is a convenience function for deleting network policies before startup of any new test.
func (k *kubeManager) cleanNetworkPolicies(namespaces []string) error { func (k *kubeManager) cleanNetworkPolicies() error {
for _, ns := range namespaces { for _, ns := range k.namespaceNames {
framework.Logf("deleting policies in %s ..........", ns) framework.Logf("deleting policies in %s ..........", ns)
l, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), metav1.ListOptions{}) l, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil { if err != nil {
@ -258,36 +278,16 @@ func (k *kubeManager) getNamespace(ns string) (*v1.Namespace, error) {
return selectedNameSpace, nil return selectedNameSpace, nil
} }
// setNamespaceLabels sets the labels for a namespace object in kubernetes. // getProbeTimeoutSeconds returns a timeout for how long the probe should work before failing a check, and takes windows heuristics into account, where requests can take longer sometimes.
func (k *kubeManager) setNamespaceLabels(ns string, labels map[string]string) error { func getProbeTimeoutSeconds() int {
selectedNameSpace, err := k.getNamespace(ns) timeoutSeconds := 1
if err != nil { if framework.NodeOSDistroIs("windows") {
return err timeoutSeconds = 3
} }
selectedNameSpace.ObjectMeta.Labels = labels return timeoutSeconds
enforcePodSecurityBaseline(selectedNameSpace)
_, err = k.clientSet.CoreV1().Namespaces().Update(context.TODO(), selectedNameSpace, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("unable to update namespace %s: %w", ns, err)
}
return nil
} }
// deleteNamespaces removes a namespace from kubernetes. // getWorkers returns the number of workers suggested to run when testing.
func (k *kubeManager) deleteNamespaces(namespaces []string) error { func getWorkers() int {
for _, ns := range namespaces { return 3
err := k.clientSet.CoreV1().Namespaces().Delete(context.TODO(), ns, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("unable to delete namespace %s: %w", ns, err)
}
}
return nil
}
func enforcePodSecurityBaseline(ns *v1.Namespace) {
if len(ns.ObjectMeta.Labels) == 0 {
ns.ObjectMeta.Labels = make(map[string]string)
}
// TODO(https://github.com/kubernetes/kubernetes/issues/108298): route namespace creation via framework.Framework.CreateNamespace
ns.ObjectMeta.Labels[admissionapi.EnforceLevelLabel] = string(admissionapi.LevelBaseline)
} }

View File

@ -29,43 +29,35 @@ import (
// Model defines the namespaces, deployments, services, pods, containers and associated // Model defines the namespaces, deployments, services, pods, containers and associated
// data for network policy test cases and provides the source of truth // data for network policy test cases and provides the source of truth
type Model struct { type Model struct {
Namespaces []*Namespace Namespaces []*Namespace
allPodStrings *[]PodString PodNames []string
allPods *[]*Pod Ports []int32
// the raw data Protocols []v1.Protocol
NamespaceNames []string
PodNames []string
Ports []int32
Protocols []v1.Protocol
DNSDomain string
} }
// NewWindowsModel returns a model specific to windows testing. // NewWindowsModel returns a model specific to windows testing.
func NewWindowsModel(namespaces []string, podNames []string, ports []int32, dnsDomain string) *Model { func NewWindowsModel(namespaceBaseNames []string, podNames []string, ports []int32) *Model {
return NewModel(namespaces, podNames, ports, []v1.Protocol{v1.ProtocolTCP, v1.ProtocolUDP}, dnsDomain) return NewModel(namespaceBaseNames, podNames, ports, []v1.Protocol{v1.ProtocolTCP, v1.ProtocolUDP})
} }
// NewModel instantiates a model based on: // NewModel instantiates a model based on:
// - namespaces // - namespaceBaseNames
// - pods // - pods
// - ports to listen on // - ports to listen on
// - protocols to listen on // - protocols to listen on
// The total number of pods is the number of namespaces x the number of pods per namespace. // The total number of pods is the number of namespaces x the number of pods per namespace.
// The number of containers per pod is the number of ports x the number of protocols. // The number of containers per pod is the number of ports x the number of protocols.
// The *total* number of containers is namespaces x pods x ports x protocols. // The *total* number of containers is namespaces x pods x ports x protocols.
func NewModel(namespaces []string, podNames []string, ports []int32, protocols []v1.Protocol, dnsDomain string) *Model { func NewModel(namespaceBaseNames []string, podNames []string, ports []int32, protocols []v1.Protocol) *Model {
model := &Model{ model := &Model{
NamespaceNames: namespaces, PodNames: podNames,
PodNames: podNames, Ports: ports,
Ports: ports, Protocols: protocols,
Protocols: protocols,
DNSDomain: dnsDomain,
} }
framework.Logf("DnsDomain %v", model.DNSDomain)
// build the entire "model" for the overall test, which means, building // build the entire "model" for the overall test, which means, building
// namespaces, pods, containers for each protocol. // namespaces, pods, containers for each protocol.
for _, ns := range namespaces { for _, ns := range namespaceBaseNames {
var pods []*Pod var pods []*Pod
for _, podName := range podNames { for _, podName := range podNames {
var containers []*Container var containers []*Container
@ -78,112 +70,30 @@ func NewModel(namespaces []string, podNames []string, ports []int32, protocols [
} }
} }
pods = append(pods, &Pod{ pods = append(pods, &Pod{
Namespace: ns,
Name: podName, Name: podName,
Containers: containers, Containers: containers,
}) })
} }
model.Namespaces = append(model.Namespaces, &Namespace{Name: ns, Pods: pods}) model.Namespaces = append(model.Namespaces, &Namespace{
BaseName: ns,
Pods: pods,
})
} }
return model return model
} }
// GetProbeTimeoutSeconds returns a timeout for how long the probe should work before failing a check, and takes windows heuristics into account, where requests can take longer sometimes.
func (m *Model) GetProbeTimeoutSeconds() int {
timeoutSeconds := 1
if framework.NodeOSDistroIs("windows") {
timeoutSeconds = 3
}
return timeoutSeconds
}
// GetWorkers returns the number of workers suggested to run when testing.
func (m *Model) GetWorkers() int {
return 3
}
// NewReachability instantiates a default-true reachability from the model's pods
func (m *Model) NewReachability() *Reachability {
return NewReachability(m.AllPods(), true)
}
// AllPodStrings returns a slice of all pod strings
func (m *Model) AllPodStrings() []PodString {
if m.allPodStrings == nil {
var pods []PodString
for _, ns := range m.Namespaces {
for _, pod := range ns.Pods {
pods = append(pods, pod.PodString())
}
}
m.allPodStrings = &pods
}
return *m.allPodStrings
}
// AllPods returns a slice of all pods
func (m *Model) AllPods() []*Pod {
if m.allPods == nil {
var pods []*Pod
for _, ns := range m.Namespaces {
for _, pod := range ns.Pods {
pods = append(pods, pod)
}
}
m.allPods = &pods
}
return *m.allPods
}
// FindPod returns the pod of matching namespace and name, or an error
func (m *Model) FindPod(ns string, name string) (*Pod, error) {
for _, namespace := range m.Namespaces {
for _, pod := range namespace.Pods {
if namespace.Name == ns && pod.Name == name {
return pod, nil
}
}
}
return nil, fmt.Errorf("unable to find pod %s/%s", ns, name)
}
// Namespace is the abstract representation of what matters to network policy // Namespace is the abstract representation of what matters to network policy
// tests for a namespace; i.e. it ignores kube implementation details // tests for a namespace; i.e. it ignores kube implementation details
type Namespace struct { type Namespace struct {
Name string BaseName string
Pods []*Pod Pods []*Pod
}
// Spec builds a kubernetes namespace spec
func (ns *Namespace) Spec() *v1.Namespace {
return &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns.Name,
Labels: ns.LabelSelector(),
},
}
}
// LabelSelector returns the default labels that should be placed on a namespace
// in order for it to be uniquely selectable by label selectors
func (ns *Namespace) LabelSelector() map[string]string {
return map[string]string{
"ns": ns.Name,
}
} }
// Pod is the abstract representation of what matters to network policy tests for // Pod is the abstract representation of what matters to network policy tests for
// a pod; i.e. it ignores kube implementation details // a pod; i.e. it ignores kube implementation details
type Pod struct { type Pod struct {
Namespace string
Name string Name string
Containers []*Container Containers []*Container
ServiceIP string
}
// PodString returns a corresponding pod string
func (p *Pod) PodString() PodString {
return NewPodString(p.Namespace, p.Name)
} }
// ContainerSpecs builds kubernetes container specs for the pod // ContainerSpecs builds kubernetes container specs for the pod
@ -195,31 +105,27 @@ func (p *Pod) ContainerSpecs() []v1.Container {
return containers return containers
} }
func (p *Pod) labelSelectorKey() string { func podNameLabelKey() string {
return "pod" return "pod"
} }
func (p *Pod) labelSelectorValue() string { // Labels returns the default labels that should be placed on a pod/deployment
return p.Name
}
// LabelSelector returns the default labels that should be placed on a pod/deployment
// in order for it to be uniquely selectable by label selectors // in order for it to be uniquely selectable by label selectors
func (p *Pod) LabelSelector() map[string]string { func (p *Pod) Labels() map[string]string {
return map[string]string{ return map[string]string{
p.labelSelectorKey(): p.labelSelectorValue(), podNameLabelKey(): p.Name,
} }
} }
// KubePod returns the kube pod (will add label selectors for windows if needed). // KubePod returns the kube pod (will add label selectors for windows if needed).
func (p *Pod) KubePod() *v1.Pod { func (p *Pod) KubePod(namespace string) *v1.Pod {
zero := int64(0) zero := int64(0)
thePod := &v1.Pod{ thePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: p.Name, Name: p.Name,
Labels: p.LabelSelector(), Labels: p.Labels(),
Namespace: p.Namespace, Namespace: namespace,
}, },
Spec: v1.PodSpec{ Spec: v1.PodSpec{
TerminationGracePeriodSeconds: &zero, TerminationGracePeriodSeconds: &zero,
@ -235,26 +141,25 @@ func (p *Pod) KubePod() *v1.Pod {
return thePod return thePod
} }
// QualifiedServiceAddress returns the address that can be used to hit a service from // QualifiedServiceAddress returns the address that can be used to access the service
// any namespace in the cluster func (p *Pod) QualifiedServiceAddress(namespace string, dnsDomain string) string {
func (p *Pod) QualifiedServiceAddress(dnsDomain string) string { return fmt.Sprintf("%s.%s.svc.%s", p.ServiceName(namespace), namespace, dnsDomain)
return fmt.Sprintf("%s.%s.svc.%s", p.ServiceName(), p.Namespace, dnsDomain)
} }
// ServiceName returns the unqualified service name // ServiceName returns the unqualified service name
func (p *Pod) ServiceName() string { func (p *Pod) ServiceName(namespace string) string {
return fmt.Sprintf("s-%s-%s", p.Namespace, p.Name) return fmt.Sprintf("s-%s-%s", namespace, p.Name)
} }
// Service returns a kube service spec // Service returns a kube service spec
func (p *Pod) Service() *v1.Service { func (p *Pod) Service(namespace string) *v1.Service {
service := &v1.Service{ service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: p.ServiceName(), Name: p.ServiceName(namespace),
Namespace: p.Namespace, Namespace: namespace,
}, },
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
Selector: p.LabelSelector(), Selector: p.Labels(),
}, },
} }
for _, container := range p.Containers { for _, container := range p.Containers {

File diff suppressed because it is too large Load Diff

View File

@ -32,8 +32,9 @@ type Prober interface {
// ProbeJob packages the data for the input of a pod->pod connectivity probe // ProbeJob packages the data for the input of a pod->pod connectivity probe
type ProbeJob struct { type ProbeJob struct {
PodFrom *Pod PodFrom TestPod
PodTo *Pod PodTo TestPod
PodToServiceIP string
ToPort int ToPort int
ToPodDNSDomain string ToPodDNSDomain string
Protocol v1.Protocol Protocol v1.Protocol
@ -48,13 +49,12 @@ type ProbeJobResults struct {
} }
// ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability` // ProbePodToPodConnectivity runs a series of probes in kube, and records the results in `testCase.Reachability`
func ProbePodToPodConnectivity(prober Prober, model *Model, testCase *TestCase) { func ProbePodToPodConnectivity(prober Prober, allPods []TestPod, dnsDomain string, testCase *TestCase) {
allPods := model.AllPods()
size := len(allPods) * len(allPods) size := len(allPods) * len(allPods)
jobs := make(chan *ProbeJob, size) jobs := make(chan *ProbeJob, size)
results := make(chan *ProbeJobResults, size) results := make(chan *ProbeJobResults, size)
for i := 0; i < model.GetWorkers(); i++ { for i := 0; i < getWorkers(); i++ {
go probeWorker(prober, jobs, results, model.GetProbeTimeoutSeconds()) go probeWorker(prober, jobs, results, getProbeTimeoutSeconds())
} }
for _, podFrom := range allPods { for _, podFrom := range allPods {
for _, podTo := range allPods { for _, podTo := range allPods {
@ -62,7 +62,7 @@ func ProbePodToPodConnectivity(prober Prober, model *Model, testCase *TestCase)
PodFrom: podFrom, PodFrom: podFrom,
PodTo: podTo, PodTo: podTo,
ToPort: testCase.ToPort, ToPort: testCase.ToPort,
ToPodDNSDomain: model.DNSDomain, ToPodDNSDomain: dnsDomain,
Protocol: testCase.Protocol, Protocol: testCase.Protocol,
} }
} }
@ -95,6 +95,7 @@ func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobR
defer ginkgo.GinkgoRecover() defer ginkgo.GinkgoRecover()
for job := range jobs { for job := range jobs {
podFrom := job.PodFrom podFrom := job.PodFrom
// defensive programming: this should not be possible as we already check in initializeClusterFromModel
if netutils.ParseIPSloppy(job.PodTo.ServiceIP) == nil { if netutils.ParseIPSloppy(job.PodTo.ServiceIP) == nil {
results <- &ProbeJobResults{ results <- &ProbeJobResults{
Job: job, Job: job,
@ -111,7 +112,7 @@ func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobR
connected, command, err := prober.probeConnectivity(&probeConnectivityArgs{ connected, command, err := prober.probeConnectivity(&probeConnectivityArgs{
nsFrom: podFrom.Namespace, nsFrom: podFrom.Namespace,
podFrom: podFrom.Name, podFrom: podFrom.Name,
containerFrom: podFrom.Containers[0].Name(), containerFrom: podFrom.ContainerName,
addrTo: job.PodTo.ServiceIP, addrTo: job.PodTo.ServiceIP,
protocol: job.Protocol, protocol: job.Protocol,
toPort: job.ToPort, toPort: job.ToPort,

View File

@ -79,21 +79,21 @@ func (p *Peer) Matches(pod PodString) bool {
// Reachability packages the data for a cluster-wide connectivity probe // Reachability packages the data for a cluster-wide connectivity probe
type Reachability struct { type Reachability struct {
Expected *TruthTable Expected *TruthTable
Observed *TruthTable Observed *TruthTable
Pods []*Pod PodStrings []PodString
} }
// NewReachability instantiates a reachability // NewReachability instantiates a reachability
func NewReachability(pods []*Pod, defaultExpectation bool) *Reachability { func NewReachability(podStrings []PodString, defaultExpectation bool) *Reachability {
var podNames []string var podNames []string
for _, pod := range pods { for _, podString := range podStrings {
podNames = append(podNames, pod.PodString().String()) podNames = append(podNames, podString.String())
} }
r := &Reachability{ r := &Reachability{
Expected: NewTruthTableFromItems(podNames, &defaultExpectation), Expected: NewTruthTableFromItems(podNames, &defaultExpectation),
Observed: NewTruthTableFromItems(podNames, nil), Observed: NewTruthTableFromItems(podNames, nil),
Pods: pods, PodStrings: podStrings,
} }
return r return r
} }
@ -101,8 +101,8 @@ func NewReachability(pods []*Pod, defaultExpectation bool) *Reachability {
// AllowLoopback expects all communication from a pod to itself to be allowed. // AllowLoopback expects all communication from a pod to itself to be allowed.
// In general, call it after setting up any other rules since loopback logic follows no policy. // In general, call it after setting up any other rules since loopback logic follows no policy.
func (r *Reachability) AllowLoopback() { func (r *Reachability) AllowLoopback() {
for _, pod := range r.Pods { for _, podString := range r.PodStrings {
podName := pod.PodString().String() podName := podString.String()
r.Expected.Set(podName, podName, true) r.Expected.Set(podName, podName, true)
} }
} }
@ -130,11 +130,11 @@ func (r *Reachability) ExpectAllEgress(pod PodString, connected bool) {
// ExpectPeer sets expected values using Peer matchers // ExpectPeer sets expected values using Peer matchers
func (r *Reachability) ExpectPeer(from *Peer, to *Peer, connected bool) { func (r *Reachability) ExpectPeer(from *Peer, to *Peer, connected bool) {
for _, fromPod := range r.Pods { for _, fromPod := range r.PodStrings {
if from.Matches(fromPod.PodString()) { if from.Matches(fromPod) {
for _, toPod := range r.Pods { for _, toPod := range r.PodStrings {
if to.Matches(toPod.PodString()) { if to.Matches(toPod) {
r.Expected.Set(string(fromPod.PodString()), string(toPod.PodString()), connected) r.Expected.Set(fromPod.String(), toPod.String(), connected)
} }
} }
} }
@ -143,7 +143,7 @@ func (r *Reachability) ExpectPeer(from *Peer, to *Peer, connected bool) {
// Observe records a single connectivity observation // Observe records a single connectivity observation
func (r *Reachability) Observe(fromPod PodString, toPod PodString, isConnected bool) { func (r *Reachability) Observe(fromPod PodString, toPod PodString, isConnected bool) {
r.Observed.Set(string(fromPod), string(toPod), isConnected) r.Observed.Set(fromPod.String(), toPod.String(), isConnected)
} }
// Summary produces a useful summary of expected and observed data // Summary produces a useful summary of expected and observed data

View File

@ -87,9 +87,9 @@ func waitForHTTPServers(k *kubeManager, model *Model) error {
for i := 0; i < maxTries; i++ { for i := 0; i < maxTries; i++ {
for caseName, testCase := range testCases { for caseName, testCase := range testCases {
if notReady[caseName] { if notReady[caseName] {
reachability := NewReachability(model.AllPods(), true) reachability := NewReachability(k.AllPodStrings(), true)
testCase.Reachability = reachability testCase.Reachability = reachability
ProbePodToPodConnectivity(k, model, testCase) ProbePodToPodConnectivity(k, k.AllPods(), k.DNSDomain(), testCase)
_, wrong, _, _ := reachability.Summary(ignoreLoopback) _, wrong, _, _ := reachability.Summary(ignoreLoopback)
if wrong == 0 { if wrong == 0 {
framework.Logf("server %s is ready", caseName) framework.Logf("server %s is ready", caseName)
@ -108,16 +108,16 @@ func waitForHTTPServers(k *kubeManager, model *Model) error {
} }
// ValidateOrFail validates connectivity // ValidateOrFail validates connectivity
func ValidateOrFail(k8s *kubeManager, model *Model, testCase *TestCase) { func ValidateOrFail(k8s *kubeManager, testCase *TestCase) {
ginkgo.By("Validating reachability matrix...") ginkgo.By("Validating reachability matrix...")
// 1st try // 1st try
ginkgo.By("Validating reachability matrix... (FIRST TRY)") ginkgo.By("Validating reachability matrix... (FIRST TRY)")
ProbePodToPodConnectivity(k8s, model, testCase) ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase)
// 2nd try, in case first one failed // 2nd try, in case first one failed
if _, wrong, _, _ := testCase.Reachability.Summary(ignoreLoopback); wrong != 0 { if _, wrong, _, _ := testCase.Reachability.Summary(ignoreLoopback); wrong != 0 {
framework.Logf("failed first probe %d wrong results ... retrying (SECOND TRY)", wrong) framework.Logf("failed first probe %d wrong results ... retrying (SECOND TRY)", wrong)
ProbePodToPodConnectivity(k8s, model, testCase) ProbePodToPodConnectivity(k8s, k8s.AllPods(), k8s.DNSDomain(), testCase)
} }
// at this point we know if we passed or failed, print final matrix and pass/fail the test. // at this point we know if we passed or failed, print final matrix and pass/fail the test.
@ -131,40 +131,43 @@ func ValidateOrFail(k8s *kubeManager, model *Model, testCase *TestCase) {
framework.Logf("VALIDATION SUCCESSFUL") framework.Logf("VALIDATION SUCCESSFUL")
} }
// UpdateNamespaceLabels sets the labels for a namespace // AddNamespaceLabels adds a new label to a namespace
func UpdateNamespaceLabels(k8s *kubeManager, ns string, newNsLabel map[string]string) { func AddNamespaceLabel(k8s *kubeManager, name string, key string, val string) {
err := k8s.setNamespaceLabels(ns, newNsLabel) ns, err := k8s.getNamespace(name)
framework.ExpectNoError(err, "Update namespace %s labels", ns) framework.ExpectNoError(err, "Unable to get namespace %s", name)
err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) { ns.Labels[key] = val
namespace, err := k8s.getNamespace(ns) _, err = k8s.clientSet.CoreV1().Namespaces().Update(context.TODO(), ns, metav1.UpdateOptions{})
if err != nil { framework.ExpectNoError(err, "Unable to update namespace %s", name)
return false, err
}
for key, expected := range newNsLabel {
if actual, ok := namespace.Labels[key]; !ok || (expected != actual) {
return false, nil
}
}
return true, nil
})
framework.ExpectNoError(err, "Unable to wait for ns %s to update labels", ns)
} }
// AddPodLabels adds new labels to a deployment's template // DeleteNamespaceLabel deletes a label from a namespace (if present)
func AddPodLabels(k8s *kubeManager, pod *Pod, newPodLabels map[string]string) { func DeleteNamespaceLabel(k8s *kubeManager, name string, key string) {
kubePod, err := k8s.clientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) ns, err := k8s.getNamespace(name)
framework.ExpectNoError(err, "Unable to get pod %s/%s", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to get namespace %s", name)
if _, ok := ns.Labels[key]; !ok {
// nothing to do if the label is not present
return
}
delete(ns.Labels, key)
_, err = k8s.clientSet.CoreV1().Namespaces().Update(context.TODO(), ns, metav1.UpdateOptions{})
framework.ExpectNoError(err, "Unable to update namespace %s", name)
}
// AddPodLabels adds new labels to a running pod
func AddPodLabels(k8s *kubeManager, namespace string, name string, newPodLabels map[string]string) {
kubePod, err := k8s.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
framework.ExpectNoError(err, "Unable to get pod %s/%s", namespace, name)
if kubePod.Labels == nil { if kubePod.Labels == nil {
kubePod.Labels = map[string]string{} kubePod.Labels = map[string]string{}
} }
for key, val := range newPodLabels { for key, val := range newPodLabels {
kubePod.Labels[key] = val kubePod.Labels[key] = val
} }
_, err = k8s.clientSet.CoreV1().Pods(pod.Namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{}) _, err = k8s.clientSet.CoreV1().Pods(namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{})
framework.ExpectNoError(err, "Unable to add pod %s/%s labels", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to add pod %s/%s labels", namespace, name)
err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) { err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) {
waitForPod, err := k8s.getPod(pod.Namespace, pod.Name) waitForPod, err := k8s.getPod(namespace, name)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -175,33 +178,31 @@ func AddPodLabels(k8s *kubeManager, pod *Pod, newPodLabels map[string]string) {
} }
return true, nil return true, nil
}) })
framework.ExpectNoError(err, "Unable to wait for pod %s/%s to update labels", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to wait for pod %s/%s to update labels", namespace, name)
}
// ResetNamespaceLabels resets the labels for a namespace
func ResetNamespaceLabels(k8s *kubeManager, ns string) {
UpdateNamespaceLabels(k8s, ns, (&Namespace{Name: ns}).LabelSelector())
} }
// ResetPodLabels resets the labels for a deployment's template // ResetPodLabels resets the labels for a deployment's template
func ResetPodLabels(k8s *kubeManager, pod *Pod) { func ResetPodLabels(k8s *kubeManager, namespace string, name string) {
kubePod, err := k8s.clientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) kubePod, err := k8s.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
framework.ExpectNoError(err, "Unable to get pod %s/%s", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to get pod %s/%s", namespace, name)
kubePod.Labels = pod.LabelSelector() labels := map[string]string{
_, err = k8s.clientSet.CoreV1().Pods(pod.Namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{}) podNameLabelKey(): name,
framework.ExpectNoError(err, "Unable to add pod %s/%s labels", pod.Namespace, pod.Name) }
kubePod.Labels = labels
_, err = k8s.clientSet.CoreV1().Pods(namespace).Update(context.TODO(), kubePod, metav1.UpdateOptions{})
framework.ExpectNoError(err, "Unable to add pod %s/%s labels", namespace, name)
err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) { err = wait.PollImmediate(waitInterval, waitTimeout, func() (done bool, err error) {
waitForPod, err := k8s.getPod(pod.Namespace, pod.Name) waitForPod, err := k8s.getPod(namespace, name)
if err != nil { if err != nil {
return false, nil return false, nil
} }
for key, expected := range pod.LabelSelector() { for key, expected := range labels {
if actual, ok := waitForPod.Labels[key]; !ok || (expected != actual) { if actual, ok := waitForPod.Labels[key]; !ok || (expected != actual) {
return false, nil return false, nil
} }
} }
return true, nil return true, nil
}) })
framework.ExpectNoError(err, "Unable to wait for pod %s/%s to update labels", pod.Namespace, pod.Name) framework.ExpectNoError(err, "Unable to wait for pod %s/%s to update labels", namespace, name)
} }