diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 165d6b33139..869d6b03875 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -64,7 +64,7 @@ type CMServer struct { CloudConfigFile string ConcurrentEndpointSyncs int ConcurrentRCSyncs int - ConcurrentDCSyncs int + ConcurrentDSCSyncs int ServiceSyncPeriod time.Duration NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -100,7 +100,7 @@ func NewCMServer() *CMServer { Address: net.ParseIP("127.0.0.1"), ConcurrentEndpointSyncs: 5, ConcurrentRCSyncs: 5, - ConcurrentDCSyncs: 2, + ConcurrentDSCSyncs: 2, ServiceSyncPeriod: 5 * time.Minute, NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, @@ -216,8 +216,8 @@ func (s *CMServer) Run(_ []string) error { controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient, replicationControllerPkg.BurstReplicas) go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) - daemonManager := daemon.NewDaemonManager(kubeClient) - go daemonManager.Run(s.ConcurrentDCSyncs, util.NeverStop) + go daemon.NewDaemonSetsController(kubeClient). + Run(s.ConcurrentDSCSyncs, util.NeverStop) cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index ad44a1c0e2b..70d653ba088 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -47,6 +47,7 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/pflag" + "k8s.io/kubernetes/pkg/controller/daemon" ) // CMServer is the main context object for the controller manager. @@ -113,6 +114,9 @@ func (s *CMServer) Run(_ []string) error { controllerManager := replicationcontroller.NewReplicationManager(kubeClient, replicationcontroller.BurstReplicas) go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) + go daemon.NewDaemonSetsController(kubeClient). + Run(s.ConcurrentDSCSyncs, util.NeverStop) + //TODO(jdef) should eventually support more cloud providers here if s.CloudProvider != mesos.ProviderName { glog.Fatalf("Only provider %v is supported, you specified %v", mesos.ProviderName, s.CloudProvider) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index dfebd1197bb..d49d720981b 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -26,11 +26,11 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/api/validation" + "k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/expapi" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" ) @@ -215,8 +215,8 @@ func NewControllerExpectations() *ControllerExpectations { type PodControlInterface interface { // CreateReplica creates new replicated pods according to the spec. CreateReplica(namespace string, controller *api.ReplicationController) error - // CreateReplicaOnNodes creates a new pod according to the spec, on a specified list of nodes. - CreateReplicaOnNode(namespace string, controller *expapi.DaemonSet, nodeNames string) error + // CreateReplicaOnNode creates a new pod according to the spec on the specified node. + CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error // DeletePod deletes the pod identified by podID. DeletePod(namespace string, podID string) error } @@ -294,13 +294,13 @@ func (r RealPodControl) CreateReplica(namespace string, controller *api.Replicat return nil } -func (r RealPodControl) CreateReplicaOnNode(namespace string, controller *expapi.DaemonSet, nodeName string) error { - desiredLabels := getReplicaLabelSet(controller.Spec.Template) - desiredAnnotations, err := getReplicaAnnotationSet(controller.Spec.Template, controller) +func (r RealPodControl) CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error { + desiredLabels := getReplicaLabelSet(ds.Spec.Template) + desiredAnnotations, err := getReplicaAnnotationSet(ds.Spec.Template, ds) if err != nil { return err } - prefix := getReplicaPrefix(controller.Name) + prefix := getReplicaPrefix(ds.Name) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -309,19 +309,20 @@ func (r RealPodControl) CreateReplicaOnNode(namespace string, controller *expapi GenerateName: prefix, }, } - if err := api.Scheme.Convert(&controller.Spec.Template.Spec, &pod.Spec); err != nil { + if err := api.Scheme.Convert(&ds.Spec.Template.Spec, &pod.Spec); err != nil { return fmt.Errorf("unable to convert pod template: %v", err) } + // if a pod does not have labels then it cannot be controlled by any controller if labels.Set(pod.Labels).AsSelector().Empty() { return fmt.Errorf("unable to create pod replica, no labels") } pod.Spec.NodeName = nodeName if newPod, err := r.KubeClient.Pods(namespace).Create(pod); err != nil { - r.Recorder.Eventf(controller, "failedCreate", "Error creating: %v", err) + r.Recorder.Eventf(ds, "failedCreate", "Error creating: %v", err) return fmt.Errorf("unable to create pod replica: %v", err) } else { - glog.V(4).Infof("Controller %v created pod %v", controller.Name, newPod.Name) - r.Recorder.Eventf(controller, "successfulCreate", "Created pod: %v", newPod.Name) + glog.V(4).Infof("Controller %v created pod %v", ds.Name, newPod.Name) + r.Recorder.Eventf(ds, "successfulCreate", "Created pod: %v", newPod.Name) } return nil diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go new file mode 100644 index 00000000000..e6db35b14d1 --- /dev/null +++ b/pkg/controller/daemon/controller.go @@ -0,0 +1,497 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemon + +import ( + "reflect" + "sort" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/experimental" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" +) + +const ( + // Daemon sets will periodically check that their daemon pods are running as expected. + FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable. + // Nodes don't need relisting. + FullNodeResyncPeriod = 0 + // Daemon pods don't need relisting. + FullDaemonPodResyncPeriod = 0 + // If sending a status upate to API server fails, we retry a finite number of times. + StatusUpdateRetries = 1 +) + +// DaemonSetsController is responsible for synchronizing DaemonSet objects stored +// in the system with actual running pods. +type DaemonSetsController struct { + kubeClient client.Interface + podControl controller.PodControlInterface + + // To allow injection of syncDaemonSet for testing. + syncHandler func(dsKey string) error + // A TTLCache of pod creates/deletes each ds expects to see + expectations controller.ControllerExpectationsInterface + // A store of daemon sets + dsStore cache.StoreToDaemonSetLister + // A store of pods + podStore cache.StoreToPodLister + // A store of nodes + nodeStore cache.StoreToNodeLister + // Watches changes to all daemon sets. + dsController *framework.Controller + // Watches changes to all pods + podController *framework.Controller + // Watches changes to all nodes. + nodeController *framework.Controller + // Daemon sets that need to be synced. + queue *workqueue.Type +} + +func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + + dsc := &DaemonSetsController{ + kubeClient: kubeClient, + podControl: controller.RealPodControl{ + KubeClient: kubeClient, + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemon-set"}), + }, + expectations: controller.NewControllerExpectations(), + queue: workqueue.New(), + } + // Manage addition/update of daemon sets. + dsc.dsStore.Store, dsc.dsController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return dsc.kubeClient.Experimental().DaemonSets(api.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return dsc.kubeClient.Experimental().DaemonSets(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &experimental.DaemonSet{}, + FullDaemonSetResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ds := obj.(*experimental.DaemonSet) + glog.V(4).Infof("Adding daemon set %s", ds.Name) + dsc.enqueueDaemonSet(obj) + }, + UpdateFunc: func(old, cur interface{}) { + oldDS := old.(*experimental.DaemonSet) + glog.V(4).Infof("Updating daemon set %s", oldDS.Name) + dsc.enqueueDaemonSet(cur) + }, + DeleteFunc: func(obj interface{}) { + ds := obj.(*experimental.DaemonSet) + glog.V(4).Infof("Deleting daemon set %s", ds.Name) + dsc.enqueueDaemonSet(obj) + }, + }, + ) + // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete + // more pods until all the effects (expectations) of a daemon set's create/delete have been observed. + dsc.podStore.Store, dsc.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return dsc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return dsc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Pod{}, + FullDaemonPodResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: dsc.addPod, + UpdateFunc: dsc.updatePod, + DeleteFunc: dsc.deletePod, + }, + ) + // Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change, + dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return dsc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return dsc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Node{}, + FullNodeResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: dsc.addNode, + UpdateFunc: dsc.updateNode, + }, + ) + dsc.syncHandler = dsc.syncDaemonSet + return dsc +} + +// Run begins watching and syncing daemon sets. +func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { + go dsc.dsController.Run(stopCh) + go dsc.podController.Run(stopCh) + go dsc.nodeController.Run(stopCh) + for i := 0; i < workers; i++ { + go util.Until(dsc.worker, time.Second, stopCh) + } + <-stopCh + glog.Infof("Shutting down Daemon Set Controller") + dsc.queue.ShutDown() +} + +func (dsc *DaemonSetsController) worker() { + for { + func() { + dsKey, quit := dsc.queue.Get() + if quit { + return + } + defer dsc.queue.Done(dsKey) + err := dsc.syncHandler(dsKey.(string)) + if err != nil { + glog.Errorf("Error syncing daemon set with key %s: %v", dsKey.(string), err) + } + }() + } +} + +func (dsc *DaemonSetsController) enqueueAllDaemonSets() { + glog.V(4).Infof("Enqueueing all daemon sets") + ds, err := dsc.dsStore.List() + if err != nil { + glog.Errorf("Error enqueueing daemon sets: %v", err) + return + } + for i := range ds { + dsc.enqueueDaemonSet(&ds[i]) + } +} + +func (dsc *DaemonSetsController) enqueueDaemonSet(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + dsc.queue.Add(key) +} + +func (dsc *DaemonSetsController) getPodDaemonSet(pod *api.Pod) *experimental.DaemonSet { + sets, err := dsc.dsStore.GetPodDaemonSets(pod) + if err != nil { + glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name) + return nil + } + // More than two items in this list indicates user error. If two daemon + // sets overlap, sort by creation timestamp, subsort by name, then pick + // the first. + glog.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels) + sort.Sort(byCreationTimestamp(sets)) + return &sets[0] +} + +func (dsc *DaemonSetsController) addPod(obj interface{}) { + pod := obj.(*api.Pod) + glog.V(4).Infof("Pod %s added.", pod.Name) + if ds := dsc.getPodDaemonSet(pod); ds != nil { + dsKey, err := controller.KeyFunc(ds) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", ds, err) + return + } + dsc.expectations.CreationObserved(dsKey) + dsc.enqueueDaemonSet(ds) + } +} + +// When a pod is updated, figure out what sets manage it and wake them +// up. If the labels of the pod have changed we need to awaken both the old +// and new set. old and cur must be *api.Pod types. +func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + // A periodic relist will send update events for all known pods. + return + } + curPod := cur.(*api.Pod) + glog.V(4).Infof("Pod %s updated.", curPod.Name) + if curDS := dsc.getPodDaemonSet(curPod); curDS != nil { + dsc.enqueueDaemonSet(curDS) + } + oldPod := old.(*api.Pod) + // If the labels have not changed, then the daemon set responsible for + // the pod is the same as it was before. In that case we have enqueued the daemon + // set above, and do not have to enqueue the set again. + if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { + // It's ok if both oldDS and curDS are the same, because curDS will set + // the expectations on its run so oldDS will have no effect. + if oldDS := dsc.getPodDaemonSet(oldPod); oldDS != nil { + dsc.enqueueDaemonSet(oldDS) + } + } +} + +func (dsc *DaemonSetsController) deletePod(obj interface{}) { + pod, ok := obj.(*api.Pod) + glog.V(4).Infof("Pod %s deleted.", pod.Name) + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the pod + // changed labels the new rc will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %+v", obj) + return + } + pod, ok = tombstone.Obj.(*api.Pod) + if !ok { + glog.Errorf("Tombstone contained object that is not a pod %+v", obj) + return + } + } + if ds := dsc.getPodDaemonSet(pod); ds != nil { + dsKey, err := controller.KeyFunc(ds) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", ds, err) + return + } + dsc.expectations.DeletionObserved(dsKey) + dsc.enqueueDaemonSet(ds) + } +} + +func (dsc *DaemonSetsController) addNode(obj interface{}) { + // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too). + dsc.enqueueAllDaemonSets() +} + +func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { + oldNode := old.(*api.Node) + curNode := cur.(*api.Node) + if api.Semantic.DeepEqual(oldNode.Name, curNode.Name) && api.Semantic.DeepEqual(oldNode.Namespace, curNode.Namespace) && api.Semantic.DeepEqual(oldNode.Labels, curNode.Labels) { + // A periodic relist will send update events for all known pods. + return + } + // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too). + dsc.enqueueAllDaemonSets() +} + +// getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes. +func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *experimental.DaemonSet) (map[string][]*api.Pod, error) { + nodeToDaemonPods := make(map[string][]*api.Pod) + daemonPods, err := dsc.podStore.Pods(ds.Namespace).List(labels.Set(ds.Spec.Selector).AsSelector()) + if err != nil { + return nodeToDaemonPods, err + } + for i := range daemonPods.Items { + nodeName := daemonPods.Items[i].Spec.NodeName + nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], &daemonPods.Items[i]) + } + return nodeToDaemonPods, nil +} + +func (dsc *DaemonSetsController) manage(ds *experimental.DaemonSet) { + // Find out which nodes are running the daemon pods selected by ds. + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + glog.Errorf("Error getting node to daemon pod mapping for daemon set %+v: %v", ds, err) + } + + // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon + // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. + nodeList, err := dsc.nodeStore.List() + if err != nil { + glog.Errorf("Couldn't get list of nodes when syncing daemon set %+v: %v", ds, err) + } + var nodesNeedingDaemonPods, podsToDelete []string + for i := range nodeList.Items { + // Check if the node satisfies the daemon set's node selector. + nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector() + shouldRun := nodeSelector.Matches(labels.Set(nodeList.Items[i].Labels)) + // If the daemon set specifies a node name, check that it matches with nodeName. + nodeName := nodeList.Items[i].Name + shouldRun = shouldRun && (ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == nodeName) + daemonPods, isRunning := nodeToDaemonPods[nodeName] + if shouldRun && !isRunning { + // If daemon pod is supposed to be running on node, but isn't, create daemon pod. + nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodeName) + } else if shouldRun && len(daemonPods) > 1 { + // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods. + // TODO: sort the daemon pods by creation time, so the the oldest is preserved. + for i := 1; i < len(daemonPods); i++ { + podsToDelete = append(podsToDelete, daemonPods[i].Name) + } + } else if !shouldRun && isRunning { + // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node. + for i := range daemonPods { + podsToDelete = append(podsToDelete, daemonPods[i].Name) + } + } + } + + // We need to set expectations before creating/deleting pods to avoid race conditions. + dsKey, err := controller.KeyFunc(ds) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", ds, err) + return + } + dsc.expectations.SetExpectations(dsKey, len(nodesNeedingDaemonPods), len(podsToDelete)) + + glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v", ds.Name, nodesNeedingDaemonPods) + for i := range nodesNeedingDaemonPods { + if err := dsc.podControl.CreateReplicaOnNode(ds.Namespace, ds, nodesNeedingDaemonPods[i]); err != nil { + glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) + dsc.expectations.CreationObserved(dsKey) + util.HandleError(err) + } + } + + glog.V(4).Infof("Pods to delete for daemon set %s: %+v", ds.Name, podsToDelete) + for i := range podsToDelete { + if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[i]); err != nil { + glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) + dsc.expectations.DeletionObserved(dsKey) + util.HandleError(err) + } + } +} + +func storeDaemonSetStatus(dsClient client.DaemonSetInterface, ds *experimental.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error { + if ds.Status.DesiredNumberScheduled == desiredNumberScheduled && ds.Status.CurrentNumberScheduled == currentNumberScheduled && ds.Status.NumberMisscheduled == numberMisscheduled { + return nil + } + + var updateErr, getErr error + for i := 0; i <= StatusUpdateRetries; i++ { + ds.Status.DesiredNumberScheduled = desiredNumberScheduled + ds.Status.CurrentNumberScheduled = currentNumberScheduled + ds.Status.NumberMisscheduled = numberMisscheduled + _, updateErr = dsClient.Update(ds) + if updateErr == nil { + // successful update + return nil + } + // Update the set with the latest resource version for the next poll + if ds, getErr = dsClient.Get(ds.Name); getErr != nil { + // If the GET fails we can't trust status.Replicas anymore. This error + // is bound to be more interesting than the update failure. + return getErr + } + } + return updateErr +} + +func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *experimental.DaemonSet) { + glog.Infof("Updating daemon set status") + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + glog.Errorf("Error getting node to daemon pod mapping for daemon set %+v: %v", ds, err) + } + + nodeList, err := dsc.nodeStore.List() + if err != nil { + glog.Errorf("Couldn't get list of nodes when updating daemon set %+v: %v", ds, err) + } + + var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int + for _, node := range nodeList.Items { + nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector() + shouldRun := nodeSelector.Matches(labels.Set(node.Labels)) + numDaemonPods := len(nodeToDaemonPods[node.Name]) + + if numDaemonPods > 0 { + currentNumberScheduled++ + } + + if shouldRun { + desiredNumberScheduled++ + } else if numDaemonPods >= 0 { + numberMisscheduled++ + } + } + + err = storeDaemonSetStatus(dsc.kubeClient.Experimental().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled) + if err != nil { + glog.Errorf("Error storing status for daemon set %+v: %v", ds, err) + } +} + +func (dsc *DaemonSetsController) syncDaemonSet(key string) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Now().Sub(startTime)) + }() + obj, exists, err := dsc.dsStore.Store.GetByKey(key) + if err != nil { + glog.Infof("Unable to retrieve ds %v from store: %v", key, err) + dsc.queue.Add(key) + return err + } + if !exists { + glog.V(3).Infof("daemon set has been deleted %v", key) + dsc.expectations.DeleteExpectations(key) + return nil + } + ds := obj.(*experimental.DaemonSet) + + // Don't process a daemon set until all its creations and deletions have been processed. + // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage, + // then we do not want to call manage on foo until the daemon pods have been created. + dsKey, err := controller.KeyFunc(ds) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", ds, err) + return err + } + dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey) + if dsNeedsSync { + dsc.manage(ds) + } + + dsc.updateDaemonSetStatus(ds) + return nil +} + +// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. +type byCreationTimestamp []experimental.DaemonSet + +func (o byCreationTimestamp) Len() int { return len(o) } +func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o byCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) +} diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go new file mode 100644 index 00000000000..1ab55b2b72c --- /dev/null +++ b/pkg/controller/daemon/controller_test.go @@ -0,0 +1,321 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemon + +import ( + "fmt" + "sync" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/apis/experimental" + "k8s.io/kubernetes/pkg/client/cache" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/securitycontext" +) + +var ( + simpleDaemonSetLabel = map[string]string{"name": "simple-daemon", "type": "production"} + simpleDaemonSetLabel2 = map[string]string{"name": "simple-daemon", "type": "test"} + simpleNodeLabel = map[string]string{"color": "blue", "speed": "fast"} + simpleNodeLabel2 = map[string]string{"color": "red", "speed": "fast"} +) + +type FakePodControl struct { + daemonSet []experimental.DaemonSet + deletePodName []string + lock sync.Mutex + err error +} + +func init() { + api.ForTesting_ReferencesAllowBlankSelfLinks = true +} + +func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error { + return nil +} + +func (f *FakePodControl) CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error { + f.lock.Lock() + defer f.lock.Unlock() + if f.err != nil { + return f.err + } + f.daemonSet = append(f.daemonSet, *ds) + return nil +} + +func (f *FakePodControl) DeletePod(namespace string, podName string) error { + f.lock.Lock() + defer f.lock.Unlock() + if f.err != nil { + return f.err + } + f.deletePodName = append(f.deletePodName, podName) + return nil +} +func (f *FakePodControl) clear() { + f.lock.Lock() + defer f.lock.Unlock() + f.deletePodName = []string{} + f.daemonSet = []experimental.DaemonSet{} +} + +func newDaemonSet(name string) *experimental.DaemonSet { + return &experimental.DaemonSet{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Experimental.Version()}, + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: api.NamespaceDefault, + }, + Spec: experimental.DaemonSetSpec{ + Selector: simpleDaemonSetLabel, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: simpleDaemonSetLabel, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Image: "foo/bar", + TerminationMessagePath: api.TerminationMessagePathDefault, + ImagePullPolicy: api.PullIfNotPresent, + SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), + }, + }, + DNSPolicy: api.DNSDefault, + }, + }, + }, + } +} + +func newNode(name string, label map[string]string) *api.Node { + return &api.Node{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Default.Version()}, + ObjectMeta: api.ObjectMeta{ + Name: name, + Labels: label, + Namespace: api.NamespaceDefault, + }, + } +} + +func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string]string) { + for i := startIndex; i < startIndex+numNodes; i++ { + nodeStore.Add(newNode(fmt.Sprintf("node-%d", i), label)) + } +} + +func newPod(podName string, nodeName string, label map[string]string) *api.Pod { + pod := &api.Pod{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Default.Version()}, + ObjectMeta: api.ObjectMeta{ + GenerateName: podName, + Labels: label, + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + NodeName: nodeName, + Containers: []api.Container{ + { + Image: "foo/bar", + TerminationMessagePath: api.TerminationMessagePathDefault, + ImagePullPolicy: api.PullIfNotPresent, + SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), + }, + }, + DNSPolicy: api.DNSDefault, + }, + } + api.GenerateName(api.SimpleNameGenerator, &pod.ObjectMeta) + return pod +} + +func addPods(podStore cache.Store, nodeName string, label map[string]string, number int) { + for i := 0; i < number; i++ { + podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label)) + } +} + +func newTestController() (*DaemonSetsController, *FakePodControl) { + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) + manager := NewDaemonSetsController(client) + podControl := &FakePodControl{} + manager.podControl = podControl + return manager, podControl +} + +func validateSyncDaemonSets(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) { + if len(fakePodControl.daemonSet) != expectedCreates { + t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.daemonSet)) + } + if len(fakePodControl.deletePodName) != expectedDeletes { + t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodName)) + } +} + +func syncAndValidateDaemonSets(t *testing.T, manager *DaemonSetsController, ds *experimental.DaemonSet, podControl *FakePodControl, expectedCreates, expectedDeletes int) { + key, err := controller.KeyFunc(ds) + if err != nil { + t.Errorf("Could not get key for daemon.") + } + manager.syncHandler(key) + validateSyncDaemonSets(t, podControl, expectedCreates, expectedDeletes) +} + +// DaemonSets without node selectors should launch pods on every node. +func TestSimpleDaemonSetLaunchesPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) +} + +// DaemonSets without node selectors should launch pods on every node. +func TestNoNodesDoesNothing(t *testing.T) { + manager, podControl := newTestController() + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} + +// DaemonSets without node selectors should launch pods on every node. +func TestOneNodeDaemonLaunchesPod(t *testing.T) { + manager, podControl := newTestController() + manager.nodeStore.Add(newNode("only-node", nil)) + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + +// Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. +func TestDealsWithExistingPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 1) + addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 2) + addPods(manager.podStore.Store, "node-3", simpleDaemonSetLabel, 5) + addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel2, 2) + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 2, 5) +} + +// Daemon with node selector should launch pods on nodes matching selector. +func TestSelectorDaemonLaunchesPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 4, nil) + addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) + daemon := newDaemonSet("foo") + daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager.dsStore.Add(daemon) + syncAndValidateDaemonSets(t, manager, daemon, podControl, 3, 0) +} + +// Daemon with node selector should delete pods from nodes that do not satisfy selector. +func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel) + addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel2, 2) + addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3) + addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 1) + addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel, 1) + daemon := newDaemonSet("foo") + daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager.dsStore.Add(daemon) + syncAndValidateDaemonSets(t, manager, daemon, podControl, 5, 4) +} + +// DaemonSet with node selector should launch pods on nodes matching selector, but also deal with existing pods on nodes. +func TestSelectorDaemonDealsWithExistingPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel) + addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel, 1) + addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3) + addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 2) + addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 4) + addPods(manager.podStore.Store, "node-6", simpleDaemonSetLabel, 13) + addPods(manager.podStore.Store, "node-7", simpleDaemonSetLabel2, 4) + addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel, 1) + addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel2, 1) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 3, 20) +} + +// DaemonSet with node selector which does not match any node labels should not launch pods. +func TestBadSelectorDaemonDoesNothing(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 4, nil) + addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel2 + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} + +// DaemonSet with node name should launch pod on node with corresponding name. +func TestNameDaemonSetLaunchesPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeName = "node-0" + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + +// DaemonSet with node name that does not exist should not launch pods. +func TestBadNameDaemonSetDoesNothing(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeName = "node-10" + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} + +// DaemonSet with node selector, and node name, matching a node, should launch a pod on the node. +func TestNameAndSelectorDaemonSetLaunchesPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 4, nil) + addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel + ds.Spec.Template.Spec.NodeName = "node-6" + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + +// DaemonSet with node selector that matches some nodes, and node name that matches a different node, should do nothing. +func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 4, nil) + addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel + ds.Spec.Template.Spec.NodeName = "node-0" + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} diff --git a/pkg/controller/daemon/manager.go b/pkg/controller/daemon/manager.go deleted file mode 100644 index 78ee868a44f..00000000000 --- a/pkg/controller/daemon/manager.go +++ /dev/null @@ -1,475 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package daemon - -import ( - "reflect" - "time" - - "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api" - client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/client/unversioned/cache" - "k8s.io/kubernetes/pkg/client/unversioned/record" - "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/expapi" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/util/workqueue" - "k8s.io/kubernetes/pkg/watch" -) - -const ( - // Daemons will periodically check that their daemon pods are running as expected. - FullDaemonResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable. - // Nodes don't need relisting. - FullNodeResyncPeriod = 0 - // Daemon pods don't need relisting. - FullDaemonPodResyncPeriod = 0 - // If sending a status upate to API server fails, we retry a finite number of times. - StatusUpdateRetries = 1 -) - -type DaemonManager struct { - kubeClient client.Interface - podControl controller.PodControlInterface - - // To allow injection of syncDaemon for testing. - syncHandler func(dcKey string) error - // A TTLCache of pod creates/deletes each dc expects to see - expectations controller.ControllerExpectationsInterface - // A store of daemons, populated by the podController. - dcStore cache.StoreToDaemonSetLister - // A store of pods, populated by the podController - podStore cache.StoreToPodLister - // A store of pods, populated by the podController - nodeStore cache.StoreToNodeLister - // Watches changes to all pods. - dcController *framework.Controller - // Watches changes to all pods - podController *framework.Controller - // Watches changes to all nodes. - nodeController *framework.Controller - // Controllers that need to be updated. - queue *workqueue.Type -} - -func NewDaemonManager(kubeClient client.Interface) *DaemonManager { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) - - dm := &DaemonManager{ - kubeClient: kubeClient, - podControl: controller.RealPodControl{ - KubeClient: kubeClient, - Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemon"}), - }, - expectations: controller.NewControllerExpectations(), - queue: workqueue.New(), - } - // Manage addition/update of daemon controllers. - dm.dcStore.Store, dm.dcController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return dm.kubeClient.Experimental().Daemons(api.NamespaceAll).List(labels.Everything()) - }, - WatchFunc: func(rv string) (watch.Interface, error) { - return dm.kubeClient.Experimental().Daemons(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) - }, - }, - &expapi.DaemonSet{}, - FullDaemonResyncPeriod, - framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - daemon := obj.(*expapi.DaemonSet) - glog.V(4).Infof("Adding daemon %s", daemon.Name) - dm.enqueueController(obj) - }, - UpdateFunc: func(old, cur interface{}) { - oldDaemon := old.(*expapi.DaemonSet) - glog.V(4).Infof("Updating daemon %s", oldDaemon.Name) - dm.enqueueController(cur) - }, - DeleteFunc: func(obj interface{}) { - daemon := obj.(*expapi.DaemonSet) - glog.V(4).Infof("Deleting daemon %s", daemon.Name) - dm.enqueueController(obj) - }, - }, - ) - // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon controller to create/delete - // more pods until all the effects (expectations) of a daemon controller's create/delete have been observed. - dm.podStore.Store, dm.podController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return dm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) - }, - WatchFunc: func(rv string) (watch.Interface, error) { - return dm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) - }, - }, - &api.Pod{}, - FullDaemonPodResyncPeriod, - framework.ResourceEventHandlerFuncs{ - AddFunc: dm.addPod, - UpdateFunc: dm.updatePod, - DeleteFunc: dm.deletePod, - }, - ) - // Watch for new nodes or updates to nodes - daemons are launched on new nodes, and possibly when labels on nodes change, - dm.nodeStore.Store, dm.nodeController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return dm.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) - }, - WatchFunc: func(rv string) (watch.Interface, error) { - return dm.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv) - }, - }, - &api.Node{}, - FullNodeResyncPeriod, - framework.ResourceEventHandlerFuncs{ - AddFunc: dm.addNode, - UpdateFunc: dm.updateNode, - DeleteFunc: func(node interface{}) {}, - }, - ) - dm.syncHandler = dm.syncDaemon - return dm -} - -// Run begins watching and syncing daemons. -func (dm *DaemonManager) Run(workers int, stopCh <-chan struct{}) { - go dm.dcController.Run(stopCh) - go dm.podController.Run(stopCh) - go dm.nodeController.Run(stopCh) - for i := 0; i < workers; i++ { - go util.Until(dm.worker, time.Second, stopCh) - } - <-stopCh - glog.Infof("Shutting down Daemon Controller Manager") - dm.queue.ShutDown() -} - -func (dm *DaemonManager) worker() { - for { - func() { - key, quit := dm.queue.Get() - if quit { - return - } - defer dm.queue.Done(key) - err := dm.syncHandler(key.(string)) - if err != nil { - glog.Errorf("Error syncing daemon controller with key %s: %v", key.(string), err) - } - }() - } -} - -func (dm *DaemonManager) enqueueAllDaemons() { - glog.V(4).Infof("Enqueueing all daemons") - daemons, err := dm.dcStore.List() - if err != nil { - glog.Errorf("Error enqueueing daemon controllers: %v", err) - return - } - for i := range daemons { - dm.enqueueController(&daemons[i]) - } -} - -func (dm *DaemonManager) enqueueController(obj interface{}) { - key, err := controller.KeyFunc(obj) - if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) - return - } - dm.queue.Add(key) -} - -func (dm *DaemonManager) getPodDaemon(pod *api.Pod) *expapi.DaemonSet { - controllers, err := dm.dcStore.GetPodDaemonSets(pod) - if err != nil { - glog.V(4).Infof("No controllers found for pod %v, daemon manager will avoid syncing", pod.Name) - return nil - } - return &controllers[0] -} - -func (dm *DaemonManager) addPod(obj interface{}) { - pod := obj.(*api.Pod) - glog.V(4).Infof("Pod %s added.", pod.Name) - if dc := dm.getPodDaemon(pod); dc != nil { - dcKey, err := controller.KeyFunc(dc) - if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", dc, err) - return - } - dm.expectations.CreationObserved(dcKey) - dm.enqueueController(dc) - } -} - -// When a pod is updated, figure out what controller/s manage it and wake them -// up. If the labels of the pod have changed we need to awaken both the old -// and new controller. old and cur must be *api.Pod types. -func (dm *DaemonManager) updatePod(old, cur interface{}) { - if api.Semantic.DeepEqual(old, cur) { - // A periodic relist will send update events for all known pods. - return - } - curPod := cur.(*api.Pod) - glog.V(4).Infof("Pod %s updated.", curPod.Name) - if dc := dm.getPodDaemon(curPod); dc != nil { - dm.enqueueController(dc) - } - oldPod := old.(*api.Pod) - // If the labels have not changed, then the daemon controller responsible for - // the pod is the same as it was before. In that case we have enqueued the daemon - // controller above, and do not have to enqueue the controller again. - if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { - // If the old and new dc are the same, the first one that syncs - // will set expectations preventing any damage from the second. - if oldRC := dm.getPodDaemon(oldPod); oldRC != nil { - dm.enqueueController(oldRC) - } - } -} - -func (dm *DaemonManager) deletePod(obj interface{}) { - pod, ok := obj.(*api.Pod) - glog.V(4).Infof("Pod %s deleted.", pod.Name) - // When a delete is dropped, the relist will notice a pod in the store not - // in the list, leading to the insertion of a tombstone object which contains - // the deleted key/value. Note that this value might be stale. If the pod - // changed labels the new rc will not be woken up till the periodic resync. - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("Couldn't get object from tombstone %+v", obj) - return - } - pod, ok = tombstone.Obj.(*api.Pod) - if !ok { - glog.Errorf("Tombstone contained object that is not a pod %+v", obj) - return - } - } - if dc := dm.getPodDaemon(pod); dc != nil { - dcKey, err := controller.KeyFunc(dc) - if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", dc, err) - return - } - dm.expectations.DeletionObserved(dcKey) - dm.enqueueController(dc) - } -} - -func (dm *DaemonManager) addNode(obj interface{}) { - // TODO: it'd be nice to pass a hint with these enqueues, so that each dc would only examine the added node (unless it has other work to do, too). - dm.enqueueAllDaemons() -} - -func (dm *DaemonManager) updateNode(old, cur interface{}) { - oldNode := old.(*api.Node) - curNode := cur.(*api.Node) - if api.Semantic.DeepEqual(oldNode.Name, curNode.Name) && api.Semantic.DeepEqual(oldNode.Namespace, curNode.Namespace) && api.Semantic.DeepEqual(oldNode.Labels, curNode.Labels) { - // A periodic relist will send update events for all known pods. - return - } - // TODO: it'd be nice to pass a hint with these enqueues, so that each dc would only examine the added node (unless it has other work to do, too). - dm.enqueueAllDaemons() -} - -// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to dc) running on the nodes. -func (dm *DaemonManager) getNodesToDaemonPods(dc *expapi.DaemonSet) (map[string][]*api.Pod, error) { - nodeToDaemonPods := make(map[string][]*api.Pod) - daemonPods, err := dm.podStore.Pods(dc.Namespace).List(labels.Set(dc.Spec.Selector).AsSelector()) - if err != nil { - return nodeToDaemonPods, err - } - for i := range daemonPods.Items { - nodeName := daemonPods.Items[i].Spec.NodeName - nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], &daemonPods.Items[i]) - } - return nodeToDaemonPods, nil -} - -func (dm *DaemonManager) manageDaemons(dc *expapi.DaemonSet) { - // Find out which nodes are running the daemon pods selected by dc. - nodeToDaemonPods, err := dm.getNodesToDaemonPods(dc) - if err != nil { - glog.Errorf("Error getting node to daemon pod mapping for daemon controller %+v: %v", dc, err) - } - - // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon - // pod. If the node is supposed to run the daemon, but isn't, create the daemon on the node. - nodeList, err := dm.nodeStore.List() - if err != nil { - glog.Errorf("Couldn't get list of nodes when adding daemon controller %+v: %v", dc, err) - } - var nodesNeedingDaemons, podsToDelete []string - for i := range nodeList.Items { - // Check if the node satisfies the daemon's node selector. - nodeSelector := labels.Set(dc.Spec.Template.Spec.NodeSelector).AsSelector() - shouldRun := nodeSelector.Matches(labels.Set(nodeList.Items[i].Labels)) - // If the daemon specifies a node name, check that it matches with nodeName. - nodeName := nodeList.Items[i].Name - shouldRun = shouldRun && (dc.Spec.Template.Spec.NodeName == "" || dc.Spec.Template.Spec.NodeName == nodeName) - daemonPods, isRunning := nodeToDaemonPods[nodeName] - if shouldRun && !isRunning { - // If daemon pod is supposed to be running on node, but isn't, create daemon pod. - nodesNeedingDaemons = append(nodesNeedingDaemons, nodeName) - } else if shouldRun && len(daemonPods) > 1 { - // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods. - // TODO: sort the daemon pods by creation time, so the the oldest is preserved. - for i := 1; i < len(daemonPods); i++ { - podsToDelete = append(podsToDelete, daemonPods[i].Name) - } - } else if !shouldRun && isRunning { - // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node. - for i := range daemonPods { - podsToDelete = append(podsToDelete, daemonPods[i].Name) - } - } - } - - // We need to set expectations before creating/deleting pods to avoid race conditions. - dcKey, err := controller.KeyFunc(dc) - if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", dc, err) - return - } - dm.expectations.SetExpectations(dcKey, len(nodesNeedingDaemons), len(podsToDelete)) - - glog.V(4).Infof("Nodes needing daemons for daemon %s: %+v", dc.Name, nodesNeedingDaemons) - for i := range nodesNeedingDaemons { - if err := dm.podControl.CreateReplicaOnNode(dc.Namespace, dc, nodesNeedingDaemons[i]); err != nil { - glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", dc.Namespace, dc.Name) - dm.expectations.CreationObserved(dcKey) - util.HandleError(err) - } - } - - glog.V(4).Infof("Pods to delete for daemon %s: %+v", dc.Name, podsToDelete) - for i := range podsToDelete { - if err := dm.podControl.DeletePod(dc.Namespace, podsToDelete[i]); err != nil { - glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", dc.Namespace, dc.Name) - dm.expectations.DeletionObserved(dcKey) - util.HandleError(err) - } - } -} - -func storeDaemonStatus(dcClient client.DaemonSetInterface, dc *expapi.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error { - if dc.Status.DesiredNumberScheduled == desiredNumberScheduled && dc.Status.CurrentNumberScheduled == currentNumberScheduled && dc.Status.NumberMisscheduled == numberMisscheduled { - return nil - } - - var updateErr, getErr error - for i := 0; i <= StatusUpdateRetries; i++ { - dc.Status.DesiredNumberScheduled = desiredNumberScheduled - dc.Status.CurrentNumberScheduled = currentNumberScheduled - dc.Status.NumberMisscheduled = numberMisscheduled - _, updateErr := dcClient.Update(dc) - if updateErr == nil { - return updateErr - } - // Update the controller with the latest resource version for the next poll - if dc, getErr = dcClient.Get(dc.Name); getErr != nil { - // If the GET fails we can't trust status.Replicas anymore. This error - // is bound to be more interesting than the update failure. - return getErr - } - } - // Failed 2 updates one of which was with the latest controller, return the update error - return updateErr -} - -func (dm *DaemonManager) updateDaemonStatus(dc *expapi.DaemonSet) { - glog.Infof("Updating daemon status") - nodeToDaemonPods, err := dm.getNodesToDaemonPods(dc) - if err != nil { - glog.Errorf("Error getting node to daemon pod mapping for daemon %+v: %v", dc, err) - } - - nodeList, err := dm.nodeStore.List() - if err != nil { - glog.Errorf("Couldn't get list of nodes when adding daemon %+v: %v", dc, err) - } - - var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int - for i := range nodeList.Items { - nodeSelector := labels.Set(dc.Spec.Template.Spec.NodeSelector).AsSelector() - shouldRun := nodeSelector.Matches(labels.Set(nodeList.Items[i].Labels)) - numDaemonPods := len(nodeToDaemonPods[nodeList.Items[i].Name]) - if shouldRun { - desiredNumberScheduled++ - if numDaemonPods == 1 { - currentNumberScheduled++ - } - } else if numDaemonPods >= 1 { - numberMisscheduled++ - } - } - - err = storeDaemonStatus(dm.kubeClient.Experimental().Daemons(dc.Namespace), dc, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled) - if err != nil { - glog.Errorf("Error storing status for daemon %+v: %v", dc, err) - } -} - -func (dm *DaemonManager) syncDaemon(key string) error { - startTime := time.Now() - defer func() { - glog.V(4).Infof("Finished syncing daemon %q (%v)", key, time.Now().Sub(startTime)) - }() - obj, exists, err := dm.dcStore.Store.GetByKey(key) - if err != nil { - glog.Infof("Unable to retrieve dc %v from store: %v", key, err) - dm.queue.Add(key) - return err - } - if !exists { - glog.V(3).Infof("Daemon Controller has been deleted %v", key) - dm.expectations.DeleteExpectations(key) - return nil - } - dc := obj.(*expapi.DaemonSet) - - // Don't process a daemon until all its creations and deletions have been processed. - // For example if daemon foo asked for 3 new daemon pods in the previous call to manageDaemons, - // then we do not want to call manageDaemons on foo until the daemon pods have been created. - dcKey, err := controller.KeyFunc(dc) - if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", dc, err) - return err - } - dcNeedsSync := dm.expectations.SatisfiedExpectations(dcKey) - if dcNeedsSync { - dm.manageDaemons(dc) - } - - dm.updateDaemonStatus(dc) - return nil -} diff --git a/pkg/controller/daemon/manager_test.go b/pkg/controller/daemon/manager_test.go deleted file mode 100644 index 7e222c990f3..00000000000 --- a/pkg/controller/daemon/manager_test.go +++ /dev/null @@ -1,321 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package daemon - -import ( - "fmt" - "sync" - "testing" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/testapi" - client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/client/unversioned/cache" - "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/expapi" - "k8s.io/kubernetes/pkg/securitycontext" -) - -var ( - simpleDaemonLabel = map[string]string{"name": "simple-daemon", "type": "production"} - simpleDaemonLabel2 = map[string]string{"name": "simple-daemon", "type": "test"} - simpleNodeLabel = map[string]string{"color": "blue", "speed": "fast"} - simpleNodeLabel2 = map[string]string{"color": "red", "speed": "fast"} -) - -type FakePodControl struct { - daemonSpec []expapi.DaemonSet - deletePodName []string - lock sync.Mutex - err error -} - -func init() { - api.ForTesting_ReferencesAllowBlankSelfLinks = true -} - -func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error { - return nil -} - -func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *expapi.DaemonSet, nodeName string) error { - f.lock.Lock() - defer f.lock.Unlock() - if f.err != nil { - return f.err - } - f.daemonSpec = append(f.daemonSpec, *daemon) - return nil -} - -func (f *FakePodControl) DeletePod(namespace string, podName string) error { - f.lock.Lock() - defer f.lock.Unlock() - if f.err != nil { - return f.err - } - f.deletePodName = append(f.deletePodName, podName) - return nil -} -func (f *FakePodControl) clear() { - f.lock.Lock() - defer f.lock.Unlock() - f.deletePodName = []string{} - f.daemonSpec = []expapi.DaemonSet{} -} - -func newDaemon(name string) *expapi.DaemonSet { - return &expapi.DaemonSet{ - TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, - ObjectMeta: api.ObjectMeta{ - Name: name, - Namespace: api.NamespaceDefault, - }, - Spec: expapi.DaemonSetSpec{ - Selector: simpleDaemonLabel, - Template: &api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: simpleDaemonLabel, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Image: "foo/bar", - TerminationMessagePath: api.TerminationMessagePathDefault, - ImagePullPolicy: api.PullIfNotPresent, - SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), - }, - }, - DNSPolicy: api.DNSDefault, - }, - }, - }, - } -} - -func newNode(name string, label map[string]string) *api.Node { - return &api.Node{ - TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, - ObjectMeta: api.ObjectMeta{ - Name: name, - Labels: label, - Namespace: api.NamespaceDefault, - }, - } -} - -func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string]string) { - for i := startIndex; i < startIndex+numNodes; i++ { - nodeStore.Add(newNode(fmt.Sprintf("node-%d", i), label)) - } -} - -func newPod(podName string, nodeName string, label map[string]string) *api.Pod { - pod := &api.Pod{ - TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, - ObjectMeta: api.ObjectMeta{ - GenerateName: podName, - Labels: label, - Namespace: api.NamespaceDefault, - }, - Spec: api.PodSpec{ - NodeName: nodeName, - Containers: []api.Container{ - { - Image: "foo/bar", - TerminationMessagePath: api.TerminationMessagePathDefault, - ImagePullPolicy: api.PullIfNotPresent, - SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), - }, - }, - DNSPolicy: api.DNSDefault, - }, - } - api.GenerateName(api.SimpleNameGenerator, &pod.ObjectMeta) - return pod -} - -func addPods(podStore cache.Store, nodeName string, label map[string]string, number int) { - for i := 0; i < number; i++ { - podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label)) - } -} - -func makeTestManager() (*DaemonManager, *FakePodControl) { - client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) - manager := NewDaemonManager(client) - podControl := &FakePodControl{} - manager.podControl = podControl - return manager, podControl -} - -func validateSyncDaemons(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) { - if len(fakePodControl.daemonSpec) != expectedCreates { - t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.daemonSpec)) - } - if len(fakePodControl.deletePodName) != expectedDeletes { - t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodName)) - } -} - -func syncAndValidateDaemons(t *testing.T, manager *DaemonManager, daemon *expapi.DaemonSet, podControl *FakePodControl, expectedCreates, expectedDeletes int) { - key, err := controller.KeyFunc(daemon) - if err != nil { - t.Errorf("Could not get key for daemon.") - } - manager.syncHandler(key) - validateSyncDaemons(t, podControl, expectedCreates, expectedDeletes) -} - -// Daemon without node selectors should launch pods on every node. -func TestSimpleDaemonLaunchesPods(t *testing.T) { - manager, podControl := makeTestManager() - addNodes(manager.nodeStore.Store, 0, 5, nil) - daemon := newDaemon("foo") - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 5, 0) -} - -// Daemon without node selectors should launch pods on every node. -func TestNoNodesDoesNothing(t *testing.T) { - manager, podControl := makeTestManager() - daemon := newDaemon("foo") - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0) -} - -// Daemon without node selectors should launch pods on every node. -func TestOneNodeDaemonLaunchesPod(t *testing.T) { - manager, podControl := makeTestManager() - manager.nodeStore.Add(newNode("only-node", nil)) - daemon := newDaemon("foo") - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 1, 0) -} - -// Manager should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. -func TestDealsWithExistingPods(t *testing.T) { - manager, podControl := makeTestManager() - addNodes(manager.nodeStore.Store, 0, 5, nil) - addPods(manager.podStore.Store, "node-1", simpleDaemonLabel, 1) - addPods(manager.podStore.Store, "node-2", simpleDaemonLabel, 2) - addPods(manager.podStore.Store, "node-3", simpleDaemonLabel, 5) - addPods(manager.podStore.Store, "node-4", simpleDaemonLabel2, 2) - daemon := newDaemon("foo") - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 2, 5) -} - -// Daemon with node selector should launch pods on nodes matching selector. -func TestSelectorDaemonLaunchesPods(t *testing.T) { - manager, podControl := makeTestManager() - addNodes(manager.nodeStore.Store, 0, 4, nil) - addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) - daemon := newDaemon("foo") - daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 3, 0) -} - -// Daemon with node selector should delete pods from nodes that do not satisfy selector. -func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) { - manager, podControl := makeTestManager() - addNodes(manager.nodeStore.Store, 0, 5, nil) - addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel) - addPods(manager.podStore.Store, "node-0", simpleDaemonLabel2, 2) - addPods(manager.podStore.Store, "node-1", simpleDaemonLabel, 3) - addPods(manager.podStore.Store, "node-1", simpleDaemonLabel2, 1) - addPods(manager.podStore.Store, "node-4", simpleDaemonLabel, 1) - daemon := newDaemon("foo") - daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 5, 4) -} - -// Daemon with node selector should launch pods on nodes matching selector, but also deal with existing pods on nodes. -func TestSelectorDaemonDealsWithExistingPods(t *testing.T) { - manager, podControl := makeTestManager() - addNodes(manager.nodeStore.Store, 0, 5, nil) - addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel) - addPods(manager.podStore.Store, "node-0", simpleDaemonLabel, 1) - addPods(manager.podStore.Store, "node-1", simpleDaemonLabel, 3) - addPods(manager.podStore.Store, "node-1", simpleDaemonLabel2, 2) - addPods(manager.podStore.Store, "node-2", simpleDaemonLabel, 4) - addPods(manager.podStore.Store, "node-6", simpleDaemonLabel, 13) - addPods(manager.podStore.Store, "node-7", simpleDaemonLabel2, 4) - addPods(manager.podStore.Store, "node-9", simpleDaemonLabel, 1) - addPods(manager.podStore.Store, "node-9", simpleDaemonLabel2, 1) - daemon := newDaemon("foo") - daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 3, 20) -} - -// Daemon with node selector which does not match any node labels should not launch pods. -func TestBadSelectorDaemonDoesNothing(t *testing.T) { - manager, podControl := makeTestManager() - addNodes(manager.nodeStore.Store, 0, 4, nil) - addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) - daemon := newDaemon("foo") - daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel2 - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0) -} - -// Daemon with node name should launch pod on node with corresponding name. -func TestNameDaemonLaunchesPods(t *testing.T) { - manager, podControl := makeTestManager() - addNodes(manager.nodeStore.Store, 0, 5, nil) - daemon := newDaemon("foo") - daemon.Spec.Template.Spec.NodeName = "node-0" - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 1, 0) -} - -// Daemon with node name that does not exist should not launch pods. -func TestBadNameDaemonDoesNothing(t *testing.T) { - manager, podControl := makeTestManager() - addNodes(manager.nodeStore.Store, 0, 5, nil) - daemon := newDaemon("foo") - daemon.Spec.Template.Spec.NodeName = "node-10" - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0) -} - -// Daemon with node selector, and node name, matching a node, should launch a pod on the node. -func TestNameAndSelectorDaemonLaunchesPods(t *testing.T) { - manager, podControl := makeTestManager() - addNodes(manager.nodeStore.Store, 0, 4, nil) - addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) - daemon := newDaemon("foo") - daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel - daemon.Spec.Template.Spec.NodeName = "node-6" - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 1, 0) -} - -// Daemon with node selector that matches some nodes, and node name that matches a different node, should do nothing. -func TestInconsistentNameSelectorDaemonDoesNothing(t *testing.T) { - manager, podControl := makeTestManager() - addNodes(manager.nodeStore.Store, 0, 4, nil) - addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) - daemon := newDaemon("foo") - daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel - daemon.Spec.Template.Spec.NodeName = "node-0" - manager.dcStore.Add(daemon) - syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0) -} diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 7cab28df825..404099a0271 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -27,11 +27,11 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/expapi" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" @@ -70,7 +70,7 @@ func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationCo return nil } -func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *expapi.DaemonSet, nodeName string) error { +func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *experimental.DaemonSet, nodeName string) error { return nil } diff --git a/test/e2e/daemon.go b/test/e2e/daemon_set.go similarity index 83% rename from test/e2e/daemon.go rename to test/e2e/daemon_set.go index cf027eb2d48..2eb2d41434b 100644 --- a/test/e2e/daemon.go +++ b/test/e2e/daemon_set.go @@ -21,8 +21,8 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/experimental" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/expapi" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/wait" @@ -31,8 +31,8 @@ import ( . "github.com/onsi/gomega" ) -var _ = Describe("Daemon", func() { - f := &Framework{BaseName: "daemons"} +var _ = Describe("Daemon set", func() { + f := &Framework{BaseName: "daemonsets"} BeforeEach(func() { f.beforeEach() @@ -47,7 +47,7 @@ var _ = Describe("Daemon", func() { }) It("should launch a daemon pod on every node of the cluster", func() { - testDaemons(f) + testDaemonSets(f) }) }) @@ -72,17 +72,13 @@ func clearNodeLabels(c *client.Client) error { } func checkDaemonPodOnNodes(f *Framework, selector map[string]string, nodeNames []string) func() (bool, error) { - // Don't return an error, because returning an error will abort wait.Poll, but - // if there's an error, we want to try getting the daemon again. return func() (bool, error) { - // Get list of pods satisfying selector. podList, err := f.Client.Pods(f.Namespace.Name).List(labels.Set(selector).AsSelector(), fields.Everything()) if err != nil { return false, nil } pods := podList.Items - // Get a map of node names to number of daemon pods running on the node. nodesToPodCount := make(map[string]int) for _, pod := range pods { nodesToPodCount[pod.Spec.NodeName] += 1 @@ -103,8 +99,6 @@ func checkDaemonPodOnNodes(f *Framework, selector map[string]string, nodeNames [ } func checkRunningOnAllNodes(f *Framework, selector map[string]string) func() (bool, error) { - // Don't return an error, because returning an error will abort wait.Poll, but - // if there's an error, we want to try getting the daemon again. return func() (bool, error) { nodeList, err := f.Client.Nodes().List(labels.Everything(), fields.Everything()) if err != nil { @@ -122,21 +116,21 @@ func checkRunningOnNoNodes(f *Framework, selector map[string]string) func() (boo return checkDaemonPodOnNodes(f, selector, make([]string, 0)) } -func testDaemons(f *Framework) { +func testDaemonSets(f *Framework) { ns := f.Namespace.Name c := f.Client - simpleDaemonName := "simple-daemon" + simpleDSName := "simple-daemon-set" image := "gcr.io/google_containers/serve_hostname:1.1" - label := map[string]string{"name": simpleDaemonName} + label := map[string]string{"name": simpleDSName} retryTimeout := 1 * time.Minute retryInterval := 5 * time.Second - By(fmt.Sprintf("Creating simple daemon %s", simpleDaemonName)) - _, err := c.Daemons(ns).Create(&expapi.DaemonSet{ + By(fmt.Sprintf("Creating simple daemon set %s", simpleDSName)) + _, err := c.DaemonSets(ns).Create(&experimental.DaemonSet{ ObjectMeta: api.ObjectMeta{ - Name: simpleDaemonName, + Name: simpleDSName, }, - Spec: expapi.DaemonSetSpec{ + Spec: experimental.DaemonSetSpec{ Template: &api.PodTemplateSpec{ ObjectMeta: api.ObjectMeta{ Labels: label, @@ -144,7 +138,7 @@ func testDaemons(f *Framework) { Spec: api.PodSpec{ Containers: []api.Container{ { - Name: simpleDaemonName, + Name: simpleDSName, Image: image, Ports: []api.ContainerPort{{ContainerPort: 9376}}, }, @@ -172,15 +166,15 @@ func testDaemons(f *Framework) { err = wait.Poll(retryInterval, retryTimeout, checkRunningOnAllNodes(f, label)) Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive") - complexDaemonName := "complex-daemon" - complexLabel := map[string]string{"name": complexDaemonName} + complexDSName := "complex-daemon-set" + complexLabel := map[string]string{"name": complexDSName} nodeSelector := map[string]string{"color": "blue"} - By(fmt.Sprintf("Creating daemon with a node selector %s", complexDaemonName)) - _, err = c.Daemons(ns).Create(&expapi.DaemonSet{ + By(fmt.Sprintf("Creating daemon with a node selector %s", complexDSName)) + _, err = c.DaemonSets(ns).Create(&experimental.DaemonSet{ ObjectMeta: api.ObjectMeta{ - Name: complexDaemonName, + Name: complexDSName, }, - Spec: expapi.DaemonSetSpec{ + Spec: experimental.DaemonSetSpec{ Selector: complexLabel, Template: &api.PodTemplateSpec{ ObjectMeta: api.ObjectMeta{ @@ -190,7 +184,7 @@ func testDaemons(f *Framework) { NodeSelector: nodeSelector, Containers: []api.Container{ { - Name: complexDaemonName, + Name: complexDSName, Image: image, Ports: []api.ContainerPort{{ContainerPort: 9376}}, },