mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-26 02:55:32 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			442 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			442 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2017 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package daemon
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"fmt"
 | |
| 	"sort"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| 
 | |
| 	apps "k8s.io/api/apps/v1"
 | |
| 	"k8s.io/api/core/v1"
 | |
| 	"k8s.io/apimachinery/pkg/api/errors"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/labels"
 | |
| 	"k8s.io/apimachinery/pkg/runtime"
 | |
| 	intstrutil "k8s.io/apimachinery/pkg/util/intstr"
 | |
| 	"k8s.io/apimachinery/pkg/util/json"
 | |
| 	"k8s.io/apimachinery/pkg/util/rand"
 | |
| 	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
 | |
| 	"k8s.io/kubernetes/pkg/controller"
 | |
| 	"k8s.io/kubernetes/pkg/controller/daemon/util"
 | |
| 	labelsutil "k8s.io/kubernetes/pkg/util/labels"
 | |
| )
 | |
| 
 | |
| // rollingUpdate deletes old daemon set pods making sure that no more than
 | |
| // ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable
 | |
| func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, hash string) error {
 | |
| 	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
 | |
| 	}
 | |
| 
 | |
| 	_, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
 | |
| 	maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("Couldn't get unavailable numbers: %v", err)
 | |
| 	}
 | |
| 	oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods)
 | |
| 
 | |
| 	// for oldPods delete all not running pods
 | |
| 	var oldPodsToDelete []string
 | |
| 	glog.V(4).Infof("Marking all unavailable old pods for deletion")
 | |
| 	for _, pod := range oldUnavailablePods {
 | |
| 		// Skip terminating pods. We won't delete them again
 | |
| 		if pod.DeletionTimestamp != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		glog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
 | |
| 		oldPodsToDelete = append(oldPodsToDelete, pod.Name)
 | |
| 	}
 | |
| 
 | |
| 	glog.V(4).Infof("Marking old pods for deletion")
 | |
| 	for _, pod := range oldAvailablePods {
 | |
| 		if numUnavailable >= maxUnavailable {
 | |
| 			glog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable)
 | |
| 			break
 | |
| 		}
 | |
| 		glog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
 | |
| 		oldPodsToDelete = append(oldPodsToDelete, pod.Name)
 | |
| 		numUnavailable++
 | |
| 	}
 | |
| 	return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
 | |
| }
 | |
| 
 | |
| // constructHistory finds all histories controlled by the given DaemonSet, and
 | |
| // update current history revision number, or create current history if need to.
 | |
| // It also deduplicates current history, and adds missing unique labels to existing histories.
 | |
