diff --git a/test/e2e/framework.go b/test/e2e/framework.go index c1dd414efe2..d7a42330d5a 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -103,3 +103,49 @@ func (f *Framework) WaitForPodRunning(podName string) error { func (f *Framework) TestContainerOutput(scenarioName string, pod *api.Pod, expectedOutput []string) { testContainerOutputInNamespace(scenarioName, f.Client, pod, expectedOutput, f.Namespace.Name) } + +// WaitForAnEndpoint waits for at least one endpoint to become available in the +// service's corresponding endpoints object. +func (f *Framework) WaitForAnEndpoint(serviceName string) error { + for { + // TODO: Endpoints client should take a field selector so we + // don't have to list everything. + list, err := f.Client.Endpoints(f.Namespace.Name).List(labels.Everything()) + if err != nil { + return err + } + rv := list.ResourceVersion + + isOK := func(e *api.Endpoints) bool { + return e.Name == serviceName && len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 + } + for i := range list.Items { + if isOK(&list.Items[i]) { + return nil + } + } + + w, err := f.Client.Endpoints(f.Namespace.Name).Watch( + labels.Everything(), + fields.Set{"metadata.name": serviceName}.AsSelector(), + rv, + ) + if err != nil { + return err + } + defer w.Stop() + + for { + val, ok := <-w.ResultChan() + if !ok { + // reget and re-watch + break + } + if e, ok := val.Object.(*api.Endpoints); ok { + if isOK(e) { + return nil + } + } + } + } +} diff --git a/test/e2e/proxy.go b/test/e2e/proxy.go new file mode 100644 index 00000000000..8ec1d962dd5 --- /dev/null +++ b/test/e2e/proxy.go @@ -0,0 +1,186 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "strings" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Proxy", func() { + for _, version := range []string{"v1beta3", "v1"} { + Context("version "+version, func() { proxyContext(version) }) + } +}) + +func proxyContext(version string) { + f := NewFramework("proxy") + prefix := "/api/" + version + + It("should proxy logs on node with explicit kubelet port", func() { + node, err := pickNode(f.Client) + Expect(err).NotTo(HaveOccurred()) + // AbsPath preserves the trailing '/'. + body, err := f.Client.Get().AbsPath(prefix + "/proxy/nodes/" + node + ":10250/logs/").Do().Raw() + if len(body) > 0 { + if len(body) > 100 { + body = body[:100] + body = append(body, '.', '.', '.') + } + Logf("Got: %s", body) + } + Expect(err).NotTo(HaveOccurred()) + }) + + It("should proxy logs on node", func() { + node, err := pickNode(f.Client) + Expect(err).NotTo(HaveOccurred()) + body, err := f.Client.Get().AbsPath(prefix + "/proxy/nodes/" + node + "/logs/").Do().Raw() + if len(body) > 0 { + if len(body) > 100 { + body = body[:100] + body = append(body, '.', '.', '.') + } + Logf("Got: %s", body) + } + Expect(err).NotTo(HaveOccurred()) + }) + + It("should proxy to cadvisor", func() { + node, err := pickNode(f.Client) + Expect(err).NotTo(HaveOccurred()) + body, err := f.Client.Get().AbsPath(prefix + "/proxy/nodes/" + node + ":4194/containers/").Do().Raw() + if len(body) > 0 { + if len(body) > 100 { + body = body[:100] + body = append(body, '.', '.', '.') + } + Logf("Got: %s", body) + } + Expect(err).NotTo(HaveOccurred()) + }) + + It("should proxy through a service and a pod", func() { + labels := map[string]string{"proxy-service-target": "true"} + service, err := f.Client.Services(f.Namespace.Name).Create(&api.Service{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "proxy-service-", + }, + Spec: api.ServiceSpec{ + Selector: labels, + Ports: []api.ServicePort{ + { + Name: "portname1", + Port: 80, + TargetPort: util.NewIntOrStringFromString("dest1"), + }, + { + Name: "portname2", + Port: 81, + TargetPort: util.NewIntOrStringFromInt(162), + }, + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + defer func(name string) { + err := f.Client.Services(f.Namespace.Name).Delete(name) + if err != nil { + Logf("Failed deleting service %v: %v", name, err) + } + }(service.Name) + + // Make an RC with a single pod. + pods := []*api.Pod{} + cfg := RCConfig{ + Client: f.Client, + Image: "gcr.io/google_containers/porter:91d46193649807d1340b46797774d8b2", + Name: service.Name, + Namespace: f.Namespace.Name, + Replicas: 1, + PollInterval: time.Second, + Env: map[string]string{ + "SERVE_PORT_80": "not accessible via service", + "SERVE_PORT_160": "foo", + "SERVE_PORT_162": "bar", + }, + Ports: map[string]int{ + "dest1": 160, + "dest2": 162, + }, + Labels: labels, + CreatedPods: &pods, + } + Expect(RunRC(cfg)).NotTo(HaveOccurred()) + defer DeleteRC(f.Client, f.Namespace.Name, cfg.Name) + + Expect(f.WaitForAnEndpoint(service.Name)).NotTo(HaveOccurred()) + + // Try proxying through the service and directly to through the pod. + svcPrefix := prefix + "/proxy/namespaces/" + f.Namespace.Name + "/services/" + service.Name + podPrefix := prefix + "/proxy/namespaces/" + f.Namespace.Name + "/pods/" + pods[0].Name + expectations := map[string]string{ + svcPrefix + ":portname1": "foo", + svcPrefix + ":portname2": "bar", + podPrefix + ":80": "not accessible via service", + podPrefix + ":160": "foo", + podPrefix + ":162": "bar", + // TODO: below entries don't work, but I believe we should make them work. + // svcPrefix + ":80": "foo", + // svcPrefix + ":81": "bar", + // podPrefix + ":dest1": "foo", + // podPrefix + ":dest2": "bar", + } + + errors := []string{} + for path, val := range expectations { + body, err := f.Client.Get().AbsPath(path).Do().Raw() + if err != nil { + errors = append(errors, fmt.Sprintf("path %v gave error %v", path, err)) + continue + } + if e, a := val, string(body); e != a { + errors = append(errors, fmt.Sprintf("path %v: wanted %v, got %v", path, e, a)) + } + } + + if len(errors) != 0 { + Fail(strings.Join(errors, "\n")) + } + }) +} + +func pickNode(c *client.Client) (string, error) { + nodes, err := c.Nodes().List(labels.Everything(), fields.Everything()) + if err != nil { + return "", err + } + if len(nodes.Items) == 0 { + return "", fmt.Errorf("no nodes exist, can't test node proxy") + } + return nodes.Items[0].Name, nil +} diff --git a/test/e2e/util.go b/test/e2e/util.go index 44d2a88e849..5b8dc6acf62 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -158,6 +158,19 @@ type RCConfig struct { PollInterval time.Duration PodStatusFile *os.File Replicas int + + // Env vars, set the same for every pod. + Env map[string]string + + // Extra labels added to every pod. + Labels map[string]string + + // Ports to declare in the container (map of name to containerPort). + Ports map[string]int + + // Pointer to a list of pods; if non-nil, will be set to a list of pods + // created by this RC by RunRC. + CreatedPods *[]*api.Pod } func Logf(format string, a ...interface{}) { @@ -841,6 +854,23 @@ func RunRC(config RCConfig) error { }, }, } + if config.Env != nil { + for k, v := range config.Env { + c := &rc.Spec.Template.Spec.Containers[0] + c.Env = append(c.Env, api.EnvVar{Name: k, Value: v}) + } + } + if config.Labels != nil { + for k, v := range config.Labels { + rc.Spec.Template.ObjectMeta.Labels[k] = v + } + } + if config.Ports != nil { + for k, v := range config.Ports { + c := &rc.Spec.Template.Spec.Containers[0] + c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: v}) + } + } _, err := config.Client.ReplicationControllers(config.Namespace).Create(rc) if err != nil { return fmt.Errorf("Error creating replication controller: %v", err) @@ -866,6 +896,9 @@ func RunRC(config RCConfig) error { inactive := 0 failedContainers := 0 pods := podStore.List() + if config.CreatedPods != nil { + *config.CreatedPods = pods + } for _, p := range pods { if p.Status.Phase == api.PodRunning { running++