Merge pull request #32670 from soltysh/cron_update

Automatic merge from submit-queue

Remove hacks from ScheduledJobs cron spec parsing 

Previusly `github.com/robfig/cron` library did not allow passing cron spec without seconds. First commit updates the library, which has additional method ParseStandard which follows the standard cron spec, iow. minute, hour, day of month, month, day of week.

@janetkuo @erictune as promised in #30227 I've updated the library and now I'm updating it in k8s
This commit is contained in:
Kubernetes Submit Queue 2016-09-23 13:27:16 -07:00 committed by GitHub
commit 8152cfb9c3
6 changed files with 165 additions and 86 deletions

4
Godeps/Godeps.json generated
View File

@ -2083,8 +2083,8 @@
}, },
{ {
"ImportPath": "github.com/robfig/cron", "ImportPath": "github.com/robfig/cron",
"Comment": "v1-16-g0f39cf7", "Comment": "v1-40-g783cfcb",
"Rev": "0f39cf7ebc65a602f45692f9894bd6a193faf8fa" "Rev": "783cfcb01fb00c48f740c9de79122bd410294149"
}, },
{ {
"ImportPath": "github.com/russross/blackfriday", "ImportPath": "github.com/russross/blackfriday",

View File

@ -199,12 +199,7 @@ func validateConcurrencyPolicy(concurrencyPolicy *batch.ConcurrencyPolicy, fldPa
func validateScheduleFormat(schedule string, fldPath *field.Path) field.ErrorList { func validateScheduleFormat(schedule string, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{} allErrs := field.ErrorList{}
// TODO soltysh: this should be removed when https://github.com/robfig/cron/issues/58 is fixed if _, err := cron.ParseStandard(schedule); err != nil {
tmpSchedule := schedule
if len(schedule) > 0 && schedule[0] != '@' {
tmpSchedule = "0 " + schedule
}
if _, err := cron.Parse(tmpSchedule); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath, schedule, err.Error())) allErrs = append(allErrs, field.Invalid(fldPath, schedule, err.Error()))
} }

View File

@ -109,8 +109,7 @@ func getNextStartTimeAfter(schedule string, now time.Time) (time.Time, error) {
// How to handle concurrency control. // How to handle concurrency control.
// How to detect changes to schedules or deleted schedules and then // How to detect changes to schedules or deleted schedules and then
// update the jobs? // update the jobs?
tmpSched := addSeconds(schedule) sched, err := cron.Parse(schedule)
sched, err := cron.Parse(tmpSched)
if err != nil { if err != nil {
return time.Unix(0, 0), fmt.Errorf("Unparseable schedule: %s : %s", schedule, err) return time.Unix(0, 0), fmt.Errorf("Unparseable schedule: %s : %s", schedule, err)
} }
@ -123,8 +122,7 @@ func getNextStartTimeAfter(schedule string, now time.Time) (time.Time, error) {
// If there were missed times prior to the last known start time, then those are not returned. // If there were missed times prior to the last known start time, then those are not returned.
func getRecentUnmetScheduleTimes(sj batch.ScheduledJob, now time.Time) ([]time.Time, error) { func getRecentUnmetScheduleTimes(sj batch.ScheduledJob, now time.Time) ([]time.Time, error) {
starts := []time.Time{} starts := []time.Time{}
tmpSched := addSeconds(sj.Spec.Schedule) sched, err := cron.ParseStandard(sj.Spec.Schedule)
sched, err := cron.Parse(tmpSched)
if err != nil { if err != nil {
return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err) return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err)
} }
@ -172,15 +170,6 @@ func getRecentUnmetScheduleTimes(sj batch.ScheduledJob, now time.Time) ([]time.T
return starts, nil return starts, nil
} }
// TODO soltysh: this should be removed when https://github.com/robfig/cron/issues/58 is fixed
func addSeconds(schedule string) string {
tmpSched := schedule
if len(schedule) > 0 && schedule[0] != '@' {
tmpSched = "0 " + schedule
}
return tmpSched
}
// XXX unit test this // XXX unit test this
// getJobFromTemplate makes a Job from a ScheduledJob // getJobFromTemplate makes a Job from a ScheduledJob

View File

@ -19,6 +19,7 @@ type Cron struct {
snapshot chan []*Entry snapshot chan []*Entry
running bool running bool
ErrorLog *log.Logger ErrorLog *log.Logger
location *time.Location
} }
// Job is an interface for submitted cron jobs. // Job is an interface for submitted cron jobs.
@ -69,8 +70,13 @@ func (s byTime) Less(i, j int) bool {
return s[i].Next.Before(s[j].Next) return s[i].Next.Before(s[j].Next)
} }
// New returns a new Cron job runner. // New returns a new Cron job runner, in the Local time zone.
func New() *Cron { func New() *Cron {
return NewWithLocation(time.Now().Location())
}
// NewWithLocation returns a new Cron job runner.
func NewWithLocation(location *time.Location) *Cron {
return &Cron{ return &Cron{
entries: nil, entries: nil,
add: make(chan *Entry), add: make(chan *Entry),
@ -78,6 +84,7 @@ func New() *Cron {
snapshot: make(chan []*Entry), snapshot: make(chan []*Entry),
running: false, running: false,
ErrorLog: nil, ErrorLog: nil,
location: location,
} }
} }
@ -125,8 +132,16 @@ func (c *Cron) Entries() []*Entry {
return c.entrySnapshot() return c.entrySnapshot()
} }
// Start the cron scheduler in its own go-routine. // Location gets the time zone location
func (c *Cron) Location() *time.Location {
return c.location
}
// Start the cron scheduler in its own go-routine, or no-op if already started.
func (c *Cron) Start() { func (c *Cron) Start() {
if c.running {
return
}
c.running = true c.running = true
go c.run() go c.run()
} }
@ -147,7 +162,7 @@ func (c *Cron) runWithRecovery(j Job) {
// access to the 'running' state variable. // access to the 'running' state variable.
func (c *Cron) run() { func (c *Cron) run() {
// Figure out the next activation times for each entry. // Figure out the next activation times for each entry.
now := time.Now().Local() now := time.Now().In(c.location)
for _, entry := range c.entries { for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now) entry.Next = entry.Schedule.Next(now)
} }
@ -165,8 +180,9 @@ func (c *Cron) run() {
effective = c.entries[0].Next effective = c.entries[0].Next
} }
timer := time.NewTimer(effective.Sub(now))
select { select {
case now = <-time.After(effective.Sub(now)): case now = <-timer.C:
// Run every entry whose next time was this effective time. // Run every entry whose next time was this effective time.
for _, e := range c.entries { for _, e := range c.entries {
if e.Next != effective { if e.Next != effective {
@ -174,23 +190,25 @@ func (c *Cron) run() {
} }
go c.runWithRecovery(e.Job) go c.runWithRecovery(e.Job)
e.Prev = e.Next e.Prev = e.Next
e.Next = e.Schedule.Next(effective) e.Next = e.Schedule.Next(now)
} }
continue continue
case newEntry := <-c.add: case newEntry := <-c.add:
c.entries = append(c.entries, newEntry) c.entries = append(c.entries, newEntry)
newEntry.Next = newEntry.Schedule.Next(time.Now().Local()) newEntry.Next = newEntry.Schedule.Next(time.Now().In(c.location))
case <-c.snapshot: case <-c.snapshot:
c.snapshot <- c.entrySnapshot() c.snapshot <- c.entrySnapshot()
case <-c.stop: case <-c.stop:
timer.Stop()
return return
} }
// 'now' should be updated after newEntry and snapshot cases. // 'now' should be updated after newEntry and snapshot cases.
now = time.Now().Local() now = time.Now().In(c.location)
timer.Stop()
} }
} }

