Fix kubectl drain for statefulset and use eviciton for drain if possible

This commit is contained in:
ymqytw
2016-10-20 09:43:48 -07:00
parent 0c7421fb51
commit b73fae6c55
18 changed files with 548 additions and 103 deletions

View File

@@ -20,15 +20,19 @@ import (
"errors"
"fmt"
"io"
"math"
"reflect"
"strings"
"time"
"github.com/jonboulle/clockwork"
"github.com/spf13/cobra"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/fields"
@@ -49,10 +53,12 @@ type DrainOptions struct {
GracePeriodSeconds int
IgnoreDaemonsets bool
Timeout time.Duration
backOff clockwork.Clock
DeleteLocalData bool
mapper meta.RESTMapper
nodeInfo *resource.Info
out io.Writer
errOut io.Writer
typer runtime.ObjectTyper
}
@@ -67,6 +73,9 @@ type fatal struct {
}
const (
EvictionKind = "Eviction"
EvictionSubresource = "pods/eviction"
kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)"
kDaemonsetWarning = "Ignoring DaemonSet-managed pods"
kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)"
@@ -152,8 +161,8 @@ var (
$ kubectl drain foo --grace-period=900`)
)
func NewCmdDrain(f cmdutil.Factory, out io.Writer) *cobra.Command {
options := &DrainOptions{factory: f, out: out}
func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
options := &DrainOptions{factory: f, out: out, errOut: errOut, backOff: clockwork.NewRealClock()}
cmd := &cobra.Command{
Use: "drain NODE",
@@ -221,16 +230,43 @@ func (o *DrainOptions) RunDrain() error {
return err
}
err := o.deleteOrEvictPodsSimple()
// TODO: update IsTooManyRequests() when the TooManyRequests(429) error returned from the API server has a non-empty Reason field
for i := 1; i <= maxPatchRetry && apierrors.IsTooManyRequests(err); i++ {
if i > triesBeforeBackOff {
currBackOffPeriod := time.Duration(math.Exp2(float64(i-triesBeforeBackOff))) * backOffPeriod
fmt.Fprintf(o.errOut, "Retry in %v\n", currBackOffPeriod)
o.backOff.Sleep(currBackOffPeriod)
}
fmt.Fprintf(o.errOut, "Retrying\n")
err = o.deleteOrEvictPodsSimple()
}
if err == nil {
cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, false, "drained")
}
return err
}
func (o *DrainOptions) deleteOrEvictPodsSimple() error {
pods, err := o.getPodsForDeletion()
if err != nil {
return err
}
if err = o.deletePods(pods); err != nil {
return err
if o.Timeout == 0 {
o.Timeout = kubectl.Timeout + time.Duration(10*len(pods))*time.Second
}
cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, false, "drained")
return nil
err = o.deleteOrEvictPods(pods)
if err != nil {
pendingPods, newErr := o.getPodsForDeletion()
if newErr != nil {
return newErr
}
fmt.Fprintf(o.errOut, "There are pending pods when an error occurred: %v\n", err)
for _, pendingPod := range pendingPods {
fmt.Fprintf(o.errOut, "%s/%s\n", "pod", pendingPod.Name)
}
}
return err
}
func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) {
@@ -243,6 +279,8 @@ func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{},
return o.client.Batch().Jobs(sr.Reference.Namespace).Get(sr.Reference.Name)
case "ReplicaSet":
return o.client.Extensions().ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name)
case "StatefulSet":
return o.client.Apps().StatefulSets(sr.Reference.Namespace).Get(sr.Reference.Name)
}
return nil, fmt.Errorf("Unknown controller kind %q", sr.Reference.Kind)
}
@@ -252,7 +290,6 @@ func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, err
if !found {
return nil, nil
}
// Now verify that the specified creator actually exists.
sr := &api.SerializedReference{}
if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil {
@@ -380,21 +417,58 @@ func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
return []api.Pod{}, errors.New(fs.Message())
}
if len(ws) > 0 {
fmt.Fprintf(o.out, "WARNING: %s\n", ws.Message())
fmt.Fprintf(o.errOut, "WARNING: %s\n", ws.Message())
}
return pods, nil
}
// deletePods deletes the pods on the api server
func (o *DrainOptions) deletePods(pods []api.Pod) error {
deleteOptions := api.DeleteOptions{}
func (o *DrainOptions) deletePod(pod api.Pod) error {
deleteOptions := &api.DeleteOptions{}
if o.GracePeriodSeconds >= 0 {
gracePeriodSeconds := int64(o.GracePeriodSeconds)
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
}
return o.client.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
}
func (o *DrainOptions) evictPod(pod api.Pod, policyGroupVersion string) error {
deleteOptions := &api.DeleteOptions{}
if o.GracePeriodSeconds >= 0 {
gracePeriodSeconds := int64(o.GracePeriodSeconds)
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
}
eviction := &policy.Eviction{
TypeMeta: unversioned.TypeMeta{
APIVersion: policyGroupVersion,
Kind: EvictionKind,
},
ObjectMeta: api.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
DeleteOptions: deleteOptions,
}
// Remember to change change the URL manipulation func when Evction's version change
return o.client.Policy().Evictions(eviction.Namespace).Evict(eviction)
}
// deleteOrEvictPods deletes or evicts the pods on the api server
func (o *DrainOptions) deleteOrEvictPods(pods []api.Pod) error {
if len(pods) == 0 {
return nil
}
policyGroupVersion, err := SupportEviction(o.client)
if err != nil {
return err
}
for _, pod := range pods {
err := o.client.Core().Pods(pod.Namespace).Delete(pod.Name, &deleteOptions)
if len(policyGroupVersion) > 0 {
err = o.evictPod(pod, policyGroupVersion)
} else {
err = o.deletePod(pod)
}
if err != nil {
return err
}
@@ -403,17 +477,11 @@ func (o *DrainOptions) deletePods(pods []api.Pod) error {
getPodFn := func(namespace, name string) (*api.Pod, error) {
return o.client.Core().Pods(namespace).Get(name)
}
pendingPods, err := o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn)
if err != nil {
fmt.Fprintf(o.out, "There are pending pods when an error occured:\n")
for _, pendindPod := range pendingPods {
cmdutil.PrintSuccess(o.mapper, true, o.out, "pod", pendindPod.Name, false, "")
}
}
_, err = o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn)
return err
}
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(namespace, name string) (*api.Pod, error)) ([]api.Pod, error) {
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
pendingPods := []api.Pod{}
for i, pod := range pods {
@@ -436,6 +504,38 @@ func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Dura
return pods, err
}
// SupportEviction uses Discovery API to find out if the server support eviction subresource
// If support, it will return its groupVersion; Otherwise, it will return ""
func SupportEviction(clientset *internalclientset.Clientset) (string, error) {
discoveryClient := clientset.Discovery()
groupList, err := discoveryClient.ServerGroups()
if err != nil {
return "", err
}
foundPolicyGroup := false
var policyGroupVersion string
for _, group := range groupList.Groups {
if group.Name == "policy" {
foundPolicyGroup = true
policyGroupVersion = group.PreferredVersion.GroupVersion
break
}
}
if !foundPolicyGroup {
return "", nil
}
resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
if err != nil {
return "", err
}
for _, resource := range resourceList.APIResources {
if resource.Name == EvictionSubresource && resource.Kind == EvictionKind {
return policyGroupVersion, nil
}
}
return "", nil
}
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
// "Unschedulable" is passed as the first arg.
func (o *DrainOptions) RunCordonOrUncordon(desired bool) error {