From 9d75c958cedbf354668bba5a6f89393ac661e858 Mon Sep 17 00:00:00 2001 From: Laszlo Janosi Date: Thu, 3 Nov 2022 10:54:14 +0200 Subject: [PATCH] Fix review comments. Implement endpoint port validation that verifies the protocol, too. --- test/e2e/framework/endpoints/ports.go | 24 +++++ test/e2e/framework/endpointslice/ports.go | 29 +++++ test/e2e/framework/service/jig.go | 20 +--- test/e2e/network/service.go | 123 ++++++++++++++++++++-- 4 files changed, 171 insertions(+), 25 deletions(-) diff --git a/test/e2e/framework/endpoints/ports.go b/test/e2e/framework/endpoints/ports.go index efd960d93f4..8c116b031a5 100644 --- a/test/e2e/framework/endpoints/ports.go +++ b/test/e2e/framework/endpoints/ports.go @@ -24,6 +24,9 @@ import ( // PortsByPodUID is a map that maps pod UID to container ports. type PortsByPodUID map[types.UID][]int +// FullPortsByPodUID is a map that maps pod UID to container ports. +type FullPortsByPodUID map[types.UID][]v1.ContainerPort + // GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints. func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { m := PortsByPodUID{} @@ -40,3 +43,24 @@ func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { } return m } + +// GetFullContainerPortsByPodUID returns a FullPortsByPodUID map on the given endpoints with all the port data. +func GetFullContainerPortsByPodUID(ep *v1.Endpoints) FullPortsByPodUID { + m := FullPortsByPodUID{} + for _, ss := range ep.Subsets { + for _, port := range ss.Ports { + containerPort := v1.ContainerPort{ + Name: port.Name, + ContainerPort: port.Port, + Protocol: port.Protocol, + } + for _, addr := range ss.Addresses { + if _, ok := m[addr.TargetRef.UID]; !ok { + m[addr.TargetRef.UID] = make([]v1.ContainerPort, 0) + } + m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], containerPort) + } + } + } + return m +} diff --git a/test/e2e/framework/endpointslice/ports.go b/test/e2e/framework/endpointslice/ports.go index c61230c7d2a..f51162b0083 100644 --- a/test/e2e/framework/endpointslice/ports.go +++ b/test/e2e/framework/endpointslice/ports.go @@ -17,6 +17,7 @@ limitations under the License. package endpointslice import ( + v1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" ) @@ -24,6 +25,9 @@ import ( // PortsByPodUID is a map that maps pod UID to container ports. type PortsByPodUID map[types.UID][]int +// FullPortsByPodUID is a map that maps pod UID to container ports. +type FullPortsByPodUID map[types.UID][]v1.ContainerPort + // GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints. func GetContainerPortsByPodUID(eps []discoveryv1.EndpointSlice) PortsByPodUID { m := PortsByPodUID{} @@ -44,3 +48,28 @@ func GetContainerPortsByPodUID(eps []discoveryv1.EndpointSlice) PortsByPodUID { } return m } + +// GetFullContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints. +func GetFullContainerPortsByPodUID(eps []discoveryv1.EndpointSlice) FullPortsByPodUID { + m := FullPortsByPodUID{} + + for _, es := range eps { + for _, port := range es.Ports { + if port.Port == nil { + continue + } + containerPort := v1.ContainerPort{ + Name: *port.Name, + ContainerPort: *port.Port, + Protocol: *port.Protocol, + } + for _, ep := range es.Endpoints { + if _, ok := m[ep.TargetRef.UID]; !ok { + m[ep.TargetRef.UID] = make([]v1.ContainerPort, 0) + } + m[ep.TargetRef.UID] = append(m[ep.TargetRef.UID], containerPort) + } + } + } + return m +} diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index 5e7b5ec7eb0..202c872ae12 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -1070,7 +1070,7 @@ func (j *TestJig) CreateSCTPServiceWithPort(tweak func(svc *v1.Service), port in // CreateLoadBalancerServiceWaitForClusterIPOnly creates a loadbalancer service and waits // for it to acquire a cluster IP -func (j *TestJig) CreateLoadBalancerServiceWaitForClusterIPOnly(timeout time.Duration, tweak func(svc *v1.Service)) (*v1.Service, error) { +func (j *TestJig) CreateLoadBalancerServiceWaitForClusterIPOnly(tweak func(svc *v1.Service)) (*v1.Service, error) { ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=LoadBalancer") svc := j.newServiceTemplate(v1.ProtocolTCP, 80) svc.Spec.Type = v1.ServiceTypeLoadBalancer @@ -1079,24 +1079,10 @@ func (j *TestJig) CreateLoadBalancerServiceWaitForClusterIPOnly(timeout time.Dur if tweak != nil { tweak(svc) } - _, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{}) + result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{}) if err != nil { return nil, fmt.Errorf("failed to create LoadBalancer Service %q: %v", svc.Name, err) } - ginkgo.By("waiting for cluster IP for loadbalancer service " + j.Namespace + "/" + j.Name) - return j.WaitForLoadBalancerClusterIP(timeout) -} - -// WaitForLoadBalancerClusterIP waits the given LoadBalancer service to have a ClusterIP, or returns an error after the given timeout -func (j *TestJig) WaitForLoadBalancerClusterIP(timeout time.Duration) (*v1.Service, error) { - framework.Logf("Waiting up to %v for LoadBalancer service %q to have a ClusterIP", timeout, j.Name) - service, err := j.waitForCondition(timeout, "have a ClusterIP", func(svc *v1.Service) bool { - return len(svc.Spec.ClusterIP) > 0 - }) - if err != nil { - return nil, err - } - - return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer) + return j.sanityCheckService(result, v1.ServiceTypeLoadBalancer) } diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index bebe2265af2..1a0ef2ff601 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -125,6 +125,12 @@ type portsByPodName map[string][]int // portsByPodUID is a map that maps pod name to container ports. type portsByPodUID map[types.UID][]int +// fullPortsByPodName is a map that maps pod name to container ports including their protocols. +type fullPortsByPodName map[string][]v1.ContainerPort + +// fullPortsByPodUID is a map that maps pod name to container ports. +type fullPortsByPodUID map[types.UID][]v1.ContainerPort + // affinityCheckFromPod returns interval, timeout and function pinging the service and // returning pinged hosts for pinging the service from execPod. func affinityCheckFromPod(execPod *v1.Pod, serviceIP string, servicePort int) (time.Duration, time.Duration, func() []string) { @@ -3750,7 +3756,7 @@ var _ = common.SIGDescribe("Services", func() { svc2port := "svc2" ginkgo.By("creating service " + serviceName + " in namespace " + ns) - svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(2*time.Minute, func(service *v1.Service) { + svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(service *v1.Service) { service.Spec.Ports = []v1.ServicePort{ { Name: "portname1", @@ -3760,7 +3766,7 @@ var _ = common.SIGDescribe("Services", func() { }, { Name: "portname2", - Port: 81, + Port: 80, TargetPort: intstr.FromString(svc2port), Protocol: v1.ProtocolUDP, }, @@ -3768,8 +3774,7 @@ var _ = common.SIGDescribe("Services", func() { }) framework.ExpectNoError(err) - port1 := 100 - port2 := 101 + containerPort := 100 names := map[string]bool{} defer func() { @@ -3782,20 +3787,20 @@ var _ = common.SIGDescribe("Services", func() { containerPorts := []v1.ContainerPort{ { Name: svc1port, - ContainerPort: int32(port1), + ContainerPort: int32(containerPort), Protocol: v1.ProtocolTCP, }, { Name: svc2port, - ContainerPort: int32(port2), + ContainerPort: int32(containerPort), Protocol: v1.ProtocolUDP, }, } podname1 := "pod1" - createPodOrFail(f, ns, podname1, jig.Labels, containerPorts, "netexec", "--http-port", strconv.Itoa(port1), "--udp-port", strconv.Itoa(port2)) - validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{podname1: {port1, port2}}) + createPodOrFail(f, ns, podname1, jig.Labels, containerPorts, "netexec", "--http-port", strconv.Itoa(containerPort), "--udp-port", strconv.Itoa(containerPort)) + validateEndpointsPortsWithProtocolsOrFail(cs, ns, serviceName, fullPortsByPodName{podname1: containerPorts}) ginkgo.By("Checking if the Service forwards traffic to pods") execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil) @@ -4236,6 +4241,108 @@ func restartComponent(cs clientset.Interface, cName, ns string, matchLabels map[ return err } +// validateEndpointsPortsWithProtocolsOrFail validates that the given service exists and is served by the given expectedEndpoints. +func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints fullPortsByPodName) { + ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", framework.ServiceStartTimeout, serviceName, namespace, expectedEndpoints)) + expectedPortsByPodUID, err := translatePortsByPodNameToPortsByPodUID(c, namespace, expectedEndpoints) + framework.ExpectNoError(err, "failed to translate pod name to UID, ns:%s, expectedEndpoints:%v", namespace, expectedEndpoints) + + var ( + pollErr error + i = 0 + ) + if pollErr = wait.PollImmediate(time.Second, framework.ServiceStartTimeout, func() (bool, error) { + i++ + + ep, err := c.CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) + if err != nil { + framework.Logf("Failed go get Endpoints object: %v", err) + // Retry the error + return false, nil + } + portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep)) + if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil { + if i%5 == 0 { + framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints) + } + return false, nil + } + + // If EndpointSlice API is enabled, then validate if appropriate EndpointSlice objects + // were also create/updated/deleted. + if _, err := c.Discovery().ServerResourcesForGroupVersion(discoveryv1.SchemeGroupVersion.String()); err == nil { + opts := metav1.ListOptions{ + LabelSelector: "kubernetes.io/service-name=" + serviceName, + } + es, err := c.DiscoveryV1().EndpointSlices(namespace).List(context.TODO(), opts) + if err != nil { + framework.Logf("Failed go list EndpointSlice objects: %v", err) + // Retry the error + return false, nil + } + portsByUID = fullPortsByPodUID(e2eendpointslice.GetFullContainerPortsByPodUID(es.Items)) + if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil { + if i%5 == 0 { + framework.Logf("Unexpected endpoint slices: found %v, expected %v, will retry", portsByUID, expectedEndpoints) + } + return false, nil + } + } + framework.Logf("successfully validated that service %s in namespace %s exposes endpoints %v", + serviceName, namespace, expectedEndpoints) + return true, nil + }); pollErr != nil { + if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{}); err == nil { + for _, pod := range pods.Items { + framework.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp) + } + } else { + framework.Logf("Can't list pod debug info: %v", err) + } + } + framework.ExpectNoError(pollErr, "error waithing for service %s in namespace %s to expose endpoints %v: %v", serviceName, namespace, expectedEndpoints) +} + +func translatePortsByPodNameToPortsByPodUID(c clientset.Interface, ns string, expectedEndpoints fullPortsByPodName) (fullPortsByPodUID, error) { + portsByUID := make(fullPortsByPodUID) + for name, portList := range expectedEndpoints { + pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err) + } + portsByUID[pod.ObjectMeta.UID] = portList + } + return portsByUID, nil +} + +func validatePortsAndProtocols(ep, expectedEndpoints fullPortsByPodUID) error { + if len(ep) != len(expectedEndpoints) { + // should not happen because we check this condition before + return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints) + } + for podUID := range expectedEndpoints { + if _, ok := ep[podUID]; !ok { + return fmt.Errorf("endpoint %v not found", podUID) + } + if len(ep[podUID]) != len(expectedEndpoints[podUID]) { + return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID]) + } + var match bool + for _, epPort := range ep[podUID] { + match = false + for _, expectedPort := range expectedEndpoints[podUID] { + if epPort.ContainerPort == expectedPort.ContainerPort && epPort.Protocol == expectedPort.Protocol { + match = true + } + } + if !match { + return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID]) + } + } + } + return nil +} + var _ = common.SIGDescribe("SCTP [LinuxOnly]", func() { f := framework.NewDefaultFramework("sctp") f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged