mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
DaemonSet controller actively kills failed pods (to recreate them)
This commit is contained in:
parent
90b5d4cbd8
commit
a2e1341e01
@ -467,20 +467,34 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
daemonPods, isRunning := nodeToDaemonPods[node.Name]
|
daemonPods, exists := nodeToDaemonPods[node.Name]
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case shouldSchedule && !isRunning:
|
case shouldSchedule && !exists:
|
||||||
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
|
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
|
||||||
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
|
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
|
||||||
case shouldContinueRunning && len(daemonPods) > 1:
|
case shouldContinueRunning:
|
||||||
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
|
// If a daemon pod failed, delete it
|
||||||
// Sort the daemon pods by creation time, so the the oldest is preserved.
|
// TODO: handle the case when the daemon pods fail consistently and causes kill-recreate hot loop
|
||||||
sort.Sort(podByCreationTimestamp(daemonPods))
|
var daemonPodsRunning []*v1.Pod
|
||||||
for i := 1; i < len(daemonPods); i++ {
|
for i := range daemonPods {
|
||||||
podsToDelete = append(podsToDelete, daemonPods[i].Name)
|
daemon := daemonPods[i]
|
||||||
|
if daemon.Status.Phase == v1.PodFailed {
|
||||||
|
glog.V(2).Infof("Found failed daemon pod %s/%s, will try to kill it", daemon.Namespace, daemon.Name)
|
||||||
|
podsToDelete = append(podsToDelete, daemon.Name)
|
||||||
|
} else {
|
||||||
|
daemonPodsRunning = append(daemonPodsRunning, daemon)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case !shouldContinueRunning && isRunning:
|
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
|
||||||
|
// Sort the daemon pods by creation time, so the oldest is preserved.
|
||||||
|
if len(daemonPodsRunning) > 1 {
|
||||||
|
sort.Sort(podByCreationTimestamp(daemonPodsRunning))
|
||||||
|
for i := 1; i < len(daemonPodsRunning); i++ {
|
||||||
|
podsToDelete = append(podsToDelete, daemonPods[i].Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case !shouldContinueRunning && exists:
|
||||||
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
|
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
|
||||||
for i := range daemonPods {
|
for i := range daemonPods {
|
||||||
podsToDelete = append(podsToDelete, daemonPods[i].Name)
|
podsToDelete = append(podsToDelete, daemonPods[i].Name)
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
|
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/kubectl"
|
"k8s.io/kubernetes/pkg/kubectl"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
|
|
||||||
@ -59,6 +60,20 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
|
|||||||
var f *framework.Framework
|
var f *framework.Framework
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
|
// Clean up
|
||||||
|
daemonsets, err := f.ClientSet.Extensions().DaemonSets(f.Namespace.Name).List(metav1.ListOptions{})
|
||||||
|
Expect(err).NotTo(HaveOccurred(), "unable to dump DaemonSets")
|
||||||
|
if daemonsets != nil && len(daemonsets.Items) > 0 {
|
||||||
|
for _, ds := range daemonsets.Items {
|
||||||
|
By(fmt.Sprintf("Deleting DaemonSet %q with reaper", ds.Name))
|
||||||
|
dsReaper, err := kubectl.ReaperFor(extensionsinternal.Kind("DaemonSet"), f.InternalClientset)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = dsReaper.Stop(f.Namespace.Name, ds.Name, 0, nil)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds.Spec.Template.Labels))
|
||||||
|
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to be reaped")
|
||||||
|
}
|
||||||
|
}
|
||||||
if daemonsets, err := f.ClientSet.Extensions().DaemonSets(f.Namespace.Name).List(metav1.ListOptions{}); err == nil {
|
if daemonsets, err := f.ClientSet.Extensions().DaemonSets(f.Namespace.Name).List(metav1.ListOptions{}); err == nil {
|
||||||
framework.Logf("daemonset: %s", runtime.EncodeOrDie(api.Codecs.LegacyCodec(api.Registry.EnabledVersions()...), daemonsets))
|
framework.Logf("daemonset: %s", runtime.EncodeOrDie(api.Codecs.LegacyCodec(api.Registry.EnabledVersions()...), daemonsets))
|
||||||
} else {
|
} else {
|
||||||
@ -69,7 +84,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
|
|||||||
} else {
|
} else {
|
||||||
framework.Logf("unable to dump pods: %v", err)
|
framework.Logf("unable to dump pods: %v", err)
|
||||||
}
|
}
|
||||||
err := clearDaemonSetNodeLabels(f.ClientSet)
|
err = clearDaemonSetNodeLabels(f.ClientSet)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -92,38 +107,9 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
|
|||||||
It("should run and stop simple daemon", func() {
|
It("should run and stop simple daemon", func() {
|
||||||
label := map[string]string{daemonsetNameLabel: dsName}
|
label := map[string]string{daemonsetNameLabel: dsName}
|
||||||
|
|
||||||
framework.Logf("Creating simple daemon set %s", dsName)
|
By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
|
||||||
_, err := c.Extensions().DaemonSets(ns).Create(&extensions.DaemonSet{
|
_, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label))
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: dsName,
|
|
||||||
},
|
|
||||||
Spec: extensions.DaemonSetSpec{
|
|
||||||
Template: v1.PodTemplateSpec{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Labels: label,
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{
|
|
||||||
Name: dsName,
|
|
||||||
Image: image,
|
|
||||||
Ports: []v1.ContainerPort{{ContainerPort: 9376}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
defer func() {
|
|
||||||
framework.Logf("Check that reaper kills all daemon pods for %s", dsName)
|
|
||||||
dsReaper, err := kubectl.ReaperFor(extensionsinternal.Kind("DaemonSet"), f.InternalClientset)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
err = dsReaper.Stop(ns, dsName, 0, nil)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, label))
|
|
||||||
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to be reaped")
|
|
||||||
}()
|
|
||||||
|
|
||||||
By("Check that daemon pods launch on every node of the cluster.")
|
By("Check that daemon pods launch on every node of the cluster.")
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
@ -133,48 +119,21 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
|
|||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
By("Stop a daemon pod, check that the daemon pod is revived.")
|
By("Stop a daemon pod, check that the daemon pod is revived.")
|
||||||
podClient := c.Core().Pods(ns)
|
podList := listDaemonPods(c, ns, label)
|
||||||
|
|
||||||
selector := labels.Set(label).AsSelector()
|
|
||||||
options := metav1.ListOptions{LabelSelector: selector.String()}
|
|
||||||
podList, err := podClient.List(options)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(len(podList.Items)).To(BeNumerically(">", 0))
|
|
||||||
pod := podList.Items[0]
|
pod := podList.Items[0]
|
||||||
err = podClient.Delete(pod.Name, nil)
|
err = c.Core().Pods(ns).Delete(pod.Name, nil)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label))
|
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label))
|
||||||
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive")
|
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive")
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should run and stop complex daemon", func() {
|
It("should run and stop complex daemon", func() {
|
||||||
complexLabel := map[string]string{daemonsetNameLabel: dsName}
|
complexLabel := map[string]string{daemonsetNameLabel: dsName}
|
||||||
nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
|
nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
|
||||||
framework.Logf("Creating daemon with a node selector %s", dsName)
|
framework.Logf("Creating daemon %q with a node selector", dsName)
|
||||||
_, err := c.Extensions().DaemonSets(ns).Create(&extensions.DaemonSet{
|
ds := newDaemonSet(dsName, image, complexLabel)
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ds.Spec.Template.Spec.NodeSelector = nodeSelector
|
||||||
Name: dsName,
|
_, err := c.Extensions().DaemonSets(ns).Create(ds)
|
||||||
},
|
|
||||||
Spec: extensions.DaemonSetSpec{
|
|
||||||
Selector: &metav1.LabelSelector{MatchLabels: complexLabel},
|
|
||||||
Template: v1.PodTemplateSpec{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Labels: complexLabel,
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
NodeSelector: nodeSelector,
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{
|
|
||||||
Name: dsName,
|
|
||||||
Image: image,
|
|
||||||
Ports: []v1.ContainerPort{{ContainerPort: 9376}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
By("Initially, daemon pods should not be running on any nodes.")
|
By("Initially, daemon pods should not be running on any nodes.")
|
||||||
@ -198,17 +157,14 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
|
|||||||
Expect(err).NotTo(HaveOccurred(), "error removing labels on node")
|
Expect(err).NotTo(HaveOccurred(), "error removing labels on node")
|
||||||
Expect(wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, complexLabel))).
|
Expect(wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, complexLabel))).
|
||||||
NotTo(HaveOccurred(), "error waiting for daemon pod to not be running on nodes")
|
NotTo(HaveOccurred(), "error waiting for daemon pod to not be running on nodes")
|
||||||
|
|
||||||
By("We should now be able to delete the daemon set.")
|
|
||||||
Expect(c.Extensions().DaemonSets(ns).Delete(dsName, nil)).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should run and stop complex daemon with node affinity", func() {
|
It("should run and stop complex daemon with node affinity", func() {
|
||||||
complexLabel := map[string]string{daemonsetNameLabel: dsName}
|
complexLabel := map[string]string{daemonsetNameLabel: dsName}
|
||||||
nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
|
nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
|
||||||
framework.Logf("Creating daemon with a node affinity %s", dsName)
|
framework.Logf("Creating daemon %q with a node affinity", dsName)
|
||||||
affinity := &v1.Affinity{
|
ds := newDaemonSet(dsName, image, complexLabel)
|
||||||
|
ds.Spec.Template.Spec.Affinity = &v1.Affinity{
|
||||||
NodeAffinity: &v1.NodeAffinity{
|
NodeAffinity: &v1.NodeAffinity{
|
||||||
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
|
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
|
||||||
NodeSelectorTerms: []v1.NodeSelectorTerm{
|
NodeSelectorTerms: []v1.NodeSelectorTerm{
|
||||||
@ -225,29 +181,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
_, err := c.Extensions().DaemonSets(ns).Create(&extensions.DaemonSet{
|
_, err := c.Extensions().DaemonSets(ns).Create(ds)
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: dsName,
|
|
||||||
},
|
|
||||||
Spec: extensions.DaemonSetSpec{
|
|
||||||
Selector: &metav1.LabelSelector{MatchLabels: complexLabel},
|
|
||||||
Template: v1.PodTemplateSpec{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Labels: complexLabel,
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
Affinity: affinity,
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{
|
|
||||||
Name: dsName,
|
|
||||||
Image: image,
|
|
||||||
Ports: []v1.ContainerPort{{ContainerPort: 9376}},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
By("Initially, daemon pods should not be running on any nodes.")
|
By("Initially, daemon pods should not be running on any nodes.")
|
||||||
@ -271,13 +205,67 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
|
|||||||
Expect(err).NotTo(HaveOccurred(), "error removing labels on node")
|
Expect(err).NotTo(HaveOccurred(), "error removing labels on node")
|
||||||
Expect(wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, complexLabel))).
|
Expect(wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, complexLabel))).
|
||||||
NotTo(HaveOccurred(), "error waiting for daemon pod to not be running on nodes")
|
NotTo(HaveOccurred(), "error waiting for daemon pod to not be running on nodes")
|
||||||
|
})
|
||||||
|
|
||||||
By("We should now be able to delete the daemon set.")
|
It("should retry creating failed daemon pods", func() {
|
||||||
Expect(c.Extensions().DaemonSets(ns).Delete(dsName, nil)).NotTo(HaveOccurred())
|
label := map[string]string{daemonsetNameLabel: dsName}
|
||||||
|
|
||||||
|
By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName))
|
||||||
|
_, err := c.Extensions().DaemonSets(ns).Create(newDaemonSet(dsName, image, label))
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Check that daemon pods launch on every node of the cluster.")
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label))
|
||||||
|
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to start")
|
||||||
|
err = checkDaemonStatus(f, dsName)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Set a daemon pod's phase to 'Failed', check that the daemon pod is revived.")
|
||||||
|
podList := listDaemonPods(c, ns, label)
|
||||||
|
pod := podList.Items[0]
|
||||||
|
pod.ResourceVersion = ""
|
||||||
|
pod.Status.Phase = v1.PodFailed
|
||||||
|
_, err = c.Core().Pods(ns).UpdateStatus(&pod)
|
||||||
|
Expect(err).NotTo(HaveOccurred(), "error failing a daemon pod")
|
||||||
|
err = wait.Poll(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, label))
|
||||||
|
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive")
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
func newDaemonSet(dsName, image string, label map[string]string) *extensions.DaemonSet {
|
||||||
|
return &extensions.DaemonSet{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: dsName,
|
||||||
|
},
|
||||||
|
Spec: extensions.DaemonSetSpec{
|
||||||
|
Template: v1.PodTemplateSpec{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Labels: label,
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: dsName,
|
||||||
|
Image: image,
|
||||||
|
Ports: []v1.ContainerPort{{ContainerPort: 9376}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func listDaemonPods(c clientset.Interface, ns string, label map[string]string) *v1.PodList {
|
||||||
|
selector := labels.Set(label).AsSelector()
|
||||||
|
options := metav1.ListOptions{LabelSelector: selector.String()}
|
||||||
|
podList, err := c.Core().Pods(ns).List(options)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(len(podList.Items)).To(BeNumerically(">", 0))
|
||||||
|
return podList
|
||||||
|
}
|
||||||
|
|
||||||
func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {
|
func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {
|
||||||
daemonSetLabels := map[string]string{}
|
daemonSetLabels := map[string]string{}
|
||||||
otherLabels := map[string]string{}
|
otherLabels := map[string]string{}
|
||||||
@ -354,7 +342,9 @@ func checkDaemonPodOnNodes(f *framework.Framework, selector map[string]string, n
|
|||||||
|
|
||||||
nodesToPodCount := make(map[string]int)
|
nodesToPodCount := make(map[string]int)
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
nodesToPodCount[pod.Spec.NodeName] += 1
|
if controller.IsPodActive(&pod) {
|
||||||
|
nodesToPodCount[pod.Spec.NodeName] += 1
|
||||||
|
}
|
||||||
}
|
}
|
||||||
framework.Logf("nodesToPodCount: %#v", nodesToPodCount)
|
framework.Logf("nodesToPodCount: %#v", nodesToPodCount)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user