diff --git a/cmd/kube-controller-manager/app/extensions.go b/cmd/kube-controller-manager/app/extensions.go index a3218496fa7..f1b65440799 100644 --- a/cmd/kube-controller-manager/app/extensions.go +++ b/cmd/kube-controller-manager/app/extensions.go @@ -33,6 +33,7 @@ func startDaemonSetController(ctx ControllerContext) (bool, error) { } go daemon.NewDaemonSetsController( ctx.InformerFactory.Extensions().V1beta1().DaemonSets(), + ctx.InformerFactory.Apps().V1beta1().ControllerRevisions(), ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Nodes(), ctx.ClientBuilder.ClientOrDie("daemon-set-controller"), diff --git a/pkg/client/listers/extensions/v1beta1/BUILD b/pkg/client/listers/extensions/v1beta1/BUILD index 2292129b3b8..973ba0bea39 100644 --- a/pkg/client/listers/extensions/v1beta1/BUILD +++ b/pkg/client/listers/extensions/v1beta1/BUILD @@ -26,6 +26,7 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", + "//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go b/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go index 6e913e4770e..09c0ca738c1 100644 --- a/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go +++ b/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go @@ -22,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/api/v1" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" ) @@ -29,6 +30,7 @@ import ( // DaemonSetLister. type DaemonSetListerExpansion interface { GetPodDaemonSets(pod *v1.Pod) ([]*v1beta1.DaemonSet, error) + GetHistoryDaemonSets(history *apps.ControllerRevision) ([]*v1beta1.DaemonSet, error) } // DaemonSetNamespaceListerExpansion allows custom methods to be added to @@ -76,3 +78,37 @@ func (s *daemonSetLister) GetPodDaemonSets(pod *v1.Pod) ([]*v1beta1.DaemonSet, e return daemonSets, nil } + +// GetHistoryDaemonSets returns a list of DaemonSets that potentially +// match a ControllerRevision. Only the one specified in the ControllerRevision's ControllerRef +// will actually manage it. +// Returns an error only if no matching DaemonSets are found. +func (s *daemonSetLister) GetHistoryDaemonSets(history *apps.ControllerRevision) ([]*v1beta1.DaemonSet, error) { + if len(history.Labels) == 0 { + return nil, fmt.Errorf("no DaemonSet found for ControllerRevision %s because it has no labels", history.Name) + } + + list, err := s.DaemonSets(history.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + + var daemonSets []*v1beta1.DaemonSet + for _, ds := range list { + selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid label selector: %v", err) + } + // If a DaemonSet with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(history.Labels)) { + continue + } + daemonSets = append(daemonSets, ds) + } + + if len(daemonSets) == 0 { + return nil, fmt.Errorf("could not find DaemonSets for ControllerRevision %s in namespace %s with labels: %v", history.Name, history.Namespace, history.Labels) + } + + return daemonSets, nil +} diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index f7e29fc171f..f65e697d615 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -17,8 +17,10 @@ limitations under the License. package controller import ( + "encoding/binary" "encoding/json" "fmt" + "hash/fnv" "sync" "sync/atomic" "time" @@ -29,16 +31,14 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" - - "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/integer" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" v1helper "k8s.io/kubernetes/pkg/api/v1/helper" @@ -48,6 +48,7 @@ import ( extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" clientretry "k8s.io/kubernetes/pkg/client/retry" + hashutil "k8s.io/kubernetes/pkg/util/hash" "github.com/golang/glog" ) @@ -980,3 +981,18 @@ func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs glog.Infof("Caches are synced for %s controller", controllerName) return true } + +// ComputeHash returns a hash value calculated from pod template and a collisionCount to avoid hash collision +func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int64) uint32 { + podTemplateSpecHasher := fnv.New32a() + hashutil.DeepHashObject(podTemplateSpecHasher, *template) + + // Add collisionCount in the hash if it exists. + if collisionCount != nil { + collisionCountBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(collisionCountBytes, uint64(*collisionCount)) + podTemplateSpecHasher.Write(collisionCountBytes) + } + + return podTemplateSpecHasher.Sum32() +} diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index 8bfe3438782..e4dbbd93e3d 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -19,6 +19,7 @@ package controller import ( "encoding/json" "fmt" + "math" "math/rand" "net/http/httptest" "reflect" @@ -443,3 +444,38 @@ func TestActiveReplicaSetsFiltering(t *testing.T) { t.Errorf("expected %v, got %v", expectedNames.List(), gotNames.List()) } } + +func int64P(num int64) *int64 { + return &num +} + +func TestComputeHash(t *testing.T) { + tests := []struct { + name string + template *v1.PodTemplateSpec + collisionCount *int64 + otherCollisionCount *int64 + }{ + { + name: "simple", + template: &v1.PodTemplateSpec{}, + collisionCount: int64P(1), + otherCollisionCount: int64P(2), + }, + { + name: "using math.MaxInt64", + template: &v1.PodTemplateSpec{}, + collisionCount: nil, + otherCollisionCount: int64P(int64(math.MaxInt64)), + }, + } + + for _, test := range tests { + hash := ComputeHash(test.template, test.collisionCount) + otherHash := ComputeHash(test.template, test.otherCollisionCount) + + if hash == otherHash { + t.Errorf("expected different hashes but got the same: %d", hash) + } + } +} diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 74c714d0c7e..9305e1ac274 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -21,11 +21,14 @@ go_library( "//pkg/api/v1:go_default_library", "//pkg/api/v1/helper:go_default_library", "//pkg/api/v1/pod:go_default_library", + "//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset/typed/extensions/v1beta1:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library", "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", "//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library", + "//pkg/client/listers/apps/v1beta1:go_default_library", "//pkg/client/listers/core/v1:go_default_library", "//pkg/client/listers/extensions/v1beta1:go_default_library", "//pkg/controller:go_default_library", @@ -38,9 +41,11 @@ go_library( "//plugin/pkg/scheduler/algorithm/predicates:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 2a7e83cfc8b..71e35e31bb6 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -39,11 +39,14 @@ import ( "k8s.io/kubernetes/pkg/api/v1" v1helper "k8s.io/kubernetes/pkg/api/v1/helper" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1" + appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1" coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1" + appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1" "k8s.io/kubernetes/pkg/controller" @@ -99,6 +102,11 @@ type DaemonSetsController struct { // dsStoreSynced returns true if the daemonset store has been synced at least once. // Added as a member to the struct to allow injection for testing. dsStoreSynced cache.InformerSynced + // historyLister get list/get history from the shared informers's store + historyLister appslisters.ControllerRevisionLister + // historyStoreSynced returns true if the history store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + historyStoreSynced cache.InformerSynced // podLister get list/get pods from the shared informers's store podLister corelisters.PodLister // podStoreSynced returns true if the pod store has been synced at least once. @@ -114,7 +122,7 @@ type DaemonSetsController struct { queue workqueue.RateLimitingInterface } -func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController { +func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, historyInformer appsinformers.ControllerRevisionInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -152,6 +160,14 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo dsc.dsLister = daemonSetInformer.Lister() dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced + historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: dsc.addHistory, + UpdateFunc: dsc.updateHistory, + DeleteFunc: dsc.deleteHistory, + }) + dsc.historyLister = historyInformer.Lister() + dsc.historyStoreSynced = historyInformer.Informer().HasSynced + // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete // more pods until all the effects (expectations) of a daemon set's create/delete have been observed. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -273,6 +289,138 @@ func (dsc *DaemonSetsController) getPodDaemonSets(pod *v1.Pod) []*extensions.Dae return sets } +// getDaemonSetsForHistory returns a list of DaemonSets that potentially +// match a ControllerRevision. +func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.ControllerRevision) []*extensions.DaemonSet { + daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history) + if err != nil || len(daemonSets) == 0 { + return nil + } + if len(daemonSets) > 1 { + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. + glog.V(4).Infof("User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v", + history.Namespace, history.Name, history.Labels) + } + return daemonSets +} + +// addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created +// or when the controller manager is restarted. +func (dsc *DaemonSetsController) addHistory(obj interface{}) { + history := obj.(*apps.ControllerRevision) + if history.DeletionTimestamp != nil { + // On a restart of the controller manager, it's possible for an object to + // show up in a state that is already pending deletion. + dsc.deleteHistory(history) + return + } + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(history); controllerRef != nil { + ds := dsc.resolveControllerRef(history.Namespace, controllerRef) + if ds == nil { + return + } + glog.V(4).Infof("ControllerRevision %s added.", history.Name) + return + } + + // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync + // them to see if anyone wants to adopt it. + daemonSets := dsc.getDaemonSetsForHistory(history) + if len(daemonSets) == 0 { + return + } + glog.V(4).Infof("Orphan ControllerRevision %s added.", history.Name) + for _, ds := range daemonSets { + dsc.enqueueDaemonSet(ds) + } +} + +// updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision +// is updated and wake them up. If the anything of the ControllerRevision have changed, we need to +// awaken both the old and new DaemonSets. +func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) { + curHistory := cur.(*apps.ControllerRevision) + oldHistory := old.(*apps.ControllerRevision) + if curHistory.ResourceVersion == oldHistory.ResourceVersion { + // Periodic resync will send update events for all known ControllerRevisions. + return + } + + curControllerRef := controller.GetControllerOf(curHistory) + oldControllerRef := controller.GetControllerOf(oldHistory) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && oldControllerRef != nil { + // The ControllerRef was changed. Sync the old controller, if any. + if ds := dsc.resolveControllerRef(oldHistory.Namespace, oldControllerRef); ds != nil { + dsc.enqueueDaemonSet(ds) + } + } + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + ds := dsc.resolveControllerRef(curHistory.Namespace, curControllerRef) + if ds == nil { + return + } + glog.V(4).Infof("ControllerRevision %s updated.", curHistory.Name) + dsc.enqueueDaemonSet(ds) + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels) + if labelChanged || controllerRefChanged { + daemonSets := dsc.getDaemonSetsForHistory(curHistory) + if len(daemonSets) == 0 { + return + } + glog.V(4).Infof("Orphan ControllerRevision %s updated.", curHistory.Name) + for _, ds := range daemonSets { + dsc.enqueueDaemonSet(ds) + } + } +} + +// deleteHistory enqueues the DaemonSet that manages a ControllerRevision when +// the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or +// a DeletionFinalStateUnknown marker item. +func (dsc *DaemonSetsController) deleteHistory(obj interface{}) { + history, ok := obj.(*apps.ControllerRevision) + + // When a delete is dropped, the relist will notice a ControllerRevision in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the ControllerRevision + // changed labels the new DaemonSet will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return + } + history, ok = tombstone.Obj.(*apps.ControllerRevision) + if !ok { + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ControllerRevision %#v", obj)) + return + } + } + + controllerRef := controller.GetControllerOf(history) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return + } + ds := dsc.resolveControllerRef(history.Namespace, controllerRef) + if ds == nil { + return + } + glog.V(4).Infof("ControllerRevision %s deleted.", history.Name) + dsc.enqueueDaemonSet(ds) +} + func (dsc *DaemonSetsController) addPod(obj interface{}) { pod := obj.(*v1.Pod) @@ -486,11 +634,11 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { } } -// getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes. +// getDaemonPods returns daemon pods owned by the given ds. // This also reconciles ControllerRef by adopting/orphaning. // Note that returned Pods are pointers to objects in the cache. // If you want to modify one, you need to deep-copy it first. -func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) (map[string][]*v1.Pod, error) { +func (dsc *DaemonSetsController) getDaemonPods(ds *extensions.DaemonSet) ([]*v1.Pod, error) { selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) if err != nil { return nil, err @@ -516,7 +664,15 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) }) // Use ControllerRefManager to adopt/orphan as needed. cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, canAdoptFunc) - claimedPods, err := cm.ClaimPods(pods) + return cm.ClaimPods(pods) +} + +// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes. +// This also reconciles ControllerRef by adopting/orphaning. +// Note that returned Pods are pointers to objects in the cache. +// If you want to modify one, you need to deep-copy it first. +func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) (map[string][]*v1.Pod, error) { + claimedPods, err := dsc.getDaemonPods(ds) if err != nil { return nil, err } @@ -554,18 +710,18 @@ func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controll return ds } -func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error { +func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) (string, error) { // Find out which nodes are running the daemon pods controlled by ds. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { - return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) + return "", fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. nodeList, err := dsc.nodeLister.List(labels.Everything()) if err != nil { - return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err) + return "", fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err) } var nodesNeedingDaemonPods, podsToDelete []string var failedPodsObserved int @@ -612,23 +768,33 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error { } } } - errors := dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods) + + // Find current history of the DaemonSet, and label new pods using the hash label value of the current history when creating them + cur, _, err := dsc.constructHistory(ds) + if err != nil { + return "", fmt.Errorf("failed to construct revisions of DaemonSet: %v", err) + } + + hash := cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] + if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil { + return "", err + } // Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop if failedPodsObserved > 0 { - errors = append(errors, fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name)) + return "", fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name) } - return utilerrors.NewAggregate(errors) + return hash, nil } // syncNodes deletes given pods and creates new daemon set pods on the given nodes // returns slice with erros if any -func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string) []error { +func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error { // We need to set expectations before creating/deleting pods to avoid race conditions. dsKey, err := controller.KeyFunc(ds) if err != nil { - return []error{fmt.Errorf("couldn't get key for object %#v: %v", ds, err)} + return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) } createDiff := len(nodesNeedingDaemonPods) @@ -649,7 +815,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff) createWait := sync.WaitGroup{} createWait.Add(createDiff) - template := util.GetPodTemplateWithGeneration(ds.Spec.Template, ds.Spec.TemplateGeneration) + template := util.CreatePodTemplate(ds.Spec.Template, ds.Spec.TemplateGeneration, hash) for i := 0; i < createDiff; i++ { go func(ix int) { defer createWait.Done() @@ -685,7 +851,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet for err := range errCh { errors = append(errors, err) } - return errors + return utilerrors.NewAggregate(errors) } func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int) error { @@ -767,7 +933,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) numberAvailable++ } } - if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod) { + if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod, ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey]) { updatedNumberScheduled++ } } @@ -825,21 +991,28 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { return dsc.updateDaemonSetStatus(ds) } - if err := dsc.manage(ds); err != nil { + hash, err := dsc.manage(ds) + if err != nil { return err } // Process rolling updates if we're ready. if dsc.expectations.SatisfiedExpectations(dsKey) { switch ds.Spec.UpdateStrategy.Type { + case extensions.OnDeleteDaemonSetStrategyType: case extensions.RollingUpdateDaemonSetStrategyType: - err = dsc.rollingUpdate(ds) + err = dsc.rollingUpdate(ds, hash) } if err != nil { return err } } + err = dsc.cleanupHistory(ds) + if err != nil { + return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err) + } + return dsc.updateDaemonSetStatus(ds) } diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index cf30584e25e..51cdab11944 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/controller" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/securitycontext" + labelsutil "k8s.io/kubernetes/pkg/util/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" ) @@ -84,6 +85,7 @@ func getKey(ds *extensions.DaemonSet, t *testing.T) string { } func newDaemonSet(name string) *extensions.DaemonSet { + two := int32(2) return &extensions.DaemonSet{ TypeMeta: metav1.TypeMeta{APIVersion: testapi.Extensions.GroupVersion().String()}, ObjectMeta: metav1.ObjectMeta{ @@ -92,6 +94,10 @@ func newDaemonSet(name string) *extensions.DaemonSet { Namespace: metav1.NamespaceDefault, }, Spec: extensions.DaemonSetSpec{ + RevisionHistoryLimit: &two, + UpdateStrategy: extensions.DaemonSetUpdateStrategy{ + Type: extensions.OnDeleteDaemonSetStrategyType, + }, Selector: &metav1.LabelSelector{MatchLabels: simpleDaemonSetLabel}, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -139,11 +145,18 @@ func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string] } func newPod(podName string, nodeName string, label map[string]string, ds *extensions.DaemonSet) *v1.Pod { + // Add hash unique label to the pod + newLabels := label + if ds != nil { + hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount)) + newLabels = labelsutil.CloneAndAddLabel(label, extensions.DefaultDaemonSetUniqueLabelKey, hash) + } + pod := &v1.Pod{ TypeMeta: metav1.TypeMeta{APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()}, ObjectMeta: metav1.ObjectMeta{ GenerateName: podName, - Labels: label, + Labels: newLabels, Namespace: metav1.NamespaceDefault, }, Spec: v1.PodSpec{ @@ -168,7 +181,8 @@ func newPod(podName string, nodeName string, label map[string]string, ds *extens func addPods(podStore cache.Store, nodeName string, label map[string]string, ds *extensions.DaemonSet, number int) { for i := 0; i < number; i++ { - podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label, ds)) + pod := newPod(fmt.Sprintf("%s-", nodeName), nodeName, label, ds) + podStore.Add(pod) } } @@ -251,6 +265,7 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, manager := NewDaemonSetsController( informerFactory.Extensions().V1beta1().DaemonSets(), + informerFactory.Apps().V1beta1().ControllerRevisions(), informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), clientset, diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index d474fb2d31e..9be6960e430 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -17,28 +17,37 @@ limitations under the License. package daemon import ( + "bytes" + "encoding/json" "fmt" + "sort" "github.com/golang/glog" + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/runtime" intstrutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/daemon/util" + labelsutil "k8s.io/kubernetes/pkg/util/labels" ) // rollingUpdate deletes old daemon set pods making sure that no more than // ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable -func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { +func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet, hash string) error { nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } - _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods) + _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash) maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods) if err != nil { return fmt.Errorf("Couldn't get unavailable numbers: %v", err) @@ -46,7 +55,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods) // for oldPods delete all not running pods - var podsToDelete []string + var oldPodsToDelete []string glog.V(4).Infof("Marking all unavailable old pods for deletion") for _, pod := range oldUnavailablePods { // Skip terminating pods. We won't delete them again @@ -54,7 +63,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { continue } glog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - podsToDelete = append(podsToDelete, pod.Name) + oldPodsToDelete = append(oldPodsToDelete, pod.Name) } glog.V(4).Infof("Marking old pods for deletion") @@ -64,20 +73,311 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { break } glog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - podsToDelete = append(podsToDelete, pod.Name) + oldPodsToDelete = append(oldPodsToDelete, pod.Name) numUnavailable++ } - errors := dsc.syncNodes(ds, podsToDelete, []string{}) - return utilerrors.NewAggregate(errors) + return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash) } -func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) ([]*v1.Pod, []*v1.Pod) { +func (dsc *DaemonSetsController) constructHistory(ds *extensions.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) { + var histories []*apps.ControllerRevision + var currentHistories []*apps.ControllerRevision + histories, err = dsc.controlledHistories(ds) + if err != nil { + return nil, nil, err + } + for _, history := range histories { + // Add the unique label if it's not already added to the history + // We use history name instead of computing hash, so that we don't need to worry about hash collision + if _, ok := history.Labels[extensions.DefaultDaemonSetUniqueLabelKey]; !ok { + var clone interface{} + clone, err = api.Scheme.DeepCopy(history) + if err != nil { + return nil, nil, err + } + toUpdate := clone.(*apps.ControllerRevision) + toUpdate.Labels[extensions.DefaultDaemonSetUniqueLabelKey] = toUpdate.Name + history, err = dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Update(toUpdate) + if err != nil { + return nil, nil, err + } + } + // Compare histories with ds to separate cur and old history + found := false + found, err = match(ds, history) + if err != nil { + return nil, nil, err + } + if found { + currentHistories = append(currentHistories, history) + } else { + old = append(old, history) + } + } + + currRevision := maxRevision(old) + 1 + switch len(currentHistories) { + case 0: + // Create a new history if the current one isn't found + cur, err = dsc.snapshot(ds, currRevision) + if err != nil { + return nil, nil, err + } + default: + cur, err = dsc.dedupCurHistories(ds, currentHistories) + if err != nil { + return nil, nil, err + } + // Update revision number if necessary + if cur.Revision < currRevision { + var clone interface{} + clone, err = api.Scheme.DeepCopy(cur) + if err != nil { + return nil, nil, err + } + toUpdate := clone.(*apps.ControllerRevision) + toUpdate.Revision = currRevision + _, err = dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Update(toUpdate) + if err != nil { + return nil, nil, err + } + } + } + // Label ds with current history's unique label as well + if ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey] != cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] { + var clone interface{} + clone, err = api.Scheme.DeepCopy(ds) + if err != nil { + return nil, nil, err + } + toUpdate := clone.(*extensions.DaemonSet) + if toUpdate.Labels == nil { + toUpdate.Labels = make(map[string]string) + } + toUpdate.Labels[extensions.DefaultDaemonSetUniqueLabelKey] = cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] + _, err = dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).UpdateStatus(toUpdate) + } + return cur, old, err +} + +func (dsc *DaemonSetsController) cleanupHistory(ds *extensions.DaemonSet) error { + nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) + } + _, old, err := dsc.constructHistory(ds) + if err != nil { + return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err) + } + + toKeep := int(*ds.Spec.RevisionHistoryLimit) + toKill := len(old) - toKeep + if toKill <= 0 { + return nil + } + + // Find all hashes of live pods + liveHashes := make(map[string]bool) + for _, pods := range nodesToDaemonPods { + for _, pod := range pods { + if hash := pod.Labels[extensions.DefaultDaemonSetUniqueLabelKey]; len(hash) > 0 { + liveHashes[hash] = true + } + } + } + + // Find all live history with the above hashes + liveHistory := make(map[string]bool) + for _, history := range old { + if hash := history.Labels[extensions.DefaultDaemonSetUniqueLabelKey]; liveHashes[hash] { + liveHistory[history.Name] = true + } + } + + // Clean up old history from smallest to highest revision (from oldest to newest) + sort.Sort(historiesByRevision(old)) + for _, history := range old { + if toKill <= 0 { + break + } + if liveHistory[history.Name] { + continue + } + // Clean up + err := dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Delete(history.Name, nil) + if err != nil { + return err + } + toKill-- + } + return nil +} + +// maxRevision returns the max revision number of the given list of histories +func maxRevision(histories []*apps.ControllerRevision) int64 { + max := int64(0) + for _, history := range histories { + if history.Revision > max { + max = history.Revision + } + } + return max +} + +func (dsc *DaemonSetsController) dedupCurHistories(ds *extensions.DaemonSet, curHistories []*apps.ControllerRevision) (*apps.ControllerRevision, error) { + if len(curHistories) == 1 { + return curHistories[0], nil + } + var maxRevision int64 + var keepCur *apps.ControllerRevision + for _, cur := range curHistories { + if cur.Revision >= maxRevision { + keepCur = cur + maxRevision = cur.Revision + } + } + // Clean up duplicates and relabel pods + for _, cur := range curHistories { + if cur.Name == keepCur.Name { + continue + } + // Relabel pods before dedup + pods, err := dsc.getDaemonPods(ds) + if err != nil { + return nil, err + } + for _, pod := range pods { + if pod.Labels[extensions.DefaultDaemonSetUniqueLabelKey] != keepCur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] { + clone, err := api.Scheme.DeepCopy(pod) + if err != nil { + return nil, err + } + toUpdate := clone.(*v1.Pod) + if toUpdate.Labels == nil { + toUpdate.Labels = make(map[string]string) + } + toUpdate.Labels[extensions.DefaultDaemonSetUniqueLabelKey] = keepCur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] + _, err = dsc.kubeClient.Core().Pods(ds.Namespace).Update(toUpdate) + if err != nil { + return nil, err + } + } + } + // Remove duplicates + err = dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Delete(cur.Name, nil) + if err != nil { + return nil, err + } + } + return keepCur, nil +} + +// controlledHistories returns all ControllerRevisions controlled by the given DaemonSet +// Note that returned histories are pointers to objects in the cache. +// If you want to modify one, you need to deep-copy it first. +func (dsc *DaemonSetsController) controlledHistories(ds *extensions.DaemonSet) ([]*apps.ControllerRevision, error) { + var result []*apps.ControllerRevision + selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) + if err != nil { + return nil, err + } + histories, err := dsc.historyLister.List(selector) + if err != nil { + return nil, err + } + for _, history := range histories { + // Skip history that doesn't belong to the DaemonSet + if controllerRef := controller.GetControllerOf(history); controllerRef == nil || controllerRef.UID != ds.UID { + continue + } + result = append(result, history) + } + return result, nil +} + +// match check if ds template is semantically equal to the template stored in history +func match(ds *extensions.DaemonSet, history *apps.ControllerRevision) (bool, error) { + template, err := decodeHistory(history) + return apiequality.Semantic.DeepEqual(&ds.Spec.Template, template), err +} + +func decodeHistory(history *apps.ControllerRevision) (*v1.PodTemplateSpec, error) { + raw := history.Data.Raw + decoder := json.NewDecoder(bytes.NewBuffer(raw)) + template := v1.PodTemplateSpec{} + err := decoder.Decode(&template) + return &template, err +} + +func encodeTemplate(template *v1.PodTemplateSpec) ([]byte, error) { + buffer := new(bytes.Buffer) + encoder := json.NewEncoder(buffer) + err := encoder.Encode(template) + return buffer.Bytes(), err +} + +func (dsc *DaemonSetsController) snapshot(ds *extensions.DaemonSet, revision int64) (*apps.ControllerRevision, error) { + encodedTemplate, err := encodeTemplate(&ds.Spec.Template) + if err != nil { + return nil, err + } + hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount)) + name := ds.Name + "-" + hash + history := &apps.ControllerRevision{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ds.Namespace, + Labels: labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, extensions.DefaultDaemonSetUniqueLabelKey, hash), + Annotations: ds.Annotations, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(ds)}, + }, + Data: runtime.RawExtension{Raw: encodedTemplate}, + Revision: revision, + } + + history, err = dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Create(history) + if errors.IsAlreadyExists(err) { + // TODO: Is it okay to get from historyLister? + existedHistory, getErr := dsc.kubeClient.AppsV1beta1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{}) + if getErr != nil { + return nil, getErr + } + // Check if we already created it + done, err := match(ds, existedHistory) + if err != nil { + return nil, err + } + if done { + return existedHistory, nil + } + + // Handle name collisions between different history + // TODO: Is it okay to get from dsLister? + currDS, getErr := dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{}) + if getErr != nil { + return nil, getErr + } + if currDS.Status.CollisionCount == nil { + currDS.Status.CollisionCount = new(int64) + } + *currDS.Status.CollisionCount++ + _, updateErr := dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).UpdateStatus(currDS) + if updateErr != nil { + return nil, updateErr + } + glog.V(2).Infof("Found a hash collision for DaemonSet %q - bumping collisionCount to %d to resolve it", ds.Name, *currDS.Status.CollisionCount) + return nil, err + } + return history, err +} + +func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod, hash string) ([]*v1.Pod, []*v1.Pod) { var newPods []*v1.Pod var oldPods []*v1.Pod for _, pods := range nodeToDaemonPods { for _, pod := range pods { - if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod) { + if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod, hash) { newPods = append(newPods, pod) } else { oldPods = append(oldPods, pod) @@ -129,3 +429,11 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *extensions.DaemonSet, glog.V(4).Infof(" DaemonSet %s/%s, maxUnavailable: %d, numUnavailable: %d", ds.Namespace, ds.Name, maxUnavailable, numUnavailable) return maxUnavailable, numUnavailable, nil } + +type historiesByRevision []*apps.ControllerRevision + +func (h historiesByRevision) Len() int { return len(h) } +func (h historiesByRevision) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h historiesByRevision) Less(i, j int) bool { + return h[i].Revision < h[j].Revision +} diff --git a/pkg/controller/daemon/util/daemonset_util.go b/pkg/controller/daemon/util/daemonset_util.go index 0b101e769c6..a6acf970864 100644 --- a/pkg/controller/daemon/util/daemonset_util.go +++ b/pkg/controller/daemon/util/daemonset_util.go @@ -29,9 +29,10 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" ) -// GetPodTemplateWithHash returns copy of provided template with additional -// label which contains hash of provided template and sets default daemon tolerations. -func GetPodTemplateWithGeneration(template v1.PodTemplateSpec, generation int64) v1.PodTemplateSpec { +// CreatePodTemplate returns copy of provided template with additional +// label which contains templateGeneration (for backward compatibility), +// hash of provided template and sets default daemon tolerations. +func CreatePodTemplate(template v1.PodTemplateSpec, generation int64, hash string) v1.PodTemplateSpec { obj, _ := api.Scheme.DeepCopy(template) newTemplate := obj.(v1.PodTemplateSpec) // DaemonSet pods shouldn't be deleted by NodeController in case of node problems. @@ -60,14 +61,19 @@ func GetPodTemplateWithGeneration(template v1.PodTemplateSpec, generation int64) extensions.DaemonSetTemplateGenerationKey, templateGenerationStr, ) + // TODO: do we need to validate if the DaemonSet is RollingUpdate or not? + if len(hash) > 0 { + newTemplate.ObjectMeta.Labels[extensions.DefaultDaemonSetUniqueLabelKey] = hash + } return newTemplate } -// IsPodUpdate checks if pod contains label with provided hash -func IsPodUpdated(dsTemplateGeneration int64, pod *v1.Pod) bool { - podTemplateGeneration, generationExists := pod.ObjectMeta.Labels[extensions.DaemonSetTemplateGenerationKey] - dsTemplateGenerationStr := fmt.Sprint(dsTemplateGeneration) - return generationExists && podTemplateGeneration == dsTemplateGenerationStr +// IsPodUpdate checks if pod contains label value that either matches templateGeneration or hash +func IsPodUpdated(dsTemplateGeneration int64, pod *v1.Pod, hash string) bool { + // Compare with hash to see if the pod is updated, need to maintain backward compatibility of templateGeneration + templateMatches := pod.Labels[extensions.DaemonSetTemplateGenerationKey] == fmt.Sprint(dsTemplateGeneration) + hashMatches := len(hash) > 0 && pod.Labels[extensions.DefaultDaemonSetUniqueLabelKey] == hash + return hashMatches || templateMatches } // SplitByAvailablePods splits provided daemon set pods by availabilty diff --git a/pkg/controller/daemon/util/daemonset_util_test.go b/pkg/controller/daemon/util/daemonset_util_test.go index 630b60ebdd9..e4fa22c4949 100644 --- a/pkg/controller/daemon/util/daemonset_util_test.go +++ b/pkg/controller/daemon/util/daemonset_util_test.go @@ -47,46 +47,118 @@ func newPod(podName string, nodeName string, label map[string]string) *v1.Pod { } func TestIsPodUpdated(t *testing.T) { + templateGeneration := int64(12345) + hash := "55555" + labels := map[string]string{extensions.DaemonSetTemplateGenerationKey: fmt.Sprint(templateGeneration), extensions.DefaultDaemonSetUniqueLabelKey: hash} + labelsNoHash := map[string]string{extensions.DaemonSetTemplateGenerationKey: fmt.Sprint(templateGeneration)} tests := []struct { + test string templateGeneration int64 pod *v1.Pod + hash string isUpdated bool }{ { - int64(12345), - newPod("pod1", "node1", map[string]string{extensions.DaemonSetTemplateGenerationKey: "12345"}), + "templateGeneration and hash both match", + templateGeneration, + newPod("pod1", "node1", labels), + hash, true, }, { - int64(12355), - newPod("pod1", "node1", map[string]string{extensions.DaemonSetTemplateGenerationKey: "12345"}), + "templateGeneration matches, hash doesn't", + templateGeneration, + newPod("pod1", "node1", labels), + hash + "123", + true, + }, + { + "templateGeneration matches, no hash label, has hash", + templateGeneration, + newPod("pod1", "node1", labelsNoHash), + hash, + true, + }, + { + "templateGeneration matches, no hash label, no hash", + templateGeneration, + newPod("pod1", "node1", labelsNoHash), + "", + true, + }, + { + "templateGeneration matches, has hash label, no hash", + templateGeneration, + newPod("pod1", "node1", labels), + "", + true, + }, + { + "templateGeneration doesn't match, hash does", + templateGeneration + 1, + newPod("pod1", "node1", labels), + hash, + true, + }, + { + "templateGeneration and hash don't match", + templateGeneration + 1, + newPod("pod1", "node1", labels), + hash + "123", false, }, { - int64(12355), + "empty labels, no hash", + templateGeneration, newPod("pod1", "node1", map[string]string{}), + "", false, }, { - int64(12355), + "empty labels", + templateGeneration, + newPod("pod1", "node1", map[string]string{}), + hash, + false, + }, + { + "no labels", + templateGeneration, newPod("pod1", "node1", nil), + hash, false, }, } for _, test := range tests { - updated := IsPodUpdated(test.templateGeneration, test.pod) + updated := IsPodUpdated(test.templateGeneration, test.pod, test.hash) if updated != test.isUpdated { - t.Errorf("IsPodUpdated returned wrong value. Expected %t, got %t. TemplateGeneration: %d", test.isUpdated, updated, test.templateGeneration) + t.Errorf("%s: IsPodUpdated returned wrong value. Expected %t, got %t", test.test, test.isUpdated, updated) } } } -func TestGetPodTemplateWithGeneration(t *testing.T) { - generation := int64(1) - podTemplateSpec := v1.PodTemplateSpec{} - newPodTemplate := GetPodTemplateWithGeneration(podTemplateSpec, generation) - label, exists := newPodTemplate.ObjectMeta.Labels[extensions.DaemonSetTemplateGenerationKey] - if !exists || label != fmt.Sprint(generation) { - t.Errorf("Error in getting podTemplateSpec with label generation. Exists: %t, label: %s", exists, label) +func TestCreatePodTemplate(t *testing.T) { + tests := []struct { + templateGeneration int64 + hash string + expectUniqueLabel bool + }{ + {int64(1), "", false}, + {int64(2), "3242341807", true}, + } + for _, test := range tests { + podTemplateSpec := v1.PodTemplateSpec{} + newPodTemplate := CreatePodTemplate(podTemplateSpec, test.templateGeneration, test.hash) + val, exists := newPodTemplate.ObjectMeta.Labels[extensions.DaemonSetTemplateGenerationKey] + if !exists || val != fmt.Sprint(test.templateGeneration) { + t.Errorf("Expected podTemplateSpec to have generation label value: %d, got: %s", test.templateGeneration, val) + } + val, exists = newPodTemplate.ObjectMeta.Labels[extensions.DefaultDaemonSetUniqueLabelKey] + if test.expectUniqueLabel && (!exists || val != test.hash) { + t.Errorf("Expected podTemplateSpec to have hash label value: %s, got: %s", test.hash, val) + } + if !test.expectUniqueLabel && exists { + t.Errorf("Expected podTemplateSpec to have no hash label, got: %s", val) + } } } diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 0268572fcd8..7e43d636ca1 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -288,7 +288,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *extensions.Deployment, rsLis return nil, err } newRSTemplate := templateCopy.(v1.PodTemplateSpec) - podTemplateSpecHash := fmt.Sprintf("%d", deploymentutil.GetPodTemplateSpecHash(&newRSTemplate, d.Status.CollisionCount)) + podTemplateSpecHash := fmt.Sprintf("%d", controller.ComputeHash(&newRSTemplate, d.Status.CollisionCount)) newRSTemplate.Labels = labelsutil.CloneAndAddLabel(d.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) // Add podTemplateHash label to selector. newRSSelector := labelsutil.CloneSelectorAndAddLabel(d.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) diff --git a/pkg/controller/deployment/util/hash_test.go b/pkg/controller/deployment/util/hash_test.go index d1d944df6d1..9020f92218b 100644 --- a/pkg/controller/deployment/util/hash_test.go +++ b/pkg/controller/deployment/util/hash_test.go @@ -24,6 +24,7 @@ import ( "testing" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/controller" hashutil "k8s.io/kubernetes/pkg/util/hash" ) @@ -110,7 +111,7 @@ func TestPodTemplateSpecHash(t *testing.T) { specJson := strings.Replace(podSpec, "@@VERSION@@", strconv.Itoa(i), 1) spec := v1.PodTemplateSpec{} json.Unmarshal([]byte(specJson), &spec) - hash := GetPodTemplateSpecHash(&spec, nil) + hash := controller.ComputeHash(&spec, nil) if v, ok := seenHashes[hash]; ok { t.Errorf("Hash collision, old: %d new: %d", v, i) break @@ -139,6 +140,6 @@ func BenchmarkFnv(b *testing.B) { json.Unmarshal([]byte(podSpec), &spec) for i := 0; i < b.N; i++ { - GetPodTemplateSpecHash(&spec, nil) + controller.ComputeHash(&spec, nil) } } diff --git a/pkg/controller/deployment/util/pod_util.go b/pkg/controller/deployment/util/pod_util.go index 1954ef7076d..9b54953d12e 100644 --- a/pkg/controller/deployment/util/pod_util.go +++ b/pkg/controller/deployment/util/pod_util.go @@ -17,9 +17,6 @@ limitations under the License. package util import ( - "encoding/binary" - "hash/fnv" - "github.com/golang/glog" errorsutil "k8s.io/apimachinery/pkg/util/errors" @@ -28,23 +25,8 @@ import ( v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/client/retry" - hashutil "k8s.io/kubernetes/pkg/util/hash" ) -func GetPodTemplateSpecHash(template *v1.PodTemplateSpec, uniquifier *int64) uint32 { - podTemplateSpecHasher := fnv.New32a() - hashutil.DeepHashObject(podTemplateSpecHasher, *template) - - // Add uniquifier in the hash if it exists. - if uniquifier != nil { - uniquifierBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(uniquifierBytes, uint64(*uniquifier)) - podTemplateSpecHasher.Write(uniquifierBytes) - } - - return podTemplateSpecHasher.Sum32() -} - // TODO: use client library instead when it starts to support update retries // see https://github.com/kubernetes/kubernetes/issues/21479 type updatePodFunc func(pod *v1.Pod) error diff --git a/pkg/controller/deployment/util/pod_util_test.go b/pkg/controller/deployment/util/pod_util_test.go deleted file mode 100644 index c312a372fb1..00000000000 --- a/pkg/controller/deployment/util/pod_util_test.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "math" - "testing" - - "k8s.io/kubernetes/pkg/api/v1" -) - -func int64P(num int64) *int64 { - return &num -} - -func TestGetPodTemplateSpecHash(t *testing.T) { - tests := []struct { - name string - template *v1.PodTemplateSpec - collisionCount *int64 - otherCollisionCount *int64 - }{ - { - name: "simple", - template: &v1.PodTemplateSpec{}, - collisionCount: int64P(1), - otherCollisionCount: int64P(2), - }, - { - name: "using math.MaxInt64", - template: &v1.PodTemplateSpec{}, - collisionCount: nil, - otherCollisionCount: int64P(int64(math.MaxInt64)), - }, - } - - for _, test := range tests { - hash := GetPodTemplateSpecHash(test.template, test.collisionCount) - otherHash := GetPodTemplateSpecHash(test.template, test.otherCollisionCount) - - if hash == otherHash { - t.Errorf("expected different hashes but got the same: %d", hash) - } - } -} diff --git a/pkg/controller/deployment/util/replicaset_util.go b/pkg/controller/deployment/util/replicaset_util.go index a816e2522c3..5a58cd2bc99 100644 --- a/pkg/controller/deployment/util/replicaset_util.go +++ b/pkg/controller/deployment/util/replicaset_util.go @@ -28,6 +28,7 @@ import ( unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1" extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/retry" + "k8s.io/kubernetes/pkg/controller" labelsutil "k8s.io/kubernetes/pkg/util/labels" ) @@ -76,5 +77,5 @@ func GetReplicaSetHash(rs *extensions.ReplicaSet, uniquifier *int64) (string, er } rsTemplate := template.(v1.PodTemplateSpec) rsTemplate.Labels = labelsutil.CloneAndRemoveLabel(rsTemplate.Labels, extensions.DefaultDeploymentUniqueLabelKey) - return fmt.Sprintf("%d", GetPodTemplateSpecHash(&rsTemplate, uniquifier)), nil + return fmt.Sprintf("%d", controller.ComputeHash(&rsTemplate, uniquifier)), nil } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index ddf83429177..d14e8a46f33 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -86,6 +86,7 @@ func init() { rbac.NewRule("list", "watch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), rbac.NewRule("list", "watch", "create", "delete", "patch").Groups(legacyGroup).Resources("pods").RuleOrDie(), rbac.NewRule("create").Groups(legacyGroup).Resources("pods/binding").RuleOrDie(), + rbac.NewRule("list", "watch", "create", "delete", "update", "patch").Groups(appsGroup).Resources("controllerrevisions").RuleOrDie(), eventsRule(), }, }) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index 9a9e031bdd0..894734ffd3a 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -180,6 +180,17 @@ items: - pods/binding verbs: - create + - apiGroups: + - apps + resources: + - controllerrevisions + verbs: + - create + - delete + - list + - patch + - update + - watch - apiGroups: - "" resources: