mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Refactored, expanded and fixed federated-services e2e tests.
This commit is contained in:
parent
13bb931b60
commit
28fab489ea
@ -19,6 +19,8 @@ package e2e
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/federation/apis/federation"
|
||||
@ -47,239 +49,315 @@ const (
|
||||
|
||||
FederatedServiceTimeout = 60 * time.Second
|
||||
|
||||
FederatedServiceName = "federated-service"
|
||||
FederatedServicePod = "federated-service-test-pod"
|
||||
FederatedServiceName = "federated-service"
|
||||
FederatedServicePodName = "federated-service-test-pod"
|
||||
|
||||
DefaultFederationName = "federation"
|
||||
|
||||
// We use this to decide how long to wait for our DNS probes to succeed.
|
||||
DNSTTL = 180 * time.Second
|
||||
DNSTTL = 180 * time.Second // TODO: make k8s.io/kubernetes/federation/pkg/federation-controller/service.minDnsTtl exported, and import it here.
|
||||
)
|
||||
|
||||
var FederatedServiceLabels = map[string]string{
|
||||
"foo": "bar",
|
||||
}
|
||||
|
||||
var _ = framework.KubeDescribe("[Feature:Federation] Federated Services", func() {
|
||||
|
||||
var _ = framework.KubeDescribe("[Feature:Federation]", func() {
|
||||
f := framework.NewDefaultFederatedFramework("federated-service")
|
||||
var clusterClientSets []*release_1_3.Clientset
|
||||
var clusterNamespaceCreated []bool // Did we need to create a new namespace in each of the above clusters? If so, we should delete it.
|
||||
var federationName string
|
||||
f := framework.NewDefaultFederatedFramework("service")
|
||||
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
|
||||
// TODO: Federation API server should be able to answer this.
|
||||
if federationName = os.Getenv("FEDERATION_NAME"); federationName == "" {
|
||||
federationName = DefaultFederationName
|
||||
}
|
||||
|
||||
contexts := f.GetUnderlyingFederatedContexts()
|
||||
|
||||
for _, context := range contexts {
|
||||
createClusterObjectOrFail(f, &context)
|
||||
}
|
||||
|
||||
var clusterList *federation.ClusterList
|
||||
By("Obtaining a list of all the clusters")
|
||||
if err := wait.PollImmediate(framework.Poll, FederatedServiceTimeout, func() (bool, error) {
|
||||
var err error
|
||||
clusterList, err = f.FederationClientset.Federation().Clusters().List(api.ListOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
framework.Logf("%d clusters registered, waiting for %d", len(clusterList.Items), len(contexts))
|
||||
if len(clusterList.Items) == len(contexts) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}); err != nil {
|
||||
framework.Failf("Failed to list registered clusters: %+v", err)
|
||||
}
|
||||
|
||||
framework.Logf("Checking that %d clusters are Ready", len(contexts))
|
||||
for _, context := range contexts {
|
||||
clusterIsReadyOrFail(f, &context)
|
||||
}
|
||||
framework.Logf("%d clusters are Ready", len(contexts))
|
||||
|
||||
for _, cluster := range clusterList.Items {
|
||||
framework.Logf("Creating a clientset for the cluster %s", cluster.Name)
|
||||
|
||||
Expect(framework.TestContext.KubeConfig).ToNot(Equal(""), "KubeConfig must be specified to load clusters' client config")
|
||||
kubecfg, err := clientcmd.LoadFromFile(framework.TestContext.KubeConfig)
|
||||
framework.ExpectNoError(err, "error loading KubeConfig: %v", err)
|
||||
|
||||
cfgOverride := &clientcmd.ConfigOverrides{
|
||||
ClusterInfo: clientcmdapi.Cluster{
|
||||
Server: cluster.Spec.ServerAddressByClientCIDRs[0].ServerAddress,
|
||||
},
|
||||
}
|
||||
ccfg := clientcmd.NewNonInteractiveClientConfig(*kubecfg, cluster.Name, cfgOverride, clientcmd.NewDefaultClientConfigLoadingRules())
|
||||
cfg, err := ccfg.ClientConfig()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
cfg.QPS = KubeAPIQPS
|
||||
cfg.Burst = KubeAPIBurst
|
||||
clset := release_1_3.NewForConfigOrDie(restclient.AddUserAgent(cfg, UserAgentName))
|
||||
clusterClientSets = append(clusterClientSets, clset)
|
||||
}
|
||||
|
||||
for i, cs := range clusterClientSets {
|
||||
if _, err := cs.Core().Namespaces().Get(f.Namespace.Name); errors.IsNotFound(err) {
|
||||
ns := &v1.Namespace{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: f.Namespace.Name,
|
||||
},
|
||||
}
|
||||
if _, err := cs.Core().Namespaces().Create(ns); err != nil {
|
||||
framework.Logf("Couldn't create the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err)
|
||||
}
|
||||
framework.Logf("Namespace %s created in cluster [%d]", f.Namespace.Name, i)
|
||||
} else if err != nil {
|
||||
framework.Logf("Couldn't create the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
Describe("DNS", func() {
|
||||
AfterEach(func() {
|
||||
var _ = Describe("Federated Services", func() {
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
|
||||
// TODO(mml): replace with calls to framework.DeleteNamespaces and
|
||||
// framework.WaitForNamespacesDeleted. But first we need to re-write
|
||||
// them to expect versioned clients.
|
||||
// ALSO TODO(mml): Utility functions like these should [optionally?]
|
||||
// accept a list of clients/clusters to act upon, to increase
|
||||
// re-usablity.
|
||||
for i, cs := range clusterClientSets {
|
||||
if err := cs.Core().Namespaces().Delete(f.Namespace.Name, api.NewDeleteOptions(0)); err != nil {
|
||||
framework.Failf("Couldn't delete the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err)
|
||||
// TODO: Federation API server should be able to answer this.
|
||||
if federationName = os.Getenv("FEDERATION_NAME"); federationName == "" {
|
||||
federationName = DefaultFederationName
|
||||
}
|
||||
|
||||
contexts := f.GetUnderlyingFederatedContexts()
|
||||
|
||||
for _, context := range contexts {
|
||||
createClusterObjectOrFail(f, &context)
|
||||
}
|
||||
|
||||
var clusterList *federation.ClusterList
|
||||
By("Obtaining a list of all the clusters")
|
||||
if err := wait.PollImmediate(framework.Poll, FederatedServiceTimeout, func() (bool, error) {
|
||||
var err error
|
||||
clusterList, err = f.FederationClientset.Federation().Clusters().List(api.ListOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
framework.Logf("%d clusters registered, waiting for %d", len(clusterList.Items), len(contexts))
|
||||
if len(clusterList.Items) == len(contexts) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}); err != nil {
|
||||
framework.Failf("Failed to list registered clusters: %+v", err)
|
||||
}
|
||||
|
||||
framework.Logf("Checking that %d clusters are Ready", len(contexts))
|
||||
for _, context := range contexts {
|
||||
clusterIsReadyOrFail(f, &context)
|
||||
}
|
||||
framework.Logf("%d clusters are Ready", len(contexts))
|
||||
|
||||
for i, cluster := range clusterList.Items {
|
||||
framework.Logf("Creating a clientset for the cluster %s", cluster.Name)
|
||||
|
||||
Expect(framework.TestContext.KubeConfig).ToNot(Equal(""), "KubeConfig must be specified to load clusters' client config")
|
||||
kubecfg, err := clientcmd.LoadFromFile(framework.TestContext.KubeConfig)
|
||||
framework.ExpectNoError(err, "error loading KubeConfig: %v", err)
|
||||
|
||||
cfgOverride := &clientcmd.ConfigOverrides{
|
||||
ClusterInfo: clientcmdapi.Cluster{
|
||||
Server: cluster.Spec.ServerAddressByClientCIDRs[0].ServerAddress,
|
||||
},
|
||||
}
|
||||
ccfg := clientcmd.NewNonInteractiveClientConfig(*kubecfg, cluster.Name, cfgOverride, clientcmd.NewDefaultClientConfigLoadingRules())
|
||||
cfg, err := ccfg.ClientConfig()
|
||||
framework.ExpectNoError(err, "Error creating client config in cluster #%d", i)
|
||||
|
||||
cfg.QPS = KubeAPIQPS
|
||||
cfg.Burst = KubeAPIBurst
|
||||
clset := release_1_3.NewForConfigOrDie(restclient.AddUserAgent(cfg, UserAgentName))
|
||||
clusterClientSets = append(clusterClientSets, clset)
|
||||
}
|
||||
|
||||
clusterNamespaceCreated = make([]bool, len(clusterClientSets))
|
||||
for i, cs := range clusterClientSets {
|
||||
// The e2e Framework created the required namespace in one of the clusters, but we need to create it in all the others, if it doesn't yet exist.
|
||||
if _, err := cs.Core().Namespaces().Get(f.Namespace.Name); errors.IsNotFound(err) {
|
||||
ns := &v1.Namespace{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: f.Namespace.Name,
|
||||
},
|
||||
}
|
||||
_, err := cs.Core().Namespaces().Create(ns)
|
||||
if err == nil {
|
||||
clusterNamespaceCreated[i] = true
|
||||
}
|
||||
framework.ExpectNoError(err, "Couldn't create the namespace %s in cluster [%d]", f.Namespace.Name, i)
|
||||
framework.Logf("Namespace %s created in cluster [%d]", f.Namespace.Name, i)
|
||||
} else if err != nil {
|
||||
framework.Logf("Couldn't create the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
for i, cs := range clusterClientSets {
|
||||
if clusterNamespaceCreated[i] {
|
||||
if _, err := cs.Core().Namespaces().Get(f.Namespace.Name); !errors.IsNotFound(err) {
|
||||
err := cs.Core().Namespaces().Delete(f.Namespace.Name, &api.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "Couldn't delete the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err)
|
||||
}
|
||||
framework.Logf("Namespace %s deleted in cluster [%d]", f.Namespace.Name, i)
|
||||
}
|
||||
framework.Logf("Namespace %s deleted in cluster [%d]", f.Namespace.Name, i)
|
||||
}
|
||||
|
||||
// Delete the registered clusters in the federation API server.
|
||||
clusterList, err := f.FederationClientset.Federation().Clusters().List(api.ListOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
framework.ExpectNoError(err, "Error listing clusters")
|
||||
for _, cluster := range clusterList.Items {
|
||||
err := f.FederationClientset.Federation().Clusters().Delete(cluster.Name, &api.DeleteOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
framework.ExpectNoError(err, "Error deleting cluster %q", cluster.Name)
|
||||
}
|
||||
})
|
||||
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
createBackendPods(clusterClientSets, f.Namespace.Name)
|
||||
createService(f.FederationClientset_1_3, clusterClientSets, f.Namespace.Name)
|
||||
})
|
||||
|
||||
It("should be able to discover a federated service", func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
|
||||
svcDNSNames := []string{
|
||||
FederatedServiceName,
|
||||
fmt.Sprintf("%s.%s", FederatedServiceName, f.Namespace.Name),
|
||||
fmt.Sprintf("%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name),
|
||||
fmt.Sprintf("%s.%s.%s", FederatedServiceName, f.Namespace.Name, federationName),
|
||||
fmt.Sprintf("%s.%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name, federationName),
|
||||
}
|
||||
// TODO(mml): This could be much faster. We can launch all the test
|
||||
// pods, perhaps in the BeforeEach, and then just poll until we get
|
||||
// successes/failures from them all.
|
||||
for _, name := range svcDNSNames {
|
||||
discoverService(f, name, true)
|
||||
}
|
||||
})
|
||||
|
||||
Context("non-local federated service", func() {
|
||||
Describe("Service creation", func() {
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
|
||||
// Delete a federated service shard in the default e2e Kubernetes cluster.
|
||||
// TODO(mml): This should not work: #27623. We should use a load
|
||||
// balancer with actual back-ends, some of which we delete or disable.
|
||||
err := f.Clientset_1_3.Core().Services(f.Namespace.Name).Delete(FederatedServiceName, &api.DeleteOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
waitForFederatedServiceShard(f.Clientset_1_3, f.Namespace.Name, nil, 0)
|
||||
// Placeholder
|
||||
})
|
||||
|
||||
It("should be able to discover a non-local federated service", func() {
|
||||
AfterEach(func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
// Placeholder
|
||||
})
|
||||
|
||||
It("should succeed", func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
service := createServiceOrFail(f.FederationClientset_1_3, f.Namespace.Name)
|
||||
By(fmt.Sprintf("Creation of service %q in namespace %q succeeded. Deleting service.", service.Name, f.Namespace.Name))
|
||||
// Cleanup
|
||||
err := f.FederationClientset_1_3.Services(f.Namespace.Name).Delete(service.Name, &api.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "Error deleting service %q in namespace %q", service.Name, service.Namespace)
|
||||
By(fmt.Sprintf("Deletion of service %q in namespace %q succeeded.", service.Name, f.Namespace.Name))
|
||||
})
|
||||
|
||||
It("should create matching services in underlying clusters", func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
service := createServiceOrFail(f.FederationClientset_1_3, f.Namespace.Name)
|
||||
defer func() { // Cleanup
|
||||
By(fmt.Sprintf("Deleting service %q in namespace %q", service.Name, f.Namespace.Name))
|
||||
err := f.FederationClientset_1_3.Services(f.Namespace.Name).Delete(service.Name, &api.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "Error deleting service %q in namespace %q", service.Name, f.Namespace.Name)
|
||||
}()
|
||||
waitForServiceShardsOrFail(f.Namespace.Name, service, clusterClientSets, nil)
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("DNS", func() {
|
||||
|
||||
var (
|
||||
service *v1.Service
|
||||
backendPods []*v1.Pod
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
backendPods = createBackendPodsOrFail(clusterClientSets, f.Namespace.Name, FederatedServicePodName)
|
||||
service = createServiceOrFail(f.FederationClientset_1_3, f.Namespace.Name)
|
||||
waitForServiceShardsOrFail(f.Namespace.Name, service, clusterClientSets, nil)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
if backendPods != nil {
|
||||
deleteBackendPodsOrFail(clusterClientSets, f.Namespace.Name, backendPods)
|
||||
backendPods = nil
|
||||
}
|
||||
if service != nil {
|
||||
deleteServiceOrFail(f.FederationClientset_1_3, f.Namespace.Name, service.Name)
|
||||
service = nil
|
||||
}
|
||||
})
|
||||
|
||||
It("should be able to discover a federated service", func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
|
||||
svcDNSNames := []string{
|
||||
FederatedServiceName,
|
||||
fmt.Sprintf("%s.%s", FederatedServiceName, f.Namespace.Name),
|
||||
fmt.Sprintf("%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name),
|
||||
fmt.Sprintf("%s.%s.%s", FederatedServiceName, f.Namespace.Name, federationName),
|
||||
fmt.Sprintf("%s.%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name, federationName),
|
||||
}
|
||||
for _, name := range svcDNSNames {
|
||||
discoverService(f, name, true)
|
||||
// TODO(mml): This could be much faster. We can launch all the test
|
||||
// pods, perhaps in the BeforeEach, and then just poll until we get
|
||||
// successes/failures from them all.
|
||||
for i, DNSName := range svcDNSNames {
|
||||
discoverService(f, DNSName, true, "federated-service-e2e-discovery-pod-"+strconv.Itoa(i))
|
||||
}
|
||||
})
|
||||
|
||||
// TODO(mml): This currently takes 9 minutes. Consider reducing the
|
||||
// TTL and/or running the pods in parallel.
|
||||
Context("[Slow] missing local service", func() {
|
||||
It("should never find DNS entries for a missing local service", func() {
|
||||
Context("non-local federated service", func() {
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
|
||||
localSvcDNSNames := []string{
|
||||
FederatedServiceName,
|
||||
fmt.Sprintf("%s.%s", FederatedServiceName, f.Namespace.Name),
|
||||
fmt.Sprintf("%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name),
|
||||
// Delete all the backend pods from the shard which is local to the discovery pod.
|
||||
deleteBackendPodsOrFail([]*release_1_3.Clientset{f.Clientset_1_3}, f.Namespace.Name, []*v1.Pod{backendPods[0]})
|
||||
backendPods[0] = nil // So we don't try to delete it again in an outer AfterEach
|
||||
})
|
||||
|
||||
It("should be able to discover a non-local federated service", func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
|
||||
svcDNSNames := []string{
|
||||
fmt.Sprintf("%s.%s.%s", FederatedServiceName, f.Namespace.Name, federationName),
|
||||
fmt.Sprintf("%s.%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name, federationName),
|
||||
}
|
||||
for _, name := range localSvcDNSNames {
|
||||
discoverService(f, name, false)
|
||||
for i, name := range svcDNSNames {
|
||||
discoverService(f, name, true, "federated-service-e2e-discovery-pod-"+strconv.Itoa(i))
|
||||
}
|
||||
})
|
||||
|
||||
// TODO(mml): This currently takes 9 minutes. Consider reducing the
|
||||
// TTL and/or running the pods in parallel.
|
||||
Context("[Slow] missing local service", func() {
|
||||
It("should never find DNS entries for a missing local service", func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
|
||||
localSvcDNSNames := []string{
|
||||
FederatedServiceName,
|
||||
fmt.Sprintf("%s.%s", FederatedServiceName, f.Namespace.Name),
|
||||
fmt.Sprintf("%s.%s.svc.cluster.local.", FederatedServiceName, f.Namespace.Name),
|
||||
}
|
||||
for i, name := range localSvcDNSNames {
|
||||
discoverService(f, name, false, FederatedServicePodName+strconv.Itoa(i))
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// waitForFederatedServiceShard waits until the number of shards of a given federated
|
||||
// service reaches the expected value, i.e. numSvcs in the given individual Kubernetes
|
||||
// cluster. If the shard count, i.e. numSvcs is expected to be at least one, then
|
||||
// it also checks if the first shard's name and spec matches that of the given service.
|
||||
func waitForFederatedServiceShard(cs *release_1_3.Clientset, namespace string, service *v1.Service, numSvcs int) {
|
||||
By("Fetching a federated service shard")
|
||||
var clSvcList *v1.ServiceList
|
||||
if err := wait.PollImmediate(framework.Poll, FederatedServiceTimeout, func() (bool, error) {
|
||||
var err error
|
||||
clSvcList, err = cs.Core().Services(namespace).List(api.ListOptions{})
|
||||
if err != nil {
|
||||
/*
|
||||
equivalent returns true if the two services are equivalent. Fields which are expected to differ between
|
||||
federated services and the underlying cluster services (e.g. ClusterIP, LoadBalancerIP etc) are ignored.
|
||||
*/
|
||||
func equivalent(federationService, clusterService v1.Service) bool {
|
||||
// TODO: I think that we need a DeepCopy here to avoid clobbering our parameters.
|
||||
clusterService.Spec.ClusterIP = federationService.Spec.ClusterIP
|
||||
clusterService.Spec.ExternalIPs = federationService.Spec.ExternalIPs
|
||||
clusterService.Spec.DeprecatedPublicIPs = federationService.Spec.DeprecatedPublicIPs
|
||||
clusterService.Spec.LoadBalancerIP = federationService.Spec.LoadBalancerIP
|
||||
clusterService.Spec.LoadBalancerSourceRanges = federationService.Spec.LoadBalancerSourceRanges
|
||||
// N.B. We cannot iterate over the port objects directly, as their values
|
||||
// only get copied and our updates will get lost.
|
||||
for i := range clusterService.Spec.Ports {
|
||||
clusterService.Spec.Ports[i].NodePort = federationService.Spec.Ports[i].NodePort
|
||||
}
|
||||
return reflect.DeepEqual(clusterService.Spec, federationService.Spec)
|
||||
}
|
||||
|
||||
/*
|
||||
waitForServiceOrFail waits until a service is either present or absent in the cluster specified by clientset.
|
||||
If the condition is not met within timout, it fails the calling test.
|
||||
*/
|
||||
func waitForServiceOrFail(clientset *release_1_3.Clientset, namespace string, service *v1.Service, present bool, timeout time.Duration) {
|
||||
By(fmt.Sprintf("Fetching a federated service shard of service %q in namespace %q from cluster", service.Name, namespace))
|
||||
var clusterService *v1.Service
|
||||
err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
|
||||
clusterService, err := clientset.Services(namespace).Get(service.Name)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
return false, err
|
||||
}
|
||||
n := len(clSvcList.Items)
|
||||
if n == numSvcs {
|
||||
if (clusterService != nil && err == nil && present) || (clusterService == nil && errors.IsNotFound(err) && !present) {
|
||||
By(fmt.Sprintf("Success: federated service shard of service %q in namespace %q in cluster: %v", service.Name, namespace, present))
|
||||
return true, nil
|
||||
}
|
||||
framework.Logf("%d services found, waiting for %d, trying again in %s", n, numSvcs, framework.Poll)
|
||||
By(fmt.Sprintf("Service found: %v, waiting for service found: %v, trying again in %s", clusterService != nil, present, framework.Poll))
|
||||
return false, nil
|
||||
}); err != nil {
|
||||
framework.Failf("Failed to list registered clusters: %+v", err)
|
||||
}
|
||||
})
|
||||
framework.ExpectNoError(err, "Failed to get service %q in namespace %q", service.Name, namespace)
|
||||
|
||||
if numSvcs > 0 && service != nil {
|
||||
// Renaming for clarity/readability
|
||||
clSvc := clSvcList.Items[0]
|
||||
|
||||
Expect(clSvc.Name).To(Equal(service.Name))
|
||||
// Some fields are expected to be different, so make them the same before checking equality.
|
||||
clSvc.Spec.ClusterIP = service.Spec.ClusterIP
|
||||
clSvc.Spec.ExternalIPs = service.Spec.ExternalIPs
|
||||
clSvc.Spec.DeprecatedPublicIPs = service.Spec.DeprecatedPublicIPs
|
||||
clSvc.Spec.LoadBalancerIP = service.Spec.LoadBalancerIP
|
||||
clSvc.Spec.LoadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges
|
||||
// N.B. We cannot iterate over the port objects directly, as their values
|
||||
// only get copied and our updates will get lost.
|
||||
for i := range clSvc.Spec.Ports {
|
||||
clSvc.Spec.Ports[i].NodePort = service.Spec.Ports[i].NodePort
|
||||
}
|
||||
Expect(clSvc.Spec).To(Equal(service.Spec))
|
||||
if present && clusterService != nil {
|
||||
Expect(equivalent(*clusterService, *service))
|
||||
}
|
||||
}
|
||||
|
||||
func createService(fcs *federation_release_1_3.Clientset, clusterClientSets []*release_1_3.Clientset, namespace string) {
|
||||
/*
|
||||
waitForServiceShardsOrFail waits for the service to appear (or disappear) in the clientsets specifed in presentInCluster (or all if presentInCluster is nil).
|
||||
If presentInCluster[n] is true, then wait for service shard to exist in the cluster specifid in clientsets[n]
|
||||
If presentInCluster[n] is false, then wait for service shard to not exist in the cluster specifid in clientsets[n]
|
||||
*/
|
||||
func waitForServiceShardsOrFail(namespace string, service *v1.Service, clientsets []*release_1_3.Clientset, presentInCluster []bool) {
|
||||
if presentInCluster != nil {
|
||||
Expect(len(presentInCluster)).To(Equal(len(clientsets)), "Internal error: Number of presence flags does not equal number of clients/clusters")
|
||||
}
|
||||
framework.Logf("Waiting for service %q in %d clusters", service.Name, len(clientsets))
|
||||
for i, clientset := range clientsets {
|
||||
var present bool // Should the service be present or absent in this cluster?
|
||||
if presentInCluster == nil {
|
||||
present = true
|
||||
} else {
|
||||
present = presentInCluster[i]
|
||||
}
|
||||
waitForServiceOrFail(clientset, namespace, service, present, FederatedServiceTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func createServiceOrFail(clientset *federation_release_1_3.Clientset, namespace string) *v1.Service {
|
||||
if clientset == nil || len(namespace) == 0 {
|
||||
Fail(fmt.Sprintf("Internal error: invalid parameters passed to deleteServiceOrFail: clientset: %v, namespace: %v", clientset, namespace))
|
||||
}
|
||||
By(fmt.Sprintf("Creating federated service %q in namespace %q", FederatedServiceName, namespace))
|
||||
|
||||
service := &v1.Service{
|
||||
@ -298,12 +376,19 @@ func createService(fcs *federation_release_1_3.Clientset, clusterClientSets []*r
|
||||
},
|
||||
},
|
||||
}
|
||||
nservice, err := fcs.Core().Services(namespace).Create(service)
|
||||
framework.Logf("Trying to create service %q in namespace %q", service.ObjectMeta.Name, service.ObjectMeta.Namespace)
|
||||
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("creating service %s: %+v", service.Name, err))
|
||||
for _, cs := range clusterClientSets {
|
||||
waitForFederatedServiceShard(cs, namespace, nservice, 1)
|
||||
By(fmt.Sprintf("Trying to create service %q in namespace %q", service.Name, namespace))
|
||||
_, err := clientset.Services(namespace).Create(service)
|
||||
framework.ExpectNoError(err, "Creating service %q in namespace %q", service.Name, namespace)
|
||||
By(fmt.Sprintf("Successfully created federated service %q in namespace %q", FederatedServiceName, namespace))
|
||||
return service
|
||||
}
|
||||
|
||||
func deleteServiceOrFail(clientset *federation_release_1_3.Clientset, namespace string, serviceName string) {
|
||||
if clientset == nil || len(namespace) == 0 || len(serviceName) == 0 {
|
||||
Fail(fmt.Sprintf("Internal error: invalid parameters passed to deleteServiceOrFail: clientset: %v, namespace: %v, service: %v", clientset, namespace, serviceName))
|
||||
}
|
||||
err := clientset.Services(namespace).Delete(serviceName, api.NewDeleteOptions(0))
|
||||
framework.ExpectNoError(err, "Error deleting service %q from namespace %q", serviceName, namespace)
|
||||
}
|
||||
|
||||
func podExitCodeDetector(f *framework.Framework, name string, code int32) func() error {
|
||||
@ -347,14 +432,13 @@ func podExitCodeDetector(f *framework.Framework, name string, code int32) func()
|
||||
}
|
||||
}
|
||||
|
||||
func discoverService(f *framework.Framework, name string, exists bool) {
|
||||
func discoverService(f *framework.Framework, name string, exists bool, podName string) {
|
||||
command := []string{"sh", "-c", fmt.Sprintf("until nslookup '%s'; do sleep 10; done", name)}
|
||||
By(fmt.Sprintf("Looking up %q", name))
|
||||
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: FederatedServicePod,
|
||||
Labels: map[string]string{"name": FederatedServicePod},
|
||||
Name: podName,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
@ -369,22 +453,24 @@ func discoverService(f *framework.Framework, name string, exists bool) {
|
||||
}
|
||||
|
||||
_, err := f.Client.Pods(f.Namespace.Name).Create(pod)
|
||||
Expect(err).NotTo(HaveOccurred(), "Trying to create pod to run %q", command)
|
||||
defer f.Client.Pods(f.Namespace.Name).Delete(FederatedServicePod, api.NewDeleteOptions(0))
|
||||
framework.ExpectNoError(err, "Trying to create pod to run %q", command)
|
||||
defer func() { f.Client.Pods(f.Namespace.Name).Delete(podName, api.NewDeleteOptions(0)) }()
|
||||
|
||||
if exists {
|
||||
// TODO(mml): Eventually check the IP address is correct, too.
|
||||
Eventually(podExitCodeDetector(f, FederatedServicePod, 0), 10*DNSTTL, time.Second*2).
|
||||
Eventually(podExitCodeDetector(f, podName, 0), 3*DNSTTL, time.Second*2).
|
||||
Should(BeNil(), "%q should exit 0, but it never did", command)
|
||||
} else {
|
||||
Consistently(podExitCodeDetector(f, FederatedServicePod, 0), 10*DNSTTL, time.Second*2).
|
||||
Consistently(podExitCodeDetector(f, podName, 0), 3*DNSTTL, time.Second*2).
|
||||
ShouldNot(BeNil(), "%q should never exit 0, but it did", command)
|
||||
}
|
||||
}
|
||||
|
||||
func createBackendPods(clusterClientSets []*release_1_3.Clientset, namespace string) {
|
||||
name := "backend"
|
||||
|
||||
/*
|
||||
createBackendPodsOrFail creates one pod in each cluster, and returns the created pods (in the same order as clusterClientSets).
|
||||
If creation of any pod fails, the test fails (possibly with a partially created set of pods). No retries are attempted.
|
||||
*/
|
||||
func createBackendPodsOrFail(clusterClientSets []*release_1_3.Clientset, namespace string, name string) []*v1.Pod {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: name,
|
||||
@ -394,16 +480,34 @@ func createBackendPods(clusterClientSets []*release_1_3.Clientset, namespace str
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "backend",
|
||||
Name: name,
|
||||
Image: "gcr.io/google_containers/echoserver:1.4",
|
||||
},
|
||||
},
|
||||
RestartPolicy: v1.RestartPolicyAlways,
|
||||
},
|
||||
}
|
||||
pods := make([]*v1.Pod, len(clusterClientSets))
|
||||
for i, client := range clusterClientSets {
|
||||
createdPod, err := client.Core().Pods(namespace).Create(pod)
|
||||
framework.ExpectNoError(err, "Creating pod %q in namespace %q in cluster %d", name, namespace, i)
|
||||
pods[i] = createdPod
|
||||
}
|
||||
return pods
|
||||
}
|
||||
|
||||
for _, client := range clusterClientSets {
|
||||
_, err := client.Core().Pods(namespace).Create(pod)
|
||||
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Creating pod %q/%q", namespace, name))
|
||||
/*
|
||||
deleteBackendPodsOrFail deletes one pod from each cluster (unless pods[n] is nil for that cluster)
|
||||
If deletion of any pod fails, the test fails (possibly with a partially deleted set of pods). No retries are attempted.
|
||||
*/
|
||||
func deleteBackendPodsOrFail(clusterClientSets []*release_1_3.Clientset, namespace string, pods []*v1.Pod) {
|
||||
if len(clusterClientSets) != len(pods) {
|
||||
Fail(fmt.Sprintf("Internal error: number of clients (%d) does not equal number of pods (%d). One pod per client please.", len(clusterClientSets), len(pods)))
|
||||
}
|
||||
for i, client := range clusterClientSets {
|
||||
if pods[i] != nil {
|
||||
err := client.Core().Pods(namespace).Delete(pods[i].Name, api.NewDeleteOptions(0))
|
||||
framework.ExpectNoError(err, "Deleting pod %q in namespace %q from cluster %d", pods[i].Name, namespace, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user