mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-25 18:09:10 +00:00 
			
		
		
		
	Fixed minor spelling mistakes caught while reading `cronjob_controller.go` source code reference at [operating-kubernetes](https://stripe.com/blog/operating-kubernetes)
		
			
				
	
	
		
			409 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			409 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2016 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package cronjob
 | |
| 
 | |
| /*
 | |
| I did not use watch or expectations.  Those add a lot of corner cases, and we aren't
 | |
| expecting a large volume of jobs or scheduledJobs.  (We are favoring correctness
 | |
| over scalability.  If we find a single controller thread is too slow because
 | |
| there are a lot of Jobs or CronJobs, we can parallelize by Namespace.
 | |
| If we find the load on the API server is too high, we can use a watch and
 | |
| UndeltaStore.)
 | |
| 
 | |
| Just periodically list jobs and SJs, and then reconcile them.
 | |
| 
 | |
| */
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"sort"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| 
 | |
| 	batchv1 "k8s.io/api/batch/v1"
 | |
| 	batchv1beta1 "k8s.io/api/batch/v1beta1"
 | |
| 	"k8s.io/api/core/v1"
 | |
| 	"k8s.io/apimachinery/pkg/api/errors"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/types"
 | |
| 	utilerrors "k8s.io/apimachinery/pkg/util/errors"
 | |
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	clientset "k8s.io/client-go/kubernetes"
 | |
| 	"k8s.io/client-go/kubernetes/scheme"
 | |
| 	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	ref "k8s.io/client-go/tools/reference"
 | |
| 	"k8s.io/kubernetes/pkg/util/metrics"
 | |
| )
 | |
| 
 | |
| // Utilities for dealing with Jobs and CronJobs and time.
 | |
| 
 | |
| // controllerKind contains the schema.GroupVersionKind for this controller type.
 | |
| var controllerKind = batchv1beta1.SchemeGroupVersion.WithKind("CronJob")
 | |
| 
 | |
| type CronJobController struct {
 | |
| 	kubeClient clientset.Interface
 | |
| 	jobControl jobControlInterface
 | |
| 	sjControl  sjControlInterface
 | |
| 	podControl podControlInterface
 | |
| 	recorder   record.EventRecorder
 | |
| }
 | |
| 
 | |
| func NewCronJobController(kubeClient clientset.Interface) (*CronJobController, error) {
 | |
| 	eventBroadcaster := record.NewBroadcaster()
 | |
| 	eventBroadcaster.StartLogging(glog.Infof)
 | |
| 	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
 | |
| 
 | |
| 	if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
 | |
| 		if err := metrics.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	jm := &CronJobController{
 | |
| 		kubeClient: kubeClient,
 | |
| 		jobControl: realJobControl{KubeClient: kubeClient},
 | |
| 		sjControl:  &realSJControl{KubeClient: kubeClient},
 | |
| 		podControl: &realPodControl{KubeClient: kubeClient},
 | |
| 		recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
 | |
| 	}
 | |
| 
 | |
| 	return jm, nil
 | |
| }
 | |
| 
 | |
| // Run the main goroutine responsible for watching and syncing jobs.
 | |
| func (jm *CronJobController) Run(stopCh <-chan struct{}) {
 | |
| 	defer utilruntime.HandleCrash()
 | |
| 	glog.Infof("Starting CronJob Manager")
 | |
| 	// Check things every 10 second.
 | |
| 	go wait.Until(jm.syncAll, 10*time.Second, stopCh)
 | |
| 	<-stopCh
 | |
| 	glog.Infof("Shutting down CronJob Manager")
 | |
| }
 | |
| 
 | |
| // syncAll lists all the CronJobs and Jobs and reconciles them.
 | |
| func (jm *CronJobController) syncAll() {
 | |
| 	// List children (Jobs) before parents (CronJob).
 | |
| 	// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
 | |
| 	// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
 | |
| 	// Note that this only works because we are NOT using any caches here.
 | |
| 	jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err))
 | |
| 		return
 | |
| 	}
 | |
| 	js := jl.Items
 | |
| 	glog.V(4).Infof("Found %d jobs", len(js))
 | |
| 
 | |
| 	sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
 | |
| 		return
 | |
| 	}
 | |
| 	sjs := sjl.Items
 | |
| 	glog.V(4).Infof("Found %d cronjobs", len(sjs))
 | |
| 
 | |
| 	jobsBySj := groupJobsByParent(js)
 | |
| 	glog.V(4).Infof("Found %d groups", len(jobsBySj))
 | |
| 
 | |
