Promote CronJobs to batch/v1

This commit is contained in:
Maciej Szulik 2021-02-10 12:49:43 +01:00
parent 22b3a9cd21
commit 9364d0fc81
No known key found for this signature in database
GPG Key ID: F15E55D276FA84C4
10 changed files with 232 additions and 12 deletions

View File

@ -81,6 +81,8 @@ func TestDefaulting(t *testing.T) {
{Group: "autoscaling", Version: "v2beta1", Kind: "HorizontalPodAutoscalerList"}: {},
{Group: "autoscaling", Version: "v2beta2", Kind: "HorizontalPodAutoscaler"}: {},
{Group: "autoscaling", Version: "v2beta2", Kind: "HorizontalPodAutoscalerList"}: {},
{Group: "batch", Version: "v1", Kind: "CronJob"}: {},
{Group: "batch", Version: "v1", Kind: "CronJobList"}: {},
{Group: "batch", Version: "v1", Kind: "Job"}: {},
{Group: "batch", Version: "v1", Kind: "JobList"}: {},
{Group: "batch", Version: "v1beta1", Kind: "CronJob"}: {},

View File

@ -32,14 +32,13 @@ func addConversionFuncs(scheme *runtime.Scheme) error {
case "metadata.name", "metadata.namespace", "status.successful":
return label, value, nil
default:
return "", "", fmt.Errorf("field label %q not supported for batchv1.Job", label)
return "", "", fmt.Errorf("field label %q not supported for Job", label)
}
},
)
})
}
// The following functions don't do anything special, but they need to be added
// here due to the dependency of v1beta1 and v2alpha1 on v1.
// here due to the dependency of v1beta1 on v1.
func Convert_batch_JobSpec_To_v1_JobSpec(in *batch.JobSpec, out *v1.JobSpec, s conversion.Scope) error {
return autoConvert_batch_JobSpec_To_v1_JobSpec(in, out, s)

View File

@ -19,6 +19,7 @@ package v1
import (
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/runtime"
utilpointer "k8s.io/utils/pointer"
)
func addDefaultingFuncs(scheme *runtime.Scheme) error {
@ -29,18 +30,14 @@ func SetDefaults_Job(obj *batchv1.Job) {
// For a non-parallel job, you can leave both `.spec.completions` and
// `.spec.parallelism` unset. When both are unset, both are defaulted to 1.
if obj.Spec.Completions == nil && obj.Spec.Parallelism == nil {
obj.Spec.Completions = new(int32)
*obj.Spec.Completions = 1
obj.Spec.Parallelism = new(int32)
*obj.Spec.Parallelism = 1
obj.Spec.Completions = utilpointer.Int32Ptr(1)
obj.Spec.Parallelism = utilpointer.Int32Ptr(1)
}
if obj.Spec.Parallelism == nil {
obj.Spec.Parallelism = new(int32)
*obj.Spec.Parallelism = 1
obj.Spec.Parallelism = utilpointer.Int32Ptr(1)
}
if obj.Spec.BackoffLimit == nil {
obj.Spec.BackoffLimit = new(int32)
*obj.Spec.BackoffLimit = 6
obj.Spec.BackoffLimit = utilpointer.Int32Ptr(6)
}
labels := obj.Spec.Template.Labels
if labels != nil && len(obj.Labels) == 0 {
@ -50,3 +47,18 @@ func SetDefaults_Job(obj *batchv1.Job) {
obj.Spec.CompletionMode = batchv1.NonIndexedCompletion
}
}
func SetDefaults_CronJob(obj *batchv1.CronJob) {
if obj.Spec.ConcurrencyPolicy == "" {
obj.Spec.ConcurrencyPolicy = batchv1.AllowConcurrent
}
if obj.Spec.Suspend == nil {
obj.Spec.Suspend = utilpointer.BoolPtr(false)
}
if obj.Spec.SuccessfulJobsHistoryLimit == nil {
obj.Spec.SuccessfulJobsHistoryLimit = utilpointer.Int32Ptr(3)
}
if obj.Spec.FailedJobsHistoryLimit == nil {
obj.Spec.FailedJobsHistoryLimit = utilpointer.Int32Ptr(1)
}
}

View File

@ -259,3 +259,69 @@ func roundTrip(t *testing.T, obj runtime.Object) runtime.Object {
}
return obj3
}
func TestSetDefaultCronJob(t *testing.T) {
tests := map[string]struct {
original *batchv1.CronJob
expected *batchv1.CronJob
}{
"empty batchv1.CronJob should default batchv1.ConcurrencyPolicy and Suspend": {
original: &batchv1.CronJob{},
expected: &batchv1.CronJob{
Spec: batchv1.CronJobSpec{
ConcurrencyPolicy: batchv1.AllowConcurrent,
Suspend: newBool(false),
SuccessfulJobsHistoryLimit: utilpointer.Int32Ptr(3),
FailedJobsHistoryLimit: utilpointer.Int32Ptr(1),
},
},
},
"set fields should not be defaulted": {
original: &batchv1.CronJob{
Spec: batchv1.CronJobSpec{
ConcurrencyPolicy: batchv1.ForbidConcurrent,
Suspend: newBool(true),
SuccessfulJobsHistoryLimit: utilpointer.Int32Ptr(5),
FailedJobsHistoryLimit: utilpointer.Int32Ptr(5),
},
},
expected: &batchv1.CronJob{
Spec: batchv1.CronJobSpec{
ConcurrencyPolicy: batchv1.ForbidConcurrent,
Suspend: newBool(true),
SuccessfulJobsHistoryLimit: utilpointer.Int32Ptr(5),
FailedJobsHistoryLimit: utilpointer.Int32Ptr(5),
},
},
},
}
for name, test := range tests {
original := test.original
expected := test.expected
obj2 := roundTrip(t, runtime.Object(original))
actual, ok := obj2.(*batchv1.CronJob)
if !ok {
t.Errorf("%s: unexpected object: %v", name, actual)
t.FailNow()
}
if actual.Spec.ConcurrencyPolicy != expected.Spec.ConcurrencyPolicy {
t.Errorf("%s: got different concurrencyPolicy than expected: %v %v", name, actual.Spec.ConcurrencyPolicy, expected.Spec.ConcurrencyPolicy)
}
if *actual.Spec.Suspend != *expected.Spec.Suspend {
t.Errorf("%s: got different suspend than expected: %v %v", name, *actual.Spec.Suspend, *expected.Spec.Suspend)
}
if *actual.Spec.SuccessfulJobsHistoryLimit != *expected.Spec.SuccessfulJobsHistoryLimit {
t.Errorf("%s: got different successfulJobsHistoryLimit than expected: %v %v", name, *actual.Spec.SuccessfulJobsHistoryLimit, *expected.Spec.SuccessfulJobsHistoryLimit)
}
if *actual.Spec.FailedJobsHistoryLimit != *expected.Spec.FailedJobsHistoryLimit {
t.Errorf("%s: got different failedJobsHistoryLimit than expected: %v %v", name, *actual.Spec.FailedJobsHistoryLimit, *expected.Spec.FailedJobsHistoryLimit)
}
}
}
func newBool(val bool) *bool {
p := new(bool)
*p = val
return p
}

View File

@ -61,6 +61,7 @@ var GVRToStorageVersionHash = map[string]string{
"autoscaling/v2beta1/horizontalpodautoscalers": "oQlkt7f5j/A=",
"autoscaling/v2beta2/horizontalpodautoscalers": "oQlkt7f5j/A=",
"batch/v1/jobs": "mudhfqk/qZY=",
"batch/v1/cronjobs": "h/JlFAZkyyY=",
"batch/v1beta1/cronjobs": "h/JlFAZkyyY=",
"certificates.k8s.io/v1/certificatesigningrequests": "95fRKMXA+00=",
"certificates.k8s.io/v1beta1/certificatesigningrequests": "95fRKMXA+00=",

View File

@ -64,6 +64,14 @@ func (p RESTStorageProvider) v1Storage(apiResourceConfigSource serverstorage.API
storage["jobs"] = jobsStorage
storage["jobs/status"] = jobsStatusStorage
// cronjobs
cronJobsStorage, cronJobsStatusStorage, err := cronjobstore.NewREST(restOptionsGetter)
if err != nil {
return storage, err
}
storage["cronjobs"] = cronJobsStorage
storage["cronjobs/status"] = cronJobsStatusStorage
return storage, err
}

View File

@ -46,6 +46,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Job{},
&JobList{},
&CronJob{},
&CronJobList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil

View File

@ -244,3 +244,125 @@ type JobCondition struct {
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,6,opt,name=message"`
}
// JobTemplateSpec describes the data a Job should have when created from a template
type JobTemplateSpec struct {
// Standard object's metadata of the jobs created from this template.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Specification of the desired behavior of the job.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
// +optional
Spec JobSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
}
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// CronJob represents the configuration of a single cron job.
type CronJob struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Specification of the desired behavior of a cron job, including the schedule.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
// +optional
Spec CronJobSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
// Current status of a cron job.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
// +optional
Status CronJobStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// CronJobList is a collection of cron jobs.
type CronJobList struct {
metav1.TypeMeta `json:",inline"`
// Standard list metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// items is the list of CronJobs.
Items []CronJob `json:"items" protobuf:"bytes,2,rep,name=items"`
}
// CronJobSpec describes how the job execution will look like and when it will actually run.
type CronJobSpec struct {
// The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
Schedule string `json:"schedule" protobuf:"bytes,1,opt,name=schedule"`
// Optional deadline in seconds for starting the job if it misses scheduled
// time for any reason. Missed jobs executions will be counted as failed ones.
// +optional
StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty" protobuf:"varint,2,opt,name=startingDeadlineSeconds"`
// Specifies how to treat concurrent executions of a Job.
// Valid values are:
// - "Allow" (default): allows CronJobs to run concurrently;
// - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet;
// - "Replace": cancels currently running job and replaces it with a new one
// +optional
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty" protobuf:"bytes,3,opt,name=concurrencyPolicy,casttype=ConcurrencyPolicy"`
// This flag tells the controller to suspend subsequent executions, it does
// not apply to already started executions. Defaults to false.
// +optional
Suspend *bool `json:"suspend,omitempty" protobuf:"varint,4,opt,name=suspend"`
// Specifies the job that will be created when executing a CronJob.
JobTemplate JobTemplateSpec `json:"jobTemplate" protobuf:"bytes,5,opt,name=jobTemplate"`
// The number of successful finished jobs to retain. Value must be non-negative integer.
// Defaults to 3.
// +optional
SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty" protobuf:"varint,6,opt,name=successfulJobsHistoryLimit"`
// The number of failed finished jobs to retain. Value must be non-negative integer.
// Defaults to 1.
// +optional
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty" protobuf:"varint,7,opt,name=failedJobsHistoryLimit"`
}
// ConcurrencyPolicy describes how the job will be handled.
// Only one of the following concurrent policies may be specified.
// If none of the following policies is specified, the default one
// is AllowConcurrent.
type ConcurrencyPolicy string
const (
// AllowConcurrent allows CronJobs to run concurrently.
AllowConcurrent ConcurrencyPolicy = "Allow"
// ForbidConcurrent forbids concurrent runs, skipping next run if previous
// hasn't finished yet.
ForbidConcurrent ConcurrencyPolicy = "Forbid"
// ReplaceConcurrent cancels currently running job and replaces it with a new one.
ReplaceConcurrent ConcurrencyPolicy = "Replace"
)
// CronJobStatus represents the current state of a cron job.
type CronJobStatus struct {
// A list of pointers to currently running jobs.
// +optional
// +listType=atomic
Active []v1.ObjectReference `json:"active,omitempty" protobuf:"bytes,1,rep,name=active"`
// Information when was the last time the job was successfully scheduled.
// +optional
LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty" protobuf:"bytes,4,opt,name=lastScheduleTime"`
// Information when was the last time the job successfully completed.
// +optional
LastSuccessfulTime *metav1.Time `json:"lastSuccessfulTime,omitempty" protobuf:"bytes,5,opt,name=lastSuccessfulTime"`
}

View File

@ -51,6 +51,7 @@ var statusData = map[schema.GroupVersionResource]string{
gvr("networking.k8s.io", "v1beta1", "ingresses"): `{"status": {"loadBalancer": {"ingress": [{"ip": "127.0.0.1"}]}}}`,
gvr("networking.k8s.io", "v1", "ingresses"): `{"status": {"loadBalancer": {"ingress": [{"ip": "127.0.0.1"}]}}}`,
gvr("autoscaling", "v1", "horizontalpodautoscalers"): `{"status": {"currentReplicas": 5}}`,
gvr("batch", "v1", "cronjobs"): `{"status": {"lastScheduleTime": null}}`,
gvr("batch", "v1beta1", "cronjobs"): `{"status": {"lastScheduleTime": null}}`,
gvr("storage.k8s.io", "v1", "volumeattachments"): `{"status": {"attached": true}}`,
gvr("policy", "v1beta1", "poddisruptionbudgets"): `{"status": {"currentHealthy": 5}}`,

View File

@ -152,6 +152,13 @@ func GetEtcdStorageDataForNamespace(namespace string) map[schema.GroupVersionRes
Stub: `{"metadata": {"name": "job1"}, "spec": {"manualSelector": true, "selector": {"matchLabels": {"controller-uid": "uid1"}}, "template": {"metadata": {"labels": {"controller-uid": "uid1"}}, "spec": {"containers": [{"image": "` + image + `", "name": "container1"}], "dnsPolicy": "ClusterFirst", "restartPolicy": "Never"}}}}`,
ExpectedEtcdPath: "/registry/jobs/" + namespace + "/job1",
},
gvr("batch", "v1", "cronjobs"): {
Stub: `{"metadata": {"name": "cjv1"}, "spec": {"jobTemplate": {"spec": {"template": {"metadata": {"labels": {"controller-uid": "uid0"}}, "spec": {"containers": [{"image": "` + image + `", "name": "container0"}], "dnsPolicy": "ClusterFirst", "restartPolicy": "Never"}}}}, "schedule": "* * * * *"}}`,
ExpectedEtcdPath: "/registry/cronjobs/" + namespace + "/cjv1",
// TODO (soltysh): in 1.22 this should be switched to v1. See https://github.com/kubernetes/kubernetes/pull/98965
// this has to stay at v1beta1 for a release, otherwise a 1.20 API server won't be able to read the data persisted in etcd and will break during a multi-server upgrade
ExpectedGVK: gvkP("batch", "v1beta1", "CronJob"),
},
// --
// k8s.io/kubernetes/pkg/apis/batch/v1beta1