mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
Merge pull request #108032 from deejross/kep3140-cronjob-timezone
KEP 3140: TimeZone support for CronJob
This commit is contained in:
@@ -35,6 +35,8 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
batchv1informers "k8s.io/client-go/informers/batch/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
@@ -48,6 +50,7 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/cronjob/metrics"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -371,6 +374,7 @@ func (jm *ControllerV2) enqueueControllerAfter(obj interface{}, t time.Duration)
|
||||
// updateCronJob re-queues the CronJob for next scheduled time if there is a
|
||||
// change in spec.schedule otherwise it re-queues it now
|
||||
func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {
|
||||
timeZoneEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CronJobTimeZone)
|
||||
oldCJ, okOld := old.(*batchv1.CronJob)
|
||||
newCJ, okNew := curr.(*batchv1.CronJob)
|
||||
|
||||
@@ -381,9 +385,9 @@ func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {
|
||||
// if the change in schedule results in next requeue having to be sooner than it already was,
|
||||
// it will be handled here by the queue. If the next requeue is further than previous schedule,
|
||||
// the sync loop will essentially be a no-op for the already queued key with old schedule.
|
||||
if oldCJ.Spec.Schedule != newCJ.Spec.Schedule {
|
||||
// schedule changed, change the requeue time
|
||||
sched, err := cron.ParseStandard(newCJ.Spec.Schedule)
|
||||
if oldCJ.Spec.Schedule != newCJ.Spec.Schedule || (timeZoneEnabled && !pointer.StringEqual(oldCJ.Spec.TimeZone, newCJ.Spec.TimeZone)) {
|
||||
// schedule changed, change the requeue time, pass nil recorder so that syncCronJob will output any warnings
|
||||
sched, err := cron.ParseStandard(formatSchedule(timeZoneEnabled, newCJ, nil))
|
||||
if err != nil {
|
||||
// this is likely a user error in defining the spec value
|
||||
// we should log the error and not reconcile this cronjob until an update to spec
|
||||
@@ -420,6 +424,7 @@ func (jm *ControllerV2) syncCronJob(
|
||||
cronJob = cronJob.DeepCopy()
|
||||
now := jm.now()
|
||||
updateStatus := false
|
||||
timeZoneEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CronJobTimeZone)
|
||||
|
||||
childrenJobs := make(map[types.UID]bool)
|
||||
for _, j := range jobs {
|
||||
@@ -487,12 +492,21 @@ func (jm *ControllerV2) syncCronJob(
|
||||
return cronJob, nil, updateStatus, nil
|
||||
}
|
||||
|
||||
if timeZoneEnabled && cronJob.Spec.TimeZone != nil {
|
||||
if _, err := time.LoadLocation(*cronJob.Spec.TimeZone); err != nil {
|
||||
timeZone := pointer.StringDeref(cronJob.Spec.TimeZone, "")
|
||||
klog.V(4).InfoS("Not starting job because timeZone is invalid", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "timeZone", timeZone, "err", err)
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err)
|
||||
return cronJob, nil, updateStatus, nil
|
||||
}
|
||||
}
|
||||
|
||||
if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
|
||||
klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
|
||||
return cronJob, nil, updateStatus, nil
|
||||
}
|
||||
|
||||
sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
|
||||
sched, err := cron.ParseStandard(formatSchedule(timeZoneEnabled, cronJob, jm.recorder))
|
||||
if err != nil {
|
||||
// this is likely a user error in defining the spec value
|
||||
// we should log the error and not reconcile this cronjob until an update to spec
|
||||
@@ -501,10 +515,6 @@ func (jm *ControllerV2) syncCronJob(
|
||||
return cronJob, nil, updateStatus, nil
|
||||
}
|
||||
|
||||
if strings.Contains(cronJob.Spec.Schedule, "TZ") {
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnsupportedSchedule", "CRON_TZ or TZ used in schedule %q is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", cronJob.Spec.Schedule)
|
||||
}
|
||||
|
||||
scheduledTime, err := getNextScheduleTime(*cronJob, now, sched, jm.recorder)
|
||||
if err != nil {
|
||||
// this is likely a user error in defining the spec value
|
||||
@@ -739,3 +749,23 @@ func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, re
|
||||
func getRef(object runtime.Object) (*corev1.ObjectReference, error) {
|
||||
return ref.GetReference(scheme.Scheme, object)
|
||||
}
|
||||
|
||||
func formatSchedule(timeZoneEnabled bool, cj *batchv1.CronJob, recorder record.EventRecorder) string {
|
||||
if strings.Contains(cj.Spec.Schedule, "TZ") {
|
||||
if recorder != nil {
|
||||
recorder.Eventf(cj, corev1.EventTypeWarning, "UnsupportedSchedule", "CRON_TZ or TZ used in schedule %q is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", cj.Spec.Schedule)
|
||||
}
|
||||
|
||||
return cj.Spec.Schedule
|
||||
}
|
||||
|
||||
if timeZoneEnabled && cj.Spec.TimeZone != nil {
|
||||
if _, err := time.LoadLocation(*cj.Spec.TimeZone); err != nil {
|
||||
return cj.Spec.Schedule
|
||||
}
|
||||
|
||||
return fmt.Sprintf("TZ=%s %s", *cj.Spec.TimeZone, cj.Spec.Schedule)
|
||||
}
|
||||
|
||||
return cj.Spec.Schedule
|
||||
}
|
||||
|
||||
@@ -32,10 +32,13 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
_ "k8s.io/kubernetes/pkg/apis/batch/install"
|
||||
_ "k8s.io/kubernetes/pkg/apis/core/install"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
@@ -50,6 +53,9 @@ var (
|
||||
errorSchedule = "obvious error schedule"
|
||||
// schedule is hourly on the hour
|
||||
onTheHour = "0 * * * ?"
|
||||
|
||||
errorTimeZone = "bad timezone"
|
||||
newYork = "America/New_York"
|
||||
)
|
||||
|
||||
// returns a cronJob with some fields filled in.
|
||||
@@ -127,6 +133,19 @@ func justAfterTheHour() *time.Time {
|
||||
return &T1
|
||||
}
|
||||
|
||||
func justAfterTheHourInZone(tz string) time.Time {
|
||||
location, err := time.LoadLocation(tz)
|
||||
if err != nil {
|
||||
panic("tz error: " + err.Error())
|
||||
}
|
||||
|
||||
T1, err := time.ParseInLocation(time.RFC3339, "2016-05-19T10:01:00Z", location)
|
||||
if err != nil {
|
||||
panic("test setup error: " + err.Error())
|
||||
}
|
||||
return T1
|
||||
}
|
||||
|
||||
func justBeforeTheHour() time.Time {
|
||||
T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:00Z")
|
||||
if err != nil {
|
||||
@@ -162,6 +181,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
|
||||
concurrencyPolicy batchv1.ConcurrencyPolicy
|
||||
suspend bool
|
||||
schedule string
|
||||
timeZone *string
|
||||
deadline int64
|
||||
|
||||
// cj status
|
||||
@@ -173,6 +193,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
|
||||
now time.Time
|
||||
jobCreateError error
|
||||
jobGetErr error
|
||||
enableTimeZone bool
|
||||
|
||||
// expectations
|
||||
expectCreate bool
|
||||
@@ -212,6 +233,17 @@ func TestControllerV2SyncCronJob(t *testing.T) {
|
||||
expectedWarnings: 1,
|
||||
jobPresentInCJActiveStatus: true,
|
||||
},
|
||||
"never ran, not valid time zone": {
|
||||
concurrencyPolicy: "Allow",
|
||||
schedule: onTheHour,
|
||||
timeZone: &errorTimeZone,
|
||||
deadline: noDead,
|
||||
jobCreationTime: justAfterThePriorHour(),
|
||||
now: justBeforeTheHour(),
|
||||
enableTimeZone: true,
|
||||
expectedWarnings: 1,
|
||||
jobPresentInCJActiveStatus: true,
|
||||
},
|
||||
"never ran, not time, A": {
|
||||
concurrencyPolicy: "Allow",
|
||||
schedule: onTheHour,
|
||||
@@ -238,6 +270,17 @@ func TestControllerV2SyncCronJob(t *testing.T) {
|
||||
expectRequeueAfter: true,
|
||||
jobPresentInCJActiveStatus: true,
|
||||
},
|
||||
"never ran, not time in zone": {
|
||||
concurrencyPolicy: "Allow",
|
||||
schedule: onTheHour,
|
||||
timeZone: &newYork,
|
||||
deadline: noDead,
|
||||
jobCreationTime: justAfterThePriorHour(),
|
||||
now: justBeforeTheHour(),
|
||||
enableTimeZone: true,
|
||||
expectRequeueAfter: true,
|
||||
jobPresentInCJActiveStatus: true,
|
||||
},
|
||||
"never ran, is time, A": {
|
||||
concurrencyPolicy: "Allow",
|
||||
schedule: onTheHour,
|
||||
@@ -274,6 +317,48 @@ func TestControllerV2SyncCronJob(t *testing.T) {
|
||||
expectUpdateStatus: true,
|
||||
jobPresentInCJActiveStatus: true,
|
||||
},
|
||||
"never ran, is time in zone, but time zone disabled": {
|
||||
concurrencyPolicy: "Allow",
|
||||
schedule: onTheHour,
|
||||
timeZone: &newYork,
|
||||
deadline: noDead,
|
||||
jobCreationTime: justAfterThePriorHour(),
|
||||
now: justAfterTheHourInZone(newYork),
|
||||
enableTimeZone: false,
|
||||
expectCreate: true,
|
||||
expectActive: 1,
|
||||
expectRequeueAfter: true,
|
||||
expectUpdateStatus: true,
|
||||
jobPresentInCJActiveStatus: true,
|
||||
},
|
||||
"never ran, is time in zone": {
|
||||
concurrencyPolicy: "Allow",
|
||||
schedule: onTheHour,
|
||||
timeZone: &newYork,
|
||||
deadline: noDead,
|
||||
jobCreationTime: justAfterThePriorHour(),
|
||||
now: justAfterTheHourInZone(newYork),
|
||||
enableTimeZone: true,
|
||||
expectCreate: true,
|
||||
expectActive: 1,
|
||||
expectRequeueAfter: true,
|
||||
expectUpdateStatus: true,
|
||||
jobPresentInCJActiveStatus: true,
|
||||
},
|
||||
"never ran, is time in zone, but TZ is also set in schedule": {
|
||||
concurrencyPolicy: "Allow",
|
||||
schedule: "TZ=UTC " + onTheHour,
|
||||
timeZone: &newYork,
|
||||
deadline: noDead,
|
||||
jobCreationTime: justAfterThePriorHour(),
|
||||
now: justAfterTheHourInZone(newYork),
|
||||
enableTimeZone: true,
|
||||
expectCreate: true,
|
||||
expectedWarnings: 1,
|
||||
expectRequeueAfter: true,
|
||||
expectUpdateStatus: true,
|
||||
jobPresentInCJActiveStatus: true,
|
||||
},
|
||||
"never ran, is time, suspended": {
|
||||
concurrencyPolicy: "Allow",
|
||||
suspend: true,
|
||||
@@ -815,11 +900,15 @@ func TestControllerV2SyncCronJob(t *testing.T) {
|
||||
for name, tc := range testCases {
|
||||
name := name
|
||||
tc := tc
|
||||
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.CronJobTimeZone, tc.enableTimeZone)()
|
||||
|
||||
cj := cronJob()
|
||||
cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
|
||||
cj.Spec.Suspend = &tc.suspend
|
||||
cj.Spec.Schedule = tc.schedule
|
||||
cj.Spec.TimeZone = tc.timeZone
|
||||
if tc.deadline != noDead {
|
||||
cj.Spec.StartingDeadlineSeconds = &tc.deadline
|
||||
}
|
||||
@@ -1058,6 +1147,63 @@ func TestControllerV2UpdateCronJob(t *testing.T) {
|
||||
},
|
||||
expectedDelay: 1*time.Second + nextScheduleDelta,
|
||||
},
|
||||
{
|
||||
name: "spec.timeZone not changed",
|
||||
oldCronJob: &batchv1.CronJob{
|
||||
Spec: batchv1.CronJobSpec{
|
||||
TimeZone: &newYork,
|
||||
JobTemplate: batchv1.JobTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{"a": "b"},
|
||||
Annotations: map[string]string{"x": "y"},
|
||||
},
|
||||
Spec: jobSpec(),
|
||||
},
|
||||
},
|
||||
},
|
||||
newCronJob: &batchv1.CronJob{
|
||||
Spec: batchv1.CronJobSpec{
|
||||
TimeZone: &newYork,
|
||||
JobTemplate: batchv1.JobTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{"a": "foo"},
|
||||
Annotations: map[string]string{"x": "y"},
|
||||
},
|
||||
Spec: jobSpec(),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedDelay: 0 * time.Second,
|
||||
},
|
||||
{
|
||||
name: "spec.timeZone changed",
|
||||
oldCronJob: &batchv1.CronJob{
|
||||
Spec: batchv1.CronJobSpec{
|
||||
TimeZone: &newYork,
|
||||
JobTemplate: batchv1.JobTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{"a": "b"},
|
||||
Annotations: map[string]string{"x": "y"},
|
||||
},
|
||||
Spec: jobSpec(),
|
||||
},
|
||||
},
|
||||
},
|
||||
newCronJob: &batchv1.CronJob{
|
||||
Spec: batchv1.CronJobSpec{
|
||||
TimeZone: nil,
|
||||
JobTemplate: batchv1.JobTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{"a": "foo"},
|
||||
Annotations: map[string]string{"x": "y"},
|
||||
},
|
||||
Spec: jobSpec(),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedDelay: 0 * time.Second,
|
||||
},
|
||||
|
||||
// TODO: Add more test cases for updating scheduling.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
|
||||
Reference in New Issue
Block a user