Merge pull request #127153 from Anurag252/master

Add a static NodePort allocator for e2e tests
This commit is contained in:
Kubernetes Prow Robot 2025-01-02 16:56:14 +01:00 committed by GitHub
commit 6746df77f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 157 additions and 10 deletions

View File

@ -20,9 +20,11 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/onsi/ginkgo/v2"
@ -58,6 +60,33 @@ var NodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
// It is copied from "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
var errAllocated = errors.New("provided port is already allocated")
// staticPortRange implements port allocation model described here
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/3668-reserved-service-nodeport-range
type staticPortRange struct {
sync.Mutex
baseport int32
length int32
reservedPorts sets.Set[int32]
}
func calculateRange(size int32) int32 {
var minPort int32 = 16
var step int32 = 32
var maxPort int32 = 128
return min(max(minPort, size/step), maxPort)
}
var staticPortAllocator *staticPortRange
// Initialize only once per test
func init() {
staticPortAllocator = &staticPortRange{
baseport: int32(NodePortRange.Base),
length: calculateRange(int32(NodePortRange.Size)),
reservedPorts: sets.New[int32](),
}
}
// TestJig is a test jig to help service testing.
type TestJig struct {
Client clientset.Interface
@ -82,6 +111,73 @@ func NewTestJig(client clientset.Interface, namespace, name string) *TestJig {
return j
}
// reservePort reserves the port provided as input.
// If an invalid port was provided or if the port is already reserved, it returns false
func (s *staticPortRange) reservePort(port int32) bool {
s.Lock()
defer s.Unlock()
if port < s.baseport || port > s.baseport+s.length || s.reservedPorts.Has(port) {
return false
}
s.reservedPorts.Insert(port)
return true
}
// getUnusedPort returns a free port from the range and returns its number and nil value
// the port is not allocated so the consumer should allocate it explicitly calling allocatePort()
// if none is available then it returns -1 and error
func (s *staticPortRange) getUnusedPort() (int32, error) {
s.Lock()
defer s.Unlock()
// start in a random offset
start := rand.Int31n(s.length)
for i := int32(0); i < s.length; i++ {
port := s.baseport + (start+i)%(s.length)
if !s.reservedPorts.Has(port) {
return port, nil
}
}
return -1, fmt.Errorf("no free ports were found")
}
// releasePort releases the port passed as an argument
func (s *staticPortRange) releasePort(port int32) {
s.Lock()
defer s.Unlock()
s.reservedPorts.Delete(port)
}
// GetUnusedStaticNodePort returns a free port in static range and a nil value
// If no port in static range is available it returns -1 and an error value
// Note that it is not guaranteed that the returned port is actually available on the apiserver;
// You must allocate a port, then attempt to create the service, then call
// ReserveStaticNodePort.
func GetUnusedStaticNodePort() (int32, error) {
return staticPortAllocator.getUnusedPort()
}
// ReserveStaticNodePort reserves the port provided as input. It is guaranteed
// that no other test will receive this port from GetUnusedStaticNodePort until
// after you call ReleaseStaticNodePort.
//
// port must have been previously allocated by GetUnusedStaticNodePort, and
// then successfully used as a NodePort or HealthCheckNodePort when creating
// a service. Trying to reserve a port that was not allocated by
// GetUnusedStaticNodePort, or reserving it before creating the associated service
// may cause other e2e tests to fail.
//
// If an invalid port was provided or if the port is already reserved, it returns false
func ReserveStaticNodePort(port int32) bool {
return staticPortAllocator.reservePort(port)
}
// ReleaseStaticNodePort releases the specified port.
// The corresponding service should have already been deleted, to ensure that the
// port allocator doesn't try to reuse it before the apiserver considers it available.
func ReleaseStaticNodePort(port int32) {
staticPortAllocator.releasePort(port)
}
// newServiceTemplate returns the default v1.Service template for this j, but
// does not actually create the Service. The default Service has the same name
// as the j and exposes the given port.

View File

@ -1682,10 +1682,37 @@ var _ = common.SIGDescribe("Services", func() {
service := t.BuildServiceSpec()
service.Spec.Type = v1.ServiceTypeNodePort
numberOfRetries := 5
ginkgo.By("creating service " + serviceName + " with type NodePort in namespace " + ns)
service, err := t.CreateService(service)
framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
var err error
for i := 0; i < numberOfRetries; i++ {
port, err := e2eservice.GetUnusedStaticNodePort()
framework.ExpectNoError(err, "Static node port allocator was not able to find a free nodeport.")
service.Spec.Ports[0].NodePort = port
service, err = t.CreateService(service)
// We will later delete this service and then recreate it with same nodeport. We need to ensure that
// another e2e test doesn't start listening on our old nodeport and conflicts re-creation of service
// hence we use ReserveStaticNodePort.
if err == nil {
nodePort := service.Spec.Ports[0].NodePort
ok := e2eservice.ReserveStaticNodePort(nodePort)
if !ok {
// We could not reserve the allocated port which means the port was either invalid or was reserved by another test.
// This indicates a problem in code and we have a log message to debug it.
framework.Failf("Static node port allocator was not able to reserve nodeport: %d", nodePort)
}
break
}
if apierrors.IsConflict(err) {
framework.Logf("node port %d is already allocated to other service, retrying ... : %v", port, err)
continue
}
framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
}
nodePort := service.Spec.Ports[0].NodePort
defer e2eservice.ReleaseStaticNodePort(nodePort)
if service.Spec.Type != v1.ServiceTypeNodePort {
framework.Failf("got unexpected Spec.Type for new service: %v", service)
@ -1700,7 +1727,6 @@ var _ = common.SIGDescribe("Services", func() {
if !e2eservice.NodePortRange.Contains(int(port.NodePort)) {
framework.Failf("got unexpected (out-of-range) port for new service: %v", service)
}
nodePort := port.NodePort
ginkgo.By("deleting original service " + serviceName)
err = t.DeleteService(serviceName)
@ -3931,10 +3957,37 @@ var _ = common.SIGDescribe("Services", func() {
}
ginkgo.By("creating the service")
svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
})
framework.ExpectNoError(err, "creating the service")
var svc *v1.Service
numberOfRetries := 5
for i := 0; i < numberOfRetries; i++ {
port, err := e2eservice.GetUnusedStaticNodePort()
framework.ExpectNoError(err, "Static node port allocator was not able to find a free nodeport.")
svc, err = jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
svc.Spec.HealthCheckNodePort = port
})
// We will later convert this service to Cluster traffic policy, but we need to ensure that
// another e2e test doesn't start listening on our old HealthCheckNodePort when we
// do that, so we use ReserveStaticNodePort.
if err == nil {
staticHealthCheckPort := svc.Spec.HealthCheckNodePort
ok := e2eservice.ReserveStaticNodePort(staticHealthCheckPort)
if !ok {
// We could not reserve the allocated port which means the port was either invalid or was reserved by another test.
// This indicates a problem in code and we have a log message to debug it.
framework.Failf("Static node port allocator was not able to reserve healthcheck nodeport: %d", staticHealthCheckPort)
}
break
}
if apierrors.IsConflict(err) {
framework.Logf("node port %d is already allocated to other service, retrying ... : %v", port, err)
continue
}
framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, namespace)
}
defer e2eservice.ReleaseStaticNodePort(svc.Spec.HealthCheckNodePort)
nodePortStr := fmt.Sprintf("%d", svc.Spec.Ports[0].NodePort)
hcNodePortStr := fmt.Sprintf("%d", svc.Spec.HealthCheckNodePort)
framework.Logf("NodePort is %s, HealthCheckNodePort is %s", nodePortStr, hcNodePortStr)
@ -4043,7 +4096,6 @@ var _ = common.SIGDescribe("Services", func() {
}
deadline = time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout)
// FIXME: this is racy; we need to use a reserved HCNP here.
ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the endpoint node when ExternalTrafficPolicy is Cluster")
checkOneHealthCheck(endpointNodeIP, false, "", deadline)
ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the execpod node when ExternalTrafficPolicy is Cluster")
@ -4062,7 +4114,6 @@ var _ = common.SIGDescribe("Services", func() {
_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
// Request the same healthCheckNodePort as before, to test the user-requested allocation path
// FIXME: we need to use a reserved HCNP here.
svc.Spec.HealthCheckNodePort = oldHealthCheckNodePort
})
framework.ExpectNoError(err, "updating ExternalTrafficPolicy and HealthCheckNodePort")