Merge pull request #48200 from irfanurrehman/fed-sched-generic-args

Automatic merge from submit-queue (batch tested with PRs 48043, 48200, 49139, 36238, 49130)

[Federation] Make arguments to scheduling type adapter methods generic

This is in the process of trying to rebase https://github.com/kubernetes/kubernetes/pull/45993 on latest.
cc @marun @perotinus 
@kubernetes/sig-federation-misc 
Hoping I get some attention to this and later PRs soon.

Associated issue https://github.com/kubernetes/kubernetes/issues/49181

**Release note**:

```NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-07-19 01:57:27 -07:00 committed by GitHub
commit cf9f00bb95
5 changed files with 42 additions and 38 deletions

View File

@ -47,15 +47,16 @@ type DeploymentAdapter struct {
func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
schedulingAdapter := schedulingAdapter{ schedulingAdapter := schedulingAdapter{
preferencesAnnotationName: FedDeploymentPreferencesAnnotation, preferencesAnnotationName: FedDeploymentPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error { updateStatusFunc: func(obj pkgruntime.Object, status interface{}) error {
deployment := obj.(*extensionsv1.Deployment) deployment := obj.(*extensionsv1.Deployment)
if status.Replicas != deployment.Status.Replicas || status.UpdatedReplicas != deployment.Status.UpdatedReplicas || typedStatus := status.(ReplicaSchedulingStatus)
status.ReadyReplicas != deployment.Status.ReadyReplicas || status.AvailableReplicas != deployment.Status.AvailableReplicas { if typedStatus.Replicas != deployment.Status.Replicas || typedStatus.UpdatedReplicas != deployment.Status.UpdatedReplicas ||
typedStatus.ReadyReplicas != deployment.Status.ReadyReplicas || typedStatus.AvailableReplicas != deployment.Status.AvailableReplicas {
deployment.Status = extensionsv1.DeploymentStatus{ deployment.Status = extensionsv1.DeploymentStatus{
Replicas: status.Replicas, Replicas: typedStatus.Replicas,
UpdatedReplicas: status.UpdatedReplicas, UpdatedReplicas: typedStatus.UpdatedReplicas,
ReadyReplicas: status.ReadyReplicas, ReadyReplicas: typedStatus.ReadyReplicas,
AvailableReplicas: status.AvailableReplicas, AvailableReplicas: typedStatus.AvailableReplicas,
} }
_, err := client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment) _, err := client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
return err return err

View File

@ -47,15 +47,16 @@ type ReplicaSetAdapter struct {
func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter { func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
schedulingAdapter := schedulingAdapter{ schedulingAdapter := schedulingAdapter{
preferencesAnnotationName: FedReplicaSetPreferencesAnnotation, preferencesAnnotationName: FedReplicaSetPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error { updateStatusFunc: func(obj pkgruntime.Object, status interface{}) error {
rs := obj.(*extensionsv1.ReplicaSet) rs := obj.(*extensionsv1.ReplicaSet)
if status.Replicas != rs.Status.Replicas || status.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas || typedStatus := status.(ReplicaSchedulingStatus)
status.ReadyReplicas != rs.Status.ReadyReplicas || status.AvailableReplicas != rs.Status.AvailableReplicas { if typedStatus.Replicas != rs.Status.Replicas || typedStatus.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas ||
typedStatus.ReadyReplicas != rs.Status.ReadyReplicas || typedStatus.AvailableReplicas != rs.Status.AvailableReplicas {
rs.Status = extensionsv1.ReplicaSetStatus{ rs.Status = extensionsv1.ReplicaSetStatus{
Replicas: status.Replicas, Replicas: typedStatus.Replicas,
FullyLabeledReplicas: status.Replicas, FullyLabeledReplicas: typedStatus.Replicas,
ReadyReplicas: status.ReadyReplicas, ReadyReplicas: typedStatus.ReadyReplicas,
AvailableReplicas: status.AvailableReplicas, AvailableReplicas: typedStatus.AvailableReplicas,
} }
_, err := client.Extensions().ReplicaSets(rs.Namespace).UpdateStatus(rs) _, err := client.Extensions().ReplicaSets(rs.Namespace).UpdateStatus(rs)
return err return err

View File

@ -37,9 +37,9 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// SchedulingStatus contains the status of the objects that are being // ReplicaSchedulingStatus contains the status of the replica type objects (rs or deployment)
// scheduled into joined clusters. // that are being scheduled into joined clusters.
type SchedulingStatus struct { type ReplicaSchedulingStatus struct {
Replicas int32 Replicas int32
UpdatedReplicas int32 UpdatedReplicas int32
FullyLabeledReplicas int32 FullyLabeledReplicas int32
@ -47,19 +47,19 @@ type SchedulingStatus struct {
AvailableReplicas int32 AvailableReplicas int32
} }
// SchedulingInfo wraps the information that a SchedulingAdapter needs // ReplicaSchedulingInfo wraps the information that a replica type (rs or deployment)
// to update objects per a schedule. // SchedulingAdapter needs to update objects per a schedule.
type SchedulingInfo struct { type ReplicaSchedulingInfo struct {
Schedule map[string]int64 Schedule map[string]int64
Status SchedulingStatus Status ReplicaSchedulingStatus
} }
// SchedulingAdapter defines operations for interacting with a // SchedulingAdapter defines operations for interacting with a
// federated type that requires more complex synchronization logic. // federated type that requires more complex synchronization logic.
type SchedulingAdapter interface { type SchedulingAdapter interface {
GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error)
ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, bool, error)
UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error UpdateFederatedStatus(obj pkgruntime.Object, status interface{}) error
// EquivalentIgnoringSchedule returns whether obj1 and obj2 are // EquivalentIgnoringSchedule returns whether obj1 and obj2 are
// equivalent ignoring differences due to scheduling. // equivalent ignoring differences due to scheduling.
@ -70,14 +70,14 @@ type SchedulingAdapter interface {
// workload scheduling. // workload scheduling.
type schedulingAdapter struct { type schedulingAdapter struct {
preferencesAnnotationName string preferencesAnnotationName string
updateStatusFunc func(pkgruntime.Object, SchedulingStatus) error updateStatusFunc func(pkgruntime.Object, interface{}) error
} }
func (a *schedulingAdapter) IsSchedulingAdapter() bool { func (a *schedulingAdapter) IsSchedulingAdapter() bool {
return true return true
} }
func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error) { func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) {
var clusterNames []string var clusterNames []string
for _, cluster := range clusters { for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name) clusterNames = append(clusterNames, cluster.Name)
@ -122,14 +122,15 @@ func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clust
plnr := planner.NewPlanner(fedPref) plnr := planner.NewPlanner(fedPref)
return &SchedulingInfo{ return &ReplicaSchedulingInfo{
Schedule: schedule(plnr, obj, key, clusterNames, currentReplicasPerCluster, estimatedCapacity), Schedule: schedule(plnr, obj, key, clusterNames, currentReplicasPerCluster, estimatedCapacity),
Status: SchedulingStatus{}, Status: ReplicaSchedulingStatus{},
}, nil }, nil
} }
func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error) { func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, bool, error) {
replicas, ok := schedulingInfo.Schedule[cluster.Name] typedSchedulingInfo := schedulingInfo.(*ReplicaSchedulingInfo)
replicas, ok := typedSchedulingInfo.Schedule[cluster.Name]
if !ok { if !ok {
replicas = 0 replicas = 0
} }
@ -138,7 +139,7 @@ func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clust
reflect.ValueOf(federationObjCopy).Elem().FieldByName("Spec").FieldByName("Replicas").Set(reflect.ValueOf(&specReplicas)) reflect.ValueOf(federationObjCopy).Elem().FieldByName("Spec").FieldByName("Replicas").Set(reflect.ValueOf(&specReplicas))
if clusterObj != nil { if clusterObj != nil {
schedulingStatusVal := reflect.ValueOf(schedulingInfo).Elem().FieldByName("Status") schedulingStatusVal := reflect.ValueOf(typedSchedulingInfo).Elem().FieldByName("Status")
objStatusVal := reflect.ValueOf(clusterObj).Elem().FieldByName("Status") objStatusVal := reflect.ValueOf(clusterObj).Elem().FieldByName("Status")
for i := 0; i < schedulingStatusVal.NumField(); i++ { for i := 0; i < schedulingStatusVal.NumField(); i++ {
schedulingStatusField := schedulingStatusVal.Field(i) schedulingStatusField := schedulingStatusVal.Field(i)
@ -154,7 +155,7 @@ func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clust
return federationObjCopy, replicas > 0, nil return federationObjCopy, replicas > 0, nil
} }
func (a *schedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error { func (a *schedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status interface{}) error {
return a.updateStatusFunc(obj, status) return a.updateStatusFunc(obj, status)
} }

View File

@ -366,7 +366,7 @@ func (s *FederationSyncController) reconcile(qualifiedName federatedtypes.Qualif
return statusError return statusError
} }
operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, schedulingInfo *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error) { operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, schedulingInfo interface{}) ([]util.FederatedOperation, error) {
operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, schedulingInfo, func(clusterName string) (interface{}, bool, error) { operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, schedulingInfo, func(clusterName string) (interface{}, bool, error) {
return s.informer.GetTargetStore().GetByKey(clusterName, key) return s.informer.GetTargetStore().GetByKey(clusterName, key)
}) })
@ -445,7 +445,7 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, qu
} }
type clustersAccessorFunc func() ([]*federationapi.Cluster, error) type clustersAccessorFunc func() ([]*federationapi.Cluster, error)
type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object, *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error) type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object, interface{}) ([]util.FederatedOperation, error)
type clusterSelectorFunc func(*metav1.ObjectMeta, func(map[string]string, map[string]string) (bool, error), []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error) type clusterSelectorFunc func(*metav1.ObjectMeta, func(map[string]string, map[string]string) (bool, error), []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error)
type executionFunc func([]util.FederatedOperation) error type executionFunc func([]util.FederatedOperation) error
@ -467,7 +467,7 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op
return statusError return statusError
} }
var schedulingInfo *federatedtypes.SchedulingInfo var schedulingInfo interface{}
if adapter.IsSchedulingAdapter() { if adapter.IsSchedulingAdapter() {
schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter) schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter)
if !ok { if !ok {
@ -490,7 +490,8 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op
if !ok { if !ok {
glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind) glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind)
} }
err = schedulingAdapter.UpdateFederatedStatus(obj, schedulingInfo.Status) typedScheduleInfo := schedulingInfo.(*federatedtypes.ReplicaSchedulingInfo)
err = schedulingAdapter.UpdateFederatedStatus(obj, typedScheduleInfo.Status)
if err != nil { if err != nil {
runtime.HandleError(fmt.Errorf("adapter.UpdateFinished() failed on adapter for %s %q: %v", kind, key, err)) runtime.HandleError(fmt.Errorf("adapter.UpdateFinished() failed on adapter for %s %q: %v", kind, key, err))
return statusError return statusError
@ -532,7 +533,7 @@ func selectedClusters(objMeta *metav1.ObjectMeta, selector func(map[string]strin
type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error) type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error)
// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters // clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters
func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, schedulingInfo *federatedtypes.SchedulingInfo, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) { func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, schedulingInfo interface{}, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) {
operations := make([]util.FederatedOperation, 0) operations := make([]util.FederatedOperation, 0)
kind := adapter.Kind() kind := adapter.Kind()

View File

@ -75,7 +75,7 @@ func TestSyncToClusters(t *testing.T) {
} }
return nil, nil return nil, nil
}, },
func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object, *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error) { func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object, interface{}) ([]util.FederatedOperation, error) {
if testCase.operationsError { if testCase.operationsError {
return nil, awfulError return nil, awfulError
} }