mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Merge pull request #28388 from mml/cluster-refactor
Automatic merge from submit-queue Collect cluster state into a single type. Mostly just makes it easier to read and, hopefully, extend.
This commit is contained in:
commit
9075f53dca
@ -62,11 +62,22 @@ var FederatedServiceLabels = map[string]string{
|
||||
"foo": "bar",
|
||||
}
|
||||
|
||||
/*
|
||||
cluster keeps track of the assorted objects and state related to each cluster
|
||||
in the federation
|
||||
*/
|
||||
type cluster struct {
|
||||
name string
|
||||
*release_1_3.Clientset
|
||||
namespaceCreated bool // Did we need to create a new namespace in this cluster? If so, we should delete it.
|
||||
backendPod *v1.Pod // The backend pod, if one's been created.
|
||||
}
|
||||
|
||||
var _ = framework.KubeDescribe("[Feature:Federation]", func() {
|
||||
f := framework.NewDefaultFederatedFramework("federated-service")
|
||||
var clusterClientSets []*release_1_3.Clientset
|
||||
var clusterNamespaceCreated []bool // Did we need to create a new namespace in each of the above clusters? If so, we should delete it.
|
||||
var clusters map[string]*cluster // All clusters, keyed by cluster name
|
||||
var federationName string
|
||||
var primaryClusterName string // The name of the "primary" cluster
|
||||
|
||||
var _ = Describe("Federated Services", func() {
|
||||
BeforeEach(func() {
|
||||
@ -106,9 +117,11 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() {
|
||||
}
|
||||
framework.Logf("%d clusters are Ready", len(contexts))
|
||||
|
||||
clusterClientSets = make([]*release_1_3.Clientset, len(clusterList.Items))
|
||||
for i, cluster := range clusterList.Items {
|
||||
framework.Logf("Creating a clientset for the cluster %s", cluster.Name)
|
||||
clusters = map[string]*cluster{}
|
||||
primaryClusterName = clusterList.Items[0].Name
|
||||
By(fmt.Sprintf("Labeling %q as the first cluster", primaryClusterName))
|
||||
for i, c := range clusterList.Items {
|
||||
framework.Logf("Creating a clientset for the cluster %s", c.Name)
|
||||
|
||||
Expect(framework.TestContext.KubeConfig).ToNot(Equal(""), "KubeConfig must be specified to load clusters' client config")
|
||||
kubecfg, err := clientcmd.LoadFromFile(framework.TestContext.KubeConfig)
|
||||
@ -116,48 +129,47 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() {
|
||||
|
||||
cfgOverride := &clientcmd.ConfigOverrides{
|
||||
ClusterInfo: clientcmdapi.Cluster{
|
||||
Server: cluster.Spec.ServerAddressByClientCIDRs[0].ServerAddress,
|
||||
Server: c.Spec.ServerAddressByClientCIDRs[0].ServerAddress,
|
||||
},
|
||||
}
|
||||
ccfg := clientcmd.NewNonInteractiveClientConfig(*kubecfg, cluster.Name, cfgOverride, clientcmd.NewDefaultClientConfigLoadingRules())
|
||||
ccfg := clientcmd.NewNonInteractiveClientConfig(*kubecfg, c.Name, cfgOverride, clientcmd.NewDefaultClientConfigLoadingRules())
|
||||
cfg, err := ccfg.ClientConfig()
|
||||
framework.ExpectNoError(err, "Error creating client config in cluster #%d", i)
|
||||
framework.ExpectNoError(err, "Error creating client config in cluster #%d (%q)", i, c.Name)
|
||||
|
||||
cfg.QPS = KubeAPIQPS
|
||||
cfg.Burst = KubeAPIBurst
|
||||
clset := release_1_3.NewForConfigOrDie(restclient.AddUserAgent(cfg, UserAgentName))
|
||||
clusterClientSets[i] = clset
|
||||
clusters[c.Name] = &cluster{c.Name, clset, false, nil}
|
||||
}
|
||||
|
||||
clusterNamespaceCreated = make([]bool, len(clusterClientSets))
|
||||
for i, cs := range clusterClientSets {
|
||||
for name, c := range clusters {
|
||||
// The e2e Framework created the required namespace in one of the clusters, but we need to create it in all the others, if it doesn't yet exist.
|
||||
if _, err := cs.Core().Namespaces().Get(f.Namespace.Name); errors.IsNotFound(err) {
|
||||
if _, err := c.Clientset.Core().Namespaces().Get(f.Namespace.Name); errors.IsNotFound(err) {
|
||||
ns := &v1.Namespace{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: f.Namespace.Name,
|
||||
},
|
||||
}
|
||||
_, err := cs.Core().Namespaces().Create(ns)
|
||||
_, err := c.Clientset.Core().Namespaces().Create(ns)
|
||||
if err == nil {
|
||||
clusterNamespaceCreated[i] = true
|
||||
c.namespaceCreated = true
|
||||
}
|
||||
framework.ExpectNoError(err, "Couldn't create the namespace %s in cluster [%d]", f.Namespace.Name, i)
|
||||
framework.Logf("Namespace %s created in cluster [%d]", f.Namespace.Name, i)
|
||||
framework.ExpectNoError(err, "Couldn't create the namespace %s in cluster %q", f.Namespace.Name, name)
|
||||
framework.Logf("Namespace %s created in cluster %q", f.Namespace.Name, name)
|
||||
} else if err != nil {
|
||||
framework.Logf("Couldn't create the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err)
|
||||
framework.Logf("Couldn't create the namespace %s in cluster %q: %v", f.Namespace.Name, name, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
for i, cs := range clusterClientSets {
|
||||
if clusterNamespaceCreated[i] {
|
||||
if _, err := cs.Core().Namespaces().Get(f.Namespace.Name); !errors.IsNotFound(err) {
|
||||
err := cs.Core().Namespaces().Delete(f.Namespace.Name, &api.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "Couldn't delete the namespace %s in cluster [%d]: %v", f.Namespace.Name, i, err)
|
||||
for name, c := range clusters {
|
||||
if c.namespaceCreated {
|
||||
if _, err := c.Clientset.Core().Namespaces().Get(f.Namespace.Name); !errors.IsNotFound(err) {
|
||||
err := c.Clientset.Core().Namespaces().Delete(f.Namespace.Name, &api.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "Couldn't delete the namespace %s in cluster %q: %v", f.Namespace.Name, name, err)
|
||||
}
|
||||
framework.Logf("Namespace %s deleted in cluster [%d]", f.Namespace.Name, i)
|
||||
framework.Logf("Namespace %s deleted in cluster %q", f.Namespace.Name, name)
|
||||
}
|
||||
}
|
||||
|
||||
@ -199,32 +211,26 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() {
|
||||
err := f.FederationClientset_1_3.Services(f.Namespace.Name).Delete(service.Name, &api.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "Error deleting service %q in namespace %q", service.Name, f.Namespace.Name)
|
||||
}()
|
||||
waitForServiceShardsOrFail(f.Namespace.Name, service, clusterClientSets, nil)
|
||||
waitForServiceShardsOrFail(f.Namespace.Name, service, clusters)
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("DNS", func() {
|
||||
|
||||
var (
|
||||
service *v1.Service
|
||||
backendPods []*v1.Pod
|
||||
service *v1.Service
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
backendPods = createBackendPodsOrFail(clusterClientSets, f.Namespace.Name, FederatedServicePodName)
|
||||
createBackendPodsOrFail(clusters, f.Namespace.Name, FederatedServicePodName)
|
||||
service = createServiceOrFail(f.FederationClientset_1_3, f.Namespace.Name)
|
||||
waitForServiceShardsOrFail(f.Namespace.Name, service, clusterClientSets, nil)
|
||||
waitForServiceShardsOrFail(f.Namespace.Name, service, clusters)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
if backendPods != nil {
|
||||
deleteBackendPodsOrFail(clusterClientSets, f.Namespace.Name, backendPods)
|
||||
backendPods = nil
|
||||
} else {
|
||||
By("No backend pods to delete. BackendPods is nil.")
|
||||
}
|
||||
deleteBackendPodsOrFail(clusters, f.Namespace.Name)
|
||||
|
||||
if service != nil {
|
||||
deleteServiceOrFail(f.FederationClientset_1_3, f.Namespace.Name, service.Name)
|
||||
@ -257,7 +263,7 @@ var _ = framework.KubeDescribe("[Feature:Federation]", func() {
|
||||
framework.SkipUnlessFederated(f.Client)
|
||||
|
||||
// Delete all the backend pods from the shard which is local to the discovery pod.
|
||||
deleteBackendPodsOrFail([]*release_1_3.Clientset{f.Clientset_1_3}, f.Namespace.Name, []*v1.Pod{backendPods[0]})
|
||||
deleteOneBackendPodOrFail(clusters[primaryClusterName])
|
||||
|
||||
})
|
||||
|
||||
@ -341,23 +347,12 @@ func waitForServiceOrFail(clientset *release_1_3.Clientset, namespace string, se
|
||||
}
|
||||
|
||||
/*
|
||||
waitForServiceShardsOrFail waits for the service to appear (or disappear) in the clientsets specifed in presentInCluster (or all if presentInCluster is nil).
|
||||
If presentInCluster[n] is true, then wait for service shard to exist in the cluster specifid in clientsets[n]
|
||||
If presentInCluster[n] is false, then wait for service shard to not exist in the cluster specifid in clientsets[n]
|
||||
waitForServiceShardsOrFail waits for the service to appear in all clusters
|
||||
*/
|
||||
func waitForServiceShardsOrFail(namespace string, service *v1.Service, clientsets []*release_1_3.Clientset, presentInCluster []bool) {
|
||||
if presentInCluster != nil {
|
||||
Expect(len(presentInCluster)).To(Equal(len(clientsets)), "Internal error: Number of presence flags does not equal number of clients/clusters")
|
||||
}
|
||||
framework.Logf("Waiting for service %q in %d clusters", service.Name, len(clientsets))
|
||||
for i, clientset := range clientsets {
|
||||
var present bool // Should the service be present or absent in this cluster?
|
||||
if presentInCluster == nil {
|
||||
present = true
|
||||
} else {
|
||||
present = presentInCluster[i]
|
||||
}
|
||||
waitForServiceOrFail(clientset, namespace, service, present, FederatedServiceTimeout)
|
||||
func waitForServiceShardsOrFail(namespace string, service *v1.Service, clusters map[string]*cluster) {
|
||||
framework.Logf("Waiting for service %q in %d clusters", service.Name, len(clusters))
|
||||
for _, c := range clusters {
|
||||
waitForServiceOrFail(c.Clientset, namespace, service, true, FederatedServiceTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
@ -484,7 +479,7 @@ func discoverService(f *framework.Framework, name string, exists bool, podName s
|
||||
createBackendPodsOrFail creates one pod in each cluster, and returns the created pods (in the same order as clusterClientSets).
|
||||
If creation of any pod fails, the test fails (possibly with a partially created set of pods). No retries are attempted.
|
||||
*/
|
||||
func createBackendPodsOrFail(clusterClientSets []*release_1_3.Clientset, namespace string, name string) []*v1.Pod {
|
||||
func createBackendPodsOrFail(clusters map[string]*cluster, namespace string, name string) {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: name,
|
||||
@ -501,36 +496,42 @@ func createBackendPodsOrFail(clusterClientSets []*release_1_3.Clientset, namespa
|
||||
RestartPolicy: v1.RestartPolicyAlways,
|
||||
},
|
||||
}
|
||||
pods := make([]*v1.Pod, len(clusterClientSets))
|
||||
for i, client := range clusterClientSets {
|
||||
By(fmt.Sprintf("Creating pod %q in namespace %q in cluster %d", pod.Name, namespace, i))
|
||||
createdPod, err := client.Core().Pods(namespace).Create(pod)
|
||||
framework.ExpectNoError(err, "Creating pod %q in namespace %q in cluster %d", name, namespace, i)
|
||||
By(fmt.Sprintf("Successfully created pod %q in namespace %q in cluster %d: %v", pod.Name, namespace, i, *createdPod))
|
||||
pods[i] = createdPod
|
||||
for name, c := range clusters {
|
||||
By(fmt.Sprintf("Creating pod %q in namespace %q in cluster %q", pod.Name, namespace, name))
|
||||
createdPod, err := c.Clientset.Core().Pods(namespace).Create(pod)
|
||||
framework.ExpectNoError(err, "Creating pod %q in namespace %q in cluster %q", name, namespace, name)
|
||||
By(fmt.Sprintf("Successfully created pod %q in namespace %q in cluster %q: %v", pod.Name, namespace, name, *createdPod))
|
||||
c.backendPod = createdPod
|
||||
}
|
||||
return pods
|
||||
}
|
||||
|
||||
/*
|
||||
deleteBackendPodsOrFail deletes one pod from each cluster (unless pods[n] is nil for that cluster)
|
||||
If deletion of any pod fails, the test fails (possibly with a partially deleted set of pods). No retries are attempted.
|
||||
deleteOneBackendPodOrFail deletes exactly one backend pod which must not be nil
|
||||
The test fails if there are any errors.
|
||||
*/
|
||||
func deleteBackendPodsOrFail(clusterClientSets []*release_1_3.Clientset, namespace string, pods []*v1.Pod) {
|
||||
if len(clusterClientSets) != len(pods) {
|
||||
Fail(fmt.Sprintf("Internal error: number of clients (%d) does not equal number of pods (%d). One pod per client please.", len(clusterClientSets), len(pods)))
|
||||
func deleteOneBackendPodOrFail(c *cluster) {
|
||||
pod := c.backendPod
|
||||
Expect(pod).ToNot(BeNil())
|
||||
err := c.Clientset.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0))
|
||||
if errors.IsNotFound(err) {
|
||||
By(fmt.Sprintf("Pod %q in namespace %q in cluster %q does not exist. No need to delete it.", pod.Name, pod.Namespace, c.name))
|
||||
} else {
|
||||
framework.ExpectNoError(err, "Deleting pod %q in namespace %q from cluster %q", pod.Name, pod.Namespace, c.name)
|
||||
}
|
||||
for i, client := range clusterClientSets {
|
||||
if pods[i] != nil {
|
||||
err := client.Core().Pods(namespace).Delete(pods[i].Name, api.NewDeleteOptions(0))
|
||||
if errors.IsNotFound(err) {
|
||||
By(fmt.Sprintf("Pod %q in namespace %q in cluster %d does not exist. No need to delete it.", pods[i].Name, namespace, i))
|
||||
} else {
|
||||
framework.ExpectNoError(err, "Deleting pod %q in namespace %q from cluster %d", pods[i].Name, namespace, i)
|
||||
}
|
||||
By(fmt.Sprintf("Backend pod %q in namespace %q in cluster %d deleted or does not exist", pods[i].Name, namespace, i))
|
||||
By(fmt.Sprintf("Backend pod %q in namespace %q in cluster %q deleted or does not exist", pod.Name, pod.Namespace, c.name))
|
||||
}
|
||||
|
||||
/*
|
||||
deleteBackendPodsOrFail deletes one pod from each cluster that has one.
|
||||
If deletion of any pod fails, the test fails (possibly with a partially deleted set of pods). No retries are attempted.
|
||||
*/
|
||||
func deleteBackendPodsOrFail(clusters map[string]*cluster, namespace string) {
|
||||
for name, c := range clusters {
|
||||
if c.backendPod != nil {
|
||||
deleteOneBackendPodOrFail(c)
|
||||
c.backendPod = nil
|
||||
} else {
|
||||
By(fmt.Sprintf("No backend pod to delete for cluster %d", i))
|
||||
By(fmt.Sprintf("No backend pod to delete for cluster %q", name))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user