mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Retry job update after failure to prevent modification conflict
This commit is contained in:
parent
3b43ce8e5c
commit
b253c20d80
@ -204,8 +204,9 @@ var _ = framework.KubeDescribe("V1Job", func() {
|
||||
// the job stabilized and won't be synced until modification or full
|
||||
// resync happens, we don't want to wait for the latter so we force
|
||||
// sync modifying it
|
||||
job.Spec.Parallelism = &completions
|
||||
job, err = updateV1Job(f.ClientSet, f.Namespace.Name, job)
|
||||
_, err = framework.UpdateJobWithRetries(f.ClientSet, f.Namespace.Name, job.Name, func(update *batch.Job) {
|
||||
update.Spec.Parallelism = &completions
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = waitForV1JobFail(f.ClientSet, f.Namespace.Name, job.Name, v1JobTimeout)
|
||||
}
|
||||
|
@ -39,6 +39,7 @@ go_library(
|
||||
"//pkg/api/validation:go_default_library",
|
||||
"//pkg/apimachinery/registered:go_default_library",
|
||||
"//pkg/apis/apps:go_default_library",
|
||||
"//pkg/apis/batch:go_default_library",
|
||||
"//pkg/apis/componentconfig:go_default_library",
|
||||
"//pkg/apis/extensions:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||
|
@ -49,6 +49,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/apis/apps"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
|
||||
@ -3388,6 +3389,30 @@ func UpdateStatefulSetWithRetries(c clientset.Interface, namespace, name string,
|
||||
return statefulSet, pollErr
|
||||
}
|
||||
|
||||
type updateJobFunc func(*batch.Job)
|
||||
|
||||
func UpdateJobWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateJobFunc) (job *batch.Job, err error) {
|
||||
jobs := c.Batch().Jobs(namespace)
|
||||
var updateErr error
|
||||
pollErr := wait.PollImmediate(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
|
||||
if job, err = jobs.Get(name); err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Apply the update, then attempt to push it to the apiserver.
|
||||
applyUpdate(job)
|
||||
if job, err = jobs.Update(job); err == nil {
|
||||
Logf("Updating job %s", name)
|
||||
return true, nil
|
||||
}
|
||||
updateErr = err
|
||||
return false, nil
|
||||
})
|
||||
if pollErr == wait.ErrWaitTimeout {
|
||||
pollErr = fmt.Errorf("couldn't apply the provided updated to job %q: %v", name, updateErr)
|
||||
}
|
||||
return job, pollErr
|
||||
}
|
||||
|
||||
// NodeAddresses returns the first address of the given type of each node.
|
||||
func NodeAddresses(nodelist *api.NodeList, addrType api.NodeAddressType) []string {
|
||||
hosts := []string{}
|
||||
|
@ -193,8 +193,9 @@ var _ = framework.KubeDescribe("Job", func() {
|
||||
// the job stabilized and won't be synced until modification or full
|
||||
// resync happens, we don't want to wait for the latter so we force
|
||||
// sync modifying it
|
||||
job.Spec.Parallelism = &completions
|
||||
job, err = updateJob(f.ClientSet, f.Namespace.Name, job)
|
||||
_, err = framework.UpdateJobWithRetries(f.ClientSet, f.Namespace.Name, job.Name, func(update *batch.Job) {
|
||||
update.Spec.Parallelism = &completions
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = waitForJobFail(f.ClientSet, f.Namespace.Name, job.Name, jobTimeout)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user