mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-25 10:00:53 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			337 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			337 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2014 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package kubelet
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| 	"k8s.io/api/core/v1"
 | |
| 	"k8s.io/apimachinery/pkg/types"
 | |
| 	"k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/events"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/eviction"
 | |
| 	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/util/format"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/util/queue"
 | |
| )
 | |
| 
 | |
| // OnCompleteFunc is a function that is invoked when an operation completes.
 | |
| // If err is non-nil, the operation did not complete successfully.
 | |
| type OnCompleteFunc func(err error)
 | |
| 
 | |
| // PodStatusFunc is a function that is invoked to generate a pod status.
 | |
| type PodStatusFunc func(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus
 | |
| 
 | |
| // KillPodOptions are options when performing a pod update whose update type is kill.
 | |
| type KillPodOptions struct {
 | |
| 	// PodStatusFunc is the function to invoke to set pod status in response to a kill request.
 | |
| 	PodStatusFunc PodStatusFunc
 | |
| 	// PodTerminationGracePeriodSecondsOverride is optional override to use if a pod is being killed as part of kill operation.
 | |
| 	PodTerminationGracePeriodSecondsOverride *int64
 | |
| }
 | |
| 
 | |
| // UpdatePodOptions is an options struct to pass to a UpdatePod operation.
 | |
| type UpdatePodOptions struct {
 | |
| 	// pod to update
 | |
| 	Pod *v1.Pod
 | |
| 	// the mirror pod for the pod to update, if it is a static pod
 | |
| 	MirrorPod *v1.Pod
 | |
| 	// the type of update (create, update, sync, kill)
 | |
| 	UpdateType kubetypes.SyncPodType
 | |
| 	// optional callback function when operation completes
 | |
| 	// this callback is not guaranteed to be completed since a pod worker may
 | |
| 	// drop update requests if it was fulfilling a previous request.  this is
 | |
| 	// only guaranteed to be invoked in response to a kill pod request which is
 | |
| 	// always delivered.
 | |
| 	OnCompleteFunc OnCompleteFunc
 | |
| 	// if update type is kill, use the specified options to kill the pod.
 | |
| 	KillPodOptions *KillPodOptions
 | |
| }
 | |
| 
 | |
| // PodWorkers is an abstract interface for testability.
 | |
| type PodWorkers interface {
 | |
| 	UpdatePod(options *UpdatePodOptions)
 | |
| 	ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
 | |
| 	ForgetWorker(uid types.UID)
 | |
| }
 | |
| 
 | |
| // syncPodOptions provides the arguments to a SyncPod operation.
 | |
| type syncPodOptions struct {
 | |
| 	// the mirror pod for the pod to sync, if it is a static pod
 | |
| 	mirrorPod *v1.Pod
 | |
| 	// pod to sync
 | |
| 	pod *v1.Pod
 | |
| 	// the type of update (create, update, sync)
 | |
| 	updateType kubetypes.SyncPodType
 | |
| 	// the current status
 | |
| 	podStatus *kubecontainer.PodStatus
 | |
| 	// if update type is kill, use the specified options to kill the pod.
 | |
| 	killPodOptions *KillPodOptions
 | |
| }
 | |
| 
 | |
| // the function to invoke to perform a sync.
 | |
| type syncPodFnType func(options syncPodOptions) error
 | |
| 
 | |
| const (
 | |
| 	// jitter factor for resyncInterval
 | |
| 	workerResyncIntervalJitterFactor = 0.5
 | |
| 
 | |
| 	// jitter factor for backOffPeriod
 | |
| 	workerBackOffPeriodJitterFactor = 0.5
 | |
| )
 | |
| 
 | |
| type podWorkers struct {
 | |
| 	// Protects all per worker fields.
 | |
| 	podLock sync.Mutex
 | |
| 
 | |
| 	// Tracks all running per-pod goroutines - per-pod goroutine will be
 | |
| 	// processing updates received through its corresponding channel.
 | |
| 	podUpdates map[types.UID]chan UpdatePodOptions
 | |
| 	// Track the current state of per-pod goroutines.
 | |
| 	// Currently all update request for a given pod coming when another
 | |
| 	// update of this pod is being processed are ignored.
 | |
| 	isWorking map[types.UID]bool
 | |
| 	// Tracks the last undelivered work item for this pod - a work item is
 | |
| 	// undelivered if it comes in while the worker is working.
 | |
| 	lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions
 | |
| 
 | |
| 	workQueue queue.WorkQueue
 | |
| 
 | |
| 	// This function is run to sync the desired stated of pod.
 | |
| 	// NOTE: This function has to be thread-safe - it can be called for
 | |
| 	// different pods at the same time.
 | |
| 	syncPodFn syncPodFnType
 | |
| 
 | |
| 	// The EventRecorder to use
 | |
| 	recorder record.EventRecorder
 | |
| 
 | |
| 	// backOffPeriod is the duration to back off when there is a sync error.
 | |
| 	backOffPeriod time.Duration
 | |
| 
 | |
| 	// resyncInterval is the duration to wait until the next sync.
 | |
| 	resyncInterval time.Duration
 | |
| 
 | |
| 	// podCache stores kubecontainer.PodStatus for all pods.
 | |
| 	podCache kubecontainer.Cache
 | |
| }
 | |
| 
 | |
| func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
 | |
| 	resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
 | |
| 	return &podWorkers{
 | |
| 		podUpdates:                map[types.UID]chan UpdatePodOptions{},
 | |
| 		isWorking:                 map[types.UID]bool{},
 | |
| 		lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},
 | |
| 		syncPodFn:                 syncPodFn,
 | |
| 		recorder:                  recorder,
 | |
| 		workQueue:                 workQueue,
 | |
| 		resyncInterval:            resyncInterval,
 | |
| 		backOffPeriod:             backOffPeriod,
 | |
| 		podCache:                  podCache,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
 | |
| 	var lastSyncTime time.Time
 | |
| 	for update := range podUpdates {
 | |
| 		err := func() error {
 | |
| 			podUID := update.Pod.UID
 | |
| 			// This is a blocking call that would return only if the cache
 | |
| 			// has an entry for the pod that is newer than minRuntimeCache
 | |
| 			// Time. This ensures the worker doesn't start syncing until
 | |
| 			// after the cache is at least newer than the finished time of
 | |
| 			// the previous sync.
 | |
| 			status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			err = p.syncPodFn(syncPodOptions{
 | |
| 				mirrorPod:      update.MirrorPod,
 | |
| 				pod:            update.Pod,
 | |
| 				podStatus:      status,
 | |
| 				killPodOptions: update.KillPodOptions,
 | |
| 				updateType:     update.UpdateType,
 | |
| 			})
 | |
| 			lastSyncTime = time.Now()
 | |
| 			return err
 | |
| 		}()
 | |
| 		// notify the call-back function if the operation succeeded or not
 | |
| 		if update.OnCompleteFunc != nil {
 | |
| 			update.OnCompleteFunc(err)
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			glog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
 | |
| 			// if we failed sync, we throw more specific events for why it happened.
 | |
| 			// as a result, i question the value of this event.
 | |
| 			// TODO: determine if we can remove this in a future release.
 | |
| 			// do not include descriptive text that can vary on why it failed so in a pathological
 | |
| 			// scenario, kubelet does not create enough discrete events that miss default aggregation
 | |
| 			// window.
 | |
| 			p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "Error syncing pod")
 | |
| 		}
 | |
| 		p.wrapUp(update.Pod.UID, err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Apply the new setting to the specified pod.
 | |
| // If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
 | |
| // Update requests are ignored if a kill pod request is pending.
 | |
| func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
 | |
| 	pod := options.Pod
 | |
| 	uid := pod.UID
 | |
| 	var podUpdates chan UpdatePodOptions
 | |
| 	var exists bool
 | |
| 
 | |
| 	p.podLock.Lock()
 | |
| 	defer p.podLock.Unlock()
 | |
| 	if podUpdates, exists = p.podUpdates[uid]; !exists {
 | |
| 		// We need to have a buffer here, because checkForUpdates() method that
 | |
| 		// puts an update into channel is called from the same goroutine where
 | |
| 		// the channel is consumed. However, it is guaranteed that in such case
 | |
| 		// the channel is empty, so buffer of size 1 is enough.
 | |
| 		podUpdates = make(chan UpdatePodOptions, 1)
 | |
| 		p.podUpdates[uid] = podUpdates
 | |
| 
 | |
| 		// Creating a new pod worker either means this is a new pod, or that the
 | |
| 		// kubelet just restarted. In either case the kubelet is willing to believe
 | |
| 		// the status of the pod for the first pod worker sync. See corresponding
 | |
| 		// comment in syncPod.
 | |
| 		go func() {
 | |
| 			defer runtime.HandleCrash()
 | |
| 			p.managePodLoop(podUpdates)
 | |
| 		}()
 | |
| 	}
 | |
| 	if !p.isWorking[pod.UID] {
 | |
| 		p.isWorking[pod.UID] = true
 | |
| 		podUpdates <- *options
 | |
| 	} else {
 | |
| 		// if a request to kill a pod is pending, we do not let anything overwrite that request.
 | |
| 		update, found := p.lastUndeliveredWorkUpdate[pod.UID]
 | |
| 		if !found || update.UpdateType != kubetypes.SyncPodKill {
 | |
| 			p.lastUndeliveredWorkUpdate[pod.UID] = *options
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *podWorkers) removeWorker(uid types.UID) {
 | |
| 	if ch, ok := p.podUpdates[uid]; ok {
 | |
| 		close(ch)
 | |
| 		delete(p.podUpdates, uid)
 | |
| 		// If there is an undelivered work update for this pod we need to remove it
 | |
| 		// since per-pod goroutine won't be able to put it to the already closed
 | |
| 		// channel when it finish processing the current work update.
 | |
| 		if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached {
 | |
| 			delete(p.lastUndeliveredWorkUpdate, uid)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| func (p *podWorkers) ForgetWorker(uid types.UID) {
 | |
| 	p.podLock.Lock()
 | |
| 	defer p.podLock.Unlock()
 | |
| 	p.removeWorker(uid)
 | |
| }
 | |
| 
 | |
| func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
 | |
| 	p.podLock.Lock()
 | |
| 	defer p.podLock.Unlock()
 | |
| 	for key := range p.podUpdates {
 | |
| 		if _, exists := desiredPods[key]; !exists {
 | |
| 			p.removeWorker(key)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
 | |
| 	// Requeue the last update if the last sync returned error.
 | |
| 	switch {
 | |
| 	case syncErr == nil:
 | |
| 		// No error; requeue at the regular resync interval.
 | |
| 		p.workQueue.Enqueue(uid, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
 | |
| 	default:
 | |
| 		// Error occurred during the sync; back off and then retry.
 | |
| 		p.workQueue.Enqueue(uid, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
 | |
| 	}
 | |
| 	p.checkForUpdates(uid)
 | |
| }
 | |
| 
 | |
| func (p *podWorkers) checkForUpdates(uid types.UID) {
 | |
| 	p.podLock.Lock()
 | |
| 	defer p.podLock.Unlock()
 | |
| 	if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
 | |
| 		p.podUpdates[uid] <- workUpdate
 | |
| 		delete(p.lastUndeliveredWorkUpdate, uid)
 | |
| 	} else {
 | |
| 		p.isWorking[uid] = false
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // killPodNow returns a KillPodFunc that can be used to kill a pod.
 | |
| // It is intended to be injected into other modules that need to kill a pod.
 | |
| func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc {
 | |
| 	return func(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error {
 | |
| 		// determine the grace period to use when killing the pod
 | |
| 		gracePeriod := int64(0)
 | |
| 		if gracePeriodOverride != nil {
 | |
| 			gracePeriod = *gracePeriodOverride
 | |
| 		} else if pod.Spec.TerminationGracePeriodSeconds != nil {
 | |
| 			gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
 | |
| 		}
 | |
| 
 | |
| 		// we timeout and return an error if we don't get a callback within a reasonable time.
 | |
| 		// the default timeout is relative to the grace period (we settle on 10s to wait for kubelet->runtime traffic to complete in sigkill)
 | |
| 		timeout := int64(gracePeriod + (gracePeriod / 2))
 | |
| 		minTimeout := int64(10)
 | |
| 		if timeout < minTimeout {
 | |
| 			timeout = minTimeout
 | |
| 		}
 | |
| 		timeoutDuration := time.Duration(timeout) * time.Second
 | |
| 
 | |
| 		// open a channel we block against until we get a result
 | |
| 		type response struct {
 | |
| 			err error
 | |
| 		}
 | |
| 		ch := make(chan response)
 | |
| 		podWorkers.UpdatePod(&UpdatePodOptions{
 | |
| 			Pod:        pod,
 | |
| 			UpdateType: kubetypes.SyncPodKill,
 | |
| 			OnCompleteFunc: func(err error) {
 | |
| 				ch <- response{err: err}
 | |
| 			},
 | |
| 			KillPodOptions: &KillPodOptions{
 | |
| 				PodStatusFunc: func(p *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
 | |
| 					return status
 | |
| 				},
 | |
| 				PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
 | |
| 			},
 | |
| 		})
 | |
| 
 | |
| 		// wait for either a response, or a timeout
 | |
| 		select {
 | |
| 		case r := <-ch:
 | |
| 			return r.err
 | |
| 		case <-time.After(timeoutDuration):
 | |
| 			recorder.Eventf(pod, v1.EventTypeWarning, events.ExceededGracePeriod, "Container runtime did not kill the pod within specified grace period.")
 | |
| 			return fmt.Errorf("timeout waiting to kill pod")
 | |
| 		}
 | |
| 	}
 | |
| }
 |