Fix review comments. Implement endpoint port validation that verifies the protocol, too.

This commit is contained in:
Laszlo Janosi 2022-11-03 10:54:14 +02:00
parent 82ce61afc7
commit 9d75c958ce
4 changed files with 171 additions and 25 deletions

View File

@ -24,6 +24,9 @@ import (
// PortsByPodUID is a map that maps pod UID to container ports.
type PortsByPodUID map[types.UID][]int
// FullPortsByPodUID is a map that maps pod UID to container ports.
type FullPortsByPodUID map[types.UID][]v1.ContainerPort
// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID {
m := PortsByPodUID{}
@ -40,3 +43,24 @@ func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID {
}
return m
}
// GetFullContainerPortsByPodUID returns a FullPortsByPodUID map on the given endpoints with all the port data.
func GetFullContainerPortsByPodUID(ep *v1.Endpoints) FullPortsByPodUID {
m := FullPortsByPodUID{}
for _, ss := range ep.Subsets {
for _, port := range ss.Ports {
containerPort := v1.ContainerPort{
Name: port.Name,
ContainerPort: port.Port,
Protocol: port.Protocol,
}
for _, addr := range ss.Addresses {
if _, ok := m[addr.TargetRef.UID]; !ok {
m[addr.TargetRef.UID] = make([]v1.ContainerPort, 0)
}
m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], containerPort)
}
}
}
return m
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package endpointslice
import (
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
)
@ -24,6 +25,9 @@ import (
// PortsByPodUID is a map that maps pod UID to container ports.
type PortsByPodUID map[types.UID][]int
// FullPortsByPodUID is a map that maps pod UID to container ports.
type FullPortsByPodUID map[types.UID][]v1.ContainerPort
// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
func GetContainerPortsByPodUID(eps []discoveryv1.EndpointSlice) PortsByPodUID {
m := PortsByPodUID{}
@ -44,3 +48,28 @@ func GetContainerPortsByPodUID(eps []discoveryv1.EndpointSlice) PortsByPodUID {
}
return m
}
// GetFullContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
func GetFullContainerPortsByPodUID(eps []discoveryv1.EndpointSlice) FullPortsByPodUID {
m := FullPortsByPodUID{}
for _, es := range eps {
for _, port := range es.Ports {
if port.Port == nil {
continue
}
containerPort := v1.ContainerPort{
Name: *port.Name,
ContainerPort: *port.Port,
Protocol: *port.Protocol,
}
for _, ep := range es.Endpoints {
if _, ok := m[ep.TargetRef.UID]; !ok {
m[ep.TargetRef.UID] = make([]v1.ContainerPort, 0)
}
m[ep.TargetRef.UID] = append(m[ep.TargetRef.UID], containerPort)
}
}
}
return m
}

View File

@ -1070,7 +1070,7 @@ func (j *TestJig) CreateSCTPServiceWithPort(tweak func(svc *v1.Service), port in
// CreateLoadBalancerServiceWaitForClusterIPOnly creates a loadbalancer service and waits
// for it to acquire a cluster IP
func (j *TestJig) CreateLoadBalancerServiceWaitForClusterIPOnly(timeout time.Duration, tweak func(svc *v1.Service)) (*v1.Service, error) {
func (j *TestJig) CreateLoadBalancerServiceWaitForClusterIPOnly(tweak func(svc *v1.Service)) (*v1.Service, error) {
ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=LoadBalancer")
svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
svc.Spec.Type = v1.ServiceTypeLoadBalancer
@ -1079,24 +1079,10 @@ func (j *TestJig) CreateLoadBalancerServiceWaitForClusterIPOnly(timeout time.Dur
if tweak != nil {
tweak(svc)
}
_, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create LoadBalancer Service %q: %v", svc.Name, err)
}
ginkgo.By("waiting for cluster IP for loadbalancer service " + j.Namespace + "/" + j.Name)
return j.WaitForLoadBalancerClusterIP(timeout)
}
// WaitForLoadBalancerClusterIP waits the given LoadBalancer service to have a ClusterIP, or returns an error after the given timeout
func (j *TestJig) WaitForLoadBalancerClusterIP(timeout time.Duration) (*v1.Service, error) {
framework.Logf("Waiting up to %v for LoadBalancer service %q to have a ClusterIP", timeout, j.Name)
service, err := j.waitForCondition(timeout, "have a ClusterIP", func(svc *v1.Service) bool {
return len(svc.Spec.ClusterIP) > 0
})
if err != nil {
return nil, err
}
return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
return j.sanityCheckService(result, v1.ServiceTypeLoadBalancer)
}

View File

@ -125,6 +125,12 @@ type portsByPodName map[string][]int
// portsByPodUID is a map that maps pod name to container ports.
type portsByPodUID map[types.UID][]int
// fullPortsByPodName is a map that maps pod name to container ports including their protocols.
type fullPortsByPodName map[string][]v1.ContainerPort
// fullPortsByPodUID is a map that maps pod name to container ports.
type fullPortsByPodUID map[types.UID][]v1.ContainerPort
// affinityCheckFromPod returns interval, timeout and function pinging the service and
// returning pinged hosts for pinging the service from execPod.
func affinityCheckFromPod(execPod *v1.Pod, serviceIP string, servicePort int) (time.Duration, time.Duration, func() []string) {
@ -3750,7 +3756,7 @@ var _ = common.SIGDescribe("Services", func() {
svc2port := "svc2"
ginkgo.By("creating service " + serviceName + " in namespace " + ns)
svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(2*time.Minute, func(service *v1.Service) {
svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(service *v1.Service) {
service.Spec.Ports = []v1.ServicePort{
{
Name: "portname1",
@ -3760,7 +3766,7 @@ var _ = common.SIGDescribe("Services", func() {
},
{
Name: "portname2",
Port: 81,
Port: 80,
TargetPort: intstr.FromString(svc2port),
Protocol: v1.ProtocolUDP,
},
@ -3768,8 +3774,7 @@ var _ = common.SIGDescribe("Services", func() {
})
framework.ExpectNoError(err)
port1 := 100
port2 := 101
containerPort := 100
names := map[string]bool{}
defer func() {
@ -3782,20 +3787,20 @@ var _ = common.SIGDescribe("Services", func() {
containerPorts := []v1.ContainerPort{
{
Name: svc1port,
ContainerPort: int32(port1),
ContainerPort: int32(containerPort),
Protocol: v1.ProtocolTCP,
},
{
Name: svc2port,
ContainerPort: int32(port2),
ContainerPort: int32(containerPort),
Protocol: v1.ProtocolUDP,
},
}
podname1 := "pod1"
createPodOrFail(f, ns, podname1, jig.Labels, containerPorts, "netexec", "--http-port", strconv.Itoa(port1), "--udp-port", strconv.Itoa(port2))
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{podname1: {port1, port2}})
createPodOrFail(f, ns, podname1, jig.Labels, containerPorts, "netexec", "--http-port", strconv.Itoa(containerPort), "--udp-port", strconv.Itoa(containerPort))
validateEndpointsPortsWithProtocolsOrFail(cs, ns, serviceName, fullPortsByPodName{podname1: containerPorts})
ginkgo.By("Checking if the Service forwards traffic to pods")
execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil)
@ -4236,6 +4241,108 @@ func restartComponent(cs clientset.Interface, cName, ns string, matchLabels map[
return err
}
// validateEndpointsPortsWithProtocolsOrFail validates that the given service exists and is served by the given expectedEndpoints.
func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints fullPortsByPodName) {
ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", framework.ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
expectedPortsByPodUID, err := translatePortsByPodNameToPortsByPodUID(c, namespace, expectedEndpoints)
framework.ExpectNoError(err, "failed to translate pod name to UID, ns:%s, expectedEndpoints:%v", namespace, expectedEndpoints)
var (
pollErr error
i = 0
)
if pollErr = wait.PollImmediate(time.Second, framework.ServiceStartTimeout, func() (bool, error) {
i++
ep, err := c.CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
framework.Logf("Failed go get Endpoints object: %v", err)
// Retry the error
return false, nil
}
portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep))
if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil {
if i%5 == 0 {
framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
}
return false, nil
}
// If EndpointSlice API is enabled, then validate if appropriate EndpointSlice objects
// were also create/updated/deleted.
if _, err := c.Discovery().ServerResourcesForGroupVersion(discoveryv1.SchemeGroupVersion.String()); err == nil {
opts := metav1.ListOptions{
LabelSelector: "kubernetes.io/service-name=" + serviceName,
}
es, err := c.DiscoveryV1().EndpointSlices(namespace).List(context.TODO(), opts)
if err != nil {
framework.Logf("Failed go list EndpointSlice objects: %v", err)
// Retry the error
return false, nil
}
portsByUID = fullPortsByPodUID(e2eendpointslice.GetFullContainerPortsByPodUID(es.Items))
if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil {
if i%5 == 0 {
framework.Logf("Unexpected endpoint slices: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
}
return false, nil
}
}
framework.Logf("successfully validated that service %s in namespace %s exposes endpoints %v",
serviceName, namespace, expectedEndpoints)
return true, nil
}); pollErr != nil {
if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{}); err == nil {
for _, pod := range pods.Items {
framework.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
}
} else {
framework.Logf("Can't list pod debug info: %v", err)
}
}
framework.ExpectNoError(pollErr, "error waithing for service %s in namespace %s to expose endpoints %v: %v", serviceName, namespace, expectedEndpoints)
}
func translatePortsByPodNameToPortsByPodUID(c clientset.Interface, ns string, expectedEndpoints fullPortsByPodName) (fullPortsByPodUID, error) {
portsByUID := make(fullPortsByPodUID)
for name, portList := range expectedEndpoints {
pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err)
}
portsByUID[pod.ObjectMeta.UID] = portList
}
return portsByUID, nil
}
func validatePortsAndProtocols(ep, expectedEndpoints fullPortsByPodUID) error {
if len(ep) != len(expectedEndpoints) {
// should not happen because we check this condition before
return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints)
}
for podUID := range expectedEndpoints {
if _, ok := ep[podUID]; !ok {
return fmt.Errorf("endpoint %v not found", podUID)
}
if len(ep[podUID]) != len(expectedEndpoints[podUID]) {
return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
}
var match bool
for _, epPort := range ep[podUID] {
match = false
for _, expectedPort := range expectedEndpoints[podUID] {
if epPort.ContainerPort == expectedPort.ContainerPort && epPort.Protocol == expectedPort.Protocol {
match = true
}
}
if !match {
return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
}
}
}
return nil
}
var _ = common.SIGDescribe("SCTP [LinuxOnly]", func() {
f := framework.NewDefaultFramework("sctp")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged