Make Daemonset use GeneralPredicates

fixes #21454, fixes #22205
This commit is contained in:
Łukasz Oleś 2016-07-01 19:02:51 +02:00
parent 89be039352
commit 528bf7af3a
5 changed files with 116 additions and 63 deletions

View File

@ -27,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@ -41,12 +40,11 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
) )
const ( const (
@ -670,25 +668,22 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
} }
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool { func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool {
// Check if the node satisfies the daemon set's node selector.
nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector()
if !nodeSelector.Matches(labels.Set(node.Labels)) {
return false
}
// If the daemon set specifies a node name, check that it matches with node.Name. // If the daemon set specifies a node name, check that it matches with node.Name.
if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) { if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
return false return false
} }
// TODO: Move it to the predicates
for _, c := range node.Status.Conditions { for _, c := range node.Status.Conditions {
if c.Type == api.NodeOutOfDisk && c.Status == api.ConditionTrue { if c.Type == api.NodeOutOfDisk && c.Status == api.ConditionTrue {
return false return false
} }
} }
newPod := &api.Pod{Spec: ds.Spec.Template.Spec} newPod := &api.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
newPod.Spec.NodeName = node.Name newPod.Spec.NodeName = node.Name
pods := []*api.Pod{newPod}
pods := []*api.Pod{}
for _, m := range dsc.podStore.Indexer.List() { for _, m := range dsc.podStore.Indexer.List() {
pod := m.(*api.Pod) pod := m.(*api.Pod)
@ -705,19 +700,23 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *exte
} }
pods = append(pods, pod) pods = append(pods, pod)
} }
_, notFittingCPU, notFittingMemory, notFittingNvidiaGPU := predicates.CheckPodsExceedingFreeResources(pods, node.Status.Allocatable)
if len(notFittingCPU)+len(notFittingMemory)+len(notFittingNvidiaGPU) != 0 { nodeInfo := schedulercache.NewNodeInfo(pods...)
dsc.eventRecorder.Eventf(ds, api.EventTypeNormal, "FailedPlacement", "failed to place pod on %q: insufficent free resources", node.ObjectMeta.Name) nodeInfo.SetNode(node)
return false fit, err := predicates.GeneralPredicates(newPod, nil, nodeInfo)
if err != nil {
if re, ok := err.(*predicates.PredicateFailureError); ok {
message := re.Error()
glog.V(2).Infof("Predicate failed on Pod: %s, for reason: %v", newPod.Name, message)
} }
ports := sets.String{} if re, ok := err.(*predicates.InsufficientResourceError); ok {
for _, pod := range pods { message := re.Error()
if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports, field.NewPath("spec", "containers")); len(errs) > 0 { glog.V(2).Infof("Predicate failed on Pod: %s, for reason: %v", newPod.Name, message)
dsc.eventRecorder.Eventf(ds, api.EventTypeNormal, "FailedPlacement", "failed to place pod on %q: host port conflict", node.ObjectMeta.Name)
return false
} }
message := fmt.Sprintf("GeneralPredicates failed due to %v.", err)
glog.Warningf("Predicate failed on Pod %s - %s", newPod.Name, message)
} }
return true return fit
} }
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.

View File

@ -90,6 +90,9 @@ func newNode(name string, label map[string]string) *api.Node {
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{Type: api.NodeReady, Status: api.ConditionTrue}, {Type: api.NodeReady, Status: api.ConditionTrue},
}, },
Allocatable: api.ResourceList{
api.ResourcePods: resource.MustParse("100"),
},
}, },
} }
} }
@ -201,10 +204,8 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
func TestNotReadNodeDaemonDoesNotLaunchPod(t *testing.T) { func TestNotReadNodeDaemonDoesNotLaunchPod(t *testing.T) {
manager, podControl := newTestController() manager, podControl := newTestController()
node := newNode("not-ready", nil) node := newNode("not-ready", nil)
node.Status = api.NodeStatus{ node.Status.Conditions = []api.NodeCondition{
Conditions: []api.NodeCondition{
{Type: api.NodeReady, Status: api.ConditionFalse}, {Type: api.NodeReady, Status: api.ConditionFalse},
},
} }
manager.nodeStore.Add(node) manager.nodeStore.Add(node)
ds := newDaemonSet("foo") ds := newDaemonSet("foo")
@ -238,6 +239,7 @@ func allocatableResources(memory, cpu string) api.ResourceList {
return api.ResourceList{ return api.ResourceList{
api.ResourceMemory: resource.MustParse(memory), api.ResourceMemory: resource.MustParse(memory),
api.ResourceCPU: resource.MustParse(cpu), api.ResourceCPU: resource.MustParse(cpu),
api.ResourcePods: resource.MustParse("100"),
} }
} }
@ -558,3 +560,26 @@ func TestDSManagerNotReady(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
} }
// Daemon with node affinity should launch pods on nodes matching affinity.
func TestNodeAffinityDaemonLaunchesPods(t *testing.T) {
manager, podControl := newTestController()
addNodes(manager.nodeStore.Store, 0, 4, nil)
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
daemon := newDaemonSet("foo")
affinity := map[string]string{
api.AffinityAnnotationKey: fmt.Sprintf(`
{"nodeAffinity": { "requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "color",
"operator": "In",
"values": ["%s"]
}]
}]
}}}`, simpleNodeLabel["color"]),
}
daemon.Spec.Template.ObjectMeta.Annotations = affinity
manager.dsStore.Add(daemon)
syncAndValidateDaemonSets(t, manager, daemon, podControl, 3, 0)
}

