Job status updates.

Added status REST storage.
Added validation for Status Updates.
Changed job controller to update status rather than just job
(which ignores status updates).
This commit is contained in:
Eric Tune 2015-09-22 01:05:54 -07:00
parent b113b5ac2a
commit 95a44be2ef
6 changed files with 80 additions and 15 deletions

View File

@ -312,6 +312,21 @@ func ValidateJobSpec(spec *experimental.JobSpec) errs.ValidationErrorList {
return allErrs
}
func ValidateJobStatus(status *experimental.JobStatus) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
if status.Active < 0 {
allErrs = append(allErrs, errs.NewFieldInvalid("active", status.Active, isNegativeErrorMsg))
}
if status.Successful < 0 {
allErrs = append(allErrs, errs.NewFieldInvalid("successful", status.Successful, isNegativeErrorMsg))
}
if status.Unsuccessful < 0 {
allErrs = append(allErrs, errs.NewFieldInvalid("unsuccessful", status.Unsuccessful, isNegativeErrorMsg))
}
return allErrs
}
func ValidateJobUpdate(oldJob, job *experimental.Job) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allErrs = append(allErrs, apivalidation.ValidateObjectMetaUpdate(&oldJob.ObjectMeta, &job.ObjectMeta).Prefix("metadata")...)
@ -319,6 +334,13 @@ func ValidateJobUpdate(oldJob, job *experimental.Job) errs.ValidationErrorList {
return allErrs
}
func ValidateJobUpdateStatus(oldJob, job *experimental.Job) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allErrs = append(allErrs, apivalidation.ValidateObjectMetaUpdate(&oldJob.ObjectMeta, &job.ObjectMeta).Prefix("metadata")...)
allErrs = append(allErrs, ValidateJobStatusUpdate(oldJob.Status, job.Status).Prefix("status")...)
return allErrs
}
func ValidateJobSpecUpdate(oldSpec, spec experimental.JobSpec) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allErrs = append(allErrs, ValidateJobSpec(&spec)...)
@ -333,3 +355,9 @@ func ValidateJobSpecUpdate(oldSpec, spec experimental.JobSpec) errs.ValidationEr
}
return allErrs
}
func ValidateJobStatusUpdate(oldStatus, status experimental.JobStatus) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allErrs = append(allErrs, ValidateJobStatus(&status)...)
return allErrs
}

View File

@ -44,7 +44,7 @@ type JobController struct {
kubeClient client.Interface
podControl controller.PodControlInterface
// To allow injection of updateJob for testing.
// To allow injection of updateJobStatus for testing.
updateHandler func(job *experimental.Job) error
syncHandler func(jobKey string) error
// podStoreSynced returns true if the pod store has been synced at least once.
@ -127,7 +127,7 @@ func NewJobController(kubeClient client.Interface) *JobController {
},
)
jm.updateHandler = jm.updateJob
jm.updateHandler = jm.updateJobStatus
jm.syncHandler = jm.syncJob
jm.podStoreSynced = jm.podController.HasSynced
return jm
@ -433,8 +433,8 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf
return active
}
func (jm *JobController) updateJob(job *experimental.Job) error {
_, err := jm.kubeClient.Experimental().Jobs(job.Namespace).Update(job)
func (jm *JobController) updateJobStatus(job *experimental.Job) error {
_, err := jm.kubeClient.Experimental().Jobs(job.Namespace).UpdateStatus(job)
return err
}

View File

@ -960,7 +960,7 @@ func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion {
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(c.ExpDatabaseStorage)
daemonSetStorage := daemonetcd.NewREST(c.ExpDatabaseStorage)
deploymentStorage := deploymentetcd.NewStorage(c.ExpDatabaseStorage)
jobStorage := jobetcd.NewREST(c.ExpDatabaseStorage)
jobStorage, jobStatusStorage := jobetcd.NewREST(c.ExpDatabaseStorage)
thirdPartyControl := ThirdPartyController{
master: m,
@ -982,6 +982,7 @@ func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion {
strings.ToLower("deployments"): deploymentStorage.Deployment,
strings.ToLower("deployments/scale"): deploymentStorage.Scale,
strings.ToLower("jobs"): jobStorage,
strings.ToLower("jobs/status"): jobStatusStorage,
}
return &apiserver.APIGroupVersion{

View File

@ -38,7 +38,7 @@ type REST struct {
var jobPrefix = "/jobs"
// NewREST returns a RESTStorage object that will work against Jobs.
func NewREST(s storage.Interface) *REST {
func NewREST(s storage.Interface) (*REST, *StatusREST) {
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &experimental.Job{} },
@ -73,5 +73,22 @@ func NewREST(s storage.Interface) *REST {
Storage: s,
}
return &REST{store}
statusStore := *store
statusStore.UpdateStrategy = job.StatusStrategy
return &REST{store}, &StatusREST{store: &statusStore}
}
// StatusREST implements the REST endpoint for changing the status of a resourcequota.
type StatusREST struct {
store *etcdgeneric.Etcd
}
func (r *StatusREST) New() runtime.Object {
return &experimental.Job{}
}
// Update alters the status subset of an object.
func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
return r.store.Update(ctx, obj)
}

View File

@ -30,9 +30,10 @@ import (
"k8s.io/kubernetes/pkg/tools"
)
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
func newStorage(t *testing.T) (*REST, *StatusREST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "experimental")
return NewREST(etcdStorage), fakeClient
storage, statusStorage := NewREST(etcdStorage)
return storage, statusStorage, fakeClient
}
func validNewJob() *experimental.Job {
@ -68,7 +69,7 @@ func validNewJob() *experimental.Job {
}
func TestCreate(t *testing.T) {
storage, fakeClient := newStorage(t)
storage, _, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
validJob := validNewJob()
validJob.ObjectMeta = api.ObjectMeta{}
@ -87,7 +88,7 @@ func TestCreate(t *testing.T) {
}
func TestUpdate(t *testing.T) {
storage, fakeClient := newStorage(t)
storage, _, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
two := 2
test.TestUpdate(
@ -114,25 +115,25 @@ func TestUpdate(t *testing.T) {
}
func TestDelete(t *testing.T) {
storage, fakeClient := newStorage(t)
storage, _, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestDelete(validNewJob())
}
func TestGet(t *testing.T) {
storage, fakeClient := newStorage(t)
storage, _, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestGet(validNewJob())
}
func TestList(t *testing.T) {
storage, fakeClient := newStorage(t)
storage, _, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestList(validNewJob())
}
func TestWatch(t *testing.T) {
storage, fakeClient := newStorage(t)
storage, _, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestWatch(
validNewJob(),
@ -151,3 +152,5 @@ func TestWatch(t *testing.T) {
},
)
}
// TODO: test update /status

View File

@ -79,6 +79,22 @@ func (jobStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fiel
return append(validationErrorList, updateErrorList...)
}
type jobStatusStrategy struct {
jobStrategy
}
var StatusStrategy = jobStatusStrategy{Strategy}
func (jobStatusStrategy) PrepareForUpdate(obj, old runtime.Object) {
newJob := obj.(*experimental.Job)
oldJob := old.(*experimental.Job)
newJob.Spec = oldJob.Spec
}
func (jobStatusStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielderrors.ValidationErrorList {
return validation.ValidateJobUpdateStatus(obj.(*experimental.Job), old.(*experimental.Job))
}
// JobSelectableFields returns a field set that represents the object for matching purposes.
func JobToSelectableFields(job *experimental.Job) fields.Set {
return fields.Set{