Merge pull request #36197 from jianhuiz/federation-job-controller

Automatic merge from submit-queue

Federated Job controller implementation

Note that job re-balance is not there yet as it's difficult to honor job deadline

requires #35945 and 35943
fixes #34261
@quinton-hoole @nikhiljindal @deepak-vij

**Release note**:
```release-note
Federated Job feature.  It is now possible to create a Federated Job 
that is automatically deployed to one or more federated clusters 
(as Jobs in those clusters).    Job parallelism and completions are 
spread across clusters according to cluster selection and weighting 
preferences.  Federated Job status reflects the aggregate status 
across all underlying cluster Jobs.
```
This commit is contained in:
Kubernetes Submit Queue 2017-08-07 16:15:01 -07:00 committed by GitHub
commit 942f030ff0
11 changed files with 1236 additions and 0 deletions

View File

@ -24,6 +24,7 @@ go_library(
"//federation/pkg/federatedtypes:go_default_library",
"//federation/pkg/federation-controller/cluster:go_default_library",
"//federation/pkg/federation-controller/ingress:go_default_library",
"//federation/pkg/federation-controller/job:go_default_library",
"//federation/pkg/federation-controller/service:go_default_library",
"//federation/pkg/federation-controller/service/dns:go_default_library",
"//federation/pkg/federation-controller/sync:go_default_library",

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/federation/pkg/federatedtypes"
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress"
jobcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/job"
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
servicednscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service/dns"
synccontroller "k8s.io/kubernetes/federation/pkg/federation-controller/sync"
@ -155,6 +156,14 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
}
}
if controllerEnabled(s.Controllers, serverResources, jobcontroller.ControllerName, jobcontroller.RequiredResources, true) {
glog.V(3).Infof("Loading client config for job controller %q", jobcontroller.UserAgentName)
jobClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, jobcontroller.UserAgentName))
jobController := jobcontroller.NewJobController(jobClientset)
glog.V(3).Infof("Running job controller")
go jobController.Run(s.ConcurrentJobSyncs, wait.NeverStop)
}
if controllerEnabled(s.Controllers, serverResources, ingresscontroller.ControllerName, ingresscontroller.RequiredResources, true) {
glog.V(3).Infof("Loading client config for ingress controller %q", ingresscontroller.UserAgentName)
ingClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, ingresscontroller.UserAgentName))

View File

