Merge pull request #25569 from soltysh/scheduledjob_storage

Automatic merge from submit-queue

Scheduledjob storage

This builds on top of #25475 so only last commit is significant. There's still one small problem with conversions, I'm currently working, but the biggest is still multi-version client tests. Read on...

@erictune unfortunately the multi-version tests are biting again, this time in `pkg/registry/scheduledjob/etcd`. If I run the tests with just `batch/v2alpha1` then these are working correctly, but we can't have only `batch/v2alpha1` enabled since `batch/v1` is the preferred version for the entire `batch/` group. We either going to skip the tests here like we did with the scheduledjob client (I'm talking about `pkg/registry/scheduledjob/etcd/etcd_test.go`) or fix it. 

I've talked with @deads2k about how we can plumb the tests so it just passes, but there's no simple solution we can come up with 😞 (other than the one from #25566), unless @caesarxuchao can chime in and propose something.

```release-note
Introducing ScheduledJobs as described in [the proposal](https://github.com/kubernetes/kubernetes/blob/master/docs/proposals/scheduledjob.md) as part of `batch/v2alpha1` version (experimental feature).
```
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/.github/PULL_REQUEST_TEMPLATE.md?pixel)]()
This commit is contained in:
k8s-merge-robot 2016-05-22 01:45:22 -07:00
commit 3b74a6b0ef
6 changed files with 442 additions and 9 deletions

View File

@ -39,15 +39,18 @@ func addConversionFuncs(scheme *runtime.Scheme) {
panic(err) panic(err)
} }
err = api.Scheme.AddFieldLabelConversionFunc("batch/v2alpha1", "Job", // Add field label conversions for kinds having selectable nothing but ObjectMeta fields.
func(label, value string) (string, string, error) { for _, kind := range []string{"Job", "JobTemplate", "ScheduledJob"} {
switch label { err = api.Scheme.AddFieldLabelConversionFunc("batch/v2alpha1", kind,
case "metadata.name", "metadata.namespace", "status.successful": func(label, value string) (string, string, error) {
return label, value, nil switch label {
default: case "metadata.name", "metadata.namespace", "status.successful":
return "", "", fmt.Errorf("field label not supported: %s", label) return label, value, nil
} default:
}) return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
}
if err != nil { if err != nil {
// If one of the conversion functions is malformed, detect it immediately. // If one of the conversion functions is malformed, detect it immediately.
panic(err) panic(err)

View File

@ -47,6 +47,7 @@ const (
PodTemplates Resource = "podtemplates" PodTemplates Resource = "podtemplates"
Replicasets Resource = "replicasets" Replicasets Resource = "replicasets"
ResourceQuotas Resource = "resourcequotas" ResourceQuotas Resource = "resourcequotas"
ScheduledJobs Resource = "scheduledjobs"
Secrets Resource = "secrets" Secrets Resource = "secrets"
ServiceAccounts Resource = "serviceaccounts" ServiceAccounts Resource = "serviceaccounts"
Services Resource = "services" Services Resource = "services"
@ -75,6 +76,7 @@ func init() {
watchCacheSizes[PodTemplates] = 100 watchCacheSizes[PodTemplates] = 100
watchCacheSizes[Replicasets] = 100 watchCacheSizes[Replicasets] = 100
watchCacheSizes[ResourceQuotas] = 100 watchCacheSizes[ResourceQuotas] = 100
watchCacheSizes[ScheduledJobs] = 100
watchCacheSizes[Secrets] = 100 watchCacheSizes[Secrets] = 100
watchCacheSizes[ServiceAccounts] = 100 watchCacheSizes[ServiceAccounts] = 100
watchCacheSizes[Services] = 100 watchCacheSizes[Services] = 100

View File

@ -0,0 +1,19 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package scheduledjob provides Registry interface and it's RESTStorage
// implementation for storing ScheduledJob api objects.
package scheduledjob

View File

@ -0,0 +1,98 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/scheduledjob"
"k8s.io/kubernetes/pkg/runtime"
)
// REST implements a RESTStorage for scheduled jobs against etcd
type REST struct {
*registry.Store
}
// NewREST returns a RESTStorage object that will work against ScheduledJobs.
func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/scheduledjobs"
newListFunc := func() runtime.Object { return &batch.ScheduledJobList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.ScheduledJobs), &batch.ScheduledJob{}, prefix, scheduledjob.Strategy, newListFunc)
store := &registry.Store{
NewFunc: func() runtime.Object { return &batch.ScheduledJob{} },
// NewListFunc returns an object capable of storing results of an etcd list.
NewListFunc: newListFunc,
// Produces a path that etcd understands, to the root of the resource
// by combining the namespace in the context with the given prefix
KeyRootFunc: func(ctx api.Context) string {
return registry.NamespaceKeyRootFunc(ctx, prefix)
},
// Produces a path that etcd understands, to the resource by combining
// the namespace in the context with the given prefix
KeyFunc: func(ctx api.Context, name string) (string, error) {
return registry.NamespaceKeyFunc(ctx, prefix, name)
},
// Retrieve the name field of a scheduled job
ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*batch.ScheduledJob).Name, nil
},
// Used to match objects based on labels/fields for list and watch
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
return scheduledjob.MatchScheduledJob(label, field)
},
QualifiedResource: batch.Resource("scheduledjobs"),
DeleteCollectionWorkers: opts.DeleteCollectionWorkers,
// Used to validate scheduled job creation
CreateStrategy: scheduledjob.Strategy,
// Used to validate scheduled job updates
UpdateStrategy: scheduledjob.Strategy,
DeleteStrategy: scheduledjob.Strategy,
Storage: storageInterface,
}
statusStore := *store
statusStore.UpdateStrategy = scheduledjob.StatusStrategy
return &REST{store}, &StatusREST{store: &statusStore}
}
// StatusREST implements the REST endpoint for changing the status of a resourcequota.
type StatusREST struct {
store *registry.Store
}
func (r *StatusREST) New() runtime.Object {
return &batch.ScheduledJob{}
}
// Update alters the status subset of an object.
func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
return r.store.Update(ctx, obj)
}