| func (dsc *DaemonSetsController) constructHistory(ds *apps.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) {
 | |
| 	var histories []*apps.ControllerRevision
 | |
| 	var currentHistories []*apps.ControllerRevision
 | |
| 	histories, err = dsc.controlledHistories(ds)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 	for _, history := range histories {
 | |
| 		// Add the unique label if it's not already added to the history
 | |
| 		// We use history name instead of computing hash, so that we don't need to worry about hash collision
 | |
| 		if _, ok := history.Labels[apps.DefaultDaemonSetUniqueLabelKey]; !ok {
 | |
| 			toUpdate := history.DeepCopy()
 | |
| 			toUpdate.Labels[apps.DefaultDaemonSetUniqueLabelKey] = toUpdate.Name
 | |
| 			history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(toUpdate)
 | |
| 			if err != nil {
 | |
| 				return nil, nil, err
 | |
| 			}
 | |
| 		}
 | |
| 		// Compare histories with ds to separate cur and old history
 | |
| 		found := false
 | |
| 		found, err = Match(ds, history)
 | |
| 		if err != nil {
 | |
| 			return nil, nil, err
 | |
| 		}
 | |
| 		if found {
 | |
| 			currentHistories = append(currentHistories, history)
 | |
| 		} else {
 | |
| 			old = append(old, history)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	currRevision := maxRevision(old) + 1
 | |
| 	switch len(currentHistories) {
 | |
| 	case 0:
 | |
| 		// Create a new history if the current one isn't found
 | |
| 		cur, err = dsc.snapshot(ds, currRevision)
 | |
| 		if err != nil {
 | |
| 			return nil, nil, err
 | |
| 		}
 | |
| 	default:
 | |
| 		cur, err = dsc.dedupCurHistories(ds, currentHistories)
 | |
| 		if err != nil {
 | |
| 			return nil, nil, err
 | |
| 		}
 | |
| 		// Update revision number if necessary
 | |
| 		if cur.Revision < currRevision {
 | |
| 			toUpdate := cur.DeepCopy()
 | |
| 			toUpdate.Revision = currRevision
 | |
| 			_, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(toUpdate)
 | |
| 			if err != nil {
 | |
| 				return nil, nil, err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return cur, old, err
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) cleanupHistory(ds *apps.DaemonSet, old []*apps.ControllerRevision) error {
 | |
| 	nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
 | |
| 	}
 | |
| 
 | |
| 	toKeep := int(*ds.Spec.RevisionHistoryLimit)
 | |
| 	toKill := len(old) - toKeep
 | |
| 	if toKill <= 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Find all hashes of live pods
 | |
| 	liveHashes := make(map[string]bool)
 | |
| 	for _, pods := range nodesToDaemonPods {
 | |
| 		for _, pod := range pods {
 | |
| 			if hash := pod.Labels[apps.DefaultDaemonSetUniqueLabelKey]; len(hash) > 0 {
 | |
| 				liveHashes[hash] = true
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Find all live history with the above hashes
 | |
| 	liveHistory := make(map[string]bool)
 | |
| 	for _, history := range old {
 | |
| 		if hash := history.Labels[apps.DefaultDaemonSetUniqueLabelKey]; liveHashes[hash] {
 | |
| 			liveHistory[history.Name] = true
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Clean up old history from smallest to highest revision (from oldest to newest)
 | |
| 	sort.Sort(historiesByRevision(old))
 | |
| 	for _, history := range old {
 | |
| 		if toKill <= 0 {
 | |
| 			break
 | |
| 		}
 | |
| 		if liveHistory[history.Name] {
 | |
| 			continue
 | |
| 		}
 | |
| 		// Clean up
 | |
| 		err := dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Delete(history.Name, nil)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		toKill--
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // maxRevision returns the max revision number of the given list of histories
 | |
| func maxRevision(histories []*apps.ControllerRevision) int64 {
 | |
| 	max := int64(0)
 | |
| 	for _, history := range histories {
 | |
| 		if history.Revision > max {
 | |
| 			max = history.Revision
 | |
| 		}
 | |
| 	}
 | |
| 	return max
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) dedupCurHistories(ds *apps.DaemonSet, curHistories []*apps.ControllerRevision) (*apps.ControllerRevision, error) {
 | |
| 	if len(curHistories) == 1 {
 | |
| 		return curHistories[0], nil
 | |
| 	}
 | |
| 	var maxRevision int64
 | |
| 	var keepCur *apps.ControllerRevision
 | |
| 	for _, cur := range curHistories {
 | |
| 		if cur.Revision >= maxRevision {
 | |
| 			keepCur = cur
 | |
| 			maxRevision = cur.Revision
 | |
| 		}
 | |
| 	}
 | |
| 	// Clean up duplicates and relabel pods
 | |
| 	for _, cur := range curHistories {
 | |
| 		if cur.Name == keepCur.Name {
 | |
| 			continue
 | |
| 		}
 | |
| 		// Relabel pods before dedup
 | |
| 		pods, err := dsc.getDaemonPods(ds)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		for _, pod := range pods {
 | |
| 			if pod.Labels[apps.DefaultDaemonSetUniqueLabelKey] != keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey] {
 | |
| 				toUpdate := pod.DeepCopy()
 | |
| 				if toUpdate.Labels == nil {
 | |
| 					toUpdate.Labels = make(map[string]string)
 | |
| 				}
 | |
| 				toUpdate.Labels[apps.DefaultDaemonSetUniqueLabelKey] = keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
 | |
| 				_, err = dsc.kubeClient.CoreV1().Pods(ds.Namespace).Update(toUpdate)
 | |
| 				if err != nil {
 | |
| 					return nil, err
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 		// Remove duplicates
 | |
| 		err = dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Delete(cur.Name, nil)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	return keepCur, nil
 | |
| }
 | |
| 
 | |
| // controlledHistories returns all ControllerRevisions controlled by the given DaemonSet.
 | |
| // This also reconciles ControllerRef by adopting/orphaning.
 | |
| // Note that returned histories are pointers to objects in the cache.
 | |
| // If you want to modify one, you need to deep-copy it first.
 | |
| func (dsc *DaemonSetsController) controlledHistories(ds *apps.DaemonSet) ([]*apps.ControllerRevision, error) {
 | |
| 	selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// List all histories to include those that don't match the selector anymore
 | |
| 	// but have a ControllerRef pointing to the controller.
 | |
| 	histories, err := dsc.historyLister.List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	// If any adoptions are attempted, we should first recheck for deletion with
 | |
| 	// an uncached quorum read sometime after listing Pods (see #42639).
 | |
| 	canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
 | |
| 		fresh, err := dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if fresh.UID != ds.UID {
 | |
| 			return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
 | |
| 		}
 | |
| 		return fresh, nil
 | |
| 	})
 | |
| 	// Use ControllerRefManager to adopt/orphan as needed.
 | |
| 	cm := controller.NewControllerRevisionControllerRefManager(dsc.crControl, ds, selector, controllerKind, canAdoptFunc)
 | |
| 	return cm.ClaimControllerRevisions(histories)
 | |
| }
 | |
| 
 | |
| // Match check if the given DaemonSet's template matches the template stored in the given history.
 | |
| func Match(ds *apps.DaemonSet, history *apps.ControllerRevision) (bool, error) {
 | |
| 	patch, err := getPatch(ds)
 | |
| 	if err != nil {
 | |
| 		return false, err
 | |
| 	}
 | |
| 	return bytes.Equal(patch, history.Data.Raw), nil
 | |
| }
 | |
| 
 | |
| // getPatch returns a strategic merge patch that can be applied to restore a Daemonset to a
 | |
| // previous version. If the returned error is nil the patch is valid. The current state that we save is just the
 | |
| // PodSpecTemplate. We can modify this later to encompass more state (or less) and remain compatible with previously
 | |
| // recorded patches.
 | |
| func getPatch(ds *apps.DaemonSet) ([]byte, error) {
 | |
| 	dsBytes, err := json.Marshal(ds)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var raw map[string]interface{}
 | |
| 	err = json.Unmarshal(dsBytes, &raw)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	objCopy := make(map[string]interface{})
 | |
| 	specCopy := make(map[string]interface{})
 | |
| 
 | |
| 	// Create a patch of the DaemonSet that replaces spec.template
 | |
| 	spec := raw["spec"].(map[string]interface{})
 | |
| 	template := spec["template"].(map[string]interface{})
 | |
| 	specCopy["template"] = template
 | |
| 	template["$patch"] = "replace"
 | |
| 	objCopy["spec"] = specCopy
 | |
| 	patch, err := json.Marshal(objCopy)
 | |
| 	return patch, err
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (*apps.ControllerRevision, error) {
 | |
| 	patch, err := getPatch(ds)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount))
 | |
| 	name := ds.Name + "-" + rand.SafeEncodeString(hash)
 | |
| 	history := &apps.ControllerRevision{
 | |
| 		ObjectMeta: metav1.ObjectMeta{
 | |
| 			Name:            name,
 | |
| 			Namespace:       ds.Namespace,
 | |
| 			Labels:          labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, apps.DefaultDaemonSetUniqueLabelKey, hash),
 | |
| 			Annotations:     ds.Annotations,
 | |
| 			OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ds, controllerKind)},
 | |
| 		},
 | |
| 		Data:     runtime.RawExtension{Raw: patch},
 | |
| 		Revision: revision,
 | |
| 	}
 | |
| 
 | |
| 	history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(history)
 | |
| 	if errors.IsAlreadyExists(err) {
 | |
| 		// TODO: Is it okay to get from historyLister?
 | |
| 		existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{})
 | |
| 		if getErr != nil {
 | |
| 			return nil, getErr
 | |
| 		}
 | |
| 		// Check if we already created it
 | |
| 		done, err := Match(ds, existedHistory)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if done {
 | |
| 			return existedHistory, nil
 | |
| 		}
 | |
| 
 | |
| 		// Handle name collisions between different history
 | |
| 		// TODO: Is it okay to get from dsLister?
 | |
| 		currDS, getErr := dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
 | |
| 		if getErr != nil {
 | |
| 			return nil, getErr
 | |
| 		}
 | |
| 		if currDS.Status.CollisionCount == nil {
 | |
| 			currDS.Status.CollisionCount = new(int32)
 | |
| 		}
 | |
| 		*currDS.Status.CollisionCount++
 | |
| 		_, updateErr := dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).UpdateStatus(currDS)
 | |
| 		if updateErr != nil {
 | |
| 			return nil, updateErr
 | |
| 		}
 | |
| 		glog.V(2).Infof("Found a hash collision for DaemonSet %q - bumping collisionCount to %d to resolve it", ds.Name, *currDS.Status.CollisionCount)
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return history, err
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *apps.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod, hash string) ([]*v1.Pod, []*v1.Pod) {
 | |
| 	var newPods []*v1.Pod
 | |
| 	var oldPods []*v1.Pod
 | |
| 
 | |
| 	for _, pods := range nodeToDaemonPods {
 | |
| 		for _, pod := range pods {
 | |
| 			// If the returned error is not nil we have a parse error.
 | |
| 			// The controller handles this via the hash.
 | |
| 			generation, err := util.GetTemplateGeneration(ds)
 | |
| 			if err != nil {
 | |
| 				generation = nil
 | |
| 			}
 | |
| 			if util.IsPodUpdated(pod, hash, generation) {
 | |
| 				newPods = append(newPods, pod)
 | |
| 			} else {
 | |
| 				oldPods = append(oldPods, pod)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return newPods, oldPods
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) {
 | |
| 	glog.V(4).Infof("Getting unavailable numbers")
 | |
| 	// TODO: get nodeList once in syncDaemonSet and pass it to other functions
 | |
| 	nodeList, err := dsc.nodeLister.List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		return -1, -1, fmt.Errorf("couldn't get list of nodes during rolling update of daemon set %#v: %v", ds, err)
 | |
| 	}
 | |
| 
 | |
| 	var numUnavailable, desiredNumberScheduled int
 | |
| 	for i := range nodeList {
 | |
| 		node := nodeList[i]
 | |
| 		wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
 | |
| 		if err != nil {
 | |
| 			return -1, -1, err
 | |
| 		}
 | |
| 		if !wantToRun {
 | |
| 			continue
 | |
| 		}
 | |
| 		desiredNumberScheduled++
 | |
| 		daemonPods, exists := nodeToDaemonPods[node.Name]
 | |
| 		if !exists {
 | |
| 			numUnavailable++
 | |
| 			continue
 | |
| 		}
 | |
| 		available := false
 | |
| 		for _, pod := range daemonPods {
 | |
| 			//for the purposes of update we ensure that the Pod is both available and not terminating
 | |
| 			if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) && pod.DeletionTimestamp == nil {
 | |
| 				available = true
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 		if !available {
 | |
| 			numUnavailable++
 | |
| 		}
 | |
| 	}
 | |
| 	maxUnavailable, err := intstrutil.GetValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, desiredNumberScheduled, true)
 | |
| 	if err != nil {
 | |
| 		return -1, -1, fmt.Errorf("Invalid value for MaxUnavailable: %v", err)
 | |
| 	}
 | |
| 	glog.V(4).Infof(" DaemonSet %s/%s, maxUnavailable: %d, numUnavailable: %d", ds.Namespace, ds.Name, maxUnavailable, numUnavailable)
 | |
| 	return maxUnavailable, numUnavailable, nil
 | |
| }
 | |
| 
 | |
| type historiesByRevision []*apps.ControllerRevision
 | |
| 
 | |
| func (h historiesByRevision) Len() int      { return len(h) }
 | |
| func (h historiesByRevision) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
 | |
| func (h historiesByRevision) Less(i, j int) bool {
 | |
| 	return h[i].Revision < h[j].Revision
 | |
| }
 |