diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 0c4907ecc34..cebdb14f534 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -19,10 +19,12 @@ package daemon import ( "reflect" "sort" + "sync" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" @@ -32,9 +34,11 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" - "sync" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" ) const ( @@ -325,7 +329,7 @@ func (dsc *DaemonSetsController) addNode(obj interface{}) { node := obj.(*api.Node) for i := range dsList.Items { ds := &dsList.Items[i] - shouldEnqueue := nodeShouldRunDaemonPod(node, ds) + shouldEnqueue := dsc.nodeShouldRunDaemonPod(node, ds) if shouldEnqueue { dsc.enqueueDaemonSet(ds) } @@ -346,7 +350,7 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { } for i := range dsList.Items { ds := &dsList.Items[i] - shouldEnqueue := (nodeShouldRunDaemonPod(oldNode, ds) != nodeShouldRunDaemonPod(curNode, ds)) + shouldEnqueue := (dsc.nodeShouldRunDaemonPod(oldNode, ds) != dsc.nodeShouldRunDaemonPod(curNode, ds)) if shouldEnqueue { dsc.enqueueDaemonSet(ds) } @@ -387,7 +391,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { } var nodesNeedingDaemonPods, podsToDelete []string for _, node := range nodeList.Items { - shouldRun := nodeShouldRunDaemonPod(&node, ds) + shouldRun := dsc.nodeShouldRunDaemonPod(&node, ds) daemonPods, isRunning := nodeToDaemonPods[node.Name] @@ -498,7 +502,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int for _, node := range nodeList.Items { - shouldRun := nodeShouldRunDaemonPod(&node, ds) + shouldRun := dsc.nodeShouldRunDaemonPod(&node, ds) numDaemonPods := len(nodeToDaemonPods[node.Name]) @@ -563,21 +567,50 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { return nil } -func nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool { +func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool { // Check if the node satisfies the daemon set's node selector. nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector() - shouldRun := nodeSelector.Matches(labels.Set(node.Labels)) + if !nodeSelector.Matches(labels.Set(node.Labels)) { + return false + } // If the daemon set specifies a node name, check that it matches with node.Name. - shouldRun = shouldRun && (ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) + if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) { + return false + } // If the node is not ready, don't run on it. // TODO(mikedanese): remove this once daemonpods forgive nodes - shouldRun = shouldRun && api.IsNodeReady(node) + if !api.IsNodeReady(node) { + return false + } - // If the node is unschedulable, don't run it - // TODO(mikedanese): remove this once we have the right node admitance levels. - // See https://github.com/kubernetes/kubernetes/issues/17297#issuecomment-156857375. - shouldRun = shouldRun && !node.Spec.Unschedulable - return shouldRun + for _, c := range node.Status.Conditions { + if c.Type == api.NodeOutOfDisk && c.Status == api.ConditionTrue { + return false + } + } + + newPod := &api.Pod{Spec: ds.Spec.Template.Spec} + newPod.Spec.NodeName = node.Name + pods := []*api.Pod{newPod} + + for _, m := range dsc.podStore.Store.List() { + pod := m.(*api.Pod) + if pod.Spec.NodeName != node.Name { + continue + } + pods = append(pods, pod) + } + _, notFittingCPU, notFittingMemory := predicates.CheckPodsExceedingFreeResources(pods, node.Status.Allocatable) + if len(notFittingCPU)+len(notFittingMemory) != 0 { + return false + } + ports := sets.String{} + for _, pod := range pods { + if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports, field.NewPath("spec", "containers")); len(errs) > 0 { + return false + } + } + return true } // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go index 5845b7c5f66..507b3de3c0b 100644 --- a/pkg/controller/daemon/controller_test.go +++ b/pkg/controller/daemon/controller_test.go @@ -22,6 +22,7 @@ import ( "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" @@ -200,17 +201,127 @@ func TestNotReadNodeDaemonDoesNotLaunchPod(t *testing.T) { syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } -// DaemonSets should not place onto Unschedulable nodes -func TestUnschedulableNodeDaemonDoesNotLaunchPod(t *testing.T) { +// DaemonSets should not place onto OutOfDisk nodes +func TestOutOfDiskNodeDaemonDoesNotLaunchPod(t *testing.T) { manager, podControl := newTestController() - node := newNode("not-ready", nil) - node.Spec.Unschedulable = true + node := newNode("not-enough-disk", nil) + node.Status.Conditions = []api.NodeCondition{{Type: api.NodeOutOfDisk, Status: api.ConditionTrue}} manager.nodeStore.Add(node) ds := newDaemonSet("foo") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } +// DaemonSets should not place onto nodes with insufficient free resource +func TestInsufficentCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) { + podSpec := api.PodSpec{ + NodeName: "too-much-mem", + Containers: []api.Container{{ + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceMemory: resource.MustParse("75M"), + api.ResourceCPU: resource.MustParse("75m"), + }, + }, + }}, + } + manager, podControl := newTestController() + node := newNode("too-much-mem", nil) + node.Status.Allocatable = api.ResourceList{ + api.ResourceMemory: resource.MustParse("100M"), + api.ResourceCPU: resource.MustParse("200m"), + } + manager.nodeStore.Add(node) + manager.podStore.Add(&api.Pod{ + Spec: podSpec, + }) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec = podSpec + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} + +// DaemonSets should place onto nodes with sufficient free resource +func TestSufficentCapacityNodeDaemonLaunchesPod(t *testing.T) { + podSpec := api.PodSpec{ + NodeName: "not-too-much-mem", + Containers: []api.Container{{ + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceMemory: resource.MustParse("75M"), + api.ResourceCPU: resource.MustParse("75m"), + }, + }, + }}, + } + manager, podControl := newTestController() + node := newNode("not-too-much-mem", nil) + node.Status.Allocatable = api.ResourceList{ + api.ResourceMemory: resource.MustParse("200M"), + api.ResourceCPU: resource.MustParse("200m"), + } + manager.nodeStore.Add(node) + manager.podStore.Add(&api.Pod{ + Spec: podSpec, + }) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec = podSpec + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + +// DaemonSets should not place onto nodes that would cause port conflicts +func TestPortConflictNodeDaemonDoesNotLaunchPod(t *testing.T) { + podSpec := api.PodSpec{ + NodeName: "port-conflict", + Containers: []api.Container{{ + Ports: []api.ContainerPort{{ + HostPort: 666, + }}, + }}, + } + manager, podControl := newTestController() + node := newNode("port-conflict", nil) + manager.nodeStore.Add(node) + manager.podStore.Add(&api.Pod{ + Spec: podSpec, + }) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec = podSpec + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} + +// DaemonSets should place onto nodes that would not cause port conflicts +func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) { + podSpec1 := api.PodSpec{ + NodeName: "no-port-conflict", + Containers: []api.Container{{ + Ports: []api.ContainerPort{{ + HostPort: 6661, + }}, + }}, + } + podSpec2 := api.PodSpec{ + NodeName: "no-port-conflict", + Containers: []api.Container{{ + Ports: []api.ContainerPort{{ + HostPort: 6662, + }}, + }}, + } + manager, podControl := newTestController() + node := newNode("no-port-conflict", nil) + manager.nodeStore.Add(node) + manager.podStore.Add(&api.Pod{ + Spec: podSpec1, + }) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec = podSpec2 + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + // Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. func TestDealsWithExistingPods(t *testing.T) { manager, podControl := newTestController() diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 597489deb95..24744f5734e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2223,6 +2223,10 @@ func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) { // new pod. The function returns a boolean value indicating whether the pod // can be admitted, a brief single-word reason and a message explaining why // the pod cannot be admitted. +// +// This needs to be kept in sync with the scheduler's and daemonset's fit predicates, +// otherwise there will inevitably be pod delete create loops. This will be fixed +// once we can extract these predicates into a common library. (#12744) func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) { if hasHostPortConflicts(pods) { return false, "HostPortConflict", "cannot start the pod due to host port conflict."