View File

@ -0,0 +1,119 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/batch/validation"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/validation/field"
)
// scheduledJobStrategy implements verification logic for Replication Controllers.
type scheduledJobStrategy struct {
runtime.ObjectTyper
api.NameGenerator
}
// Strategy is the default logic that applies when creating and updating ScheduledJob objects.
var Strategy = scheduledJobStrategy{api.Scheme, api.SimpleNameGenerator}
// NamespaceScoped returns true because all scheduled jobs need to be within a namespace.
func (scheduledJobStrategy) NamespaceScoped() bool {
return true
}
// PrepareForCreate clears the status of a scheduled job before creation.
func (scheduledJobStrategy) PrepareForCreate(obj runtime.Object) {
scheduledJob := obj.(*batch.ScheduledJob)
scheduledJob.Status = batch.ScheduledJobStatus{}
}
// PrepareForUpdate clears fields that are not allowed to be set by end users on update.
func (scheduledJobStrategy) PrepareForUpdate(obj, old runtime.Object) {
newScheduledJob := obj.(*batch.ScheduledJob)
oldScheduledJob := old.(*batch.ScheduledJob)
newScheduledJob.Status = oldScheduledJob.Status
}
// Validate validates a new scheduled job.
func (scheduledJobStrategy) Validate(ctx api.Context, obj runtime.Object) field.ErrorList {
scheduledJob := obj.(*batch.ScheduledJob)
return validation.ValidateScheduledJob(scheduledJob)
}
// Canonicalize normalizes the object after validation.
func (scheduledJobStrategy) Canonicalize(obj runtime.Object) {
}
func (scheduledJobStrategy) AllowUnconditionalUpdate() bool {
return true
}
// AllowCreateOnUpdate is false for scheduled jobs; this means a POST is needed to create one.
func (scheduledJobStrategy) AllowCreateOnUpdate() bool {
return false
}
// ValidateUpdate is the default update validation for an end user.
func (scheduledJobStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) field.ErrorList {
return validation.ValidateScheduledJob(obj.(*batch.ScheduledJob))
}
type scheduledJobStatusStrategy struct {
scheduledJobStrategy
}
var StatusStrategy = scheduledJobStatusStrategy{Strategy}
func (scheduledJobStatusStrategy) PrepareForUpdate(obj, old runtime.Object) {
newJob := obj.(*batch.ScheduledJob)
oldJob := old.(*batch.ScheduledJob)
newJob.Spec = oldJob.Spec
}
func (scheduledJobStatusStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) field.ErrorList {
return field.ErrorList{}
}
// ScheduledJobToSelectableFields returns a field set that represents the object for matching purposes.
func ScheduledJobToSelectableFields(scheduledJob *batch.ScheduledJob) fields.Set {
return generic.ObjectMetaFieldsSet(scheduledJob.ObjectMeta, true)
}
// MatchScheduledJob is the filter used by the generic etcd backend to route
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchScheduledJob(label labels.Selector, field fields.Selector) generic.Matcher {
return &generic.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
scheduledJob, ok := obj.(*batch.ScheduledJob)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a scheduled job.")
}
return labels.Set(scheduledJob.ObjectMeta.Labels), ScheduledJobToSelectableFields(scheduledJob), nil
},
}
}

View File

