diff --git a/test/e2e_federation/BUILD b/test/e2e_federation/BUILD index 1fdeb3e2e22..5684ece4cbe 100644 --- a/test/e2e_federation/BUILD +++ b/test/e2e_federation/BUILD @@ -24,9 +24,11 @@ go_library( ], tags = ["automanaged"], deps = [ + "//federation/apis/federation:go_default_library", "//federation/apis/federation/v1beta1:go_default_library", "//federation/client/clientset_generated/federation_clientset:go_default_library", "//federation/client/clientset_generated/federation_clientset/typed/core/v1:go_default_library", + "//federation/pkg/federation-controller/replicaset:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", diff --git a/test/e2e_federation/federated-replicaset.go b/test/e2e_federation/federated-replicaset.go index 026c7f5b4d7..dc36bc22be5 100644 --- a/test/e2e_federation/federated-replicaset.go +++ b/test/e2e_federation/federated-replicaset.go @@ -17,7 +17,9 @@ limitations under the License. package e2e_federation import ( + "encoding/json" "fmt" + "reflect" "strings" "time" @@ -30,11 +32,11 @@ import ( "k8s.io/kubernetes/test/e2e/framework" fedframework "k8s.io/kubernetes/test/e2e_federation/framework" - "reflect" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/kubernetes/federation/apis/federation" + fedreplicsetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset" ) const ( @@ -59,12 +61,12 @@ var _ = framework.KubeDescribe("Federation replicasets [Feature:Federation]", fu fedframework.SkipUnlessFederated(f.ClientSet) nsName := f.FederationNamespace.Name - replicaset := createReplicaSetOrFail(f.FederationClientset, nsName) - By(fmt.Sprintf("Creation of replicaset %q in namespace %q succeeded. Deleting replicaset.", replicaset.Name, nsName)) + rs := createReplicaSetOrFail(f.FederationClientset, newReplicaSet(nsName, FederationReplicaSetName, 5, nil)) + By(fmt.Sprintf("Creation of replicaset %q in namespace %q succeeded. Deleting replicaset.", rs.Name, nsName)) // Cleanup - err := f.FederationClientset.Extensions().ReplicaSets(nsName).Delete(replicaset.Name, &metav1.DeleteOptions{}) - framework.ExpectNoError(err, "Error deleting replicaset %q in namespace %q", replicaset.Name, replicaset.Namespace) - By(fmt.Sprintf("Deletion of replicaset %q in namespace %q succeeded.", replicaset.Name, nsName)) + err := f.FederationClientset.Extensions().ReplicaSets(nsName).Delete(rs.Name, &metav1.DeleteOptions{}) + framework.ExpectNoError(err, "Error deleting replicaset %q in namespace %q", rs.Name, rs.Namespace) + By(fmt.Sprintf("Deletion of replicaset %q in namespace %q succeeded.", rs.Name, nsName)) }) }) @@ -87,22 +89,74 @@ var _ = framework.KubeDescribe("Federation replicasets [Feature:Federation]", fu It("should create and update matching replicasets in underling clusters", func() { nsName := f.FederationNamespace.Name - rs := createReplicaSetOrFail(f.FederationClientset, nsName) - defer func() { - // cleanup. deletion of replicasets is not supported for underlying clusters - By(fmt.Sprintf("Preparing replicaset %q/%q for deletion by setting replicas to zero", nsName, rs.Name)) - replicas := int32(0) - rs.Spec.Replicas = &replicas - f.FederationClientset.ReplicaSets(nsName).Update(rs) - waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters) + cleanupFn := func(rs *v1beta1.ReplicaSet) { + // cleanup. deletion of replicasets is not supported for underling clusters + By(fmt.Sprintf("zero replicas then delete replicaset %q/%q", nsName, rs.Name)) + zeroReplicas := int32(0) + rs.Spec.Replicas = &zeroReplicas + updateReplicaSetOrFail(f.FederationClientset, rs) + waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters, nil) f.FederationClientset.ReplicaSets(nsName).Delete(rs.Name, &metav1.DeleteOptions{}) + } + + // general test with default replicaset pref + func() { + rs := newReplicaSet(nsName, FederationReplicaSetName, 5, nil) + rs = createReplicaSetOrFail(f.FederationClientset, rs) + defer cleanupFn(rs) + + waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters, nil) + By(fmt.Sprintf("Successfuly created and synced replicaset %q/%q (%v/%v) to clusters", nsName, rs.Name, *rs.Spec.Replicas, rs.Status.Replicas)) + + rs = newReplicaSet(nsName, FederationReplicaSetName, 15, nil) + updateReplicaSetOrFail(f.FederationClientset, rs) + waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters, nil) + By(fmt.Sprintf("Successfuly updated and synced replicaset %q/%q (%v/%v) to clusters", nsName, rs.Name, *rs.Spec.Replicas, rs.Status.Replicas)) }() - waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters) - By(fmt.Sprintf("Successfuly created and synced replicaset %q/%q to clusters", nsName, rs.Name)) - updateReplicaSetOrFail(f.FederationClientset, nsName) - waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters) - By(fmt.Sprintf("Successfuly updated and synced replicaset %q/%q to clusters", nsName, rs.Name)) + // test for replicaset prefs with weight, min and max replicas + createAndUpdateFn := func(pref *federation.FederatedReplicaSetPreferences, replicas int32, expect map[string]int32) { + rs := newReplicaSet(nsName, FederationReplicaSetName, replicas, pref) + createReplicaSetOrFail(f.FederationClientset, rs) + defer cleanupFn(rs) + + waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters, expect) + By(fmt.Sprintf("Successfuly created and synced replicaset %q/%q (%v/%v) to clusters", nsName, rs.Name, *rs.Spec.Replicas, rs.Status.Replicas)) + + rs = newReplicaSet(nsName, FederationReplicaSetName, 0, pref) + updateReplicaSetOrFail(f.FederationClientset, rs) + waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters, nil) + By(fmt.Sprintf("Successfuly updated and synced replicaset %q/%q (%v/%v) to clusters", nsName, rs.Name, *rs.Spec.Replicas, rs.Status.Replicas)) + + rs = newReplicaSet(nsName, FederationReplicaSetName, replicas, pref) + updateReplicaSetOrFail(f.FederationClientset, rs) + waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters, expect) + By(fmt.Sprintf("Successfuly updated and synced replicaset %q/%q (%v/%v) to clusters", nsName, rs.Name, *rs.Spec.Replicas, rs.Status.Replicas)) + } + createAndUpdateFn(generageFedRsPrefsWithWeight(clusters)) + createAndUpdateFn(generageFedRsPrefsWithMin(clusters)) + createAndUpdateFn(generageFedRsPrefsWithMax(clusters)) + + // test for rebalancing + func() { + pref1, pref2, replicas, expect1, expect2 := generageFedRsPrefsForRebalancing(clusters) + rs := newReplicaSet(nsName, FederationReplicaSetName, replicas, pref1) + rs = createReplicaSetOrFail(f.FederationClientset, rs) + defer cleanupFn(rs) + waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters, expect1) + By(fmt.Sprintf("Successfuly created and synced replicaset %q/%q (%v/%v) to clusters", nsName, rs.Name, *rs.Spec.Replicas, rs.Status.Replicas)) + + rs = newReplicaSet(nsName, FederationReplicaSetName, replicas, pref2) + updateReplicaSetOrFail(f.FederationClientset, rs) + waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters, expect1) + By(fmt.Sprintf("Successfuly updated and synced replicaset %q/%q (%v/%v) to clusters", nsName, rs.Name, *rs.Spec.Replicas, rs.Status.Replicas)) + + pref2 = updateFedRePrefsRebalance(pref2, true) + rs = newReplicaSet(nsName, FederationReplicaSetName, replicas, pref2) + updateReplicaSetOrFail(f.FederationClientset, rs) + waitForReplicaSetOrFail(f.FederationClientset, nsName, rs.Name, clusters, expect2) + By(fmt.Sprintf("Successfuly updated and synced replicaset %q/%q (%v/%v) to clusters", nsName, rs.Name, *rs.Spec.Replicas, rs.Status.Replicas)) + }() }) It("should be deleted from underlying clusters when OrphanDependents is false", func() { @@ -144,7 +198,7 @@ func deleteAllReplicaSetsOrFail(clientset *fedclientset.Clientset, nsName string // from underlying clusters when orphan dependents is false and they are not // deleted when orphan dependents is true. func verifyCascadingDeletionForReplicaSet(clientset *fedclientset.Clientset, clusters map[string]*cluster, orphanDependents *bool, nsName string) { - replicaSet := createReplicaSetOrFail(clientset, nsName) + replicaSet := createReplicaSetOrFail(clientset, newReplicaSet(nsName, FederationReplicaSetName, 5, nil)) replicaSetName := replicaSet.Name // Check subclusters if the replicaSet was created there. By(fmt.Sprintf("Waiting for replica sets %s to be created in all underlying clusters", replicaSetName)) @@ -180,12 +234,114 @@ func verifyCascadingDeletionForReplicaSet(clientset *fedclientset.Clientset, clu } } -func waitForReplicaSetOrFail(c *fedclientset.Clientset, namespace string, replicaSetName string, clusters map[string]*cluster) { - err := waitForReplicaSet(c, namespace, replicaSetName, clusters) +func generageFedRsPrefsWithWeight(clusters map[string]*cluster) (pref *federation.FederatedReplicaSetPreferences, replicas int32, expect map[string]int32) { + clusterNames := extraceClusterNames(clusters) + pref = &federation.FederatedReplicaSetPreferences{ + Clusters: map[string]federation.ClusterReplicaSetPreferences{}, + } + replicas = 0 + expect = map[string]int32{} + + for i, clusterName := range clusterNames { + if i != 0 { // do not set weight for cluster[0] thus it should have no replicas scheduled + clusterRsPref := pref.Clusters[clusterName] + clusterRsPref.Weight = int64(i) + replicas += int32(i) + expect[clusterName] = int32(i) + } + } + return +} + +func generageFedRsPrefsWithMin(clusters map[string]*cluster) (pref *federation.FederatedReplicaSetPreferences, replicas int32, expect map[string]int32) { + clusterNames := extraceClusterNames(clusters) + pref = &federation.FederatedReplicaSetPreferences{ + Clusters: map[string]federation.ClusterReplicaSetPreferences{ + clusterNames[0]: {Weight: 100}, + }, + } + replicas = 0 + expect = map[string]int32{} + + for i, clusterName := range clusterNames { + if i != 0 { // do not set weight and minReplicas for cluster[0] thus it should have no replicas scheduled + clusterRsPref := pref.Clusters[clusterName] + clusterRsPref.Weight = int64(1) + clusterRsPref.MinReplicas = int64(i + 2) + replicas += int32(i + 2) + expect[clusterName] = int32(i + 2) + } + } + // the extra replica goes to cluster[0] which has the highest weight + replicas += 1 + expect[clusterNames[0]] = 1 + return +} + +func generageFedRsPrefsWithMax(clusters map[string]*cluster) (pref *federation.FederatedReplicaSetPreferences, replicas int32, expect map[string]int32) { + clusterNames := extraceClusterNames(clusters) + pref = &federation.FederatedReplicaSetPreferences{ + Clusters: map[string]federation.ClusterReplicaSetPreferences{ + clusterNames[0]: {Weight: 1}, + }, + } + replicas = 0 + expect = map[string]int32{} + + for i, clusterName := range clusterNames { + if i != 0 { // do not set maxReplicas for cluster[0] thus replicas exceeds the total maxReplicas turned to cluster[0] + clusterRsPref := pref.Clusters[clusterName] + clusterRsPref.Weight = int64(100) + maxReplicas := int64(i) + clusterRsPref.MaxReplicas = &maxReplicas + replicas += int32(i) + expect[clusterName] = int32(i) + } + } + // extra replicas go to cluster[0] although it has the lowest weight as others hit the MaxReplicas + replicas += 5 + expect[clusterNames[0]] = 5 + return +} + +func updateFedRePrefsRebalance(pref *federation.FederatedReplicaSetPreferences, rebalance bool) *federation.FederatedReplicaSetPreferences { + pref.Rebalance = rebalance + return pref +} + +func generageFedRsPrefsForRebalancing(clusters map[string]*cluster) (pref1, pref2 *federation.FederatedReplicaSetPreferences, replicas int32, expect1, expect2 map[string]int32) { + clusterNames := extraceClusterNames(clusters) + replicas = 3 + + pref1 = &federation.FederatedReplicaSetPreferences{ + Clusters: map[string]federation.ClusterReplicaSetPreferences{ + clusterNames[0]: {Weight: 1}, + clusterNames[1]: {Weight: 2}, + }, + } + expect1 = map[string]int32{ + clusterNames[0]: 1, + clusterNames[1]: 2, + } + pref2 = &federation.FederatedReplicaSetPreferences{ + Clusters: map[string]federation.ClusterReplicaSetPreferences{ + clusterNames[0]: {Weight: 2}, + clusterNames[1]: {Weight: 1}, + }, + } + expect2 = map[string]int32{ + clusterNames[0]: 2, + clusterNames[1]: 1, + } + return +} + +func waitForReplicaSetOrFail(c *fedclientset.Clientset, namespace string, replicaSetName string, clusters map[string]*cluster, expect map[string]int32) { + err := waitForReplicaSet(c, namespace, replicaSetName, clusters, expect) framework.ExpectNoError(err, "Failed to verify replica set %q/%q, err: %v", namespace, replicaSetName, err) } -func waitForReplicaSet(c *fedclientset.Clientset, namespace string, replicaSetName string, clusters map[string]*cluster) error { +func waitForReplicaSet(c *fedclientset.Clientset, namespace string, replicaSetName string, clusters map[string]*cluster, expect map[string]int32) error { err := wait.Poll(10*time.Second, FederatedReplicaSetTimeout, func() (bool, error) { frs, err := c.ReplicaSets(namespace).Get(replicaSetName, metav1.GetOptions{}) if err != nil { @@ -198,15 +354,28 @@ func waitForReplicaSet(c *fedclientset.Clientset, namespace string, replicaSetNa By(fmt.Sprintf("Failed getting replicaset: %q/%q/%q, err: %v", cluster.name, namespace, replicaSetName, err)) return false, err } - if err == nil { + if errors.IsNotFound(err) { + if expect != nil && expect[cluster.name] > 0 { + By(fmt.Sprintf("Replicaset %q/%q/%q not created replicas: %v", cluster.name, namespace, replicaSetName, expect[cluster.name])) + return false, nil + } + } else { if !equivalentReplicaSet(frs, rs) { By(fmt.Sprintf("Replicaset meta or spec not match for cluster %q:\n federation: %v\n cluster: %v", cluster.name, frs, rs)) return false, nil } + if expect != nil && *rs.Spec.Replicas < expect[cluster.name] { + By(fmt.Sprintf("Replicas not match for %q/%q/%q: expect: >= %v, actual: %v", cluster.name, namespace, replicaSetName, expect[cluster.name], *rs.Spec.Replicas)) + return false, nil + } specReplicas += *rs.Spec.Replicas statusReplicas += rs.Status.Replicas } } + if *frs.Spec.Replicas == 0 && frs.Status.Replicas != 0 { + By(fmt.Sprintf("ReplicaSet %q/%q with zero replicas should match the status as no overflow happens: expected: 0, actual: %v", namespace, replicaSetName, frs.Status.Replicas)) + return false, nil + } if statusReplicas == frs.Status.Replicas && specReplicas >= *frs.Spec.Replicas { return true, nil } @@ -224,14 +393,13 @@ func equivalentReplicaSet(fedReplicaSet, localReplicaSet *v1beta1.ReplicaSet) bo reflect.DeepEqual(fedReplicaSet.Spec, localReplicaSetSpec) } -func createReplicaSetOrFail(clientset *fedclientset.Clientset, namespace string) *v1beta1.ReplicaSet { +func createReplicaSetOrFail(clientset *fedclientset.Clientset, replicaset *v1beta1.ReplicaSet) *v1beta1.ReplicaSet { + namespace := replicaset.Namespace if clientset == nil || len(namespace) == 0 { Fail(fmt.Sprintf("Internal error: invalid parameters passed to createReplicaSetOrFail: clientset: %v, namespace: %v", clientset, namespace)) } By(fmt.Sprintf("Creating federation replicaset %q in namespace %q", FederationReplicaSetName, namespace)) - replicaset := newReplicaSet(namespace, FederationReplicaSetName, 5) - _, err := clientset.Extensions().ReplicaSets(namespace).Create(replicaset) framework.ExpectNoError(err, "Creating replicaset %q in namespace %q", replicaset.Name, namespace) By(fmt.Sprintf("Successfully created federation replicaset %q in namespace %q", FederationReplicaSetName, namespace)) @@ -258,14 +426,13 @@ func deleteReplicaSetOrFail(clientset *fedclientset.Clientset, nsName string, re } } -func updateReplicaSetOrFail(clientset *fedclientset.Clientset, namespace string) *v1beta1.ReplicaSet { +func updateReplicaSetOrFail(clientset *fedclientset.Clientset, replicaset *v1beta1.ReplicaSet) *v1beta1.ReplicaSet { + namespace := replicaset.Namespace if clientset == nil || len(namespace) == 0 { Fail(fmt.Sprintf("Internal error: invalid parameters passed to updateReplicaSetOrFail: clientset: %v, namespace: %v", clientset, namespace)) } By(fmt.Sprintf("Updating federation replicaset %q in namespace %q", FederationReplicaSetName, namespace)) - replicaset := newReplicaSet(namespace, FederationReplicaSetName, 15) - newRs, err := clientset.ReplicaSets(namespace).Update(replicaset) framework.ExpectNoError(err, "Updating replicaset %q in namespace %q", replicaset.Name, namespace) By(fmt.Sprintf("Successfully updated federation replicaset %q in namespace %q", FederationReplicaSetName, namespace)) @@ -273,11 +440,12 @@ func updateReplicaSetOrFail(clientset *fedclientset.Clientset, namespace string) return newRs } -func newReplicaSet(namespace string, name string, replicas int32) *v1beta1.ReplicaSet { - return &v1beta1.ReplicaSet{ +func newReplicaSet(namespace string, name string, replicas int32, pref *federation.FederatedReplicaSetPreferences) *v1beta1.ReplicaSet { + rs := v1beta1.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, + Name: name, + Namespace: namespace, + Annotations: map[string]string{}, }, Spec: v1beta1.ReplicaSetSpec{ Replicas: &replicas, @@ -299,4 +467,18 @@ func newReplicaSet(namespace string, name string, replicas int32) *v1beta1.Repli }, }, } + if pref != nil { + prefBytes, _ := json.Marshal(pref) + prefString := string(prefBytes) + rs.Annotations[fedreplicsetcontroller.FedReplicaSetPreferencesAnnotation] = prefString + } + return &rs +} + +func extraceClusterNames(clusters map[string]*cluster) []string { + clusterNames := make([]string, 0, len(clusters)) + for clusterName := range clusters { + clusterNames = append(clusterNames, clusterName) + } + return clusterNames }