[Federation] Add a SchedulingAdapter that can extend the FederatedTypeAdapter and that provides hooks for scheduling objects into clusters.

This commit is contained in:
Jonathan MacMillan
2017-05-26 12:04:13 -07:00
parent 657c01c695
commit 1130b368eb
8 changed files with 128 additions and 16 deletions

View File

@@ -356,8 +356,8 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
return statusError
}
operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object) ([]util.FederatedOperation, error) {
operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, func(clusterName string) (interface{}, bool, error) {
operationsAccessor := func(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, schedulingInfo *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error) {
operations, err := clusterOperations(adapter, selectedClusters, unselectedClusters, obj, key, schedulingInfo, func(clusterName string) (interface{}, bool, error) {
return s.informer.GetTargetStore().GetByKey(clusterName, key)
})
if err != nil {
@@ -372,6 +372,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
selectedClusters,
s.updater.Update,
s.adapter,
s.informer,
obj,
)
}
@@ -423,12 +424,12 @@ func (s *FederationSyncController) delete(obj pkgruntime.Object, kind string, na
}
type clustersAccessorFunc func() ([]*federationapi.Cluster, error)
type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object) ([]util.FederatedOperation, error)
type operationsFunc func(federatedtypes.FederatedTypeAdapter, []*federationapi.Cluster, []*federationapi.Cluster, pkgruntime.Object, *federatedtypes.SchedulingInfo) ([]util.FederatedOperation, error)
type clusterSelectorFunc func(*metav1.ObjectMeta, func(map[string]string, map[string]string) (bool, error), []*federationapi.Cluster) ([]*federationapi.Cluster, []*federationapi.Cluster, error)
type executionFunc func([]util.FederatedOperation) error
// syncToClusters ensures that the state of the given object is synchronized to member clusters.
func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, selector clusterSelectorFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, obj pkgruntime.Object) reconciliationStatus {
func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor operationsFunc, selector clusterSelectorFunc, execute executionFunc, adapter federatedtypes.FederatedTypeAdapter, informer util.FederatedInformer, obj pkgruntime.Object) reconciliationStatus {
kind := adapter.Kind()
key := federatedtypes.ObjectKey(adapter, obj)
@@ -445,10 +446,36 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op
return statusError
}
operations, err := operationsAccessor(adapter, selectedClusters, unselectedClusters, obj)
var schedulingInfo *federatedtypes.SchedulingInfo
if adapter.IsSchedulingAdapter() {
schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter)
if !ok {
glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind)
}
schedulingInfo, err = schedulingAdapter.GetSchedule(obj, key, selectedClusters, informer)
if err != nil {
runtime.HandleError(fmt.Errorf("adapter.GetSchedule() failed on adapter for %s %q: %v", kind, key, err))
return statusError
}
}
operations, err := operationsAccessor(adapter, selectedClusters, unselectedClusters, obj, schedulingInfo)
if err != nil {
return statusError
}
if adapter.IsSchedulingAdapter() {
schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter)
if !ok {
glog.Fatalf("Adapter for kind %q does not properly implement SchedulingAdapter.", kind)
}
err = schedulingAdapter.UpdateFederatedStatus(obj, schedulingInfo.Status)
if err != nil {
runtime.HandleError(fmt.Errorf("adapter.UpdateFinished() failed on adapter for %s %q: %v", kind, key, err))
return statusError
}
}
if len(operations) == 0 {
return statusAllOK
}
@@ -459,7 +486,7 @@ func syncToClusters(clustersAccessor clustersAccessorFunc, operationsAccessor op
return statusError
}
// Evertyhing is in order but let's be double sure
// Everything is in order but let's be double sure
return statusNeedsRecheck
}
@@ -484,26 +511,46 @@ func selectedClusters(objMeta *metav1.ObjectMeta, selector func(map[string]strin
type clusterObjectAccessorFunc func(clusterName string) (interface{}, bool, error)
// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters
func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) {
// The data should not be modified.
desiredObj := adapter.Copy(obj)
func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClusters []*federationapi.Cluster, unselectedClusters []*federationapi.Cluster, obj pkgruntime.Object, key string, schedulingInfo *federatedtypes.SchedulingInfo, accessor clusterObjectAccessorFunc) ([]util.FederatedOperation, error) {
operations := make([]util.FederatedOperation, 0)
kind := adapter.Kind()
for _, cluster := range selectedClusters {
// The data should not be modified.
desiredObj := adapter.Copy(obj)
clusterObj, found, err := accessor(cluster.Name)
if err != nil {
wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err)
wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", kind, key, cluster.Name, err)
runtime.HandleError(wrappedErr)
return nil, wrappedErr
}
shouldCreateIfNeeded := true
if adapter.IsSchedulingAdapter() {
schedulingAdapter, ok := adapter.(federatedtypes.SchedulingAdapter)
if !ok {
err = fmt.Errorf("adapter for kind %s does not properly implement SchedulingAdapter.", kind)
glog.Fatalf("Error: %v", err)
}
var clusterTypedObj pkgruntime.Object = nil
if clusterObj != nil {
clusterTypedObj = clusterObj.(pkgruntime.Object)
}
desiredObj, shouldCreateIfNeeded, err = schedulingAdapter.ScheduleObject(cluster, clusterTypedObj, desiredObj, schedulingInfo)
if err != nil {
runtime.HandleError(err)
return nil, err
}
}
var operationType util.FederatedOperationType = ""
if found {
clusterObj := clusterObj.(pkgruntime.Object)
if !adapter.Equivalent(desiredObj, clusterObj) {
operationType = util.OperationTypeUpdate
}
} else {
} else if shouldCreateIfNeeded {
operationType = util.OperationTypeAdd
}
@@ -518,16 +565,16 @@ func clusterOperations(adapter federatedtypes.FederatedTypeAdapter, selectedClus
}
for _, cluster := range unselectedClusters {
_, found, err := accessor(cluster.Name)
clusterObj, found, err := accessor(cluster.Name)
if err != nil {
wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", adapter.Kind(), key, cluster.Name, err)
wrappedErr := fmt.Errorf("Failed to get %s %q from cluster %q: %v", kind, key, cluster.Name, err)
runtime.HandleError(wrappedErr)
return nil, wrappedErr
}
if found {
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeDelete,
Obj: desiredObj,
Obj: clusterObj.(pkgruntime.Object),
ClusterName: cluster.Name,
Key: key,
})