diff --git a/federation/pkg/federatedtypes/hpa.go b/federation/pkg/federatedtypes/hpa.go index 66806b35159..f239d6c214e 100644 --- a/federation/pkg/federatedtypes/hpa.go +++ b/federation/pkg/federatedtypes/hpa.go @@ -17,10 +17,12 @@ limitations under the License. package federatedtypes import ( + "fmt" "time" - "fmt" autoscalingv1 "k8s.io/api/autoscaling/v1" + "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -31,6 +33,10 @@ import ( federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + hpautil "k8s.io/kubernetes/federation/pkg/federation-controller/util/hpa" + extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" + + "github.com/golang/glog" ) const ( @@ -165,7 +171,7 @@ func (a *HpaAdapter) NewTestObject(namespace string) pkgruntime.Object { }, Spec: autoscalingv1.HorizontalPodAutoscalerSpec{ ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{ - Kind: "replicaset", + Kind: "ReplicaSet", Name: "myrs", }, MinReplicas: &min, @@ -443,6 +449,10 @@ func (a *HpaAdapter) UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo return fmt.Errorf("Error updating hpa: %s status in federation: %v", fedHpa.Name, err) } } + + if err := a.updateClusterListOnTargetObject(fedHpa, schedulingInfo.(*hpaSchedulingInfo).scheduleState); err != nil { + return fmt.Errorf("Error updating cluster list on object targetted by hpa: %s: %v", fedHpa.Name, err) + } return nil } @@ -936,3 +946,70 @@ func (a *HpaAdapter) minReplicasIncreasable(obj pkgruntime.Object) bool { } return false } + +// updateClusterListOnTargetObject passes the necessary info to the target object, +// so that the corresponding controller can act on that. +// This is used because if an hpa is active on a federated object it is supposed +// to control the replicas and presence/absence of target object from federated clusters. +func (a *HpaAdapter) updateClusterListOnTargetObject(fedHpa *autoscalingv1.HorizontalPodAutoscaler, scheduleStatus map[string]*replicaNums) error { + if len(fedHpa.Spec.ScaleTargetRef.Kind) <= 0 || len(fedHpa.Spec.ScaleTargetRef.Name) <= 0 { + // nothing to do + glog.Infof("Fed HPA: cluster list update on target object skipped for target obj: %s, kind: %s", fedHpa.Spec.ScaleTargetRef.Name, fedHpa.Spec.ScaleTargetRef.Kind) + return nil + } + + names := []string{} + for clusterName, replicas := range scheduleStatus { + if replicas != nil { + names = append(names, clusterName) + } + } + clusterNames := hpautil.ClusterNames{Names: names} + qualifiedKind := extensionsinternal.Kind(fedHpa.Spec.ScaleTargetRef.Kind) + targetObj, err := getRuntimeObjectForKind(a.client, qualifiedKind, fedHpa.Namespace, fedHpa.Spec.ScaleTargetRef.Name) + if errors.IsNotFound(err) { + // Nothing to do; the target object does not exist in federation. + glog.Infof("Fed HPA: cluster list update on target object skipped for target obj: %s, kind: %s. Target object missing in federation", fedHpa.Spec.ScaleTargetRef.Name, fedHpa.Spec.ScaleTargetRef.Kind) + return nil + } + if err != nil { + return err + } + + updatedObj := hpautil.SetHpaTargetClusterList(targetObj, clusterNames) + _, err = updateRuntimeObjectForKind(a.client, qualifiedKind, fedHpa.Namespace, updatedObj) + if err != nil { + return err + } + return nil +} + +// getRuntimeObjectForKind gets the hpa targetted object from the federation control plane. +// As of now, federation only supports "ReplicaSets" and "Deployments", which is the reason +// this function only lists these two types. +// TODO: update a similar info in federated hpa documentation. +func getRuntimeObjectForKind(c federationclientset.Interface, kind schema.GroupKind, ns, name string) (pkgruntime.Object, error) { + switch kind { + case extensionsinternal.Kind("ReplicaSet"): + return c.ExtensionsV1beta1().ReplicaSets(ns).Get(name, metav1.GetOptions{}) + case extensionsinternal.Kind("Deployment"): + return c.ExtensionsV1beta1().Deployments(ns).Get(name, metav1.GetOptions{}) + default: + return nil, fmt.Errorf("Unsupported federated kind targeted by hpa: %v", kind) + } +} + +// updateRuntimeObjectForKind updates the hpa targetted object in the federation control plane. +// As of now, federation only supports "ReplicaSets" and "Deployments", which is the reason +// this function only lists these two types. +// TODO: update a similar info in federated hpa documentation. +func updateRuntimeObjectForKind(c federationclientset.Interface, kind schema.GroupKind, ns string, obj pkgruntime.Object) (pkgruntime.Object, error) { + switch kind { + case extensionsinternal.Kind("ReplicaSet"): + return c.ExtensionsV1beta1().ReplicaSets(ns).Update(obj.(*v1beta1.ReplicaSet)) + case extensionsinternal.Kind("Deployment"): + return c.ExtensionsV1beta1().Deployments(ns).Update(obj.(*v1beta1.Deployment)) + default: + return nil, fmt.Errorf("Unsupported federated kind targeted by hpa: %v", kind) + } +} diff --git a/federation/pkg/federatedtypes/scheduling.go b/federation/pkg/federatedtypes/scheduling.go index 3cad44bd59e..cdbd3970344 100644 --- a/federation/pkg/federatedtypes/scheduling.go +++ b/federation/pkg/federatedtypes/scheduling.go @@ -30,6 +30,7 @@ import ( fedapi "k8s.io/kubernetes/federation/apis/federation" federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + hpautil "k8s.io/kubernetes/federation/pkg/federation-controller/util/hpa" "k8s.io/kubernetes/federation/pkg/federation-controller/util/planner" "k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer" "k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences" @@ -47,9 +48,9 @@ const ( ActionDelete = "delete" ) -// ReplicaSchedulingStatus contains the status of the replica type objects (rs or deployment) -// that are being scheduled into joined clusters. -type ReplicaSchedulingStatus struct { +// ReplicaStatus contains the details of status fields from the cluster objects, +// which need accumulation to update the status of the federated object. +type ReplicaStatus struct { Replicas int32 UpdatedReplicas int32 FullyLabeledReplicas int32 @@ -57,11 +58,18 @@ type ReplicaSchedulingStatus struct { AvailableReplicas int32 } +// ReplicaScheduleState is the result of adapter specific schedule() function, +// which is then used to update objects into clusters. +type ReplicaScheduleState struct { + isSelected bool + replicas int64 +} + // ReplicaSchedulingInfo wraps the information that a replica type (rs or deployment) // SchedulingAdapter needs to update objects per a schedule. type ReplicaSchedulingInfo struct { - Schedule map[string]int64 - Status ReplicaSchedulingStatus + ScheduleState map[string]*ReplicaScheduleState + Status ReplicaStatus } // SchedulingAdapter defines operations for interacting with a @@ -87,6 +95,58 @@ func (a *replicaSchedulingAdapter) IsSchedulingAdapter() bool { return true } +func isSelected(names []string, name string) bool { + for _, val := range names { + if val == name { + return true + } + } + return false +} + +func isObjHpaControlled(fedObj pkgruntime.Object) (bool, error) { + hpaSelectedClusters, error := hpautil.GetHpaTargetClusterList(fedObj) + if error != nil { + return false, error + } + + if hpaSelectedClusters == nil { + return false, nil + } + return true, nil +} + +// initializeScheduleState initializes the schedule state for consumption by schedule +// functions (schedule or simple schedule). After this initialization the state would +// already have information, if only a subset of clusters targetted by hpa, or all clusters +// need to be considered by the actual scheduling functions. +// The return bool named hpaControlled tells if this object is controlled by hpa or not. +func initializeScheduleState(fedObj pkgruntime.Object, clusterNames []string) (map[string]*ReplicaScheduleState, bool, error) { + initialState := make(map[string]*ReplicaScheduleState) + hpaControlled := false + hpaSelectedClusters, error := hpautil.GetHpaTargetClusterList(fedObj) + if error != nil { + return nil, hpaControlled, error + } + + if hpaSelectedClusters != nil { + hpaControlled = true + } + for _, clusterName := range clusterNames { + replicaState := ReplicaScheduleState{ + isSelected: false, + replicas: 0, + } + if hpaControlled { + if isSelected(hpaSelectedClusters.Names, clusterName) { + replicaState.isSelected = true + } + } + initialState[clusterName] = &replicaState + } + return initialState, hpaControlled, nil +} + func (a *replicaSchedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) { var clusterNames []string for _, cluster := range clusters { @@ -113,6 +173,23 @@ func (a *replicaSchedulingAdapter) GetSchedule(obj pkgruntime.Object, key string } return clientset.Core().Pods(metadata.GetNamespace()).List(metav1.ListOptions{LabelSelector: selector.String()}) } + + initializedState, hpaControlled, err := initializeScheduleState(obj, clusterNames) + if err != nil { + return nil, err + } + + if hpaControlled { + state, err := simpleSchedule(initializedState, key, objectGetter) + if err != nil { + return nil, err + } + return &ReplicaSchedulingInfo{ + ScheduleState: state, + Status: ReplicaStatus{}, + }, nil + } + currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter) if err != nil { return nil, err @@ -133,20 +210,14 @@ func (a *replicaSchedulingAdapter) GetSchedule(obj pkgruntime.Object, key string plnr := planner.NewPlanner(fedPref) return &ReplicaSchedulingInfo{ - Schedule: schedule(plnr, obj, key, clusterNames, currentReplicasPerCluster, estimatedCapacity), - Status: ReplicaSchedulingStatus{}, + ScheduleState: schedule(plnr, obj, key, clusterNames, currentReplicasPerCluster, estimatedCapacity, initializedState), + Status: ReplicaStatus{}, }, nil } func (a *replicaSchedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error) { typedSchedulingInfo := schedulingInfo.(*ReplicaSchedulingInfo) - replicas, ok := typedSchedulingInfo.Schedule[cluster.Name] - if !ok { - replicas = 0 - } - - specReplicas := int32(replicas) - reflect.ValueOf(federationObjCopy).Elem().FieldByName("Spec").FieldByName("Replicas").Set(reflect.ValueOf(&specReplicas)) + clusterScheduleState := typedSchedulingInfo.ScheduleState[cluster.Name] if clusterObj != nil { schedulingStatusVal := reflect.ValueOf(typedSchedulingInfo).Elem().FieldByName("Status") @@ -162,10 +233,19 @@ func (a *replicaSchedulingAdapter) ScheduleObject(cluster *federationapi.Cluster } } } + var action ScheduleAction = "" - if replicas > 0 { + specReplicas := int32(0) + // If the cluster has been selected (isSelected = true; for example by hpa) + // and the obj does not get any replicas, then it should create one with + // 0 replicas (which can then be scaled by hpa in that cluster). + // On the other hand we keep the action as "unassigned" if this cluster was + // not selected, and let the sync controller decide what to do. + if clusterScheduleState.isSelected { + specReplicas = int32(clusterScheduleState.replicas) action = ActionAdd } + reflect.ValueOf(federationObjCopy).Elem().FieldByName("Spec").FieldByName("Replicas").Set(reflect.ValueOf(&specReplicas)) return federationObjCopy, action, nil } @@ -173,22 +253,48 @@ func (a *replicaSchedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, return a.updateStatusFunc(obj, schedulingInfo) } -func schedule(planner *planner.Planner, obj pkgruntime.Object, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { +// simpleSchedule get replicas from only those clusters which are selected (by hpa scheduler). +// This aim of this is to ensure that this controller does not update objects, which are +// targetted by hpa. +func simpleSchedule(scheduleState map[string]*ReplicaScheduleState, key string, objectGetter func(clusterName string, key string) (interface{}, bool, error)) (map[string]*ReplicaScheduleState, error) { + for clusterName, state := range scheduleState { + // Get and consider replicas only for those clusters which are selected by hpa. + if state.isSelected { + obj, exists, err := objectGetter(clusterName, key) + if err != nil { + return nil, err + } + if !exists { + continue + } + state.replicas = reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Replicas").Elem().Int() + } + } + + return scheduleState, nil +} + +func schedule(planner *planner.Planner, obj pkgruntime.Object, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64, initialState map[string]*ReplicaScheduleState) map[string]*ReplicaScheduleState { // TODO: integrate real scheduler replicas := reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Replicas").Elem().Int() scheduleResult, overflow := planner.Plan(replicas, clusterNames, currentReplicasPerCluster, estimatedCapacity, key) // Ensure that all current clusters end up in the scheduling result. - result := make(map[string]int64) + // initialState, is preinitialized with all isSelected to false. + result := initialState for clusterName := range currentReplicasPerCluster { - result[clusterName] = 0 + // We consider 0 replicas equaling to no need of creating a new object. + // isSchedule remains false in such case. + result[clusterName].replicas = 0 } for clusterName, replicas := range scheduleResult { - result[clusterName] = replicas + result[clusterName].isSelected = true + result[clusterName].replicas = replicas } for clusterName, replicas := range overflow { - result[clusterName] += replicas + result[clusterName].isSelected = true + result[clusterName].replicas += replicas } if glog.V(4) { diff --git a/federation/pkg/federation-controller/util/hpa/hpa.go b/federation/pkg/federation-controller/util/hpa/hpa.go new file mode 100644 index 00000000000..d7a056f5e28 --- /dev/null +++ b/federation/pkg/federation-controller/util/hpa/hpa.go @@ -0,0 +1,75 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hpa + +import ( + "encoding/json" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" +) + +const ( + // FederatedAnnotationOnHpaTargetObj as key, is used by hpa controller to + // set selected cluster name list as annotation on the target object. + FederatedAnnotationOnHpaTargetObj = "federation.kubernetes.io/hpa-target-cluster-list" +) + +// ClusterNames stores the list of clusters represented by names as appearing on federation +// cluster objects. This is set by federation hpa and used by target objects federation +// controller to restrict that target object to only these clusters. +type ClusterNames struct { + Names []string +} + +func (cn *ClusterNames) String() string { + annotationBytes, _ := json.Marshal(cn) + return string(annotationBytes[:]) +} + +// GetHpaTargetClusterList is used to get the list of clusters from the target object +// annotations. +func GetHpaTargetClusterList(obj runtime.Object) (*ClusterNames, error) { + accessor, _ := meta.Accessor(obj) + targetObjAnno := accessor.GetAnnotations() + if targetObjAnno == nil { + return nil, nil + } + targetObjAnnoString, exists := targetObjAnno[FederatedAnnotationOnHpaTargetObj] + if !exists { + return nil, nil + } + + clusterNames := &ClusterNames{} + if err := json.Unmarshal([]byte(targetObjAnnoString), clusterNames); err != nil { + return nil, err + } + return clusterNames, nil +} + +// SetHpaTargetClusterList is used to set the list of clusters on the target object +// annotations. +func SetHpaTargetClusterList(obj runtime.Object, clusterNames ClusterNames) runtime.Object { + accessor, _ := meta.Accessor(obj) + anno := accessor.GetAnnotations() + if anno == nil { + anno = make(map[string]string) + accessor.SetAnnotations(anno) + } + anno[FederatedAnnotationOnHpaTargetObj] = clusterNames.String() + return obj +} diff --git a/federation/pkg/federation-controller/util/hpa/hpa_test.go b/federation/pkg/federation-controller/util/hpa/hpa_test.go new file mode 100644 index 00000000000..d12943bae72 --- /dev/null +++ b/federation/pkg/federation-controller/util/hpa/hpa_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hpa + +import ( + "testing" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/stretchr/testify/require" +) + +func TestGetHpaTargetClusterList(t *testing.T) { + // Any object is fine for this test. + obj := &autoscalingv1.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myhpa", + Namespace: "myNamespace", + SelfLink: "/api/mylink", + }, + } + + testCases := map[string]struct { + clusterNames *ClusterNames + expectedErr bool + }{ + "Wrong data set on annotations should return unmarshalling error when retrieving": { + expectedErr: true, + }, + "Get clusternames on annotations with 2 clusters, should have same names, which were set": { + clusterNames: &ClusterNames{ + Names: []string{ + "c1", + "c2", + }, + }, + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + accessor, _ := meta.Accessor(obj) + anno := accessor.GetAnnotations() + if anno == nil { + anno = make(map[string]string) + accessor.SetAnnotations(anno) + } + if testCase.expectedErr { + anno[FederatedAnnotationOnHpaTargetObj] = "{" //some random string + } else { + anno[FederatedAnnotationOnHpaTargetObj] = testCase.clusterNames.String() + } + + readNames, err := GetHpaTargetClusterList(obj) + + if testCase.expectedErr { + require.Error(t, err, "An error was expected") + } else { + require.Equal(t, testCase.clusterNames, readNames, "Names should have been equal") + } + }) + } +} + +func TestSetHpaTargetClusterList(t *testing.T) { + // Any object is fine for this test. + obj := &autoscalingv1.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myhpa", + Namespace: "myNamespace", + SelfLink: "/api/mylink", + }, + } + + testCases := map[string]struct { + clusterNames ClusterNames + expectedErr bool + }{ + "Get clusternames on annotations with 2 clusters, should have same names, which were set": { + clusterNames: ClusterNames{ + Names: []string{ + "c1", + "c2", + }, + }, + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + + SetHpaTargetClusterList(obj, testCase.clusterNames) + readNames, err := GetHpaTargetClusterList(obj) + require.NoError(t, err, "An error should not have happened") + require.Equal(t, &testCase.clusterNames, readNames, "Names should have been equal") + + }) + } +}