/* Copyright 2014 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // OWNER = sig/network package network import ( "bytes" "context" "encoding/json" "fmt" "math" "net/http" "strings" "sync" "time" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/transport" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2erc "k8s.io/kubernetes/test/e2e/framework/rc" "k8s.io/kubernetes/test/e2e/network/common" testutils "k8s.io/kubernetes/test/utils" imageutils "k8s.io/kubernetes/test/utils/image" admissionapi "k8s.io/pod-security-admission/api" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" ) const ( // Try all the proxy tests this many times (to catch even rare flakes). proxyAttempts = 20 // Only print this many characters of the response (to keep the logs // legible). maxDisplayBodyLen = 100 // We have seen one of these calls take just over 15 seconds, so putting this at 30. proxyHTTPCallTimeout = 30 * time.Second requestRetryPeriod = 10 * time.Millisecond requestRetryTimeout = 1 * time.Minute ) type jsonResponse struct { Method string Body string } var _ = common.SIGDescribe("Proxy", func() { version := "v1" ginkgo.Context("version "+version, func() { options := framework.Options{ ClientQPS: -1.0, } f := framework.NewFramework("proxy", options, nil) f.NamespacePodSecurityEnforceLevel = admissionapi.LevelBaseline prefix := "/api/" + version /* Test for Proxy, logs port endpoint Select any node in the cluster to invoke /proxy/nodes/:10250/logs endpoint. This endpoint MUST be reachable. */ ginkgo.It("should proxy logs on node with explicit kubelet port using proxy subresource ", func(ctx context.Context) { nodeProxyTest(ctx, f, prefix+"/nodes/", ":10250/proxy/logs/") }) /* Test for Proxy, logs endpoint Select any node in the cluster to invoke /proxy/nodes///logs endpoint. This endpoint MUST be reachable. */ ginkgo.It("should proxy logs on node using proxy subresource ", func(ctx context.Context) { nodeProxyTest(ctx, f, prefix+"/nodes/", "/proxy/logs/") }) // using the porter image to serve content, access the content // (of multiple pods?) from multiple (endpoints/services?) /* Release: v1.9 Testname: Proxy, logs service endpoint Description: Select any node in the cluster to invoke /logs endpoint using the /nodes/proxy subresource from the kubelet port. This endpoint MUST be reachable. */ framework.ConformanceIt("should proxy through a service and a pod ", func(ctx context.Context) { start := time.Now() labels := map[string]string{"proxy-service-target": "true"} service, err := f.ClientSet.CoreV1().Services(f.Namespace.Name).Create(ctx, &v1.Service{ ObjectMeta: metav1.ObjectMeta{ GenerateName: "proxy-service-", }, Spec: v1.ServiceSpec{ Selector: labels, Ports: []v1.ServicePort{ { Name: "portname1", Port: 80, TargetPort: intstr.FromString("dest1"), }, { Name: "portname2", Port: 81, TargetPort: intstr.FromInt(162), }, { Name: "tlsportname1", Port: 443, TargetPort: intstr.FromString("tlsdest1"), }, { Name: "tlsportname2", Port: 444, TargetPort: intstr.FromInt(462), }, }, }, }, metav1.CreateOptions{}) framework.ExpectNoError(err) // Make an RC with a single pod. The 'porter' image is // a simple server which serves the values of the // environmental variables below. ginkgo.By("starting an echo server on multiple ports") pods := []*v1.Pod{} cfg := testutils.RCConfig{ Client: f.ClientSet, Image: imageutils.GetE2EImage(imageutils.Agnhost), Command: []string{"/agnhost", "porter"}, Name: service.Name, Namespace: f.Namespace.Name, Replicas: 1, PollInterval: time.Second, Env: map[string]string{ "SERVE_PORT_80": `test`, "SERVE_PORT_1080": `test`, "SERVE_PORT_160": "foo", "SERVE_PORT_162": "bar", "SERVE_TLS_PORT_443": `test`, "SERVE_TLS_PORT_460": `tls baz`, "SERVE_TLS_PORT_462": `tls qux`, }, Ports: map[string]int{ "dest1": 160, "dest2": 162, "tlsdest1": 460, "tlsdest2": 462, }, ReadinessProbe: &v1.Probe{ ProbeHandler: v1.ProbeHandler{ HTTPGet: &v1.HTTPGetAction{ Port: intstr.FromInt(80), }, }, InitialDelaySeconds: 1, TimeoutSeconds: 5, PeriodSeconds: 10, }, Labels: labels, CreatedPods: &pods, } err = e2erc.RunRC(ctx, cfg) framework.ExpectNoError(err) ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, cfg.Name) err = waitForEndpoint(ctx, f.ClientSet, f.Namespace.Name, service.Name) framework.ExpectNoError(err) // table constructors // Try proxying through the service and directly to through the pod. subresourceServiceProxyURL := func(scheme, port string) string { return prefix + "/namespaces/" + f.Namespace.Name + "/services/" + net.JoinSchemeNamePort(scheme, service.Name, port) + "/proxy" } subresourcePodProxyURL := func(scheme, port string) string { return prefix + "/namespaces/" + f.Namespace.Name + "/pods/" + net.JoinSchemeNamePort(scheme, pods[0].Name, port) + "/proxy" } // construct the table expectations := map[string]string{ subresourceServiceProxyURL("", "portname1") + "/": "foo", subresourceServiceProxyURL("http", "portname1") + "/": "foo", subresourceServiceProxyURL("", "portname2") + "/": "bar", subresourceServiceProxyURL("http", "portname2") + "/": "bar", subresourceServiceProxyURL("https", "tlsportname1") + "/": "tls baz", subresourceServiceProxyURL("https", "tlsportname2") + "/": "tls qux", subresourcePodProxyURL("", "") + "/": `test`, subresourcePodProxyURL("", "1080") + "/": `test`, subresourcePodProxyURL("http", "1080") + "/": `test`, subresourcePodProxyURL("", "160") + "/": "foo", subresourcePodProxyURL("http", "160") + "/": "foo", subresourcePodProxyURL("", "162") + "/": "bar", subresourcePodProxyURL("http", "162") + "/": "bar", subresourcePodProxyURL("https", "443") + "/": `test`, subresourcePodProxyURL("https", "460") + "/": "tls baz", subresourcePodProxyURL("https", "462") + "/": "tls qux", // TODO: below entries don't work, but I believe we should make them work. // podPrefix + ":dest1": "foo", // podPrefix + ":dest2": "bar", } wg := sync.WaitGroup{} errs := []string{} errLock := sync.Mutex{} recordError := func(s string) { errLock.Lock() defer errLock.Unlock() errs = append(errs, s) } d := time.Since(start) framework.Logf("setup took %v, starting test cases", d) numberTestCases := len(expectations) totalAttempts := numberTestCases * proxyAttempts ginkgo.By(fmt.Sprintf("running %v cases, %v attempts per case, %v total attempts", numberTestCases, proxyAttempts, totalAttempts)) for i := 0; i < proxyAttempts; i++ { wg.Add(numberTestCases) for path, val := range expectations { go func(i int, path, val string) { defer wg.Done() // this runs the test case body, status, d, err := doProxy(ctx, f, path, i) if err != nil { if serr, ok := err.(*apierrors.StatusError); ok { recordError(fmt.Sprintf("%v (%v; %v): path %v gave status error: %+v", i, status, d, path, serr.Status())) } else { recordError(fmt.Sprintf("%v: path %v gave error: %v", i, path, err)) } return } if status != http.StatusOK { recordError(fmt.Sprintf("%v: path %v gave status: %v", i, path, status)) } if e, a := val, string(body); e != a { recordError(fmt.Sprintf("%v: path %v: wanted %v, got %v", i, path, e, a)) } if d > proxyHTTPCallTimeout { recordError(fmt.Sprintf("%v: path %v took %v > %v", i, path, d, proxyHTTPCallTimeout)) } }(i, path, val) } wg.Wait() } if len(errs) != 0 { body, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).GetLogs(pods[0].Name, &v1.PodLogOptions{}).Do(ctx).Raw() if err != nil { framework.Logf("Error getting logs for pod %s: %v", pods[0].Name, err) } else { framework.Logf("Pod %s has the following error logs: %s", pods[0].Name, body) } framework.Failf(strings.Join(errs, "\n")) } }) /* Release: v1.21 Testname: Proxy, validate ProxyWithPath responses Description: Attempt to create a pod and a service. A set of pod and service endpoints MUST be accessed via ProxyWithPath using a list of http methods. A valid response MUST be returned for each endpoint. */ framework.ConformanceIt("A set of valid responses are returned for both pod and service ProxyWithPath", func(ctx context.Context) { ns := f.Namespace.Name msg := "foo" testSvcName := "test-service" testSvcLabels := map[string]string{"test": "response"} framework.Logf("Creating pod...") pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "agnhost", Namespace: ns, Labels: map[string]string{ "test": "response"}, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Image: imageutils.GetE2EImage(imageutils.Agnhost), Name: "agnhost", Command: []string{"/agnhost", "porter", "--json-response"}, Env: []v1.EnvVar{{ Name: "SERVE_PORT_80", Value: msg, }}, }}, RestartPolicy: v1.RestartPolicyNever, }} _, err := f.ClientSet.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}) framework.ExpectNoError(err, "failed to create pod") framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "Pod didn't start within time out period") framework.Logf("Creating service...") _, err = f.ClientSet.CoreV1().Services(ns).Create(ctx, &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: testSvcName, Namespace: ns, Labels: testSvcLabels, }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{{ Port: 80, TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP, }}, Selector: map[string]string{ "test": "response", }, }}, metav1.CreateOptions{}) framework.ExpectNoError(err, "Failed to create the service") transportCfg, err := f.ClientConfig().TransportConfig() framework.ExpectNoError(err, "Error creating transportCfg") restTransport, err := transport.New(transportCfg) framework.ExpectNoError(err, "Error creating restTransport") client := &http.Client{ CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse }, Transport: restTransport, } // All methods for Pod ProxyWithPath return 200 // For all methods other than HEAD the response body returns 'foo' with the received http method httpVerbs := []string{"DELETE", "GET", "HEAD", "OPTIONS", "PATCH", "POST", "PUT"} for _, httpVerb := range httpVerbs { urlString := strings.TrimRight(f.ClientConfig().Host, "/") + "/api/v1/namespaces/" + ns + "/pods/agnhost/proxy/some/path/with/" + httpVerb framework.Logf("Starting http.Client for %s", urlString) pollErr := wait.PollImmediate(requestRetryPeriod, requestRetryTimeout, validateProxyVerbRequest(client, urlString, httpVerb, msg)) framework.ExpectNoError(err, "Service didn't start within time out period. %v", pollErr) } // All methods for Service ProxyWithPath return 200 // For all methods other than HEAD the response body returns 'foo' with the received http method for _, httpVerb := range httpVerbs { urlString := strings.TrimRight(f.ClientConfig().Host, "/") + "/api/v1/namespaces/" + ns + "/services/test-service/proxy/some/path/with/" + httpVerb framework.Logf("Starting http.Client for %s", urlString) pollErr := wait.PollImmediate(requestRetryPeriod, requestRetryTimeout, validateProxyVerbRequest(client, urlString, httpVerb, msg)) framework.ExpectNoError(err, "Service didn't start within time out period. %v", pollErr) } }) /* Release: v1.24 Testname: Proxy, validate Proxy responses Description: Attempt to create a pod and a service. A set of pod and service endpoints MUST be accessed via Proxy using a list of http methods. A valid response MUST be returned for each endpoint. */ framework.ConformanceIt("A set of valid responses are returned for both pod and service Proxy", func(ctx context.Context) { ns := f.Namespace.Name msg := "foo" testSvcName := "e2e-proxy-test-service" testSvcLabels := map[string]string{"e2e-test": "proxy-endpoints"} framework.Logf("Creating pod...") pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "agnhost", Namespace: ns, Labels: map[string]string{ "e2e-test": "proxy-endpoints"}, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ Image: imageutils.GetE2EImage(imageutils.Agnhost), Name: "agnhost", Command: []string{"/agnhost", "porter", "--json-response"}, Env: []v1.EnvVar{{ Name: "SERVE_PORT_80", Value: msg, }}, }}, RestartPolicy: v1.RestartPolicyNever, }} _, err := f.ClientSet.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}) framework.ExpectNoError(err, "failed to create pod") framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "Pod didn't start within time out period") framework.Logf("Creating service...") _, err = f.ClientSet.CoreV1().Services(ns).Create(ctx, &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: testSvcName, Namespace: ns, Labels: testSvcLabels, }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{{ Port: 80, TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP, }}, Selector: map[string]string{ "e2e-test": "proxy-endpoints", }, }}, metav1.CreateOptions{}) framework.ExpectNoError(err, "Failed to create the service") transportCfg, err := f.ClientConfig().TransportConfig() framework.ExpectNoError(err, "Error creating transportCfg") restTransport, err := transport.New(transportCfg) framework.ExpectNoError(err, "Error creating restTransport") client := &http.Client{ CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse }, Transport: restTransport, } // All methods for Pod Proxy return 200 // The response body returns 'foo' with the received http method httpVerbs := []string{"DELETE", "OPTIONS", "PATCH", "POST", "PUT"} for _, httpVerb := range httpVerbs { urlString := strings.TrimRight(f.ClientConfig().Host, "/") + "/api/v1/namespaces/" + ns + "/pods/agnhost/proxy?method=" + httpVerb framework.Logf("Starting http.Client for %s", urlString) pollErr := wait.PollImmediate(requestRetryPeriod, requestRetryTimeout, validateProxyVerbRequest(client, urlString, httpVerb, msg)) framework.ExpectNoError(pollErr, "Pod didn't start within time out period. %v", pollErr) } // All methods for Service Proxy return 200 // The response body returns 'foo' with the received http method for _, httpVerb := range httpVerbs { urlString := strings.TrimRight(f.ClientConfig().Host, "/") + "/api/v1/namespaces/" + ns + "/services/" + testSvcName + "/proxy?method=" + httpVerb framework.Logf("Starting http.Client for %s", urlString) pollErr := wait.PollImmediate(requestRetryPeriod, requestRetryTimeout, validateProxyVerbRequest(client, urlString, httpVerb, msg)) framework.ExpectNoError(pollErr, "Service didn't start within time out period. %v", pollErr) } // Test that each method returns 301 for both pod and service endpoints redirectVerbs := []string{"GET", "HEAD"} for _, redirectVerb := range redirectVerbs { urlString := strings.TrimRight(f.ClientConfig().Host, "/") + "/api/v1/namespaces/" + ns + "/pods/agnhost/proxy?method=" + redirectVerb validateRedirectRequest(client, redirectVerb, urlString) urlString = strings.TrimRight(f.ClientConfig().Host, "/") + "/api/v1/namespaces/" + ns + "/services/" + testSvcName + "/proxy?method=" + redirectVerb validateRedirectRequest(client, redirectVerb, urlString) } }) }) }) func validateRedirectRequest(client *http.Client, redirectVerb string, urlString string) { framework.Logf("Starting http.Client for %s", urlString) request, err := http.NewRequest(redirectVerb, urlString, nil) framework.ExpectNoError(err, "processing request") resp, err := client.Do(request) framework.ExpectNoError(err, "processing response") defer resp.Body.Close() framework.Logf("http.Client request:%s StatusCode:%d", redirectVerb, resp.StatusCode) framework.ExpectEqual(resp.StatusCode, 301, "The resp.StatusCode returned: %d", resp.StatusCode) } // validateProxyVerbRequest checks that a http request to a pod // or service was valid for any http verb. Requires agnhost image // with porter --json-response func validateProxyVerbRequest(client *http.Client, urlString string, httpVerb string, msg string) func() (bool, error) { return func() (bool, error) { var err error request, err := http.NewRequest(httpVerb, urlString, nil) if err != nil { framework.Logf("Failed to get a new request. %v", err) return false, nil } resp, err := client.Do(request) if err != nil { framework.Logf("Failed to get a response. %v", err) return false, nil } defer resp.Body.Close() buf := new(bytes.Buffer) buf.ReadFrom(resp.Body) response := buf.String() switch httpVerb { case "HEAD": framework.Logf("http.Client request:%s | StatusCode:%d", httpVerb, resp.StatusCode) if resp.StatusCode != 200 { return false, nil } return true, nil default: var jr *jsonResponse err = json.Unmarshal([]byte(response), &jr) if err != nil { framework.Logf("Failed to process jsonResponse. %v", err) return false, nil } framework.Logf("http.Client request:%s | StatusCode:%d | Response:%s | Method:%s", httpVerb, resp.StatusCode, jr.Body, jr.Method) if resp.StatusCode != 200 { return false, nil } if msg != jr.Body { return false, nil } if httpVerb != jr.Method { return false, nil } return true, nil } } } func doProxy(ctx context.Context, f *framework.Framework, path string, i int) (body []byte, statusCode int, d time.Duration, err error) { // About all of the proxy accesses in this file: // * AbsPath is used because it preserves the trailing '/'. // * Do().Raw() is used (instead of DoRaw()) because it will turn an // error from apiserver proxy into an actual error, and there is no // chance of the things we are talking to being confused for an error // that apiserver would have emitted. start := time.Now() body, err = f.ClientSet.CoreV1().RESTClient().Get().AbsPath(path).Do(ctx).StatusCode(&statusCode).Raw() d = time.Since(start) if len(body) > 0 { framework.Logf("(%v) %v: %s (%v; %v)", i, path, truncate(body, maxDisplayBodyLen), statusCode, d) } else { framework.Logf("%v: %s (%v; %v)", path, "no body", statusCode, d) } return } func truncate(b []byte, maxLen int) []byte { if len(b) <= maxLen-3 { return b } b2 := append([]byte(nil), b[:maxLen-3]...) b2 = append(b2, '.', '.', '.') return b2 } func nodeProxyTest(ctx context.Context, f *framework.Framework, prefix, nodeDest string) { // TODO: investigate why it doesn't work on master Node. node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet) framework.ExpectNoError(err) // TODO: Change it to test whether all requests succeeded when requests // not reaching Kubelet issue is debugged. serviceUnavailableErrors := 0 for i := 0; i < proxyAttempts; i++ { _, status, d, err := doProxy(ctx, f, prefix+node.Name+nodeDest, i) if status == http.StatusServiceUnavailable { framework.Logf("ginkgo.Failed proxying node logs due to service unavailable: %v", err) time.Sleep(time.Second) serviceUnavailableErrors++ } else { framework.ExpectNoError(err) framework.ExpectEqual(status, http.StatusOK) gomega.Expect(d).To(gomega.BeNumerically("<", proxyHTTPCallTimeout)) } } if serviceUnavailableErrors > 0 { framework.Logf("error: %d requests to proxy node logs failed", serviceUnavailableErrors) } maxFailures := int(math.Floor(0.1 * float64(proxyAttempts))) gomega.Expect(serviceUnavailableErrors).To(gomega.BeNumerically("<", maxFailures)) } // waitForEndpoint waits for the specified endpoint to be ready. func waitForEndpoint(ctx context.Context, c clientset.Interface, ns, name string) error { // registerTimeout is how long to wait for an endpoint to be registered. registerTimeout := time.Minute for t := time.Now(); time.Since(t) < registerTimeout; time.Sleep(framework.Poll) { endpoint, err := c.CoreV1().Endpoints(ns).Get(ctx, name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { framework.Logf("Endpoint %s/%s is not ready yet", ns, name) continue } framework.ExpectNoError(err, "Failed to get endpoints for %s/%s", ns, name) if len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Addresses) == 0 { framework.Logf("Endpoint %s/%s is not ready yet", ns, name) continue } return nil } return fmt.Errorf("failed to get endpoints for %s/%s", ns, name) }