From a9b641464ebcb2179f04450180802e0994cc51fe Mon Sep 17 00:00:00 2001 From: Anurag252 Date: Thu, 5 Sep 2024 15:40:35 +0200 Subject: [PATCH] chore: adding support for staticPortRange Signed-off-by: Anurag252 chore: implementing review comments Signed-off-by: Anurag252 lint: implementing linting suggestions Signed-off-by: Anurag252 chore: review comments implementation Signed-off-by: Anurag252 chore: fixing build errors Signed-off-by: Anurag252 revert: removing empty spaces Signed-off-by: Anurag252 chore: implementing review suggestions Signed-off-by: Anurag252 chore: implementing review comments to get rid of typecasting Signed-off-by: Anurag252 fix: fixing failed tests Signed-off-by: Anurag252 fix: fixing failed tests Signed-off-by: Anurag252 fix: fixing failed tests Signed-off-by: Anurag252 chore: rename func as per code review Signed-off-by: Anurag252 chore: change in comments Signed-off-by: Anurag252 --- test/e2e/framework/service/jig.go | 96 +++++++++++++++++++++++++++++++ test/e2e/network/service.go | 71 +++++++++++++++++++---- 2 files changed, 157 insertions(+), 10 deletions(-) diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index dc565271d5f..5eec1bc7555 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -20,9 +20,11 @@ import ( "context" "errors" "fmt" + "math/rand" "net" "strconv" "strings" + "sync" "time" "github.com/onsi/ginkgo/v2" @@ -58,6 +60,33 @@ var NodePortRange = utilnet.PortRange{Base: 30000, Size: 2768} // It is copied from "k8s.io/kubernetes/pkg/registry/core/service/portallocator" var errAllocated = errors.New("provided port is already allocated") +// staticPortRange implements port allocation model described here +// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/3668-reserved-service-nodeport-range +type staticPortRange struct { + sync.Mutex + baseport int32 + length int32 + reservedPorts sets.Set[int32] +} + +func calculateRange(size int32) int32 { + var minPort int32 = 16 + var step int32 = 32 + var maxPort int32 = 128 + return min(max(minPort, size/step), maxPort) +} + +var staticPortAllocator *staticPortRange + +// Initialize only once per test +func init() { + staticPortAllocator = &staticPortRange{ + baseport: int32(NodePortRange.Base), + length: calculateRange(int32(NodePortRange.Size)), + reservedPorts: sets.New[int32](), + } +} + // TestJig is a test jig to help service testing. type TestJig struct { Client clientset.Interface @@ -82,6 +111,73 @@ func NewTestJig(client clientset.Interface, namespace, name string) *TestJig { return j } +// reservePort reserves the port provided as input. +// If an invalid port was provided or if the port is already reserved, it returns false +func (s *staticPortRange) reservePort(port int32) bool { + s.Lock() + defer s.Unlock() + if port < s.baseport || port > s.baseport+s.length || s.reservedPorts.Has(port) { + return false + } + s.reservedPorts.Insert(port) + return true +} + +// getUnusedPort returns a free port from the range and returns its number and nil value +// the port is not allocated so the consumer should allocate it explicitly calling allocatePort() +// if none is available then it returns -1 and error +func (s *staticPortRange) getUnusedPort() (int32, error) { + s.Lock() + defer s.Unlock() + // start in a random offset + start := rand.Int31n(s.length) + for i := int32(0); i < s.length; i++ { + port := s.baseport + (start+i)%(s.length) + if !s.reservedPorts.Has(port) { + return port, nil + } + } + return -1, fmt.Errorf("no free ports were found") +} + +// releasePort releases the port passed as an argument +func (s *staticPortRange) releasePort(port int32) { + s.Lock() + defer s.Unlock() + s.reservedPorts.Delete(port) +} + +// GetUnusedStaticNodePort returns a free port in static range and a nil value +// If no port in static range is available it returns -1 and an error value +// Note that it is not guaranteed that the returned port is actually available on the apiserver; +// You must allocate a port, then attempt to create the service, then call +// ReserveStaticNodePort. +func GetUnusedStaticNodePort() (int32, error) { + return staticPortAllocator.getUnusedPort() +} + +// ReserveStaticNodePort reserves the port provided as input. It is guaranteed +// that no other test will receive this port from GetUnusedStaticNodePort until +// after you call ReleaseStaticNodePort. +// +// port must have been previously allocated by GetUnusedStaticNodePort, and +// then successfully used as a NodePort or HealthCheckNodePort when creating +// a service. Trying to reserve a port that was not allocated by +// GetUnusedStaticNodePort, or reserving it before creating the associated service +// may cause other e2e tests to fail. +// +// If an invalid port was provided or if the port is already reserved, it returns false +func ReserveStaticNodePort(port int32) bool { + return staticPortAllocator.reservePort(port) +} + +// ReleaseStaticNodePort releases the specified port. +// The corresponding service should have already been deleted, to ensure that the +// port allocator doesn't try to reuse it before the apiserver considers it available. +func ReleaseStaticNodePort(port int32) { + staticPortAllocator.releasePort(port) +} + // newServiceTemplate returns the default v1.Service template for this j, but // does not actually create the Service. The default Service has the same name // as the j and exposes the given port. diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 11965b220e8..18064e22387 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -1682,10 +1682,37 @@ var _ = common.SIGDescribe("Services", func() { service := t.BuildServiceSpec() service.Spec.Type = v1.ServiceTypeNodePort - + numberOfRetries := 5 ginkgo.By("creating service " + serviceName + " with type NodePort in namespace " + ns) - service, err := t.CreateService(service) - framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns) + var err error + for i := 0; i < numberOfRetries; i++ { + port, err := e2eservice.GetUnusedStaticNodePort() + framework.ExpectNoError(err, "Static node port allocator was not able to find a free nodeport.") + service.Spec.Ports[0].NodePort = port + service, err = t.CreateService(service) + // We will later delete this service and then recreate it with same nodeport. We need to ensure that + // another e2e test doesn't start listening on our old nodeport and conflicts re-creation of service + // hence we use ReserveStaticNodePort. + if err == nil { + nodePort := service.Spec.Ports[0].NodePort + ok := e2eservice.ReserveStaticNodePort(nodePort) + if !ok { + // We could not reserve the allocated port which means the port was either invalid or was reserved by another test. + // This indicates a problem in code and we have a log message to debug it. + framework.Failf("Static node port allocator was not able to reserve nodeport: %d", nodePort) + } + break + } + if apierrors.IsConflict(err) { + framework.Logf("node port %d is already allocated to other service, retrying ... : %v", port, err) + continue + } + framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns) + + } + + nodePort := service.Spec.Ports[0].NodePort + defer e2eservice.ReleaseStaticNodePort(nodePort) if service.Spec.Type != v1.ServiceTypeNodePort { framework.Failf("got unexpected Spec.Type for new service: %v", service) @@ -1700,7 +1727,6 @@ var _ = common.SIGDescribe("Services", func() { if !e2eservice.NodePortRange.Contains(int(port.NodePort)) { framework.Failf("got unexpected (out-of-range) port for new service: %v", service) } - nodePort := port.NodePort ginkgo.By("deleting original service " + serviceName) err = t.DeleteService(serviceName) @@ -3931,10 +3957,37 @@ var _ = common.SIGDescribe("Services", func() { } ginkgo.By("creating the service") - svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(svc *v1.Service) { - svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal - }) - framework.ExpectNoError(err, "creating the service") + var svc *v1.Service + numberOfRetries := 5 + for i := 0; i < numberOfRetries; i++ { + port, err := e2eservice.GetUnusedStaticNodePort() + framework.ExpectNoError(err, "Static node port allocator was not able to find a free nodeport.") + svc, err = jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(svc *v1.Service) { + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + svc.Spec.HealthCheckNodePort = port + }) + // We will later convert this service to Cluster traffic policy, but we need to ensure that + // another e2e test doesn't start listening on our old HealthCheckNodePort when we + // do that, so we use ReserveStaticNodePort. + if err == nil { + staticHealthCheckPort := svc.Spec.HealthCheckNodePort + ok := e2eservice.ReserveStaticNodePort(staticHealthCheckPort) + if !ok { + // We could not reserve the allocated port which means the port was either invalid or was reserved by another test. + // This indicates a problem in code and we have a log message to debug it. + framework.Failf("Static node port allocator was not able to reserve healthcheck nodeport: %d", staticHealthCheckPort) + } + break + } + if apierrors.IsConflict(err) { + framework.Logf("node port %d is already allocated to other service, retrying ... : %v", port, err) + continue + } + framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, namespace) + + } + + defer e2eservice.ReleaseStaticNodePort(svc.Spec.HealthCheckNodePort) nodePortStr := fmt.Sprintf("%d", svc.Spec.Ports[0].NodePort) hcNodePortStr := fmt.Sprintf("%d", svc.Spec.HealthCheckNodePort) framework.Logf("NodePort is %s, HealthCheckNodePort is %s", nodePortStr, hcNodePortStr) @@ -4043,7 +4096,6 @@ var _ = common.SIGDescribe("Services", func() { } deadline = time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout) - // FIXME: this is racy; we need to use a reserved HCNP here. ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the endpoint node when ExternalTrafficPolicy is Cluster") checkOneHealthCheck(endpointNodeIP, false, "", deadline) ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the execpod node when ExternalTrafficPolicy is Cluster") @@ -4062,7 +4114,6 @@ var _ = common.SIGDescribe("Services", func() { _, err = jig.UpdateService(ctx, func(svc *v1.Service) { svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal // Request the same healthCheckNodePort as before, to test the user-requested allocation path - // FIXME: we need to use a reserved HCNP here. svc.Spec.HealthCheckNodePort = oldHealthCheckNodePort }) framework.ExpectNoError(err, "updating ExternalTrafficPolicy and HealthCheckNodePort")