| 	for _, sj := range sjs {
 | |
| 		syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
 | |
| 		cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // cleanupFinishedJobs cleanups finished jobs created by a CronJob
 | |
| func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
 | |
| 	sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
 | |
| 	// If neither limits are active, there is no need to do anything.
 | |
| 	if sj.Spec.FailedJobsHistoryLimit == nil && sj.Spec.SuccessfulJobsHistoryLimit == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	failedJobs := []batchv1.Job{}
 | |
| 	succesfulJobs := []batchv1.Job{}
 | |
| 
 | |
| 	for _, job := range js {
 | |
| 		isFinished, finishedStatus := getFinishedStatus(&job)
 | |
| 		if isFinished && finishedStatus == batchv1.JobComplete {
 | |
| 			succesfulJobs = append(succesfulJobs, job)
 | |
| 		} else if isFinished && finishedStatus == batchv1.JobFailed {
 | |
| 			failedJobs = append(failedJobs, job)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if sj.Spec.SuccessfulJobsHistoryLimit != nil {
 | |
| 		removeOldestJobs(sj,
 | |
| 			succesfulJobs,
 | |
| 			jc,
 | |
| 			pc,
 | |
| 			*sj.Spec.SuccessfulJobsHistoryLimit,
 | |
| 			recorder)
 | |
| 	}
 | |
| 
 | |
| 	if sj.Spec.FailedJobsHistoryLimit != nil {
 | |
| 		removeOldestJobs(sj,
 | |
| 			failedJobs,
 | |
| 			jc,
 | |
| 			pc,
 | |
| 			*sj.Spec.FailedJobsHistoryLimit,
 | |
| 			recorder)
 | |
| 	}
 | |
| 
 | |
| 	// Update the CronJob, in case jobs were removed from the list.
 | |
| 	if _, err := sjc.UpdateStatus(sj); err != nil {
 | |
| 		nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
 | |
| 		glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // removeOldestJobs removes the oldest jobs from a list of jobs
 | |
| func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
 | |
| 	pc podControlInterface, maxJobs int32, recorder record.EventRecorder) {
 | |
| 	numToDelete := len(js) - int(maxJobs)
 | |
| 	if numToDelete <= 0 {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
 | |
| 	glog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog)
 | |
| 
 | |
| 	sort.Sort(byJobStartTime(js))
 | |
| 	for i := 0; i < numToDelete; i++ {
 | |
| 		glog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
 | |
| 		deleteJob(sj, &js[i], jc, pc, recorder, "history limit reached")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // syncOne reconciles a CronJob with a list of any Jobs that it created.
 | |
| // All known jobs created by "sj" should be included in "js".
 | |
| // The current time is passed in to facilitate testing.
 | |
| // It has no receiver, to facilitate testing.
 | |
| func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
 | |
| 	nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
 | |
| 
 | |
| 	childrenJobs := make(map[types.UID]bool)
 | |
| 	for _, j := range js {
 | |
| 		childrenJobs[j.ObjectMeta.UID] = true
 | |
| 		found := inActiveList(*sj, j.ObjectMeta.UID)
 | |
| 		if !found && !IsJobFinished(&j) {
 | |
| 			recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
 | |
| 			// We found an unfinished job that has us as the parent, but it is not in our Active list.
 | |
| 			// This could happen if we crashed right after creating the Job and before updating the status,
 | |
| 			// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
 | |
| 			// a job that they wanted us to adopt.
 | |
| 
 | |
| 			// TODO: maybe handle the adoption case?  Concurrency/suspend rules will not apply in that case, obviously, since we can't
 | |
| 			// stop users from creating jobs if they have permission.  It is assumed that if a
 | |
| 			// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
 | |
| 			// in the same namespace "adopt" that job.  ReplicaSets and their Pods work the same way.
 | |
| 			// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
 | |
| 		} else if found && IsJobFinished(&j) {
 | |
| 			deleteFromActiveList(sj, j.ObjectMeta.UID)
 | |
| 			// TODO: event to call out failure vs success.
 | |
| 			recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Remove any job reference from the active list if the corresponding job does not exist any more.
 | |
| 	// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
 | |
| 	// job running.
 | |
| 	for _, j := range sj.Status.Active {
 | |
| 		if found := childrenJobs[j.UID]; !found {
 | |
| 			recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
 | |
| 			deleteFromActiveList(sj, j.UID)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	updatedSJ, err := sjc.UpdateStatus(sj)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
 | |
| 		return
 | |
| 	}
 | |
| 	*sj = *updatedSJ
 | |
| 
 | |
| 	if sj.DeletionTimestamp != nil {
 | |
| 		// The CronJob is being deleted.
 | |
| 		// Don't do anything other than updating status.
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
 | |
| 		glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	times, err := getRecentUnmetScheduleTimes(*sj, now)
 | |
| 	if err != nil {
 | |
| 		recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
 | |
| 		glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
 | |
| 		return
 | |
| 	}
 | |
| 	// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
 | |
| 	if len(times) == 0 {
 | |
| 		glog.V(4).Infof("No unmet start times for %s", nameForLog)
 | |
| 		return
 | |
| 	}
 | |
| 	if len(times) > 1 {
 | |
| 		glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
 | |
| 	}
 | |
| 
 | |
| 	scheduledTime := times[len(times)-1]
 | |
| 	tooLate := false
 | |
| 	if sj.Spec.StartingDeadlineSeconds != nil {
 | |
| 		tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
 | |
| 	}
 | |
| 	if tooLate {
 | |
| 		glog.V(4).Infof("Missed starting window for %s", nameForLog)
 | |
| 		recorder.Eventf(sj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z))
 | |
| 		// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
 | |
| 		// the miss every cycle.  In order to avoid sending multiple events, and to avoid processing
 | |
| 		// the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
 | |
| 		// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
 | |
| 		// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
 | |
| 		// and event the next time we process it, and also so the user looking at the status
 | |
| 		// can see easily that there was a missed execution.
 | |
| 		return
 | |
| 	}
 | |
| 	if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
 | |
| 		// Regardless which source of information we use for the set of active jobs,
 | |
| 		// there is some risk that we won't see an active job when there is one.
 | |
| 		// (because we haven't seen the status update to the SJ or the created pod).
 | |
| 		// So it is theoretically possible to have concurrency with Forbid.
 | |
| 		// As long the as the invocations are "far enough apart in time", this usually won't happen.
 | |
| 		//
 | |
| 		// TODO: for Forbid, we could use the same name for every execution, as a lock.
 | |
| 		// With replace, we could use a name that is deterministic per execution time.
 | |
| 		// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
 | |
| 		glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
 | |
| 		return
 | |
| 	}
 | |
| 	if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
 | |
| 		for _, j := range sj.Status.Active {
 | |
| 			// TODO: this should be replaced with server side job deletion
 | |
| 			// currently this mimics JobReaper from pkg/kubectl/stop.go
 | |
| 			glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
 | |
| 
 | |
| 			job, err := jc.GetJob(j.Namespace, j.Name)
 | |
| 			if err != nil {
 | |
| 				recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
 | |
| 				return
 | |
| 			}
 | |
| 			if !deleteJob(sj, job, jc, pc, recorder, "") {
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	jobReq, err := getJobFromTemplate(sj, scheduledTime)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
 | |
| 		return
 | |
| 	}
 | |
| 	jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
 | |
| 	if err != nil {
 | |
| 		recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
 | |
| 	recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
 | |
| 
 | |
| 	// ------------------------------------------------------------------ //
 | |
| 
 | |
| 	// If this process restarts at this point (after posting a job, but
 | |
| 	// before updating the status), then we might try to start the job on
 | |
| 	// the next time.  Actually, if we re-list the SJs and Jobs on the next
 | |
| 	// iteration of syncAll, we might not see our own status update, and
 | |
| 	// then post one again.  So, we need to use the job name as a lock to
 | |
| 	// prevent us from making the job twice (name the job with hash of its
 | |
| 	// scheduled time).
 | |
| 
 | |
| 	// Add the just-started job to the status list.
 | |
| 	ref, err := getRef(jobResp)
 | |
| 	if err != nil {
 | |
| 		glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
 | |
| 	} else {
 | |
| 		sj.Status.Active = append(sj.Status.Active, *ref)
 | |
| 	}
 | |
| 	sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
 | |
| 	if _, err := sjc.UpdateStatus(sj); err != nil {
 | |
| 		glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
 | |
| 	}
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // deleteJob reaps a job, deleting the job, the pods and the reference in the active list
 | |
| func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface,
 | |
| 	pc podControlInterface, recorder record.EventRecorder, reason string) bool {
 | |
| 	// TODO: this should be replaced with server side job deletion
 | |
| 	// currently this mimics JobReaper from pkg/kubectl/stop.go
 | |
| 	nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
 | |
| 
 | |
| 	// scale job down to 0
 | |
| 	if *job.Spec.Parallelism != 0 {
 | |
| 		zero := int32(0)
 | |
| 		var err error
 | |
| 		job.Spec.Parallelism = &zero
 | |
| 		job, err = jc.UpdateJob(job.Namespace, job)
 | |
| 		if err != nil {
 | |
| 			recorder.Eventf(sj, v1.EventTypeWarning, "FailedUpdate", "Update job: %v", err)
 | |
| 			return false
 | |
| 		}
 | |
| 	}
 | |
| 	// remove all pods...
 | |
| 	selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector)
 | |
| 	options := metav1.ListOptions{LabelSelector: selector.String()}
 | |
| 	podList, err := pc.ListPods(job.Namespace, options)
 | |
| 	if err != nil {
 | |
| 		recorder.Eventf(sj, v1.EventTypeWarning, "FailedList", "List job-pods: %v", err)
 | |
| 		return false
 | |
| 	}
 | |
| 	errList := []error{}
 | |
| 	for _, pod := range podList.Items {
 | |
| 		glog.V(2).Infof("CronJob controller is deleting Pod %v/%v", pod.Namespace, pod.Name)
 | |
| 		if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil {
 | |
| 			// ignores the error when the pod isn't found
 | |
| 			if !errors.IsNotFound(err) {
 | |
| 				errList = append(errList, err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	if len(errList) != 0 {
 | |
| 		recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList))
 | |
| 		return false
 | |
| 	}
 | |
| 	// ... the job itself...
 | |
| 	if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
 | |
| 		recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
 | |
| 		glog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
 | |
| 		return false
 | |
| 	}
 | |
| 	// ... and its reference from active list
 | |
| 	deleteFromActiveList(sj, job.ObjectMeta.UID)
 | |
| 	recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func getRef(object runtime.Object) (*v1.ObjectReference, error) {
 | |
| 	return ref.GetReference(scheme.Scheme, object)
 | |
| }
 |