@ -0,0 +1,192 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduledjob
import (
"testing"
"k8s.io/kubernetes/pkg/api"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/labels"
)
func newBool(a bool) *bool {
r := new(bool)
*r = a
return r
}
func TestScheduledJobStrategy(t *testing.T) {
ctx := api.NewDefaultContext()
if !Strategy.NamespaceScoped() {
t.Errorf("ScheduledJob must be namespace scoped")
}
if Strategy.AllowCreateOnUpdate() {
t.Errorf("ScheduledJob should not allow create on update")
}
validSelector := &unversioned.LabelSelector{
MatchLabels: map[string]string{"a": "b"},
}
validPodTemplateSpec := api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: validSelector.MatchLabels,
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyOnFailure,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent"}},
},
}
scheduledJob := &batch.ScheduledJob{
ObjectMeta: api.ObjectMeta{
Name: "myscheduledjob",
Namespace: api.NamespaceDefault,
},
Spec: batch.ScheduledJobSpec{
Schedule: "* * * * * ?",
ConcurrencyPolicy: batch.AllowConcurrent,
JobTemplate: batch.JobTemplateSpec{
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: newBool(true),
},
},
},
}
Strategy.PrepareForCreate(scheduledJob)
if len(scheduledJob.Status.Active) != 0 {
t.Errorf("ScheduledJob does not allow setting status on create")
}
errs := Strategy.Validate(ctx, scheduledJob)
if len(errs) != 0 {
t.Errorf("Unexpected error validating %v", errs)
}
now := unversioned.Now()
updatedScheduledJob := &batch.ScheduledJob{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "4"},
Spec: batch.ScheduledJobSpec{
Schedule: "5 5 5 5 * ?",
},
Status: batch.ScheduledJobStatus{
LastScheduleTime: &now,
},
}
// ensure we do not change status
Strategy.PrepareForUpdate(updatedScheduledJob, scheduledJob)
if updatedScheduledJob.Status.Active != nil {
t.Errorf("PrepareForUpdate should have preserved prior version status")
}
errs = Strategy.ValidateUpdate(ctx, updatedScheduledJob, scheduledJob)
if len(errs) == 0 {
t.Errorf("Expected a validation error")
}
}
func TestScheduledJobStatusStrategy(t *testing.T) {
ctx := api.NewDefaultContext()
if !StatusStrategy.NamespaceScoped() {
t.Errorf("ScheduledJob must be namespace scoped")
}
if StatusStrategy.AllowCreateOnUpdate() {
t.Errorf("ScheduledJob should not allow create on update")
}
validSelector := &unversioned.LabelSelector{
MatchLabels: map[string]string{"a": "b"},
}
validPodTemplateSpec := api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: validSelector.MatchLabels,
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyOnFailure,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent"}},
},
}
oldSchedule := "* * * * * ?"
oldScheduledJob := &batch.ScheduledJob{
ObjectMeta: api.ObjectMeta{
Name: "myscheduledjob",
Namespace: api.NamespaceDefault,
ResourceVersion: "10",
},
Spec: batch.ScheduledJobSpec{
Schedule: oldSchedule,
ConcurrencyPolicy: batch.AllowConcurrent,
JobTemplate: batch.JobTemplateSpec{
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: newBool(true),
},
},
},
}
now := unversioned.Now()
newScheduledJob := &batch.ScheduledJob{
ObjectMeta: api.ObjectMeta{
Name: "myscheduledjob",
Namespace: api.NamespaceDefault,
ResourceVersion: "9",
},
Spec: batch.ScheduledJobSpec{
Schedule: "5 5 5 * * ?",
ConcurrencyPolicy: batch.AllowConcurrent,
JobTemplate: batch.JobTemplateSpec{
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: newBool(true),
},
},
},
Status: batch.ScheduledJobStatus{
LastScheduleTime: &now,
},
}
StatusStrategy.PrepareForUpdate(newScheduledJob, oldScheduledJob)
if newScheduledJob.Status.LastScheduleTime == nil {
t.Errorf("ScheduledJob status updates must allow changes to scheduledJob status")
}
if newScheduledJob.Spec.Schedule != oldSchedule {
t.Errorf("ScheduledJob status updates must now allow changes to scheduledJob spec")
}
errs := StatusStrategy.ValidateUpdate(ctx, newScheduledJob, oldScheduledJob)
if len(errs) != 0 {
t.Errorf("Unexpected error %v", errs)
}
if newScheduledJob.ResourceVersion != "9" {
t.Errorf("Incoming resource version on update should not be mutated")
}
}
// FIXME: this is failing conversion.go
func TestSelectableFieldLabelConversions(t *testing.T) {
apitesting.TestSelectableFieldLabelConversionsOfKind(t,
"batch/v2alpha1",
"ScheduledJob",
labels.Set(ScheduledJobToSelectableFields(&batch.ScheduledJob{})),
nil,
)
}