@ -56,6 +56,10 @@ type ControllerManagerConfiguration struct {
// allowed to sync concurrently. Larger number = more responsive service
// management, but more CPU (and network) load.
ConcurrentReplicaSetSyncs int `json:"concurrentReplicaSetSyncs"`
// concurrentJobSyncs is the number of Jobs that are
// allowed to sync concurrently. Larger number = more responsive service
// management, but more CPU (and network) load.
ConcurrentJobSyncs int `json:"concurrentJobSyncs"`
// clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller.
ClusterMonitorPeriod metav1.Duration `json:"clusterMonitorPeriod"`
// APIServerQPS is the QPS to use while talking with federation apiserver.
@ -96,6 +100,7 @@ func NewCMServer() *CMServer {
ConcurrentServiceSyncs: 10,
ConcurrentReplicaSetSyncs: 10,
ClusterMonitorPeriod: metav1.Duration{Duration: 40 * time.Second},
ConcurrentJobSyncs: 10,
APIServerQPS: 20.0,
APIServerBurst: 30,
LeaderElection: leaderelectionconfig.DefaultLeaderElectionConfiguration(),
@ -115,6 +120,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ServiceDnsSuffix, "service-dns-suffix", s.ServiceDnsSuffix, "DNS Suffix to use when publishing federated service names. Defaults to zone-name")
fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.IntVar(&s.ConcurrentReplicaSetSyncs, "concurrent-replicaset-syncs", s.ConcurrentReplicaSetSyncs, "The number of ReplicaSets syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.IntVar(&s.ConcurrentJobSyncs, "concurrent-job-syncs", s.ConcurrentJobSyncs, "The number of Jobs syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.")
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
fs.BoolVar(&s.EnableContentionProfiling, "contention-profiling", false, "Enable lock contention profiling, if profiling is enabled")

View File

@ -26,6 +26,7 @@ filegroup(
":package-srcs",
"//federation/pkg/federation-controller/cluster:all-srcs",
"//federation/pkg/federation-controller/ingress:all-srcs",
"//federation/pkg/federation-controller/job:all-srcs",
"//federation/pkg/federation-controller/service:all-srcs",
"//federation/pkg/federation-controller/sync:all-srcs",
"//federation/pkg/federation-controller/util:all-srcs",

View File

@ -0,0 +1,80 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["jobcontroller.go"],
tags = ["automanaged"],
deps = [
"//federation/apis/federation:go_default_library",
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//federation/pkg/federation-controller/util/planner:go_default_library",
"//federation/pkg/federation-controller/util/replicapreferences:go_default_library",
"//pkg/api:go_default_library",
"//pkg/controller:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/batch/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["jobcontroller_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset/fake:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/finalizers:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/apis/batch/v1:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/batch/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,561 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
"reflect"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog"
batchv1 "k8s.io/api/batch/v1"
clientv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
fed "k8s.io/kubernetes/federation/apis/federation"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/controller"
)
const (
fedJobPreferencesAnnotation = "federation.kubernetes.io/job-preferences"
allClustersKey = "THE_ALL_CLUSTER_KEY"
// UserAgentName is the user agent used in the federation client
UserAgentName = "Federation-Job-Controller"
// ControllerName is name of this controller
ControllerName = "jobs"
)
var (
// RequiredResources is the resource group version of the type this controller manages
RequiredResources = []schema.GroupVersionResource{batchv1.SchemeGroupVersion.WithResource("jobs")}
jobReviewDelay = 10 * time.Second
clusterAvailableDelay = 20 * time.Second
clusterUnavailableDelay = 60 * time.Second
updateTimeout = 30 * time.Second
backoffInitial = 5 * time.Second
backoffMax = 1 * time.Minute
)
// FederationJobController synchronizes the state of a federated job object
// to clusters that are members of the federation.
type FederationJobController struct {
fedClient fedclientset.Interface
jobController cache.Controller
jobStore cache.Store
fedJobInformer fedutil.FederatedInformer
jobDeliverer *fedutil.DelayingDeliverer
clusterDeliverer *fedutil.DelayingDeliverer
jobWorkQueue workqueue.Interface
// For updating members of federation.
fedUpdater fedutil.FederatedUpdater
jobBackoff *flowcontrol.Backoff
// For events
eventRecorder record.EventRecorder
defaultPlanner *planner.Planner
deletionHelper *deletionhelper.DeletionHelper
}
// NewJobController creates a new federation job controller
func NewJobController(fedClient fedclientset.Interface) *FederationJobController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(fedClient))
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "federated-job-controller"})
fjc := &FederationJobController{
fedClient: fedClient,
jobDeliverer: fedutil.NewDelayingDeliverer(),
clusterDeliverer: fedutil.NewDelayingDeliverer(),
jobWorkQueue: workqueue.New(),
jobBackoff: flowcontrol.NewBackOff(backoffInitial, backoffMax),
defaultPlanner: planner.NewPlanner(&fed.ReplicaAllocationPreferences{
Clusters: map[string]fed.ClusterPreferences{
"*": {Weight: 1},
},
}),
eventRecorder: recorder,
}
jobFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.BatchV1().Jobs(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.BatchV1().Jobs(metav1.NamespaceAll).Watch(options)
},
},
&batchv1.Job{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnAllChanges(
func(obj runtime.Object) { fjc.deliverLocalJob(obj, jobReviewDelay) },
),
)
}
clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *fedv1.Cluster) {
fjc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
},
ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) {
fjc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
},
}
fjc.fedJobInformer = fedutil.NewFederatedInformer(fedClient, jobFedInformerFactory, &clusterLifecycle)
fjc.jobStore, fjc.jobController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return fjc.fedClient.BatchV1().Jobs(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fjc.fedClient.BatchV1().Jobs(metav1.NamespaceAll).Watch(options)
},
},
&batchv1.Job{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnMetaAndSpecChanges(
func(obj runtime.Object) { fjc.deliverFedJobObj(obj, 0) },
),
)
fjc.fedUpdater = fedutil.NewFederatedUpdater(fjc.fedJobInformer, "job", updateTimeout, fjc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*batchv1.Job)
_, err := client.BatchV1().Jobs(rs.Namespace).Create(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*batchv1.Job)
_, err := client.BatchV1().Jobs(rs.Namespace).Update(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*batchv1.Job)
err := client.BatchV1().Jobs(rs.Namespace).Delete(rs.Name, &metav1.DeleteOptions{})
return err
})
fjc.deletionHelper = deletionhelper.NewDeletionHelper(
fjc.updateJob,
// objNameFunc
func(obj runtime.Object) string {
job := obj.(*batchv1.Job)
return job.Name
},
fjc.fedJobInformer,
fjc.fedUpdater,
)
return fjc
}
// Sends the given updated object to apiserver.
// Assumes that the given object is a job.
func (fjc *FederationJobController) updateJob(obj runtime.Object) (runtime.Object, error) {
job := obj.(*batchv1.Job)
return fjc.fedClient.BatchV1().Jobs(job.Namespace).Update(job)
}
// Run starts the syncing of federation jobs to the clusters.
func (fjc *FederationJobController) Run(workers int, stopCh <-chan struct{}) {
go fjc.jobController.Run(stopCh)
fjc.fedJobInformer.Start()
fjc.jobDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) {
fjc.jobWorkQueue.Add(item.Key)
})
fjc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) {
fjc.reconcileJobsOnClusterChange()
})
for !fjc.isSynced() {
time.Sleep(5 * time.Millisecond)
}
for i := 0; i < workers; i++ {
go wait.Until(fjc.worker, time.Second, stopCh)
}
fedutil.StartBackoffGC(fjc.jobBackoff, stopCh)
<-stopCh
glog.Infof("Shutting down FederationJobController")
fjc.jobDeliverer.Stop()
fjc.clusterDeliverer.Stop()
fjc.jobWorkQueue.ShutDown()
fjc.fedJobInformer.Stop()
}
func (fjc *FederationJobController) isSynced() bool {
if !fjc.fedJobInformer.ClustersSynced() {
glog.V(3).Infof("Cluster list not synced")
return false
}
clusters, err := fjc.fedJobInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
if !fjc.fedJobInformer.GetTargetStore().ClustersSynced(clusters) {
glog.V(2).Infof("cluster job list not synced")
return false
}
if !fjc.jobController.HasSynced() {
glog.V(2).Infof("federation job list not synced")
return false
}
return true
}
func (fjc *FederationJobController) deliverLocalJob(obj interface{}, duration time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %v: %v", obj, err)
return
}
_, exists, err := fjc.jobStore.GetByKey(key)
if err != nil {
glog.Errorf("Couldn't get federated job %v: %v", key, err)
return
}
if exists { // ignore jobs exists only in local k8s
fjc.deliverJobByKey(key, duration, false)
}
}
func (fjc *FederationJobController) deliverFedJobObj(obj interface{}, delay time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
fjc.deliverJobByKey(key, delay, false)
}
func (fjc *FederationJobController) deliverJobByKey(key string, delay time.Duration, failed bool) {
if failed {
fjc.jobBackoff.Next(key, time.Now())
delay = delay + fjc.jobBackoff.Get(key)
} else {
fjc.jobBackoff.Reset(key)
}
fjc.jobDeliverer.DeliverAfter(key, nil, delay)
}
type reconciliationStatus string
const (
statusAllOk = reconciliationStatus("ALL_OK")
statusNeedRecheck = reconciliationStatus("RECHECK")
statusError = reconciliationStatus("ERROR")
statusNotSynced = reconciliationStatus("NOSYNC")
)
func (fjc *FederationJobController) worker() {
for {
item, quit := fjc.jobWorkQueue.Get()
if quit {
return
}
key := item.(string)
status, err := fjc.reconcileJob(key)
fjc.jobWorkQueue.Done(item)
if err != nil {
glog.Errorf("Error syncing job controller: %v", err)
fjc.deliverJobByKey(key, 0, true)
} else {
switch status {
case statusAllOk:
break
case statusError:
fjc.deliverJobByKey(key, 0, true)
case statusNeedRecheck:
fjc.deliverJobByKey(key, jobReviewDelay, false)
case statusNotSynced:
fjc.deliverJobByKey(key, clusterAvailableDelay, false)
default:
glog.Errorf("Unhandled reconciliation status: %s", status)
fjc.deliverJobByKey(key, jobReviewDelay, false)
}
}
}
}
type scheduleResult struct {
Parallelism *int32
Completions *int32
}
func (fjc *FederationJobController) schedule(fjob *batchv1.Job, clusters []*fedv1.Cluster) map[string]scheduleResult {
plnr := fjc.defaultPlanner
frsPref, err := replicapreferences.GetAllocationPreferences(fjob, fedJobPreferencesAnnotation)
if err != nil {
glog.Warningf("Invalid job specific preference, use default. rs: %v, err: %v", fjob, err)
}
if frsPref != nil { // create a new planner if user specified a preference
plnr = planner.NewPlanner(frsPref)
}
parallelism := int64(*fjob.Spec.Parallelism)
var clusterNames []string
for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name)
}
parallelismResult, _ := plnr.Plan(parallelism, clusterNames, nil, nil, fjob.Namespace+"/"+fjob.Name)
if frsPref != nil {
for _, clusterPref := range frsPref.Clusters {
clusterPref.MinReplicas = 0
clusterPref.MaxReplicas = nil
}
plnr = planner.NewPlanner(frsPref)
}
clusterNames = nil
for clusterName := range parallelismResult {
clusterNames = append(clusterNames, clusterName)
}
completionsResult := make(map[string]int64)
if fjob.Spec.Completions != nil {
completionsResult, _ = plnr.Plan(int64(*fjob.Spec.Completions), clusterNames, nil, nil, fjob.Namespace+"/"+fjob.Name)
}
results := make(map[string]scheduleResult)
for _, clusterName := range clusterNames {
paralle := int32(parallelismResult[clusterName])
complet := int32(completionsResult[clusterName])
result := scheduleResult{
Parallelism: &paralle,
}
if fjob.Spec.Completions != nil {
result.Completions = &complet
}
results[clusterName] = result
}
return results
}
func (fjc *FederationJobController) reconcileJob(key string) (reconciliationStatus, error) {
if !fjc.isSynced() {
return statusNotSynced, nil
}
glog.V(4).Infof("Start reconcile job %q", key)
startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile job %q (%v)", key, time.Now().Sub(startTime))
objFromStore, exists, err := fjc.jobStore.GetByKey(key)
if err != nil {
return statusError, err
}
if !exists {
// deleted federated job, nothing need to do
return statusAllOk, nil
}
// Create a copy before modifying the obj to prevent race condition with other readers of obj from store.
obj, err := api.Scheme.DeepCopy(objFromStore)
fjob, ok := obj.(*batchv1.Job)
if err != nil || !ok {
return statusError, err
}
// delete job
if fjob.DeletionTimestamp != nil {
if err := fjc.delete(fjob); err != nil {
fjc.eventRecorder.Eventf(fjob, api.EventTypeNormal, "DeleteFailed", "Job delete failed: %v", err)
return statusError, err
}
return statusAllOk, nil
}
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for job: %s\n", key)
// Add the required finalizers before creating a job in underlying clusters.
updatedJobObj, err := fjc.deletionHelper.EnsureFinalizers(fjob)
if err != nil {
return statusError, err
}
fjob = updatedJobObj.(*batchv1.Job)
clusters, err := fjc.fedJobInformer.GetReadyClusters()
if err != nil {
return statusError, err
}
scheduleResult := fjc.schedule(fjob, clusters)
glog.V(3).Infof("Start syncing local job %s: %s\n", key, spew.Sprintf("%v", scheduleResult))
fedStatus := batchv1.JobStatus{}
var fedStatusFailedCondition *batchv1.JobCondition
var fedStatusCompleteCondition *batchv1.JobCondition
var operations []fedutil.FederatedOperation
for clusterName, result := range scheduleResult {
ljobObj, exists, err := fjc.fedJobInformer.GetTargetStore().GetByKey(clusterName, key)
if err != nil {
return statusError, err
}
ljob := &batchv1.Job{
ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fjob.ObjectMeta),
Spec: *fedutil.DeepCopyApiTypeOrPanic(&fjob.Spec).(*batchv1.JobSpec),
}
// use selector generated at federation level, or user specified value
manualSelector := true
ljob.Spec.ManualSelector = &manualSelector
ljob.Spec.Parallelism = result.Parallelism
ljob.Spec.Completions = result.Completions
if !exists {
if *ljob.Spec.Parallelism > 0 {
fjc.eventRecorder.Eventf(fjob, api.EventTypeNormal, "CreateInCluster", "Creating job in cluster %s", clusterName)
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: ljob,
ClusterName: clusterName,
})
}
} else {
currentLjob := ljobObj.(*batchv1.Job)
// Update existing job, if needed.
if !fedutil.ObjectMetaAndSpecEquivalent(ljob, currentLjob) {
fjc.eventRecorder.Eventf(fjob, api.EventTypeNormal, "UpdateInCluster", "Updating job in cluster %s", clusterName)
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: ljob,
ClusterName: clusterName,
})
}
// collect local job status
for _, condition := range currentLjob.Status.Conditions {
if condition.Type == batchv1.JobComplete {
if fedStatusCompleteCondition == nil ||
fedStatusCompleteCondition.LastTransitionTime.Before(condition.LastTransitionTime) {
fedStatusCompleteCondition = &condition
}
} else if condition.Type == batchv1.JobFailed {
if fedStatusFailedCondition == nil ||
fedStatusFailedCondition.LastTransitionTime.Before(condition.LastTransitionTime) {
fedStatusFailedCondition = &condition
}
}
}
if currentLjob.Status.StartTime != nil {
if fedStatus.StartTime == nil || fedStatus.StartTime.After(currentLjob.Status.StartTime.Time) {
fedStatus.StartTime = currentLjob.Status.StartTime
}
}
if currentLjob.Status.CompletionTime != nil {
if fedStatus.CompletionTime == nil || fedStatus.CompletionTime.Before(*currentLjob.Status.CompletionTime) {
fedStatus.CompletionTime = currentLjob.Status.CompletionTime
}
}
fedStatus.Active += currentLjob.Status.Active
fedStatus.Succeeded += currentLjob.Status.Succeeded
fedStatus.Failed += currentLjob.Status.Failed
}
}
// federated job fails if any local job failes
if fedStatusFailedCondition != nil {
fedStatus.Conditions = append(fedStatus.Conditions, *fedStatusFailedCondition)
} else if fedStatusCompleteCondition != nil {
fedStatus.Conditions = append(fedStatus.Conditions, *fedStatusCompleteCondition)
}
if !reflect.DeepEqual(fedStatus, fjob.Status) {
fjob.Status = fedStatus
_, err = fjc.fedClient.BatchV1().Jobs(fjob.Namespace).UpdateStatus(fjob)
if err != nil {
return statusError, err
}
}
if len(operations) == 0 {
// Everything is in order
return statusAllOk, nil
}
if glog.V(4) {
for i, op := range operations {
job := op.Obj.(*batchv1.Job)
glog.V(4).Infof("operation[%d]: %s, %s/%s/%s, %d", i, op.Type, op.ClusterName, job.Namespace, job.Name, *job.Spec.Parallelism)
}
}
err = fjc.fedUpdater.Update(operations)
if err != nil {
return statusError, err
}
// Some operations were made, reconcile after a while.
return statusNeedRecheck, nil
}
func (fjc *FederationJobController) reconcileJobsOnClusterChange() {
if !fjc.isSynced() {
fjc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
}
jobs := fjc.jobStore.List()
for _, job := range jobs {
key, _ := controller.KeyFunc(job)
fjc.deliverJobByKey(key, 0, false)
}
}
// delete deletes the given job or returns error if the deletion was not complete.
func (fjc *FederationJobController) delete(job *batchv1.Job) error {
glog.V(3).Infof("Handling deletion of job: %s/%s\n", job.Namespace, job.Name)
_, err := fjc.deletionHelper.HandleObjectInUnderlyingClusters(job)
if err != nil {
return err
}
err = fjc.fedClient.BatchV1().Jobs(job.Namespace).Delete(job.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of job finalizer deletion.
// The process that deleted the last finalizer is also going to delete the job and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete job: %s/%s, %v", job.Namespace, job.Name, err)
}
}
return nil
}

View File

@ -0,0 +1,282 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"flag"
"fmt"
"testing"
"time"
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
kubeclientset "k8s.io/client-go/kubernetes"
kubeclientfake "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientfake "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
finalizersutil "k8s.io/kubernetes/federation/pkg/federation-controller/util/finalizers"
testutil "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
batchv1internal "k8s.io/kubernetes/pkg/apis/batch/v1"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/sets"
"reflect"
"strings"
)
func installWatchReactor(fakeClien *core.Fake, resource string) chan runtime.Object {
objChan := make(chan runtime.Object, 100)
fakeWatch := watch.NewRaceFreeFake()
fakeClien.PrependWatchReactor(resource, core.DefaultWatchReactor(fakeWatch, nil))
fakeClien.PrependReactor("create", resource, func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(core.CreateAction).GetObject()
batchv1internal.SetDefaults_Job(obj.(*batchv1.Job))
fakeWatch.Add(obj)
objChan <- obj
return false, nil, nil
})
fakeClien.PrependReactor("update", resource, func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(core.UpdateAction).GetObject()
fakeWatch.Modify(obj)
objChan <- obj
return false, nil, nil
})
fakeClien.PrependReactor("delete", resource, func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: action.(core.DeleteAction).GetName(),
Namespace: action.GetNamespace(),
},
}
fakeWatch.Delete(obj)
objChan <- obj
return false, nil, nil
})
return objChan
}
func TestJobController(t *testing.T) {
flag.Set("logtostderr", "true")
flag.Set("v", "5")
flag.Parse()
jobReviewDelay = 50 * time.Millisecond
clusterAvailableDelay = 200 * time.Millisecond
clusterUnavailableDelay = 200 * time.Millisecond
fedclientset := fedclientfake.NewSimpleClientset()
fedChan := installWatchReactor(&fedclientset.Fake, "jobs")
fedclientset.Federation().Clusters().Create(testutil.NewCluster("k8s-1", apiv1.ConditionTrue))
fedclientset.Federation().Clusters().Create(testutil.NewCluster("k8s-2", apiv1.ConditionTrue))
kube1clientset := kubeclientfake.NewSimpleClientset()
kube1Chan := installWatchReactor(&kube1clientset.Fake, "jobs")
kube2clientset := kubeclientfake.NewSimpleClientset()
kube2Chan := installWatchReactor(&kube2clientset.Fake, "jobs")
fedInformerClientFactory := func(cluster *fedv1.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case "k8s-1":
return kube1clientset, nil
case "k8s-2":
return kube2clientset, nil
default:
return nil, fmt.Errorf("Unknown cluster: %v", cluster.Name)
}
}
jobController := NewJobController(fedclientset)
fedjobinformer := testutil.ToFederatedInformerForTestOnly(jobController.fedJobInformer)
fedjobinformer.SetClientFactory(fedInformerClientFactory)
stopChan := make(chan struct{})
defer close(stopChan)
go jobController.Run(5, stopChan)
test := func(job *batchv1.Job, parallelism1, parallelism2, completions1, completions2 int32) {
job, _ = fedclientset.Batch().Jobs(metav1.NamespaceDefault).Create(job)
joinErrors := func(errors []error) error {
if len(errors) == 0 {
return nil
}
errorStrings := []string{}
for _, err := range errors {
errorStrings = append(errorStrings, err.Error())
}
return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
}
// check local jobs are created with correct spec
checkLocalJob := func(parallelism, completions int32) testutil.CheckingFunction {
return func(obj runtime.Object) error {
errors := []error{}
ljob := obj.(*batchv1.Job)
if !fedutil.ObjectMetaEquivalent(job.ObjectMeta, ljob.ObjectMeta) {
errors = append(errors, fmt.Errorf("Job meta un-equivalent: %#v (expected) != %#v (actual)", job.ObjectMeta, ljob.ObjectMeta))
}
if err := checkEqual(t, *ljob.Spec.Parallelism, parallelism, "Spec.Parallelism"); err != nil {
errors = append(errors, err)
}
if ljob.Spec.Completions != nil {
if err := checkEqual(t, *ljob.Spec.Completions, completions, "Spec.Completions"); err != nil {
errors = append(errors, err)
}
}
return joinErrors(errors)
}
}
checkFedJob := func(obj runtime.Object) error {
errors := []error{}
return joinErrors(errors)
}
assert.NoError(t, testutil.CheckObjectFromChan(kube1Chan, checkLocalJob(parallelism1, completions1)))
assert.NoError(t, testutil.CheckObjectFromChan(kube2Chan, checkLocalJob(parallelism2, completions2)))
assert.NoError(t, testutil.CheckObjectFromChan(fedChan, checkFedJob))
// finish local jobs
job1, _ := kube1clientset.Batch().Jobs(metav1.NamespaceDefault).Get(job.Name, metav1.GetOptions{})
finishJob(job1, 100*time.Millisecond)
job1, _ = kube1clientset.Batch().Jobs(metav1.NamespaceDefault).UpdateStatus(job1)
job2, _ := kube2clientset.Batch().Jobs(metav1.NamespaceDefault).Get(job.Name, metav1.GetOptions{})
finishJob(job2, 100*time.Millisecond)
job2, _ = kube2clientset.Batch().Jobs(metav1.NamespaceDefault).UpdateStatus(job2)
// check fed job status updated
assert.NoError(t, testutil.CheckObjectFromChan(fedChan, func(obj runtime.Object) error {
errors := []error{}
job := obj.(*batchv1.Job)
if err := checkEqual(t, *job.Spec.Parallelism, *job1.Spec.Parallelism+*job2.Spec.Parallelism, "Spec.Parallelism"); err != nil {
errors = append(errors, err)
}
if job.Spec.Completions != nil {
if err := checkEqual(t, *job.Spec.Completions, *job1.Spec.Completions+*job2.Spec.Completions, "Spec.Completions"); err != nil {
errors = append(errors, err)
}
}
if err := checkEqual(t, job.Status.Succeeded, job1.Status.Succeeded+job2.Status.Succeeded, "Status.Succeeded"); err != nil {
errors = append(errors, err)
}
return joinErrors(errors)
}))
// delete fed job by set deletion time, and remove orphan finalizer
job, _ = fedclientset.Batch().Jobs(metav1.NamespaceDefault).Get(job.Name, metav1.GetOptions{})
deletionTimestamp := metav1.Now()
job.DeletionTimestamp = &deletionTimestamp
finalizersutil.RemoveFinalizers(job, sets.NewString(metav1.FinalizerOrphanDependents))
fedclientset.Batch().Jobs(metav1.NamespaceDefault).Update(job)
// check jobs are deleted
checkDeleted := func(obj runtime.Object) error {
djob := obj.(*batchv1.Job)
deletedJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: djob.Name,
Namespace: djob.Namespace,
},
}
if !reflect.DeepEqual(djob, deletedJob) {
return fmt.Errorf("%s/%s should be deleted", djob.Namespace, djob.Name)
}
return nil
}
assert.NoError(t, testutil.CheckObjectFromChan(kube1Chan, checkDeleted))
assert.NoError(t, testutil.CheckObjectFromChan(kube2Chan, checkDeleted))
assert.NoError(t, testutil.CheckObjectFromChan(fedChan, checkDeleted))
}
test(newJob("job1", 2, 7), 1, 1, 4, 3)
test(newJob("job2", 2, -1), 1, 1, -1, -1)
test(newJob("job3", 7, 2), 4, 3, 1, 1)
test(newJob("job4", 7, 1), 4, 3, 1, 0)
}
func checkEqual(_ *testing.T, expected, actual interface{}, msg string) error {
if !assert.ObjectsAreEqual(expected, actual) {
return fmt.Errorf("%s not equal: %#v (expected) != %#v (actual)", msg, expected, actual)
}
return nil
}
func newJob(name string, parallelism int32, completions int32) *batchv1.Job {
job := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
SelfLink: "/api/v1/namespaces/default/jobs/name",
},
Spec: batchv1.JobSpec{
Parallelism: &parallelism,
Completions: &completions,
Template: apiv1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": name,
},
},
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{Image: "foo/bar"},
},
RestartPolicy: apiv1.RestartPolicyNever,
},
},
},
}
if parallelism < 0 {
job.Spec.Parallelism = nil
}
if completions < 0 {
job.Spec.Completions = nil
}
batchv1internal.SetDefaults_Job(&job)
return &job
}
func newCondition(conditionType batchv1.JobConditionType, reason, message string) batchv1.JobCondition {
return batchv1.JobCondition{
Type: conditionType,
Status: apiv1.ConditionTrue,
LastProbeTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
}
func finishJob(job *batchv1.Job, duration time.Duration) {
job.Status.Conditions = append(job.Status.Conditions, newCondition(batchv1.JobComplete, "", ""))
if job.Spec.Completions == nil {
job.Status.Succeeded = 1
} else {
job.Status.Succeeded = *job.Spec.Completions
}
now := metav1.Now()
job.Status.StartTime = &now
time.Sleep(duration)
now = metav1.Now()
job.Status.CompletionTime = &now
}

View File

@ -71,6 +71,7 @@ func NewTriggerOnMetaAndSpecChanges(triggerFunc func(pkgruntime.Object)) *cache.
oldMeta := getFieldOrPanic(old, "ObjectMeta").(metav1.ObjectMeta)
curMeta := getFieldOrPanic(cur, "ObjectMeta").(metav1.ObjectMeta)
if !ObjectMetaEquivalent(oldMeta, curMeta) ||
!reflect.DeepEqual(oldMeta.DeletionTimestamp, curMeta.DeletionTimestamp) ||
!reflect.DeepEqual(getFieldOrPanic(old, "Spec"), getFieldOrPanic(cur, "Spec")) {
triggerFunc(curObj)
}

View File

@ -119,6 +119,7 @@ cni-conf-dir
concurrent-deployment-syncs
concurrent-endpoint-syncs
concurrent-gc-syncs
concurrent-job-syncs
concurrent-namespace-syncs
concurrent-replicaset-syncs
concurrent-resource-quota-syncs

View File

@ -15,6 +15,7 @@ go_library(
"crud.go",
"event.go",
"ingress.go",
"job.go",
"namespace.go",
"replicaset.go",
"service.go",
@ -29,6 +30,7 @@ go_library(
"//federation/client/clientset_generated/federation_clientset/typed/core/v1:go_default_library",
"//federation/pkg/federatedtypes:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//test/e2e/chaosmonkey:go_default_library",
@ -38,6 +40,7 @@ go_library(
"//test/e2e_federation/upgrades:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/k8s.io/api/batch/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",

291
test/e2e_federation/job.go Normal file
View File

@ -0,0 +1,291 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_federation
import (
"fmt"
"strings"
"time"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/test/e2e/framework"
fedframework "k8s.io/kubernetes/test/e2e_federation/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/kubernetes/pkg/api"
)
const (
FederationJobName = "federation-job"
)
var _ = framework.KubeDescribe("Federation jobs [Feature:Federation]", func() {
f := fedframework.NewDefaultFederatedFramework("federation-job")
Describe("Job objects [NoCluster]", func() {
AfterEach(func() {
fedframework.SkipUnlessFederated(f.ClientSet)
// Delete all jobs.
nsName := f.FederationNamespace.Name
deleteAllJobsOrFail(f.FederationClientset, nsName)
})
It("should be created and deleted successfully", func() {
fedframework.SkipUnlessFederated(f.ClientSet)
nsName := f.FederationNamespace.Name
job := createJobOrFail(f.FederationClientset, nsName)
By(fmt.Sprintf("Creation of job %q in namespace %q succeeded. Deleting job.", job.Name, nsName))
// Cleanup
err := f.FederationClientset.Batch().Jobs(nsName).Delete(job.Name, &metav1.DeleteOptions{})
framework.ExpectNoError(err, "Error deleting job %q in namespace %q", job.Name, job.Namespace)
By(fmt.Sprintf("Deletion of job %q in namespace %q succeeded.", job.Name, nsName))
})
})
// e2e cases for federated job controller
Describe("Federated Job", func() {
var (
clusters fedframework.ClusterSlice
)
BeforeEach(func() {
fedframework.SkipUnlessFederated(f.ClientSet)
clusters = f.GetRegisteredClusters()
})
AfterEach(func() {
nsName := f.FederationNamespace.Name
deleteAllJobsOrFail(f.FederationClientset, nsName)
})
It("should create and update matching jobs in underlying clusters", func() {
nsName := f.FederationNamespace.Name
job := createJobOrFail(f.FederationClientset, nsName)
defer func() {
// cleanup. deletion of jobs is not supported for underlying clusters
By(fmt.Sprintf("Deleting job %q/%q", nsName, job.Name))
waitForJobOrFail(f.FederationClientset, nsName, job.Name, clusters)
f.FederationClientset.Batch().Jobs(nsName).Delete(job.Name, &metav1.DeleteOptions{})
}()
waitForJobOrFail(f.FederationClientset, nsName, job.Name, clusters)
By(fmt.Sprintf("Successfuly created and synced job %q/%q to clusters", nsName, job.Name))
})
It("should be deleted from underlying clusters when OrphanDependents is false", func() {
fedframework.SkipUnlessFederated(f.ClientSet)
nsName := f.FederationNamespace.Name
orphanDependents := false
verifyCascadingDeletionForJob(f.FederationClientset, clusters, &orphanDependents, nsName)
By(fmt.Sprintf("Verified that jobs were deleted from underlying clusters"))
})
It("should not be deleted from underlying clusters when OrphanDependents is true", func() {
fedframework.SkipUnlessFederated(f.ClientSet)
nsName := f.FederationNamespace.Name
orphanDependents := true
verifyCascadingDeletionForJob(f.FederationClientset, clusters, &orphanDependents, nsName)
By(fmt.Sprintf("Verified that jobs were not deleted from underlying clusters"))
})
It("should not be deleted from underlying clusters when OrphanDependents is nil", func() {
fedframework.SkipUnlessFederated(f.ClientSet)
nsName := f.FederationNamespace.Name
verifyCascadingDeletionForJob(f.FederationClientset, clusters, nil, nsName)
By(fmt.Sprintf("Verified that jobs were not deleted from underlying clusters"))
})
})
})
// deleteAllJobsOrFail deletes all jobs in the given namespace name.
func deleteAllJobsOrFail(clientset *fedclientset.Clientset, nsName string) {
jobList, err := clientset.Batch().Jobs(nsName).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
orphanDependents := false
for _, job := range jobList.Items {
deleteJobOrFail(clientset, nsName, job.Name, &orphanDependents)
}
}
// verifyCascadingDeletionForJob verifies that job are deleted
// from underlying clusters when orphan dependents is false and they are not
// deleted when orphan dependents is true.
func verifyCascadingDeletionForJob(clientset *fedclientset.Clientset, clusters fedframework.ClusterSlice, orphanDependents *bool, nsName string) {
job := createJobOrFail(clientset, nsName)
jobName := job.Name
// Check subclusters if the job was created there.
By(fmt.Sprintf("Waiting for job %s to be created in all underlying clusters", jobName))
err := wait.Poll(5*time.Second, 2*time.Minute, func() (bool, error) {
for _, cluster := range clusters {
_, err := cluster.Batch().Jobs(nsName).Get(jobName, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
}
return true, nil
})
framework.ExpectNoError(err, "Not all jobs created")
By(fmt.Sprintf("Deleting job %s", jobName))
deleteJobOrFail(clientset, nsName, jobName, orphanDependents)
By(fmt.Sprintf("Verifying job %s in underlying clusters", jobName))
errMessages := []string{}
// job should be present in underlying clusters unless orphanDependents is false.
shouldExist := orphanDependents == nil || *orphanDependents == true
for _, cluster := range clusters {
clusterName := cluster.Name
_, err := cluster.Batch().Jobs(nsName).Get(jobName, metav1.GetOptions{})
if shouldExist && errors.IsNotFound(err) {
errMessages = append(errMessages, fmt.Sprintf("unexpected NotFound error for job %s in cluster %s, expected job to exist", jobName, clusterName))
} else if !shouldExist && !errors.IsNotFound(err) {
errMessages = append(errMessages, fmt.Sprintf("expected NotFound error for job %s in cluster %s, got error: %v", jobName, clusterName, err))
}
}
if len(errMessages) != 0 {
framework.Failf("%s", strings.Join(errMessages, "; "))
}
}
func waitForJobOrFail(c *fedclientset.Clientset, namespace string, jobName string, clusters fedframework.ClusterSlice) {
err := waitForJob(c, namespace, jobName, clusters)
framework.ExpectNoError(err, "Failed to verify job %q/%q, err: %v", namespace, jobName, err)
}
func waitForJob(c *fedclientset.Clientset, namespace string, jobName string, clusters fedframework.ClusterSlice) error {
err := wait.Poll(10*time.Second, fedframework.FederatedDefaultTestTimeout, func() (bool, error) {
fjob, err := c.Batch().Jobs(namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
return false, err
}
succeeded := int32(0)
for _, cluster := range clusters {
job, err := cluster.Batch().Jobs(namespace).Get(jobName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
By(fmt.Sprintf("Failed getting job: %q/%q/%q, err: %v", cluster.Name, namespace, jobName, err))
return false, err
}
if err == nil {
if !verifyJob(fjob, job) {
By(fmt.Sprintf("Job meta or spec not match for cluster %q:\n federation: %v\n cluster: %v", cluster.Name, fjob, job))
return false, nil
}
succeeded += job.Status.Succeeded
}
}
if succeeded == fjob.Status.Succeeded &&
(fjob.Spec.Completions != nil && succeeded == *fjob.Spec.Completions) {
return true, nil
}
By(fmt.Sprintf("Job statuses not match, federation succeeded: %v/%v, clusters succeeded: %v\n",
fjob.Status.Succeeded, func(p *int32) int32 {
if p != nil {
return *p
} else {
return -1
}
}(fjob.Spec.Completions), succeeded))
return false, nil
})
return err
}
func verifyJob(fedJob, localJob *batchv1.Job) bool {
localJobObj, _ := api.Scheme.DeepCopy(localJob)
localJob = localJobObj.(*batchv1.Job)
localJob.Spec.ManualSelector = fedJob.Spec.ManualSelector
localJob.Spec.Completions = fedJob.Spec.Completions
localJob.Spec.Parallelism = fedJob.Spec.Parallelism
return fedutil.ObjectMetaAndSpecEquivalent(fedJob, localJob)
}
func createJobOrFail(clientset *fedclientset.Clientset, namespace string) *batchv1.Job {
if clientset == nil || len(namespace) == 0 {
Fail(fmt.Sprintf("Internal error: invalid parameters passed to createJobOrFail: clientset: %v, namespace: %v", clientset, namespace))
}
By(fmt.Sprintf("Creating federation job %q in namespace %q", FederationJobName, namespace))
job := newJobForFed(namespace, FederationJobName, 5, 5)
_, err := clientset.Batch().Jobs(namespace).Create(job)
framework.ExpectNoError(err, "Creating job %q in namespace %q", job.Name, namespace)
By(fmt.Sprintf("Successfully created federation job %q in namespace %q", FederationJobName, namespace))
return job
}
func deleteJobOrFail(clientset *fedclientset.Clientset, nsName string, jobName string, orphanDependents *bool) {
By(fmt.Sprintf("Deleting job %q in namespace %q", jobName, nsName))
err := clientset.Batch().Jobs(nsName).Delete(jobName, &metav1.DeleteOptions{OrphanDependents: orphanDependents})
if err != nil && !errors.IsNotFound(err) {
framework.ExpectNoError(err, "Error deleting job %q in namespace %q", jobName, nsName)
}
// Wait for the job to be deleted.
err = wait.Poll(10*time.Second, fedframework.FederatedDefaultTestTimeout, func() (bool, error) {
_, err := clientset.Batch().Jobs(nsName).Get(jobName, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
return true, nil
}
return false, err
})
if err != nil {
framework.Failf("Error in deleting job %s: %v", jobName, err)
}
}
func newJobForFed(namespace string, name string, completions int32, parallelism int32) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: batchv1.JobSpec{
Parallelism: &parallelism,
Completions: &completions,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"name": "fjob"},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "sleep",
Image: "gcr.io/google_containers/busybox:1.24",
Command: []string{"sleep", "1"},
},
},
RestartPolicy: v1.RestartPolicyNever,
},
},
},
}
}