View File

@ -2,36 +2,78 @@ package cron
import ( import (
"fmt" "fmt"
"log"
"math" "math"
"strconv" "strconv"
"strings" "strings"
"time" "time"
) )
// ParseStandard returns a new crontab schedule representing the given standardSpec
// (https://en.wikipedia.org/wiki/Cron). It differs from Parse requiring to always
// pass 5 entries representing: minute, hour, day of month, month and day of week,
// in that order. It returns a descriptive error if the spec is not valid.
//
// It accepts
// - Standard crontab specs, e.g. "* * * * ?"
// - Descriptors, e.g. "@midnight", "@every 1h30m"
func ParseStandard(standardSpec string) (Schedule, error) {
if standardSpec[0] == '@' {
return parseDescriptor(standardSpec)
}
// Split on whitespace. We require exactly 5 fields.
// (minute) (hour) (day of month) (month) (day of week)
fields := strings.Fields(standardSpec)
if len(fields) != 5 {
return nil, fmt.Errorf("Expected exactly 5 fields, found %d: %s", len(fields), standardSpec)
}
var err error
field := func(field string, r bounds) uint64 {
if err != nil {
return uint64(0)
}
var bits uint64
bits, err = getField(field, r)
return bits
}
var (
minute = field(fields[0], minutes)
hour = field(fields[1], hours)
dayofmonth = field(fields[2], dom)
month = field(fields[3], months)
dayofweek = field(fields[4], dow)
)
if err != nil {
return nil, err
}
return &SpecSchedule{
Second: 1 << seconds.min,
Minute: minute,
Hour: hour,
Dom: dayofmonth,
Month: month,
Dow: dayofweek,
}, nil
}
// Parse returns a new crontab schedule representing the given spec. // Parse returns a new crontab schedule representing the given spec.
// It returns a descriptive error if the spec is not valid. // It returns a descriptive error if the spec is not valid.
// //
// It accepts // It accepts
// - Full crontab specs, e.g. "* * * * * ?" // - Full crontab specs, e.g. "* * * * * ?"
// - Descriptors, e.g. "@midnight", "@every 1h30m" // - Descriptors, e.g. "@midnight", "@every 1h30m"
func Parse(spec string) (_ Schedule, err error) { func Parse(spec string) (Schedule, error) {
// Convert panics into errors
defer func() {
if recovered := recover(); recovered != nil {
err = fmt.Errorf("%v", recovered)
}
}()
if spec[0] == '@' { if spec[0] == '@' {
return parseDescriptor(spec), nil return parseDescriptor(spec)
} }
// Split on whitespace. We require 5 or 6 fields. // Split on whitespace. We require 5 or 6 fields.
// (second) (minute) (hour) (day of month) (month) (day of week, optional) // (second) (minute) (hour) (day of month) (month) (day of week, optional)
fields := strings.Fields(spec) fields := strings.Fields(spec)
if len(fields) != 5 && len(fields) != 6 { if len(fields) != 5 && len(fields) != 6 {
log.Panicf("Expected 5 or 6 fields, found %d: %s", len(fields), spec) return nil, fmt.Errorf("Expected 5 or 6 fields, found %d: %s", len(fields), spec)
} }
// If a sixth field is not provided (DayOfWeek), then it is equivalent to star. // If a sixth field is not provided (DayOfWeek), then it is equivalent to star.
@ -39,39 +81,64 @@ func Parse(spec string) (_ Schedule, err error) {
fields = append(fields, "*") fields = append(fields, "*")
} }
schedule := &SpecSchedule{ var err error
Second: getField(fields[0], seconds), field := func(field string, r bounds) uint64 {
Minute: getField(fields[1], minutes), if err != nil {
Hour: getField(fields[2], hours), return uint64(0)
Dom: getField(fields[3], dom), }
Month: getField(fields[4], months), var bits uint64
Dow: getField(fields[5], dow), bits, err = getField(field, r)
return bits
}
var (
second = field(fields[0], seconds)
minute = field(fields[1], minutes)
hour = field(fields[2], hours)
dayofmonth = field(fields[3], dom)
month = field(fields[4], months)
dayofweek = field(fields[5], dow)
)
if err != nil {
return nil, err
} }
return schedule, nil return &SpecSchedule{
Second: second,
Minute: minute,
Hour: hour,
Dom: dayofmonth,
Month: month,
Dow: dayofweek,
}, nil
} }
// getField returns an Int with the bits set representing all of the times that // getField returns an Int with the bits set representing all of the times that
// the field represents. A "field" is a comma-separated list of "ranges". // the field represents or error parsing field value. A "field" is a comma-separated
func getField(field string, r bounds) uint64 { // list of "ranges".
// list = range {"," range} func getField(field string, r bounds) (uint64, error) {
var bits uint64 var bits uint64
ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' }) ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' })
for _, expr := range ranges { for _, expr := range ranges {
bits |= getRange(expr, r) bit, err := getRange(expr, r)
if err != nil {
return bits, err
}
bits |= bit
} }
return bits return bits, nil
} }
// getRange returns the bits indicated by the given expression: // getRange returns the bits indicated by the given expression:
// number | number "-" number [ "/" number ] // number | number "-" number [ "/" number ]
func getRange(expr string, r bounds) uint64 { // or error parsing range.
func getRange(expr string, r bounds) (uint64, error) {
var ( var (
start, end, step uint start, end, step uint
rangeAndStep = strings.Split(expr, "/") rangeAndStep = strings.Split(expr, "/")
lowAndHigh = strings.Split(rangeAndStep[0], "-") lowAndHigh = strings.Split(rangeAndStep[0], "-")
singleDigit = len(lowAndHigh) == 1 singleDigit = len(lowAndHigh) == 1
err error
zero = uint64(0)
) )
var extra_star uint64 var extra_star uint64
@ -80,14 +147,20 @@ func getRange(expr string, r bounds) uint64 {
end = r.max end = r.max
extra_star = starBit extra_star = starBit
} else { } else {
start = parseIntOrName(lowAndHigh[0], r.names) start, err = parseIntOrName(lowAndHigh[0], r.names)
if err != nil {
return zero, err
}
switch len(lowAndHigh) { switch len(lowAndHigh) {
case 1: case 1:
end = start end = start
case 2: case 2:
end = parseIntOrName(lowAndHigh[1], r.names) end, err = parseIntOrName(lowAndHigh[1], r.names)
if err != nil {
return zero, err
}
default: default:
log.Panicf("Too many hyphens: %s", expr) return zero, fmt.Errorf("Too many hyphens: %s", expr)
} }
} }
@ -95,50 +168,56 @@ func getRange(expr string, r bounds) uint64 {
case 1: case 1:
step = 1 step = 1
case 2: case 2:
step = mustParseInt(rangeAndStep[1]) step, err = mustParseInt(rangeAndStep[1])
if err != nil {
return zero, err
}
// Special handling: "N/step" means "N-max/step". // Special handling: "N/step" means "N-max/step".
if singleDigit { if singleDigit {
end = r.max end = r.max
} }
default: default:
log.Panicf("Too many slashes: %s", expr) return zero, fmt.Errorf("Too many slashes: %s", expr)
} }
if start < r.min { if start < r.min {
log.Panicf("Beginning of range (%d) below minimum (%d): %s", start, r.min, expr) return zero, fmt.Errorf("Beginning of range (%d) below minimum (%d): %s", start, r.min, expr)
} }
if end > r.max { if end > r.max {
log.Panicf("End of range (%d) above maximum (%d): %s", end, r.max, expr) return zero, fmt.Errorf("End of range (%d) above maximum (%d): %s", end, r.max, expr)
} }
if start > end { if start > end {
log.Panicf("Beginning of range (%d) beyond end of range (%d): %s", start, end, expr) return zero, fmt.Errorf("Beginning of range (%d) beyond end of range (%d): %s", start, end, expr)
}
if step == 0 {
return zero, fmt.Errorf("Step of range should be a positive number: %s", expr)
} }
return getBits(start, end, step) | extra_star return getBits(start, end, step) | extra_star, nil
} }
// parseIntOrName returns the (possibly-named) integer contained in expr. // parseIntOrName returns the (possibly-named) integer contained in expr.
func parseIntOrName(expr string, names map[string]uint) uint { func parseIntOrName(expr string, names map[string]uint) (uint, error) {
if names != nil { if names != nil {
if namedInt, ok := names[strings.ToLower(expr)]; ok { if namedInt, ok := names[strings.ToLower(expr)]; ok {
return namedInt return namedInt, nil
} }
} }
return mustParseInt(expr) return mustParseInt(expr)
} }
// mustParseInt parses the given expression as an int or panics. // mustParseInt parses the given expression as an int or returns an error.
func mustParseInt(expr string) uint { func mustParseInt(expr string) (uint, error) {
num, err := strconv.Atoi(expr) num, err := strconv.Atoi(expr)
if err != nil { if err != nil {
log.Panicf("Failed to parse int from %s: %s", expr, err) return 0, fmt.Errorf("Failed to parse int from %s: %s", expr, err)
} }
if num < 0 { if num < 0 {
log.Panicf("Negative number (%d) not allowed: %s", num, expr) return 0, fmt.Errorf("Negative number (%d) not allowed: %s", num, expr)
} }
return uint(num) return uint(num), nil
} }
// getBits sets all bits in the range [min, max], modulo the given step size. // getBits sets all bits in the range [min, max], modulo the given step size.
@ -162,10 +241,9 @@ func all(r bounds) uint64 {
return getBits(r.min, r.max, 1) | starBit return getBits(r.min, r.max, 1) | starBit
} }
// parseDescriptor returns a pre-defined schedule for the expression, or panics // parseDescriptor returns a predefined schedule for the expression, or error if none matches.
// if none matches. func parseDescriptor(descriptor string) (Schedule, error) {
func parseDescriptor(spec string) Schedule { switch descriptor {
switch spec {
case "@yearly", "@annually": case "@yearly", "@annually":
return &SpecSchedule{ return &SpecSchedule{
Second: 1 << seconds.min, Second: 1 << seconds.min,
@ -174,7 +252,7 @@ func parseDescriptor(spec string) Schedule {
Dom: 1 << dom.min, Dom: 1 << dom.min,
Month: 1 << months.min, Month: 1 << months.min,
Dow: all(dow), Dow: all(dow),
} }, nil
case "@monthly": case "@monthly":
return &SpecSchedule{ return &SpecSchedule{
@ -184,7 +262,7 @@ func parseDescriptor(spec string) Schedule {
Dom: 1 << dom.min, Dom: 1 << dom.min,
Month: all(months), Month: all(months),
Dow: all(dow), Dow: all(dow),
} }, nil
case "@weekly": case "@weekly":
return &SpecSchedule{ return &SpecSchedule{
@ -194,7 +272,7 @@ func parseDescriptor(spec string) Schedule {
Dom: all(dom), Dom: all(dom),
Month: all(months), Month: all(months),
Dow: 1 << dow.min, Dow: 1 << dow.min,
} }, nil
case "@daily", "@midnight": case "@daily", "@midnight":
return &SpecSchedule{ return &SpecSchedule{
@ -204,7 +282,7 @@ func parseDescriptor(spec string) Schedule {
Dom: all(dom), Dom: all(dom),
Month: all(months), Month: all(months),
Dow: all(dow), Dow: all(dow),
} }, nil
case "@hourly": case "@hourly":
return &SpecSchedule{ return &SpecSchedule{
@ -214,18 +292,17 @@ func parseDescriptor(spec string) Schedule {
Dom: all(dom), Dom: all(dom),
Month: all(months), Month: all(months),
Dow: all(dow), Dow: all(dow),
} }, nil
} }
const every = "@every " const every = "@every "
if strings.HasPrefix(spec, every) { if strings.HasPrefix(descriptor, every) {
duration, err := time.ParseDuration(spec[len(every):]) duration, err := time.ParseDuration(descriptor[len(every):])
if err != nil { if err != nil {
log.Panicf("Failed to parse duration %s: %s", spec, err) return nil, fmt.Errorf("Failed to parse duration %s: %s", descriptor, err)
} }
return Every(duration) return Every(duration), nil
} }
log.Panicf("Unrecognized descriptor: %s", spec) return nil, fmt.Errorf("Unrecognized descriptor: %s", descriptor)
return nil
} }

View File

@ -108,7 +108,7 @@ WRAP:
for 1<<uint(t.Hour())&s.Hour == 0 { for 1<<uint(t.Hour())&s.Hour == 0 {
if !added { if !added {
added = true added = true
t = t.Truncate(time.Hour) t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
} }
t = t.Add(1 * time.Hour) t = t.Add(1 * time.Hour)