diff --git a/test/e2e/federated-service.go b/test/e2e/federated-service.go index 5e90611b089..f65d62d9719 100644 --- a/test/e2e/federated-service.go +++ b/test/e2e/federated-service.go @@ -19,6 +19,8 @@ package e2e import ( "fmt" "os" + "reflect" + "strconv" "time" "k8s.io/kubernetes/federation/apis/federation" @@ -47,239 +49,315 @@ const ( FederatedServiceTimeout = 60 * time.Second - FederatedServiceName = "federated-service" - FederatedServicePod = "federated-service-test-pod" + FederatedServiceName = "federated-service" + FederatedServicePodName = "federated-service-test-pod" DefaultFederationName = "federation" // We use this to decide how long to wait for our DNS probes to succeed. - DNSTTL = 180 * time.Second + DNSTTL = 180 * time.Second // TODO: make k8s.io/kubernetes/federation/pkg/federation-controller/service.minDnsTtl exported, and import it here. ) var FederatedServiceLabels = map[string]string{ "foo": "bar", } -var _ = framework.KubeDescribe("[Feature:Federation] Federated Services", func() { - +var _ = framework.KubeDescribe("[Feature:Federation]", func() { + f := framework.NewDefaultFederatedFramework("federated-service") var clusterClientSets []*release_1_3.Clientset + var clusterNamespaceCreated []bool // Did we need to create a new namespace in each of the above clusters? If so, we should delete it. var federationName string - f := framework.NewDefaultFederatedFramework("service") - BeforeEach(func() { - framework.SkipUnlessFederated(f.Client) - - // TODO: Federation API server should be able to answer this. - if federationName = os.Getenv("FEDERATION_NAME"); federationName == "" { - federationName = DefaultFederationName - } - - contexts := f.GetUnderlyingFederatedContexts() - - for _, context := range contexts { - createClusterObjectOrFail(f, &context) - } - - var clusterList *federation.ClusterList - By("Obtaining a list of all the clusters") - if err := wait.PollImmediate(framework.Poll, FederatedServiceTimeout, func() (bool, error) { - var err error - clusterList, err = f.FederationClientset.Federation().Clusters().List(api.ListOptions{}) - if err != nil { - return false, err - } - framework.Logf("%d clusters registered, waiting for %d", len(clusterList.Items), len(contexts)) - if len(clusterList.Items) == len(contexts) { - return true, nil - } - return false, nil - }); err != nil { - framework.Failf("Failed to list registered clusters: %+v", err) - } - - framework.Logf("Checking that %d clusters are Ready", len(contexts)) - for _, context := range contexts { - clusterIsReadyOrFail(f, &context) - } - framework.Logf("%d clusters are Ready", len(contexts)) - - for _, cluster := range clusterList.Items { - framework.Logf("Creating a clientset for the cluster %s", cluster.Name) - - Expect(framework.TestContext.KubeConfig).ToNot(Equal(""), "KubeConfig must be specified to load clusters' client config") - kubecfg, err := clientcmd.LoadFromFile(framework.TestContext.KubeConfig) - framework.ExpectNoError(err, "error loading KubeConfig: %v", err) - - cfgOverride := &clientcmd.ConfigOverrides{ - ClusterInfo: clientcmdapi.Cluster{ - Server: cluster.Spec.ServerAddressByClientCIDRs[0].ServerAddress, - }, - } - ccfg := clientcmd.NewNonInteractiveClientConfig(*kubecfg, cluster.Name, cfgOverride, clientcmd.NewDefaultClientConfigLoadingRules()) - cfg, err := ccfg.ClientConfig() - Expect(err).NotTo(HaveOccurred()) - - cfg.QPS = KubeAPIQPS - cfg.Burst = KubeAPIBurst - clset := release_1_3.NewForConfigOrDie(restclient.AddUserAgent(cfg, UserAgentName)) - clusterClientSets = append(clusterClientSets, clset) - } - - for i, cs := range clusterClientSets { - if _, err := cs.Core().Namespaces().Get(f.Namespace.Name); errors.IsNotFound(err) { - ns := &v1.Namespace{ - ObjectMeta: v1.ObjectMeta{ - Name: f.Namespace.Name, - }, - } - if _, err := cs.Core().Namespaces().Create(ns); err != nil { - framework.Logf("Couldn't create the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err) - } - framework.Logf("Namespace %s created in cluster [%d]", f.Namespace.Name, i) - } else if err != nil { - framework.Logf("Couldn't create the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err) - } - } - }) - - Describe("DNS", func() { - AfterEach(func() { + var _ = Describe("Federated Services", func() { + BeforeEach(func() { framework.SkipUnlessFederated(f.Client) - // TODO(mml): replace with calls to framework.DeleteNamespaces and - // framework.WaitForNamespacesDeleted. But first we need to re-write - // them to expect versioned clients. - // ALSO TODO(mml): Utility functions like these should [optionally?] - // accept a list of clients/clusters to act upon, to increase - // re-usablity. - for i, cs := range clusterClientSets { - if err := cs.Core().Namespaces().Delete(f.Namespace.Name, api.NewDeleteOptions(0)); err != nil { - framework.Failf("Couldn't delete the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err) + // TODO: Federation API server should be able to answer this. + if federationName = os.Getenv("FEDERATION_NAME"); federationName == "" { + federationName = DefaultFederationName + } + + contexts := f.GetUnderlyingFederatedContexts() + + for _, context := range contexts { + createClusterObjectOrFail(f, &context) + } + + var clusterList *federation.ClusterList + By("Obtaining a list of all the clusters") + if err := wait.PollImmediate(framework.Poll, FederatedServiceTimeout, func() (bool, error) { + var err error + clusterList, err = f.FederationClientset.Federation().Clusters().List(api.ListOptions{}) + if err != nil { + return false, err + } + framework.Logf("%d clusters registered, waiting for %d", len(clusterList.Items), len(contexts)) + if len(clusterList.Items) == len(contexts) { + return true, nil + } + return false, nil + }); err != nil { + framework.Failf("Failed to list registered clusters: %+v", err) + } + + framework.Logf("Checking that %d clusters are Ready", len(contexts)) + for _, context := range contexts { + clusterIsReadyOrFail(f, &context) + } + framework.Logf("%d clusters are Ready", len(contexts)) + + for i, cluster := range clusterList.Items { + framework.Logf("Creating a clientset for the cluster %s", cluster.Name) + + Expect(framework.TestContext.KubeConfig).ToNot(Equal(""), "KubeConfig must be specified to load clusters' client config") + kubecfg, err := clientcmd.LoadFromFile(framework.TestContext.KubeConfig) + framework.ExpectNoError(err, "error loading KubeConfig: %v", err) + + cfgOverride := &clientcmd.ConfigOverrides{ + ClusterInfo: clientcmdapi.Cluster{ + Server: cluster.Spec.ServerAddressByClientCIDRs[0].ServerAddress, + }, + } + ccfg := clientcmd.NewNonInteractiveClientConfig(*kubecfg, cluster.Name, cfgOverride, clientcmd.NewDefaultClientConfigLoadingRules()) + cfg, err := ccfg.ClientConfig() + framework.ExpectNoError(err, "Error creating client config in cluster #%d", i) + + cfg.QPS = KubeAPIQPS + cfg.Burst = KubeAPIBurst + clset := release_1_3.NewForConfigOrDie(restclient.AddUserAgent(cfg, UserAgentName)) + clusterClientSets = append(clusterClientSets, clset) + } + + clusterNamespaceCreated = make([]bool, len(clusterClientSets)) + for i, cs := range clusterClientSets { + // The e2e Framework created the required namespace in one of the clusters, but we need to create it in all the others, if it doesn't yet exist. + if _, err := cs.Core().Namespaces().Get(f.Namespace.Name); errors.IsNotFound(err) { + ns := &v1.Namespace{ + ObjectMeta: v1.ObjectMeta{ + Name: f.Namespace.Name, + }, + } + _, err := cs.Core().Namespaces().Create(ns) + if err == nil { + clusterNamespaceCreated[i] = true + } + framework.ExpectNoError(err, "Couldn't create the namespace %s in cluster [%d]", f.Namespace.Name, i) + framework.Logf("Namespace %s created in cluster [%d]", f.Namespace.Name, i) + } else if err != nil { + framework.Logf("Couldn't create the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err) + } + } + }) + + AfterEach(func() { + for i, cs := range clusterClientSets { + if clusterNamespaceCreated[i] { + if _, err := cs.Core().Namespaces().Get(f.Namespace.Name); !errors.IsNotFound(err) { + err := cs.Core().Namespaces().Delete(f.Namespace.Name, &api.DeleteOptions{}) + framework.ExpectNoError(err, "Couldn't delete the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err) + } + framework.Logf("Namespace %s deleted in cluster [%d]", f.Namespace.Name, i) } - framework.Logf("Namespace %s deleted in cluster [%d]", f.Namespace.Name, i) } // Delete the registered clusters in the federation API server. clusterList, err := f.FederationClientset.Federation().Clusters().List(api.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) + framework.ExpectNoError(err, "Error listing clusters") for _, cluster := range clusterList.Items { err := f.FederationClientset.Federation().Clusters().Delete(cluster.Name, &api.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) + framework.ExpectNoError(err, "Error deleting cluster %q", cluster.Name) } }) - BeforeEach(func() { - framework.SkipUnlessFederated(f.Client) - createBackendPods(clusterClientSets, f.Namespace.Name) - createService(f.FederationClientset_1_3, clusterClientSets, f.Namespace.Name) - }) - - It("should be able to discover a federated service", func() { - framework.SkipUnlessFederated(f.Client) - - svcDNSNames := []string{ - FederatedServiceName, - fmt.Sprintf("%s.%s", FederatedServiceName, f.Namespace.Name), - fmt.Sprintf("%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name), - fmt.Sprintf("%s.%s.%s", FederatedServiceName, f.Namespace.Name, federationName), - fmt.Sprintf("%s.%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name, federationName), - } - // TODO(mml): This could be much faster. We can launch all the test - // pods, perhaps in the BeforeEach, and then just poll until we get - // successes/failures from them all. - for _, name := range svcDNSNames { - discoverService(f, name, true) - } - }) - - Context("non-local federated service", func() { + Describe("Service creation", func() { BeforeEach(func() { framework.SkipUnlessFederated(f.Client) - - // Delete a federated service shard in the default e2e Kubernetes cluster. - // TODO(mml): This should not work: #27623. We should use a load - // balancer with actual back-ends, some of which we delete or disable. - err := f.Clientset_1_3.Core().Services(f.Namespace.Name).Delete(FederatedServiceName, &api.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) - waitForFederatedServiceShard(f.Clientset_1_3, f.Namespace.Name, nil, 0) + // Placeholder }) - It("should be able to discover a non-local federated service", func() { + AfterEach(func() { + framework.SkipUnlessFederated(f.Client) + // Placeholder + }) + + It("should succeed", func() { + framework.SkipUnlessFederated(f.Client) + service := createServiceOrFail(f.FederationClientset_1_3, f.Namespace.Name) + By(fmt.Sprintf("Creation of service %q in namespace %q succeeded. Deleting service.", service.Name, f.Namespace.Name)) + // Cleanup + err := f.FederationClientset_1_3.Services(f.Namespace.Name).Delete(service.Name, &api.DeleteOptions{}) + framework.ExpectNoError(err, "Error deleting service %q in namespace %q", service.Name, service.Namespace) + By(fmt.Sprintf("Deletion of service %q in namespace %q succeeded.", service.Name, f.Namespace.Name)) + }) + + It("should create matching services in underlying clusters", func() { + framework.SkipUnlessFederated(f.Client) + service := createServiceOrFail(f.FederationClientset_1_3, f.Namespace.Name) + defer func() { // Cleanup + By(fmt.Sprintf("Deleting service %q in namespace %q", service.Name, f.Namespace.Name)) + err := f.FederationClientset_1_3.Services(f.Namespace.Name).Delete(service.Name, &api.DeleteOptions{}) + framework.ExpectNoError(err, "Error deleting service %q in namespace %q", service.Name, f.Namespace.Name) + }() + waitForServiceShardsOrFail(f.Namespace.Name, service, clusterClientSets, nil) + }) + }) + + var _ = Describe("DNS", func() { + + var ( + service *v1.Service + backendPods []*v1.Pod + ) + + BeforeEach(func() { + framework.SkipUnlessFederated(f.Client) + backendPods = createBackendPodsOrFail(clusterClientSets, f.Namespace.Name, FederatedServicePodName) + service = createServiceOrFail(f.FederationClientset_1_3, f.Namespace.Name) + waitForServiceShardsOrFail(f.Namespace.Name, service, clusterClientSets, nil) + }) + + AfterEach(func() { + framework.SkipUnlessFederated(f.Client) + if backendPods != nil { + deleteBackendPodsOrFail(clusterClientSets, f.Namespace.Name, backendPods) + backendPods = nil + } + if service != nil { + deleteServiceOrFail(f.FederationClientset_1_3, f.Namespace.Name, service.Name) + service = nil + } + }) + + It("should be able to discover a federated service", func() { framework.SkipUnlessFederated(f.Client) svcDNSNames := []string{ + FederatedServiceName, + fmt.Sprintf("%s.%s", FederatedServiceName, f.Namespace.Name), + fmt.Sprintf("%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name), fmt.Sprintf("%s.%s.%s", FederatedServiceName, f.Namespace.Name, federationName), fmt.Sprintf("%s.%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name, federationName), } - for _, name := range svcDNSNames { - discoverService(f, name, true) + // TODO(mml): This could be much faster. We can launch all the test + // pods, perhaps in the BeforeEach, and then just poll until we get + // successes/failures from them all. + for i, DNSName := range svcDNSNames { + discoverService(f, DNSName, true, "federated-service-e2e-discovery-pod-"+strconv.Itoa(i)) } }) - // TODO(mml): This currently takes 9 minutes. Consider reducing the - // TTL and/or running the pods in parallel. - Context("[Slow] missing local service", func() { - It("should never find DNS entries for a missing local service", func() { + Context("non-local federated service", func() { + BeforeEach(func() { framework.SkipUnlessFederated(f.Client) - localSvcDNSNames := []string{ - FederatedServiceName, - fmt.Sprintf("%s.%s", FederatedServiceName, f.Namespace.Name), - fmt.Sprintf("%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name), + // Delete all the backend pods from the shard which is local to the discovery pod. + deleteBackendPodsOrFail([]*release_1_3.Clientset{f.Clientset_1_3}, f.Namespace.Name, []*v1.Pod{backendPods[0]}) + backendPods[0] = nil // So we don't try to delete it again in an outer AfterEach + }) + + It("should be able to discover a non-local federated service", func() { + framework.SkipUnlessFederated(f.Client) + + svcDNSNames := []string{ + fmt.Sprintf("%s.%s.%s", FederatedServiceName, f.Namespace.Name, federationName), + fmt.Sprintf("%s.%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name, federationName), } - for _, name := range localSvcDNSNames { - discoverService(f, name, false) + for i, name := range svcDNSNames { + discoverService(f, name, true, "federated-service-e2e-discovery-pod-"+strconv.Itoa(i)) } }) + + // TODO(mml): This currently takes 9 minutes. Consider reducing the + // TTL and/or running the pods in parallel. + Context("[Slow] missing local service", func() { + It("should never find DNS entries for a missing local service", func() { + framework.SkipUnlessFederated(f.Client) + + localSvcDNSNames := []string{ + FederatedServiceName, + fmt.Sprintf("%s.%s", FederatedServiceName, f.Namespace.Name), + fmt.Sprintf("%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name), + } + for i, name := range localSvcDNSNames { + discoverService(f, name, false, FederatedServicePodName+strconv.Itoa(i)) + } + }) + }) }) }) }) }) -// waitForFederatedServiceShard waits until the number of shards of a given federated -// service reaches the expected value, i.e. numSvcs in the given individual Kubernetes -// cluster. If the shard count, i.e. numSvcs is expected to be at least one, then -// it also checks if the first shard's name and spec matches that of the given service. -func waitForFederatedServiceShard(cs *release_1_3.Clientset, namespace string, service *v1.Service, numSvcs int) { - By("Fetching a federated service shard") - var clSvcList *v1.ServiceList - if err := wait.PollImmediate(framework.Poll, FederatedServiceTimeout, func() (bool, error) { - var err error - clSvcList, err = cs.Core().Services(namespace).List(api.ListOptions{}) - if err != nil { +/* + equivalent returns true if the two services are equivalent. Fields which are expected to differ between + federated services and the underlying cluster services (e.g. ClusterIP, LoadBalancerIP etc) are ignored. +*/ +func equivalent(federationService, clusterService v1.Service) bool { + // TODO: I think that we need a DeepCopy here to avoid clobbering our parameters. + clusterService.Spec.ClusterIP = federationService.Spec.ClusterIP + clusterService.Spec.ExternalIPs = federationService.Spec.ExternalIPs + clusterService.Spec.DeprecatedPublicIPs = federationService.Spec.DeprecatedPublicIPs + clusterService.Spec.LoadBalancerIP = federationService.Spec.LoadBalancerIP + clusterService.Spec.LoadBalancerSourceRanges = federationService.Spec.LoadBalancerSourceRanges + // N.B. We cannot iterate over the port objects directly, as their values + // only get copied and our updates will get lost. + for i := range clusterService.Spec.Ports { + clusterService.Spec.Ports[i].NodePort = federationService.Spec.Ports[i].NodePort + } + return reflect.DeepEqual(clusterService.Spec, federationService.Spec) +} + +/* + waitForServiceOrFail waits until a service is either present or absent in the cluster specified by clientset. + If the condition is not met within timout, it fails the calling test. +*/ +func waitForServiceOrFail(clientset *release_1_3.Clientset, namespace string, service *v1.Service, present bool, timeout time.Duration) { + By(fmt.Sprintf("Fetching a federated service shard of service %q in namespace %q from cluster", service.Name, namespace)) + var clusterService *v1.Service + err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { + clusterService, err := clientset.Services(namespace).Get(service.Name) + if err != nil && !errors.IsNotFound(err) { return false, err } - n := len(clSvcList.Items) - if n == numSvcs { + if (clusterService != nil && err == nil && present) || (clusterService == nil && errors.IsNotFound(err) && !present) { + By(fmt.Sprintf("Success: federated service shard of service %q in namespace %q in cluster: %v", service.Name, namespace, present)) return true, nil } - framework.Logf("%d services found, waiting for %d, trying again in %s", n, numSvcs, framework.Poll) + By(fmt.Sprintf("Service found: %v, waiting for service found: %v, trying again in %s", clusterService != nil, present, framework.Poll)) return false, nil - }); err != nil { - framework.Failf("Failed to list registered clusters: %+v", err) - } + }) + framework.ExpectNoError(err, "Failed to get service %q in namespace %q", service.Name, namespace) - if numSvcs > 0 && service != nil { - // Renaming for clarity/readability - clSvc := clSvcList.Items[0] - - Expect(clSvc.Name).To(Equal(service.Name)) - // Some fields are expected to be different, so make them the same before checking equality. - clSvc.Spec.ClusterIP = service.Spec.ClusterIP - clSvc.Spec.ExternalIPs = service.Spec.ExternalIPs - clSvc.Spec.DeprecatedPublicIPs = service.Spec.DeprecatedPublicIPs - clSvc.Spec.LoadBalancerIP = service.Spec.LoadBalancerIP - clSvc.Spec.LoadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges - // N.B. We cannot iterate over the port objects directly, as their values - // only get copied and our updates will get lost. - for i := range clSvc.Spec.Ports { - clSvc.Spec.Ports[i].NodePort = service.Spec.Ports[i].NodePort - } - Expect(clSvc.Spec).To(Equal(service.Spec)) + if present && clusterService != nil { + Expect(equivalent(*clusterService, *service)) } } -func createService(fcs *federation_release_1_3.Clientset, clusterClientSets []*release_1_3.Clientset, namespace string) { +/* + waitForServiceShardsOrFail waits for the service to appear (or disappear) in the clientsets specifed in presentInCluster (or all if presentInCluster is nil). + If presentInCluster[n] is true, then wait for service shard to exist in the cluster specifid in clientsets[n] + If presentInCluster[n] is false, then wait for service shard to not exist in the cluster specifid in clientsets[n] +*/ +func waitForServiceShardsOrFail(namespace string, service *v1.Service, clientsets []*release_1_3.Clientset, presentInCluster []bool) { + if presentInCluster != nil { + Expect(len(presentInCluster)).To(Equal(len(clientsets)), "Internal error: Number of presence flags does not equal number of clients/clusters") + } + framework.Logf("Waiting for service %q in %d clusters", service.Name, len(clientsets)) + for i, clientset := range clientsets { + var present bool // Should the service be present or absent in this cluster? + if presentInCluster == nil { + present = true + } else { + present = presentInCluster[i] + } + waitForServiceOrFail(clientset, namespace, service, present, FederatedServiceTimeout) + } +} + +func createServiceOrFail(clientset *federation_release_1_3.Clientset, namespace string) *v1.Service { + if clientset == nil || len(namespace) == 0 { + Fail(fmt.Sprintf("Internal error: invalid parameters passed to deleteServiceOrFail: clientset: %v, namespace: %v", clientset, namespace)) + } By(fmt.Sprintf("Creating federated service %q in namespace %q", FederatedServiceName, namespace)) service := &v1.Service{ @@ -298,12 +376,19 @@ func createService(fcs *federation_release_1_3.Clientset, clusterClientSets []*r }, }, } - nservice, err := fcs.Core().Services(namespace).Create(service) - framework.Logf("Trying to create service %q in namespace %q", service.ObjectMeta.Name, service.ObjectMeta.Namespace) - Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("creating service %s: %+v", service.Name, err)) - for _, cs := range clusterClientSets { - waitForFederatedServiceShard(cs, namespace, nservice, 1) + By(fmt.Sprintf("Trying to create service %q in namespace %q", service.Name, namespace)) + _, err := clientset.Services(namespace).Create(service) + framework.ExpectNoError(err, "Creating service %q in namespace %q", service.Name, namespace) + By(fmt.Sprintf("Successfully created federated service %q in namespace %q", FederatedServiceName, namespace)) + return service +} + +func deleteServiceOrFail(clientset *federation_release_1_3.Clientset, namespace string, serviceName string) { + if clientset == nil || len(namespace) == 0 || len(serviceName) == 0 { + Fail(fmt.Sprintf("Internal error: invalid parameters passed to deleteServiceOrFail: clientset: %v, namespace: %v, service: %v", clientset, namespace, serviceName)) } + err := clientset.Services(namespace).Delete(serviceName, api.NewDeleteOptions(0)) + framework.ExpectNoError(err, "Error deleting service %q from namespace %q", serviceName, namespace) } func podExitCodeDetector(f *framework.Framework, name string, code int32) func() error { @@ -347,14 +432,13 @@ func podExitCodeDetector(f *framework.Framework, name string, code int32) func() } } -func discoverService(f *framework.Framework, name string, exists bool) { +func discoverService(f *framework.Framework, name string, exists bool, podName string) { command := []string{"sh", "-c", fmt.Sprintf("until nslookup '%s'; do sleep 10; done", name)} By(fmt.Sprintf("Looking up %q", name)) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: FederatedServicePod, - Labels: map[string]string{"name": FederatedServicePod}, + Name: podName, }, Spec: api.PodSpec{ Containers: []api.Container{ @@ -369,22 +453,24 @@ func discoverService(f *framework.Framework, name string, exists bool) { } _, err := f.Client.Pods(f.Namespace.Name).Create(pod) - Expect(err).NotTo(HaveOccurred(), "Trying to create pod to run %q", command) - defer f.Client.Pods(f.Namespace.Name).Delete(FederatedServicePod, api.NewDeleteOptions(0)) + framework.ExpectNoError(err, "Trying to create pod to run %q", command) + defer func() { f.Client.Pods(f.Namespace.Name).Delete(podName, api.NewDeleteOptions(0)) }() if exists { // TODO(mml): Eventually check the IP address is correct, too. - Eventually(podExitCodeDetector(f, FederatedServicePod, 0), 10*DNSTTL, time.Second*2). + Eventually(podExitCodeDetector(f, podName, 0), 3*DNSTTL, time.Second*2). Should(BeNil(), "%q should exit 0, but it never did", command) } else { - Consistently(podExitCodeDetector(f, FederatedServicePod, 0), 10*DNSTTL, time.Second*2). + Consistently(podExitCodeDetector(f, podName, 0), 3*DNSTTL, time.Second*2). ShouldNot(BeNil(), "%q should never exit 0, but it did", command) } } -func createBackendPods(clusterClientSets []*release_1_3.Clientset, namespace string) { - name := "backend" - +/* +createBackendPodsOrFail creates one pod in each cluster, and returns the created pods (in the same order as clusterClientSets). +If creation of any pod fails, the test fails (possibly with a partially created set of pods). No retries are attempted. +*/ +func createBackendPodsOrFail(clusterClientSets []*release_1_3.Clientset, namespace string, name string) []*v1.Pod { pod := &v1.Pod{ ObjectMeta: v1.ObjectMeta{ Name: name, @@ -394,16 +480,34 @@ func createBackendPods(clusterClientSets []*release_1_3.Clientset, namespace str Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "backend", + Name: name, Image: "gcr.io/google_containers/echoserver:1.4", }, }, RestartPolicy: v1.RestartPolicyAlways, }, } + pods := make([]*v1.Pod, len(clusterClientSets)) + for i, client := range clusterClientSets { + createdPod, err := client.Core().Pods(namespace).Create(pod) + framework.ExpectNoError(err, "Creating pod %q in namespace %q in cluster %d", name, namespace, i) + pods[i] = createdPod + } + return pods +} - for _, client := range clusterClientSets { - _, err := client.Core().Pods(namespace).Create(pod) - Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Creating pod %q/%q", namespace, name)) +/* +deleteBackendPodsOrFail deletes one pod from each cluster (unless pods[n] is nil for that cluster) +If deletion of any pod fails, the test fails (possibly with a partially deleted set of pods). No retries are attempted. +*/ +func deleteBackendPodsOrFail(clusterClientSets []*release_1_3.Clientset, namespace string, pods []*v1.Pod) { + if len(clusterClientSets) != len(pods) { + Fail(fmt.Sprintf("Internal error: number of clients (%d) does not equal number of pods (%d). One pod per client please.", len(clusterClientSets), len(pods))) + } + for i, client := range clusterClientSets { + if pods[i] != nil { + err := client.Core().Pods(namespace).Delete(pods[i].Name, api.NewDeleteOptions(0)) + framework.ExpectNoError(err, "Deleting pod %q in namespace %q from cluster %d", pods[i].Name, namespace, i) + } } }