diff --git a/federation/cmd/federation-controller-manager/app/BUILD b/federation/cmd/federation-controller-manager/app/BUILD index a4fffac2a2b..0d7514b4aef 100644 --- a/federation/cmd/federation-controller-manager/app/BUILD +++ b/federation/cmd/federation-controller-manager/app/BUILD @@ -24,6 +24,7 @@ go_library( "//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federation-controller/cluster:go_default_library", "//federation/pkg/federation-controller/ingress:go_default_library", + "//federation/pkg/federation-controller/job:go_default_library", "//federation/pkg/federation-controller/service:go_default_library", "//federation/pkg/federation-controller/service/dns:go_default_library", "//federation/pkg/federation-controller/sync:go_default_library", diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 9144eca97f4..d4abefe200c 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/federation/pkg/federatedtypes" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress" + jobcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/job" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" servicednscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service/dns" synccontroller "k8s.io/kubernetes/federation/pkg/federation-controller/sync" @@ -155,6 +156,14 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err } } + if controllerEnabled(s.Controllers, serverResources, jobcontroller.ControllerName, jobcontroller.RequiredResources, true) { + glog.V(3).Infof("Loading client config for job controller %q", jobcontroller.UserAgentName) + jobClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, jobcontroller.UserAgentName)) + jobController := jobcontroller.NewJobController(jobClientset) + glog.V(3).Infof("Running job controller") + go jobController.Run(s.ConcurrentJobSyncs, wait.NeverStop) + } + if controllerEnabled(s.Controllers, serverResources, ingresscontroller.ControllerName, ingresscontroller.RequiredResources, true) { glog.V(3).Infof("Loading client config for ingress controller %q", ingresscontroller.UserAgentName) ingClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, ingresscontroller.UserAgentName)) diff --git a/federation/cmd/federation-controller-manager/app/options/options.go b/federation/cmd/federation-controller-manager/app/options/options.go index e6659c36977..5d6efca5a10 100644 --- a/federation/cmd/federation-controller-manager/app/options/options.go +++ b/federation/cmd/federation-controller-manager/app/options/options.go @@ -56,6 +56,10 @@ type ControllerManagerConfiguration struct { // allowed to sync concurrently. Larger number = more responsive service // management, but more CPU (and network) load. ConcurrentReplicaSetSyncs int `json:"concurrentReplicaSetSyncs"` + // concurrentJobSyncs is the number of Jobs that are + // allowed to sync concurrently. Larger number = more responsive service + // management, but more CPU (and network) load. + ConcurrentJobSyncs int `json:"concurrentJobSyncs"` // clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller. ClusterMonitorPeriod metav1.Duration `json:"clusterMonitorPeriod"` // APIServerQPS is the QPS to use while talking with federation apiserver. @@ -96,6 +100,7 @@ func NewCMServer() *CMServer { ConcurrentServiceSyncs: 10, ConcurrentReplicaSetSyncs: 10, ClusterMonitorPeriod: metav1.Duration{Duration: 40 * time.Second}, + ConcurrentJobSyncs: 10, APIServerQPS: 20.0, APIServerBurst: 30, LeaderElection: leaderelectionconfig.DefaultLeaderElectionConfiguration(), @@ -115,6 +120,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.ServiceDnsSuffix, "service-dns-suffix", s.ServiceDnsSuffix, "DNS Suffix to use when publishing federated service names. Defaults to zone-name") fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.IntVar(&s.ConcurrentReplicaSetSyncs, "concurrent-replicaset-syncs", s.ConcurrentReplicaSetSyncs, "The number of ReplicaSets syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentJobSyncs, "concurrent-job-syncs", s.ConcurrentJobSyncs, "The number of Jobs syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.") fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") fs.BoolVar(&s.EnableContentionProfiling, "contention-profiling", false, "Enable lock contention profiling, if profiling is enabled") diff --git a/federation/pkg/federation-controller/BUILD b/federation/pkg/federation-controller/BUILD index 28057fc52ea..951da1b7786 100644 --- a/federation/pkg/federation-controller/BUILD +++ b/federation/pkg/federation-controller/BUILD @@ -26,6 +26,7 @@ filegroup( ":package-srcs", "//federation/pkg/federation-controller/cluster:all-srcs", "//federation/pkg/federation-controller/ingress:all-srcs", + "//federation/pkg/federation-controller/job:all-srcs", "//federation/pkg/federation-controller/service:all-srcs", "//federation/pkg/federation-controller/sync:all-srcs", "//federation/pkg/federation-controller/util:all-srcs", diff --git a/federation/pkg/federation-controller/job/BUILD b/federation/pkg/federation-controller/job/BUILD new file mode 100644 index 00000000000..da81dc92077 --- /dev/null +++ b/federation/pkg/federation-controller/job/BUILD @@ -0,0 +1,80 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = ["jobcontroller.go"], + tags = ["automanaged"], + deps = [ + "//federation/apis/federation:go_default_library", + "//federation/apis/federation/v1beta1:go_default_library", + "//federation/client/clientset_generated/federation_clientset:go_default_library", + "//federation/pkg/federation-controller/util:go_default_library", + "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", + "//federation/pkg/federation-controller/util/eventsink:go_default_library", + "//federation/pkg/federation-controller/util/planner:go_default_library", + "//federation/pkg/federation-controller/util/replicapreferences:go_default_library", + "//pkg/api:go_default_library", + "//pkg/controller:go_default_library", + "//vendor/github.com/davecgh/go-spew/spew:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/batch/v1:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["jobcontroller_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//federation/apis/federation/v1beta1:go_default_library", + "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", + "//federation/pkg/federation-controller/util:go_default_library", + "//federation/pkg/federation-controller/util/finalizers:go_default_library", + "//federation/pkg/federation-controller/util/test:go_default_library", + "//pkg/apis/batch/v1:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/api/batch/v1:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/federation/pkg/federation-controller/job/jobcontroller.go b/federation/pkg/federation-controller/job/jobcontroller.go new file mode 100644 index 00000000000..d3977182fc1 --- /dev/null +++ b/federation/pkg/federation-controller/job/jobcontroller.go @@ -0,0 +1,561 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "fmt" + "reflect" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/golang/glog" + + batchv1 "k8s.io/api/batch/v1" + clientv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + kubeclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" + "k8s.io/client-go/util/workqueue" + fed "k8s.io/kubernetes/federation/apis/federation" + fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/planner" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/controller" +) + +const ( + fedJobPreferencesAnnotation = "federation.kubernetes.io/job-preferences" + allClustersKey = "THE_ALL_CLUSTER_KEY" + // UserAgentName is the user agent used in the federation client + UserAgentName = "Federation-Job-Controller" + // ControllerName is name of this controller + ControllerName = "jobs" +) + +var ( + // RequiredResources is the resource group version of the type this controller manages + RequiredResources = []schema.GroupVersionResource{batchv1.SchemeGroupVersion.WithResource("jobs")} + jobReviewDelay = 10 * time.Second + clusterAvailableDelay = 20 * time.Second + clusterUnavailableDelay = 60 * time.Second + updateTimeout = 30 * time.Second + backoffInitial = 5 * time.Second + backoffMax = 1 * time.Minute +) + +// FederationJobController synchronizes the state of a federated job object +// to clusters that are members of the federation. +type FederationJobController struct { + fedClient fedclientset.Interface + + jobController cache.Controller + jobStore cache.Store + + fedJobInformer fedutil.FederatedInformer + + jobDeliverer *fedutil.DelayingDeliverer + clusterDeliverer *fedutil.DelayingDeliverer + jobWorkQueue workqueue.Interface + // For updating members of federation. + fedUpdater fedutil.FederatedUpdater + + jobBackoff *flowcontrol.Backoff + // For events + eventRecorder record.EventRecorder + + defaultPlanner *planner.Planner + deletionHelper *deletionhelper.DeletionHelper +} + +// NewJobController creates a new federation job controller +func NewJobController(fedClient fedclientset.Interface) *FederationJobController { + broadcaster := record.NewBroadcaster() + broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(fedClient)) + recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "federated-job-controller"}) + fjc := &FederationJobController{ + fedClient: fedClient, + jobDeliverer: fedutil.NewDelayingDeliverer(), + clusterDeliverer: fedutil.NewDelayingDeliverer(), + jobWorkQueue: workqueue.New(), + jobBackoff: flowcontrol.NewBackOff(backoffInitial, backoffMax), + defaultPlanner: planner.NewPlanner(&fed.ReplicaAllocationPreferences{ + Clusters: map[string]fed.ClusterPreferences{ + "*": {Weight: 1}, + }, + }), + eventRecorder: recorder, + } + + jobFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) { + return cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return clientset.BatchV1().Jobs(metav1.NamespaceAll).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return clientset.BatchV1().Jobs(metav1.NamespaceAll).Watch(options) + }, + }, + &batchv1.Job{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnAllChanges( + func(obj runtime.Object) { fjc.deliverLocalJob(obj, jobReviewDelay) }, + ), + ) + } + clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *fedv1.Cluster) { + fjc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) + }, + ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) { + fjc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) + }, + } + fjc.fedJobInformer = fedutil.NewFederatedInformer(fedClient, jobFedInformerFactory, &clusterLifecycle) + + fjc.jobStore, fjc.jobController = cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return fjc.fedClient.BatchV1().Jobs(metav1.NamespaceAll).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fjc.fedClient.BatchV1().Jobs(metav1.NamespaceAll).Watch(options) + }, + }, + &batchv1.Job{}, + controller.NoResyncPeriodFunc(), + fedutil.NewTriggerOnMetaAndSpecChanges( + func(obj runtime.Object) { fjc.deliverFedJobObj(obj, 0) }, + ), + ) + + fjc.fedUpdater = fedutil.NewFederatedUpdater(fjc.fedJobInformer, "job", updateTimeout, fjc.eventRecorder, + func(client kubeclientset.Interface, obj runtime.Object) error { + rs := obj.(*batchv1.Job) + _, err := client.BatchV1().Jobs(rs.Namespace).Create(rs) + return err + }, + func(client kubeclientset.Interface, obj runtime.Object) error { + rs := obj.(*batchv1.Job) + _, err := client.BatchV1().Jobs(rs.Namespace).Update(rs) + return err + }, + func(client kubeclientset.Interface, obj runtime.Object) error { + rs := obj.(*batchv1.Job) + err := client.BatchV1().Jobs(rs.Namespace).Delete(rs.Name, &metav1.DeleteOptions{}) + return err + }) + + fjc.deletionHelper = deletionhelper.NewDeletionHelper( + fjc.updateJob, + // objNameFunc + func(obj runtime.Object) string { + job := obj.(*batchv1.Job) + return job.Name + }, + fjc.fedJobInformer, + fjc.fedUpdater, + ) + + return fjc +} + +// Sends the given updated object to apiserver. +// Assumes that the given object is a job. +func (fjc *FederationJobController) updateJob(obj runtime.Object) (runtime.Object, error) { + job := obj.(*batchv1.Job) + return fjc.fedClient.BatchV1().Jobs(job.Namespace).Update(job) +} + +// Run starts the syncing of federation jobs to the clusters. +func (fjc *FederationJobController) Run(workers int, stopCh <-chan struct{}) { + go fjc.jobController.Run(stopCh) + fjc.fedJobInformer.Start() + + fjc.jobDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { + fjc.jobWorkQueue.Add(item.Key) + }) + fjc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { + fjc.reconcileJobsOnClusterChange() + }) + + for !fjc.isSynced() { + time.Sleep(5 * time.Millisecond) + } + + for i := 0; i < workers; i++ { + go wait.Until(fjc.worker, time.Second, stopCh) + } + + fedutil.StartBackoffGC(fjc.jobBackoff, stopCh) + + <-stopCh + glog.Infof("Shutting down FederationJobController") + fjc.jobDeliverer.Stop() + fjc.clusterDeliverer.Stop() + fjc.jobWorkQueue.ShutDown() + fjc.fedJobInformer.Stop() +} + +func (fjc *FederationJobController) isSynced() bool { + if !fjc.fedJobInformer.ClustersSynced() { + glog.V(3).Infof("Cluster list not synced") + return false + } + clusters, err := fjc.fedJobInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + if !fjc.fedJobInformer.GetTargetStore().ClustersSynced(clusters) { + glog.V(2).Infof("cluster job list not synced") + return false + } + + if !fjc.jobController.HasSynced() { + glog.V(2).Infof("federation job list not synced") + return false + } + return true +} + +func (fjc *FederationJobController) deliverLocalJob(obj interface{}, duration time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %v: %v", obj, err) + return + } + _, exists, err := fjc.jobStore.GetByKey(key) + if err != nil { + glog.Errorf("Couldn't get federated job %v: %v", key, err) + return + } + if exists { // ignore jobs exists only in local k8s + fjc.deliverJobByKey(key, duration, false) + } +} + +func (fjc *FederationJobController) deliverFedJobObj(obj interface{}, delay time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + fjc.deliverJobByKey(key, delay, false) +} + +func (fjc *FederationJobController) deliverJobByKey(key string, delay time.Duration, failed bool) { + if failed { + fjc.jobBackoff.Next(key, time.Now()) + delay = delay + fjc.jobBackoff.Get(key) + } else { + fjc.jobBackoff.Reset(key) + } + fjc.jobDeliverer.DeliverAfter(key, nil, delay) +} + +type reconciliationStatus string + +const ( + statusAllOk = reconciliationStatus("ALL_OK") + statusNeedRecheck = reconciliationStatus("RECHECK") + statusError = reconciliationStatus("ERROR") + statusNotSynced = reconciliationStatus("NOSYNC") +) + +func (fjc *FederationJobController) worker() { + for { + item, quit := fjc.jobWorkQueue.Get() + if quit { + return + } + key := item.(string) + status, err := fjc.reconcileJob(key) + fjc.jobWorkQueue.Done(item) + if err != nil { + glog.Errorf("Error syncing job controller: %v", err) + fjc.deliverJobByKey(key, 0, true) + } else { + switch status { + case statusAllOk: + break + case statusError: + fjc.deliverJobByKey(key, 0, true) + case statusNeedRecheck: + fjc.deliverJobByKey(key, jobReviewDelay, false) + case statusNotSynced: + fjc.deliverJobByKey(key, clusterAvailableDelay, false) + default: + glog.Errorf("Unhandled reconciliation status: %s", status) + fjc.deliverJobByKey(key, jobReviewDelay, false) + } + } + } +} + +type scheduleResult struct { + Parallelism *int32 + Completions *int32 +} + +func (fjc *FederationJobController) schedule(fjob *batchv1.Job, clusters []*fedv1.Cluster) map[string]scheduleResult { + plnr := fjc.defaultPlanner + frsPref, err := replicapreferences.GetAllocationPreferences(fjob, fedJobPreferencesAnnotation) + if err != nil { + glog.Warningf("Invalid job specific preference, use default. rs: %v, err: %v", fjob, err) + } + if frsPref != nil { // create a new planner if user specified a preference + plnr = planner.NewPlanner(frsPref) + } + + parallelism := int64(*fjob.Spec.Parallelism) + var clusterNames []string + for _, cluster := range clusters { + clusterNames = append(clusterNames, cluster.Name) + } + parallelismResult, _ := plnr.Plan(parallelism, clusterNames, nil, nil, fjob.Namespace+"/"+fjob.Name) + + if frsPref != nil { + for _, clusterPref := range frsPref.Clusters { + clusterPref.MinReplicas = 0 + clusterPref.MaxReplicas = nil + } + plnr = planner.NewPlanner(frsPref) + } + clusterNames = nil + for clusterName := range parallelismResult { + clusterNames = append(clusterNames, clusterName) + } + completionsResult := make(map[string]int64) + if fjob.Spec.Completions != nil { + completionsResult, _ = plnr.Plan(int64(*fjob.Spec.Completions), clusterNames, nil, nil, fjob.Namespace+"/"+fjob.Name) + } + + results := make(map[string]scheduleResult) + for _, clusterName := range clusterNames { + paralle := int32(parallelismResult[clusterName]) + complet := int32(completionsResult[clusterName]) + result := scheduleResult{ + Parallelism: ¶lle, + } + if fjob.Spec.Completions != nil { + result.Completions = &complet + } + results[clusterName] = result + } + + return results +} + +func (fjc *FederationJobController) reconcileJob(key string) (reconciliationStatus, error) { + if !fjc.isSynced() { + return statusNotSynced, nil + } + + glog.V(4).Infof("Start reconcile job %q", key) + startTime := time.Now() + defer glog.V(4).Infof("Finished reconcile job %q (%v)", key, time.Now().Sub(startTime)) + + objFromStore, exists, err := fjc.jobStore.GetByKey(key) + if err != nil { + return statusError, err + } + if !exists { + // deleted federated job, nothing need to do + return statusAllOk, nil + } + + // Create a copy before modifying the obj to prevent race condition with other readers of obj from store. + obj, err := api.Scheme.DeepCopy(objFromStore) + fjob, ok := obj.(*batchv1.Job) + if err != nil || !ok { + return statusError, err + } + + // delete job + if fjob.DeletionTimestamp != nil { + if err := fjc.delete(fjob); err != nil { + fjc.eventRecorder.Eventf(fjob, api.EventTypeNormal, "DeleteFailed", "Job delete failed: %v", err) + return statusError, err + } + return statusAllOk, nil + } + + glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for job: %s\n", key) + // Add the required finalizers before creating a job in underlying clusters. + updatedJobObj, err := fjc.deletionHelper.EnsureFinalizers(fjob) + if err != nil { + return statusError, err + } + fjob = updatedJobObj.(*batchv1.Job) + + clusters, err := fjc.fedJobInformer.GetReadyClusters() + if err != nil { + return statusError, err + } + + scheduleResult := fjc.schedule(fjob, clusters) + glog.V(3).Infof("Start syncing local job %s: %s\n", key, spew.Sprintf("%v", scheduleResult)) + + fedStatus := batchv1.JobStatus{} + var fedStatusFailedCondition *batchv1.JobCondition + var fedStatusCompleteCondition *batchv1.JobCondition + var operations []fedutil.FederatedOperation + for clusterName, result := range scheduleResult { + ljobObj, exists, err := fjc.fedJobInformer.GetTargetStore().GetByKey(clusterName, key) + if err != nil { + return statusError, err + } + ljob := &batchv1.Job{ + ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fjob.ObjectMeta), + Spec: *fedutil.DeepCopyApiTypeOrPanic(&fjob.Spec).(*batchv1.JobSpec), + } + // use selector generated at federation level, or user specified value + manualSelector := true + ljob.Spec.ManualSelector = &manualSelector + ljob.Spec.Parallelism = result.Parallelism + ljob.Spec.Completions = result.Completions + + if !exists { + if *ljob.Spec.Parallelism > 0 { + fjc.eventRecorder.Eventf(fjob, api.EventTypeNormal, "CreateInCluster", "Creating job in cluster %s", clusterName) + operations = append(operations, fedutil.FederatedOperation{ + Type: fedutil.OperationTypeAdd, + Obj: ljob, + ClusterName: clusterName, + }) + } + } else { + currentLjob := ljobObj.(*batchv1.Job) + + // Update existing job, if needed. + if !fedutil.ObjectMetaAndSpecEquivalent(ljob, currentLjob) { + fjc.eventRecorder.Eventf(fjob, api.EventTypeNormal, "UpdateInCluster", "Updating job in cluster %s", clusterName) + operations = append(operations, fedutil.FederatedOperation{ + Type: fedutil.OperationTypeUpdate, + Obj: ljob, + ClusterName: clusterName, + }) + } + + // collect local job status + for _, condition := range currentLjob.Status.Conditions { + if condition.Type == batchv1.JobComplete { + if fedStatusCompleteCondition == nil || + fedStatusCompleteCondition.LastTransitionTime.Before(condition.LastTransitionTime) { + fedStatusCompleteCondition = &condition + } + } else if condition.Type == batchv1.JobFailed { + if fedStatusFailedCondition == nil || + fedStatusFailedCondition.LastTransitionTime.Before(condition.LastTransitionTime) { + fedStatusFailedCondition = &condition + } + } + } + if currentLjob.Status.StartTime != nil { + if fedStatus.StartTime == nil || fedStatus.StartTime.After(currentLjob.Status.StartTime.Time) { + fedStatus.StartTime = currentLjob.Status.StartTime + } + } + if currentLjob.Status.CompletionTime != nil { + if fedStatus.CompletionTime == nil || fedStatus.CompletionTime.Before(*currentLjob.Status.CompletionTime) { + fedStatus.CompletionTime = currentLjob.Status.CompletionTime + } + } + fedStatus.Active += currentLjob.Status.Active + fedStatus.Succeeded += currentLjob.Status.Succeeded + fedStatus.Failed += currentLjob.Status.Failed + } + } + + // federated job fails if any local job failes + if fedStatusFailedCondition != nil { + fedStatus.Conditions = append(fedStatus.Conditions, *fedStatusFailedCondition) + } else if fedStatusCompleteCondition != nil { + fedStatus.Conditions = append(fedStatus.Conditions, *fedStatusCompleteCondition) + } + if !reflect.DeepEqual(fedStatus, fjob.Status) { + fjob.Status = fedStatus + _, err = fjc.fedClient.BatchV1().Jobs(fjob.Namespace).UpdateStatus(fjob) + if err != nil { + return statusError, err + } + } + + if len(operations) == 0 { + // Everything is in order + return statusAllOk, nil + } + + if glog.V(4) { + for i, op := range operations { + job := op.Obj.(*batchv1.Job) + glog.V(4).Infof("operation[%d]: %s, %s/%s/%s, %d", i, op.Type, op.ClusterName, job.Namespace, job.Name, *job.Spec.Parallelism) + } + } + err = fjc.fedUpdater.Update(operations) + if err != nil { + return statusError, err + } + + // Some operations were made, reconcile after a while. + return statusNeedRecheck, nil + +} + +func (fjc *FederationJobController) reconcileJobsOnClusterChange() { + if !fjc.isSynced() { + fjc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) + } + jobs := fjc.jobStore.List() + for _, job := range jobs { + key, _ := controller.KeyFunc(job) + fjc.deliverJobByKey(key, 0, false) + } +} + +// delete deletes the given job or returns error if the deletion was not complete. +func (fjc *FederationJobController) delete(job *batchv1.Job) error { + glog.V(3).Infof("Handling deletion of job: %s/%s\n", job.Namespace, job.Name) + _, err := fjc.deletionHelper.HandleObjectInUnderlyingClusters(job) + if err != nil { + return err + } + + err = fjc.fedClient.BatchV1().Jobs(job.Namespace).Delete(job.Name, nil) + if err != nil { + // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. + // This is expected when we are processing an update as a result of job finalizer deletion. + // The process that deleted the last finalizer is also going to delete the job and we do not have to do anything. + if !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete job: %s/%s, %v", job.Namespace, job.Name, err) + } + } + return nil +} diff --git a/federation/pkg/federation-controller/job/jobcontroller_test.go b/federation/pkg/federation-controller/job/jobcontroller_test.go new file mode 100644 index 00000000000..65a869baa4a --- /dev/null +++ b/federation/pkg/federation-controller/job/jobcontroller_test.go @@ -0,0 +1,282 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "flag" + "fmt" + "testing" + "time" + + batchv1 "k8s.io/api/batch/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + kubeclientset "k8s.io/client-go/kubernetes" + kubeclientfake "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" + fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + finalizersutil "k8s.io/kubernetes/federation/pkg/federation-controller/util/finalizers" + testutil "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" + batchv1internal "k8s.io/kubernetes/pkg/apis/batch/v1" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" + "reflect" + "strings" +) + +func installWatchReactor(fakeClien *core.Fake, resource string) chan runtime.Object { + objChan := make(chan runtime.Object, 100) + + fakeWatch := watch.NewRaceFreeFake() + fakeClien.PrependWatchReactor(resource, core.DefaultWatchReactor(fakeWatch, nil)) + fakeClien.PrependReactor("create", resource, func(action core.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(core.CreateAction).GetObject() + batchv1internal.SetDefaults_Job(obj.(*batchv1.Job)) + fakeWatch.Add(obj) + objChan <- obj + return false, nil, nil + }) + fakeClien.PrependReactor("update", resource, func(action core.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(core.UpdateAction).GetObject() + fakeWatch.Modify(obj) + objChan <- obj + return false, nil, nil + }) + fakeClien.PrependReactor("delete", resource, func(action core.Action) (handled bool, ret runtime.Object, err error) { + obj := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: action.(core.DeleteAction).GetName(), + Namespace: action.GetNamespace(), + }, + } + fakeWatch.Delete(obj) + objChan <- obj + return false, nil, nil + }) + + return objChan +} + +func TestJobController(t *testing.T) { + flag.Set("logtostderr", "true") + flag.Set("v", "5") + flag.Parse() + + jobReviewDelay = 50 * time.Millisecond + clusterAvailableDelay = 200 * time.Millisecond + clusterUnavailableDelay = 200 * time.Millisecond + + fedclientset := fedclientfake.NewSimpleClientset() + fedChan := installWatchReactor(&fedclientset.Fake, "jobs") + + fedclientset.Federation().Clusters().Create(testutil.NewCluster("k8s-1", apiv1.ConditionTrue)) + fedclientset.Federation().Clusters().Create(testutil.NewCluster("k8s-2", apiv1.ConditionTrue)) + + kube1clientset := kubeclientfake.NewSimpleClientset() + kube1Chan := installWatchReactor(&kube1clientset.Fake, "jobs") + kube2clientset := kubeclientfake.NewSimpleClientset() + kube2Chan := installWatchReactor(&kube2clientset.Fake, "jobs") + + fedInformerClientFactory := func(cluster *fedv1.Cluster) (kubeclientset.Interface, error) { + switch cluster.Name { + case "k8s-1": + return kube1clientset, nil + case "k8s-2": + return kube2clientset, nil + default: + return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name) + } + } + jobController := NewJobController(fedclientset) + fedjobinformer := testutil.ToFederatedInformerForTestOnly(jobController.fedJobInformer) + fedjobinformer.SetClientFactory(fedInformerClientFactory) + + stopChan := make(chan struct{}) + defer close(stopChan) + go jobController.Run(5, stopChan) + + test := func(job *batchv1.Job, parallelism1, parallelism2, completions1, completions2 int32) { + job, _ = fedclientset.Batch().Jobs(metav1.NamespaceDefault).Create(job) + + joinErrors := func(errors []error) error { + if len(errors) == 0 { + return nil + } + errorStrings := []string{} + for _, err := range errors { + errorStrings = append(errorStrings, err.Error()) + } + return fmt.Errorf("%s", strings.Join(errorStrings, "\n")) + } + + // check local jobs are created with correct spec + checkLocalJob := func(parallelism, completions int32) testutil.CheckingFunction { + return func(obj runtime.Object) error { + errors := []error{} + ljob := obj.(*batchv1.Job) + if !fedutil.ObjectMetaEquivalent(job.ObjectMeta, ljob.ObjectMeta) { + errors = append(errors, fmt.Errorf("Job meta un-equivalent: %#v (expected) != %#v (actual)", job.ObjectMeta, ljob.ObjectMeta)) + } + if err := checkEqual(t, *ljob.Spec.Parallelism, parallelism, "Spec.Parallelism"); err != nil { + errors = append(errors, err) + } + if ljob.Spec.Completions != nil { + if err := checkEqual(t, *ljob.Spec.Completions, completions, "Spec.Completions"); err != nil { + errors = append(errors, err) + } + } + return joinErrors(errors) + } + } + checkFedJob := func(obj runtime.Object) error { + errors := []error{} + return joinErrors(errors) + } + assert.NoError(t, testutil.CheckObjectFromChan(kube1Chan, checkLocalJob(parallelism1, completions1))) + assert.NoError(t, testutil.CheckObjectFromChan(kube2Chan, checkLocalJob(parallelism2, completions2))) + assert.NoError(t, testutil.CheckObjectFromChan(fedChan, checkFedJob)) + + // finish local jobs + job1, _ := kube1clientset.Batch().Jobs(metav1.NamespaceDefault).Get(job.Name, metav1.GetOptions{}) + finishJob(job1, 100*time.Millisecond) + job1, _ = kube1clientset.Batch().Jobs(metav1.NamespaceDefault).UpdateStatus(job1) + job2, _ := kube2clientset.Batch().Jobs(metav1.NamespaceDefault).Get(job.Name, metav1.GetOptions{}) + finishJob(job2, 100*time.Millisecond) + job2, _ = kube2clientset.Batch().Jobs(metav1.NamespaceDefault).UpdateStatus(job2) + + // check fed job status updated + assert.NoError(t, testutil.CheckObjectFromChan(fedChan, func(obj runtime.Object) error { + errors := []error{} + job := obj.(*batchv1.Job) + if err := checkEqual(t, *job.Spec.Parallelism, *job1.Spec.Parallelism+*job2.Spec.Parallelism, "Spec.Parallelism"); err != nil { + errors = append(errors, err) + } + if job.Spec.Completions != nil { + if err := checkEqual(t, *job.Spec.Completions, *job1.Spec.Completions+*job2.Spec.Completions, "Spec.Completions"); err != nil { + errors = append(errors, err) + } + } + if err := checkEqual(t, job.Status.Succeeded, job1.Status.Succeeded+job2.Status.Succeeded, "Status.Succeeded"); err != nil { + errors = append(errors, err) + } + return joinErrors(errors) + })) + + // delete fed job by set deletion time, and remove orphan finalizer + job, _ = fedclientset.Batch().Jobs(metav1.NamespaceDefault).Get(job.Name, metav1.GetOptions{}) + deletionTimestamp := metav1.Now() + job.DeletionTimestamp = &deletionTimestamp + finalizersutil.RemoveFinalizers(job, sets.NewString(metav1.FinalizerOrphanDependents)) + fedclientset.Batch().Jobs(metav1.NamespaceDefault).Update(job) + + // check jobs are deleted + checkDeleted := func(obj runtime.Object) error { + djob := obj.(*batchv1.Job) + deletedJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: djob.Name, + Namespace: djob.Namespace, + }, + } + if !reflect.DeepEqual(djob, deletedJob) { + return fmt.Errorf("%s/%s should be deleted", djob.Namespace, djob.Name) + } + return nil + } + assert.NoError(t, testutil.CheckObjectFromChan(kube1Chan, checkDeleted)) + assert.NoError(t, testutil.CheckObjectFromChan(kube2Chan, checkDeleted)) + assert.NoError(t, testutil.CheckObjectFromChan(fedChan, checkDeleted)) + } + + test(newJob("job1", 2, 7), 1, 1, 4, 3) + test(newJob("job2", 2, -1), 1, 1, -1, -1) + test(newJob("job3", 7, 2), 4, 3, 1, 1) + test(newJob("job4", 7, 1), 4, 3, 1, 0) +} + +func checkEqual(_ *testing.T, expected, actual interface{}, msg string) error { + if !assert.ObjectsAreEqual(expected, actual) { + return fmt.Errorf("%s not equal: %#v (expected) != %#v (actual)", msg, expected, actual) + } + return nil +} + +func newJob(name string, parallelism int32, completions int32) *batchv1.Job { + job := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + SelfLink: "/api/v1/namespaces/default/jobs/name", + }, + Spec: batchv1.JobSpec{ + Parallelism: ¶llelism, + Completions: &completions, + Template: apiv1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": name, + }, + }, + Spec: apiv1.PodSpec{ + Containers: []apiv1.Container{ + {Image: "foo/bar"}, + }, + RestartPolicy: apiv1.RestartPolicyNever, + }, + }, + }, + } + if parallelism < 0 { + job.Spec.Parallelism = nil + } + if completions < 0 { + job.Spec.Completions = nil + } + + batchv1internal.SetDefaults_Job(&job) + return &job +} + +func newCondition(conditionType batchv1.JobConditionType, reason, message string) batchv1.JobCondition { + return batchv1.JobCondition{ + Type: conditionType, + Status: apiv1.ConditionTrue, + LastProbeTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: message, + } +} + +func finishJob(job *batchv1.Job, duration time.Duration) { + job.Status.Conditions = append(job.Status.Conditions, newCondition(batchv1.JobComplete, "", "")) + if job.Spec.Completions == nil { + job.Status.Succeeded = 1 + } else { + job.Status.Succeeded = *job.Spec.Completions + } + now := metav1.Now() + job.Status.StartTime = &now + time.Sleep(duration) + now = metav1.Now() + job.Status.CompletionTime = &now +} diff --git a/federation/pkg/federation-controller/util/handlers.go b/federation/pkg/federation-controller/util/handlers.go index 0e2dec5bf58..406d5ca3161 100644 --- a/federation/pkg/federation-controller/util/handlers.go +++ b/federation/pkg/federation-controller/util/handlers.go @@ -71,6 +71,7 @@ func NewTriggerOnMetaAndSpecChanges(triggerFunc func(pkgruntime.Object)) *cache. oldMeta := getFieldOrPanic(old, "ObjectMeta").(metav1.ObjectMeta) curMeta := getFieldOrPanic(cur, "ObjectMeta").(metav1.ObjectMeta) if !ObjectMetaEquivalent(oldMeta, curMeta) || + !reflect.DeepEqual(oldMeta.DeletionTimestamp, curMeta.DeletionTimestamp) || !reflect.DeepEqual(getFieldOrPanic(old, "Spec"), getFieldOrPanic(cur, "Spec")) { triggerFunc(curObj) } diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index a3d243b78bd..453b36fb402 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -119,6 +119,7 @@ cni-conf-dir concurrent-deployment-syncs concurrent-endpoint-syncs concurrent-gc-syncs +concurrent-job-syncs concurrent-namespace-syncs concurrent-replicaset-syncs concurrent-resource-quota-syncs diff --git a/test/e2e_federation/BUILD b/test/e2e_federation/BUILD index 274a930df30..181a626ba59 100644 --- a/test/e2e_federation/BUILD +++ b/test/e2e_federation/BUILD @@ -15,6 +15,7 @@ go_library( "crud.go", "event.go", "ingress.go", + "job.go", "namespace.go", "replicaset.go", "service.go", @@ -29,6 +30,7 @@ go_library( "//federation/client/clientset_generated/federation_clientset/typed/core/v1:go_default_library", "//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", + "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/cloudprovider:go_default_library", "//test/e2e/chaosmonkey:go_default_library", @@ -38,6 +40,7 @@ go_library( "//test/e2e_federation/upgrades:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/gomega:go_default_library", + "//vendor/k8s.io/api/batch/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", diff --git a/test/e2e_federation/job.go b/test/e2e_federation/job.go new file mode 100644 index 00000000000..71ad152ba4b --- /dev/null +++ b/test/e2e_federation/job.go @@ -0,0 +1,291 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e_federation + +import ( + "fmt" + "strings" + "time" + + batchv1 "k8s.io/api/batch/v1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/test/e2e/framework" + fedframework "k8s.io/kubernetes/test/e2e_federation/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/kubernetes/pkg/api" +) + +const ( + FederationJobName = "federation-job" +) + +var _ = framework.KubeDescribe("Federation jobs [Feature:Federation]", func() { + + f := fedframework.NewDefaultFederatedFramework("federation-job") + + Describe("Job objects [NoCluster]", func() { + AfterEach(func() { + fedframework.SkipUnlessFederated(f.ClientSet) + + // Delete all jobs. + nsName := f.FederationNamespace.Name + deleteAllJobsOrFail(f.FederationClientset, nsName) + }) + + It("should be created and deleted successfully", func() { + fedframework.SkipUnlessFederated(f.ClientSet) + + nsName := f.FederationNamespace.Name + job := createJobOrFail(f.FederationClientset, nsName) + By(fmt.Sprintf("Creation of job %q in namespace %q succeeded. Deleting job.", job.Name, nsName)) + // Cleanup + err := f.FederationClientset.Batch().Jobs(nsName).Delete(job.Name, &metav1.DeleteOptions{}) + framework.ExpectNoError(err, "Error deleting job %q in namespace %q", job.Name, job.Namespace) + By(fmt.Sprintf("Deletion of job %q in namespace %q succeeded.", job.Name, nsName)) + }) + + }) + + // e2e cases for federated job controller + Describe("Federated Job", func() { + var ( + clusters fedframework.ClusterSlice + ) + BeforeEach(func() { + fedframework.SkipUnlessFederated(f.ClientSet) + clusters = f.GetRegisteredClusters() + }) + + AfterEach(func() { + nsName := f.FederationNamespace.Name + deleteAllJobsOrFail(f.FederationClientset, nsName) + }) + + It("should create and update matching jobs in underlying clusters", func() { + nsName := f.FederationNamespace.Name + job := createJobOrFail(f.FederationClientset, nsName) + defer func() { + // cleanup. deletion of jobs is not supported for underlying clusters + By(fmt.Sprintf("Deleting job %q/%q", nsName, job.Name)) + waitForJobOrFail(f.FederationClientset, nsName, job.Name, clusters) + f.FederationClientset.Batch().Jobs(nsName).Delete(job.Name, &metav1.DeleteOptions{}) + }() + + waitForJobOrFail(f.FederationClientset, nsName, job.Name, clusters) + By(fmt.Sprintf("Successfuly created and synced job %q/%q to clusters", nsName, job.Name)) + }) + + It("should be deleted from underlying clusters when OrphanDependents is false", func() { + fedframework.SkipUnlessFederated(f.ClientSet) + nsName := f.FederationNamespace.Name + orphanDependents := false + verifyCascadingDeletionForJob(f.FederationClientset, clusters, &orphanDependents, nsName) + By(fmt.Sprintf("Verified that jobs were deleted from underlying clusters")) + }) + + It("should not be deleted from underlying clusters when OrphanDependents is true", func() { + fedframework.SkipUnlessFederated(f.ClientSet) + nsName := f.FederationNamespace.Name + orphanDependents := true + verifyCascadingDeletionForJob(f.FederationClientset, clusters, &orphanDependents, nsName) + By(fmt.Sprintf("Verified that jobs were not deleted from underlying clusters")) + }) + + It("should not be deleted from underlying clusters when OrphanDependents is nil", func() { + fedframework.SkipUnlessFederated(f.ClientSet) + nsName := f.FederationNamespace.Name + verifyCascadingDeletionForJob(f.FederationClientset, clusters, nil, nsName) + By(fmt.Sprintf("Verified that jobs were not deleted from underlying clusters")) + }) + + }) +}) + +// deleteAllJobsOrFail deletes all jobs in the given namespace name. +func deleteAllJobsOrFail(clientset *fedclientset.Clientset, nsName string) { + jobList, err := clientset.Batch().Jobs(nsName).List(metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + orphanDependents := false + for _, job := range jobList.Items { + deleteJobOrFail(clientset, nsName, job.Name, &orphanDependents) + } +} + +// verifyCascadingDeletionForJob verifies that job are deleted +// from underlying clusters when orphan dependents is false and they are not +// deleted when orphan dependents is true. +func verifyCascadingDeletionForJob(clientset *fedclientset.Clientset, clusters fedframework.ClusterSlice, orphanDependents *bool, nsName string) { + job := createJobOrFail(clientset, nsName) + jobName := job.Name + // Check subclusters if the job was created there. + By(fmt.Sprintf("Waiting for job %s to be created in all underlying clusters", jobName)) + err := wait.Poll(5*time.Second, 2*time.Minute, func() (bool, error) { + for _, cluster := range clusters { + _, err := cluster.Batch().Jobs(nsName).Get(jobName, metav1.GetOptions{}) + if err != nil && errors.IsNotFound(err) { + return false, nil + } + if err != nil { + return false, err + } + } + return true, nil + }) + framework.ExpectNoError(err, "Not all jobs created") + + By(fmt.Sprintf("Deleting job %s", jobName)) + deleteJobOrFail(clientset, nsName, jobName, orphanDependents) + + By(fmt.Sprintf("Verifying job %s in underlying clusters", jobName)) + errMessages := []string{} + // job should be present in underlying clusters unless orphanDependents is false. + shouldExist := orphanDependents == nil || *orphanDependents == true + for _, cluster := range clusters { + clusterName := cluster.Name + _, err := cluster.Batch().Jobs(nsName).Get(jobName, metav1.GetOptions{}) + if shouldExist && errors.IsNotFound(err) { + errMessages = append(errMessages, fmt.Sprintf("unexpected NotFound error for job %s in cluster %s, expected job to exist", jobName, clusterName)) + } else if !shouldExist && !errors.IsNotFound(err) { + errMessages = append(errMessages, fmt.Sprintf("expected NotFound error for job %s in cluster %s, got error: %v", jobName, clusterName, err)) + } + } + if len(errMessages) != 0 { + framework.Failf("%s", strings.Join(errMessages, "; ")) + } +} + +func waitForJobOrFail(c *fedclientset.Clientset, namespace string, jobName string, clusters fedframework.ClusterSlice) { + err := waitForJob(c, namespace, jobName, clusters) + framework.ExpectNoError(err, "Failed to verify job %q/%q, err: %v", namespace, jobName, err) +} + +func waitForJob(c *fedclientset.Clientset, namespace string, jobName string, clusters fedframework.ClusterSlice) error { + err := wait.Poll(10*time.Second, fedframework.FederatedDefaultTestTimeout, func() (bool, error) { + fjob, err := c.Batch().Jobs(namespace).Get(jobName, metav1.GetOptions{}) + if err != nil { + return false, err + } + succeeded := int32(0) + for _, cluster := range clusters { + job, err := cluster.Batch().Jobs(namespace).Get(jobName, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + By(fmt.Sprintf("Failed getting job: %q/%q/%q, err: %v", cluster.Name, namespace, jobName, err)) + return false, err + } + if err == nil { + if !verifyJob(fjob, job) { + By(fmt.Sprintf("Job meta or spec not match for cluster %q:\n federation: %v\n cluster: %v", cluster.Name, fjob, job)) + return false, nil + } + succeeded += job.Status.Succeeded + } + } + if succeeded == fjob.Status.Succeeded && + (fjob.Spec.Completions != nil && succeeded == *fjob.Spec.Completions) { + return true, nil + } + By(fmt.Sprintf("Job statuses not match, federation succeeded: %v/%v, clusters succeeded: %v\n", + fjob.Status.Succeeded, func(p *int32) int32 { + if p != nil { + return *p + } else { + return -1 + } + }(fjob.Spec.Completions), succeeded)) + return false, nil + }) + + return err +} + +func verifyJob(fedJob, localJob *batchv1.Job) bool { + localJobObj, _ := api.Scheme.DeepCopy(localJob) + localJob = localJobObj.(*batchv1.Job) + localJob.Spec.ManualSelector = fedJob.Spec.ManualSelector + localJob.Spec.Completions = fedJob.Spec.Completions + localJob.Spec.Parallelism = fedJob.Spec.Parallelism + return fedutil.ObjectMetaAndSpecEquivalent(fedJob, localJob) +} + +func createJobOrFail(clientset *fedclientset.Clientset, namespace string) *batchv1.Job { + if clientset == nil || len(namespace) == 0 { + Fail(fmt.Sprintf("Internal error: invalid parameters passed to createJobOrFail: clientset: %v, namespace: %v", clientset, namespace)) + } + By(fmt.Sprintf("Creating federation job %q in namespace %q", FederationJobName, namespace)) + + job := newJobForFed(namespace, FederationJobName, 5, 5) + + _, err := clientset.Batch().Jobs(namespace).Create(job) + framework.ExpectNoError(err, "Creating job %q in namespace %q", job.Name, namespace) + By(fmt.Sprintf("Successfully created federation job %q in namespace %q", FederationJobName, namespace)) + return job +} + +func deleteJobOrFail(clientset *fedclientset.Clientset, nsName string, jobName string, orphanDependents *bool) { + By(fmt.Sprintf("Deleting job %q in namespace %q", jobName, nsName)) + err := clientset.Batch().Jobs(nsName).Delete(jobName, &metav1.DeleteOptions{OrphanDependents: orphanDependents}) + if err != nil && !errors.IsNotFound(err) { + framework.ExpectNoError(err, "Error deleting job %q in namespace %q", jobName, nsName) + } + + // Wait for the job to be deleted. + err = wait.Poll(10*time.Second, fedframework.FederatedDefaultTestTimeout, func() (bool, error) { + _, err := clientset.Batch().Jobs(nsName).Get(jobName, metav1.GetOptions{}) + if err != nil && errors.IsNotFound(err) { + return true, nil + } + return false, err + }) + if err != nil { + framework.Failf("Error in deleting job %s: %v", jobName, err) + } +} + +func newJobForFed(namespace string, name string, completions int32, parallelism int32) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: batchv1.JobSpec{ + Parallelism: ¶llelism, + Completions: &completions, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"name": "fjob"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "sleep", + Image: "gcr.io/google_containers/busybox:1.24", + Command: []string{"sleep", "1"}, + }, + }, + RestartPolicy: v1.RestartPolicyNever, + }, + }, + }, + } +}