View File

@ -429,42 +429,6 @@ func getResourceRequest(pod *api.Pod) *resourceRequest {
return &result return &result
} }
func CheckPodsExceedingFreeResources(pods []*api.Pod, allocatable api.ResourceList) (fitting []*api.Pod, notFittingCPU, notFittingMemory, notFittingNvidiaGPU []*api.Pod) {
totalMilliCPU := allocatable.Cpu().MilliValue()
totalMemory := allocatable.Memory().Value()
totalNvidiaGPU := allocatable.NvidiaGPU().Value()
milliCPURequested := int64(0)
memoryRequested := int64(0)
nvidiaGPURequested := int64(0)
for _, pod := range pods {
podRequest := getResourceRequest(pod)
fitsCPU := (totalMilliCPU - milliCPURequested) >= podRequest.milliCPU
fitsMemory := (totalMemory - memoryRequested) >= podRequest.memory
fitsNVidiaGPU := (totalNvidiaGPU - nvidiaGPURequested) >= podRequest.nvidiaGPU
if !fitsCPU {
// the pod doesn't fit due to CPU request
notFittingCPU = append(notFittingCPU, pod)
continue
}
if !fitsMemory {
// the pod doesn't fit due to Memory request
notFittingMemory = append(notFittingMemory, pod)
continue
}
if !fitsNVidiaGPU {
// the pod doesn't fit due to NvidiaGPU request
notFittingNvidiaGPU = append(notFittingNvidiaGPU, pod)
continue
}
// the pod fits
milliCPURequested += podRequest.milliCPU
memoryRequested += podRequest.memory
nvidiaGPURequested += podRequest.nvidiaGPU
fitting = append(fitting, pod)
}
return
}
func podName(pod *api.Pod) string { func podName(pod *api.Pod) string {
return pod.Namespace + "/" + pod.Name return pod.Namespace + "/" + pod.Name
} }

View File

@ -1840,8 +1840,7 @@ func TestInterPodAffinity(t *testing.T) {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Labels: podLabel2, Labels: podLabel2,
Annotations: map[string]string{ Annotations: map[string]string{
api.AffinityAnnotationKey: ` api.AffinityAnnotationKey: `{"podAffinity": {
{"podAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": [{ "requiredDuringSchedulingIgnoredDuringExecution": [{
"labelSelector": { "labelSelector": {
"matchExpressions": [{ "matchExpressions": [{

View File

@ -197,6 +197,72 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
Expect(c.DaemonSets(ns).Delete(dsName)).NotTo(HaveOccurred()) Expect(c.DaemonSets(ns).Delete(dsName)).NotTo(HaveOccurred())
}) })
It("should run and stop complex daemon with node affinity", func() {
complexLabel := map[string]string{daemonsetNameLabel: dsName}
nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
framework.Logf("Creating daemon with a node affinity %s", dsName)
affinity := map[string]string{
api.AffinityAnnotationKey: fmt.Sprintf(`
{"nodeAffinity": { "requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "%s",
"operator": "In",
"values": ["%s"]
}]
}]
}}}`, daemonsetColorLabel, nodeSelector[daemonsetColorLabel]),
}
_, err := c.DaemonSets(ns).Create(&extensions.DaemonSet{
ObjectMeta: api.ObjectMeta{
Name: dsName,
},
Spec: extensions.DaemonSetSpec{
Selector: &unversioned.LabelSelector{MatchLabels: complexLabel},
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: complexLabel,
Annotations: affinity,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: dsName,
Image: image,
Ports: []api.ContainerPort{{ContainerPort: 9376}},
},
},
},
},
},
})
Expect(err).NotTo(HaveOccurred())
By("Initially, daemon pods should not be running on any nodes.")
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, complexLabel))
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pods to be running on no nodes")
By("Change label of node, check that daemon pod is launched.")
nodeList := framework.GetReadySchedulableNodesOrDie(f.Client)
Expect(len(nodeList.Items)).To(BeNumerically(">", 0))
newNode, err := setDaemonSetNodeLabels(c, nodeList.Items[0].Name, nodeSelector)
Expect(err).NotTo(HaveOccurred(), "error setting labels on node")
daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
Expect(len(daemonSetLabels)).To(Equal(1))
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, complexLabel, []string{newNode.Name}))
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pods to be running on new nodes")
By("remove the node selector and wait for daemons to be unscheduled")
_, err = setDaemonSetNodeLabels(c, nodeList.Items[0].Name, map[string]string{})
Expect(err).NotTo(HaveOccurred(), "error removing labels on node")
Expect(wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, complexLabel))).
NotTo(HaveOccurred(), "error waiting for daemon pod to not be running on nodes")
By("We should now be able to delete the daemon set.")
Expect(c.DaemonSets(ns).Delete(dsName)).NotTo(HaveOccurred())
})
}) })
func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) { func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {