From b253c20d80f307db608abbfcce58fb6495f48e5a Mon Sep 17 00:00:00 2001 From: Maciej Szulik Date: Fri, 18 Nov 2016 11:47:02 +0100 Subject: [PATCH] Retry job update after failure to prevent modification conflict --- test/e2e/batch_v1_jobs.go | 5 +++-- test/e2e/framework/BUILD | 1 + test/e2e/framework/util.go | 25 +++++++++++++++++++++++++ test/e2e/job.go | 5 +++-- 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/test/e2e/batch_v1_jobs.go b/test/e2e/batch_v1_jobs.go index 4e3d4b18c41..71d582f1191 100644 --- a/test/e2e/batch_v1_jobs.go +++ b/test/e2e/batch_v1_jobs.go @@ -204,8 +204,9 @@ var _ = framework.KubeDescribe("V1Job", func() { // the job stabilized and won't be synced until modification or full // resync happens, we don't want to wait for the latter so we force // sync modifying it - job.Spec.Parallelism = &completions - job, err = updateV1Job(f.ClientSet, f.Namespace.Name, job) + _, err = framework.UpdateJobWithRetries(f.ClientSet, f.Namespace.Name, job.Name, func(update *batch.Job) { + update.Spec.Parallelism = &completions + }) Expect(err).NotTo(HaveOccurred()) err = waitForV1JobFail(f.ClientSet, f.Namespace.Name, job.Name, v1JobTimeout) } diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index a918386d0da..519bf11fc8e 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -39,6 +39,7 @@ go_library( "//pkg/api/validation:go_default_library", "//pkg/apimachinery/registered:go_default_library", "//pkg/apis/apps:go_default_library", + "//pkg/apis/batch:go_default_library", "//pkg/apis/componentconfig:go_default_library", "//pkg/apis/extensions:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index a950abd34d0..891b355dabe 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -49,6 +49,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" @@ -3388,6 +3389,30 @@ func UpdateStatefulSetWithRetries(c clientset.Interface, namespace, name string, return statefulSet, pollErr } +type updateJobFunc func(*batch.Job) + +func UpdateJobWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateJobFunc) (job *batch.Job, err error) { + jobs := c.Batch().Jobs(namespace) + var updateErr error + pollErr := wait.PollImmediate(10*time.Millisecond, 1*time.Minute, func() (bool, error) { + if job, err = jobs.Get(name); err != nil { + return false, err + } + // Apply the update, then attempt to push it to the apiserver. + applyUpdate(job) + if job, err = jobs.Update(job); err == nil { + Logf("Updating job %s", name) + return true, nil + } + updateErr = err + return false, nil + }) + if pollErr == wait.ErrWaitTimeout { + pollErr = fmt.Errorf("couldn't apply the provided updated to job %q: %v", name, updateErr) + } + return job, pollErr +} + // NodeAddresses returns the first address of the given type of each node. func NodeAddresses(nodelist *api.NodeList, addrType api.NodeAddressType) []string { hosts := []string{} diff --git a/test/e2e/job.go b/test/e2e/job.go index 7f2192085ad..dac326b43af 100644 --- a/test/e2e/job.go +++ b/test/e2e/job.go @@ -193,8 +193,9 @@ var _ = framework.KubeDescribe("Job", func() { // the job stabilized and won't be synced until modification or full // resync happens, we don't want to wait for the latter so we force // sync modifying it - job.Spec.Parallelism = &completions - job, err = updateJob(f.ClientSet, f.Namespace.Name, job) + _, err = framework.UpdateJobWithRetries(f.ClientSet, f.Namespace.Name, job.Name, func(update *batch.Job) { + update.Spec.Parallelism = &completions + }) Expect(err).NotTo(HaveOccurred()) err = waitForJobFail(f.ClientSet, f.Namespace.Name, job.Name, jobTimeout) }