diff --git a/staging/src/k8s.io/endpointslice/utils.go b/staging/src/k8s.io/endpointslice/utils.go index 3aa35ec20c1..2cd4e018318 100644 --- a/staging/src/k8s.io/endpointslice/utils.go +++ b/staging/src/k8s.io/endpointslice/utils.go @@ -392,6 +392,17 @@ func findPort(pod *v1.Pod, svcPort *v1.ServicePort) (int, error) { } } } + // also support sidecar container (initContainer with restartPolicy=Always) + for _, container := range pod.Spec.InitContainers { + if container.RestartPolicy == nil || *container.RestartPolicy != v1.ContainerRestartPolicyAlways { + continue + } + for _, port := range container.Ports { + if port.Name == name && port.Protocol == svcPort.Protocol { + return int(port.ContainerPort), nil + } + } + } case intstr.Int: return portName.IntValue(), nil } diff --git a/staging/src/k8s.io/endpointslice/utils_test.go b/staging/src/k8s.io/endpointslice/utils_test.go index 43dfac317b4..288c4e26fc3 100644 --- a/staging/src/k8s.io/endpointslice/utils_test.go +++ b/staging/src/k8s.io/endpointslice/utils_test.go @@ -515,6 +515,7 @@ func TestServiceControllerKey(t *testing.T) { func TestGetEndpointPorts(t *testing.T) { protoTCP := v1.ProtocolTCP + restartPolicyAlways := v1.ContainerRestartPolicyAlways testCases := map[string]struct { service *v1.Service @@ -585,6 +586,88 @@ func TestGetEndpointPorts(t *testing.T) { AppProtocol: pointer.String("https"), }}, }, + "service with named port for restartable init container": { + service: &v1.Service{ + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{ + Name: "http-sidecar", + Port: 8080, + TargetPort: intstr.FromInt32(8080), + Protocol: protoTCP, + }, { + Name: "http", + Port: 8090, + TargetPort: intstr.FromString("http"), + Protocol: protoTCP, + }}, + }, + }, + pod: &v1.Pod{ + Spec: v1.PodSpec{ + InitContainers: []v1.Container{{ + Ports: []v1.ContainerPort{{ + Name: "http-sidecar", + ContainerPort: int32(8080), + Protocol: protoTCP, + }}, + RestartPolicy: &restartPolicyAlways, + }}, + Containers: []v1.Container{{ + Ports: []v1.ContainerPort{{ + Name: "http", + ContainerPort: int32(8090), + Protocol: protoTCP, + }}, + }}, + }, + }, + expectedPorts: []*discovery.EndpointPort{{ + Name: pointer.String("http-sidecar"), + Port: pointer.Int32(8080), + Protocol: &protoTCP, + }, { + Name: pointer.String("http"), + Port: pointer.Int32(8090), + Protocol: &protoTCP, + }}, + }, + "service with same named port for regular and restartable init container": { + service: &v1.Service{ + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromString("http"), + Protocol: protoTCP, + }}, + }, + }, + pod: &v1.Pod{ + Spec: v1.PodSpec{ + InitContainers: []v1.Container{{ + Ports: []v1.ContainerPort{{ + Name: "http", + ContainerPort: int32(8080), + Protocol: protoTCP, + }}, + RestartPolicy: &restartPolicyAlways, + }}, + Containers: []v1.Container{{ + Ports: []v1.ContainerPort{{ + Name: "http", + ContainerPort: int32(8090), + Protocol: protoTCP, + }}, + }}, + }, + }, + expectedPorts: []*discovery.EndpointPort{{ + Name: pointer.String("http"), + Port: pointer.Int32(8090), + Protocol: &protoTCP, + }}, + }, } for name, tc := range testCases { diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index fe45f949fc0..b976dd3f201 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -4133,6 +4133,78 @@ var _ = common.SIGDescribe("Services", func() { checkOneNodePort(hostExecPodNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline) checkOneNodePort(thirdNodeIP, false, v1.ServiceExternalTrafficPolicyLocal, deadline) }) + + ginkgo.It("should connect to the ports exposed by restartable init containers", func(ctx context.Context) { + serviceName := "sidecar-with-port" + ns := f.Namespace.Name + + t := NewServerTest(cs, ns, serviceName) + defer func() { + defer ginkgo.GinkgoRecover() + errs := t.Cleanup() + if len(errs) != 0 { + framework.Failf("errors in cleanup: %v", errs) + } + }() + + name := "sidecar-port" + port := int32(8080) + namedPort := "http-sidecar" + + service := t.BuildServiceSpec() + service.Spec.Ports = []v1.ServicePort{ + { + Name: namedPort, + Port: port, + TargetPort: intstr.FromInt(int(port)), + }, + } + ports := []v1.ContainerPort{{Name: namedPort, ContainerPort: port, Protocol: v1.ProtocolTCP}} + args := []string{"netexec", fmt.Sprintf("--http-port=%d", port)} + createPodWithRestartableInitContainerOrFail(ctx, f, ns, name, t.Labels, ports, args...) + + ginkgo.By(fmt.Sprintf("creating Service %v with selectors %v", service.Name, service.Spec.Selector)) + service, err := t.CreateService(service) + framework.ExpectNoError(err) + + checkServiceReachabilityFromExecPod(ctx, f.ClientSet, ns, service.Name, service.Spec.ClusterIP, port) + }) + + ginkgo.It("should connect to the named ports exposed by restartable init containers", func(ctx context.Context) { + serviceName := "sidecar-with-named-port" + ns := f.Namespace.Name + + t := NewServerTest(cs, ns, serviceName) + defer func() { + defer ginkgo.GinkgoRecover() + errs := t.Cleanup() + if len(errs) != 0 { + framework.Failf("errors in cleanup: %v", errs) + } + }() + + name := "sidecar-port" + port := int32(8080) + namedPort := "http-sidecar" + + service := t.BuildServiceSpec() + service.Spec.Ports = []v1.ServicePort{ + { + Name: namedPort, + Port: port, + TargetPort: intstr.FromString(namedPort), + }, + } + ports := []v1.ContainerPort{{Name: namedPort, ContainerPort: port, Protocol: v1.ProtocolTCP}} + args := []string{"netexec", fmt.Sprintf("--http-port=%d", port)} + createPodWithRestartableInitContainerOrFail(ctx, f, ns, name, t.Labels, ports, args...) + + ginkgo.By(fmt.Sprintf("creating Service %v with selectors %v", service.Name, service.Spec.Selector)) + service, err := t.CreateService(service) + framework.ExpectNoError(err) + + checkServiceReachabilityFromExecPod(ctx, f.ClientSet, ns, service.Name, service.Spec.ClusterIP, port) + }) }) // execAffinityTestForSessionAffinityTimeout is a helper function that wrap the logic of @@ -4382,6 +4454,18 @@ func createPodOrFail(ctx context.Context, f *framework.Framework, ns, name strin e2epod.NewPodClient(f).CreateSync(ctx, pod) } +// createPodWithRestartableInitContainerOrFail creates a pod with restartable init containers using the specified containerPorts. +func createPodWithRestartableInitContainerOrFail(ctx context.Context, f *framework.Framework, ns, name string, labels map[string]string, containerPorts []v1.ContainerPort, args ...string) { + ginkgo.By(fmt.Sprintf("Creating pod %s with restartable init containers in namespace %s", name, ns)) + pod := e2epod.NewAgnhostPod(ns, name, nil, nil, nil, "pause") + pod.ObjectMeta.Labels = labels + restartPolicyAlways := v1.ContainerRestartPolicyAlways + init := e2epod.NewAgnhostContainer(name, nil, containerPorts, args...) + init.RestartPolicy = &restartPolicyAlways + pod.Spec.InitContainers = []v1.Container{init} + e2epod.NewPodClient(f).CreateSync(ctx, pod) +} + // launchHostExecPod launches a hostexec pod in the given namespace and waits // until it's Running. If avoidNode is non-nil, it will ensure that the pod doesn't // land on that node. @@ -4432,6 +4516,30 @@ func checkReachabilityFromPod(ctx context.Context, expectToBeReachable bool, tim framework.ExpectNoError(err) } +// checkServiceReachabilityFromExecPod creates a dedicated client pod, executes into it, +// and checks reachability to the specified target host and port. +func checkServiceReachabilityFromExecPod(ctx context.Context, client clientset.Interface, namespace, name, clusterIP string, port int32) { + // We avoid relying on DNS lookup with the service name here because + // we only want to test whether the named port is accessible from the service. + serverHost := net.JoinHostPort(clusterIP, strconv.Itoa(int(port))) + ginkgo.By("creating a dedicated client to send request to the http server " + serverHost) + execPod := e2epod.CreateExecPodOrFail(ctx, client, namespace, "execpod-", nil) + execPodName := execPod.Name + cmd := fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s/", serverHost) + var stdout string + if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) { + var err error + stdout, err = e2eoutput.RunHostCmd(namespace, execPodName, cmd) + if err != nil { + framework.Logf("error trying to connect to service %s: %v, ... retrying", name, err) + return false, nil + } + return true, nil + }); pollErr != nil { + framework.Failf("connection to the Service %v within %v should be succeeded, stdout: %v", name, e2eservice.KubeProxyLagTimeout, stdout) + } +} + func validatePorts(ep, expectedEndpoints portsByPodUID) error { if len(ep) != len(expectedEndpoints) { // should not happen because we check this condition before