add --selector flag support

This commit is contained in:
juanvallejo 2017-09-22 13:55:54 -04:00
parent 086bda60e5
commit 3775a50cb8
No known key found for this signature in database
GPG Key ID: 7D2C958002D6448D
2 changed files with 152 additions and 52 deletions

View File

@ -4177,6 +4177,38 @@ run_certificates_tests() {
set +o errexit
}
run_cluster_management_tests() {
set -o nounset
set -o errexit
kube::log::status "Testing cluster-management commands"
kube::test::get_object_assert nodes "{{range.items}}{{$id_field}}:{{end}}" '127.0.0.1:'
### kubectl drain command fails when both --selector and a node argument are given
# Pre-condition: node exists and contains label test=label
kubectl label node "127.0.0.1" "test=label"
kube::test::get_object_assert "nodes 127.0.0.1" '{{.metadata.labels.test}}' 'label'
response=$(! kubectl drain "127.0.0.1" --selector test=label 2>&1)
kube::test::if_has_string "${response}" 'cannot specify both a node name'
### kubectl cordon command fails when no arguments are passed
# Pre-condition: node exists
response=$(! kubectl cordon 2>&1)
kube::test::if_has_string "${response}" 'error\: USAGE\: cordon NODE'
### kubectl cordon selects all nodes with an empty --selector=
# Pre-condition: node "127.0.0.1" is uncordoned
kubectl uncordon "127.0.0.1"
response=$(kubectl cordon --selector=)
kube::test::if_has_string "${response}" 'node "127.0.0.1" cordoned'
# Post-condition: node "127.0.0.1" is cordoned
kube::test::get_object_assert "nodes 127.0.0.1" "{{.spec.unschedulable}}" 'true'
set +o nounset
set +o errexit
}
run_plugins_tests() {
set -o nounset
set -o errexit
@ -4766,6 +4798,13 @@ runTests() {
record_command run_certificates_tests
fi
######################
# Cluster Management #
######################
if kube::test::if_supports_resource "${nodes}" ; then
record_command run_cluster_management_tests
fi
###########
# Plugins #
###########

View File

@ -24,8 +24,6 @@ import (
"strings"
"time"
"k8s.io/apimachinery/pkg/util/json"
"github.com/jonboulle/clockwork"
"github.com/spf13/cobra"
@ -36,6 +34,8 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
restclient "k8s.io/client-go/rest"
@ -60,8 +60,9 @@ type DrainOptions struct {
Timeout time.Duration
backOff clockwork.Clock
DeleteLocalData bool
Selector string
mapper meta.RESTMapper
nodeInfo *resource.Info
nodeInfos []*resource.Info
Out io.Writer
ErrOut io.Writer
typer runtime.ObjectTyper
@ -111,6 +112,7 @@ func NewCmdCordon(f cmdutil.Factory, out io.Writer) *cobra.Command {
cmdutil.CheckErr(options.RunCordonOrUncordon(true))
},
}
cmd.Flags().StringVarP(&options.Selector, "selector", "l", options.Selector, "Selector (label query) to filter on")
return cmd
}
@ -136,6 +138,7 @@ func NewCmdUncordon(f cmdutil.Factory, out io.Writer) *cobra.Command {
cmdutil.CheckErr(options.RunCordonOrUncordon(false))
},
}
cmd.Flags().StringVarP(&options.Selector, "selector", "l", options.Selector, "Selector (label query) to filter on")
return cmd
}
@ -191,6 +194,7 @@ func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")
cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")
cmd.Flags().DurationVar(&options.Timeout, "timeout", 0, "The length of time to wait before giving up, zero means infinite")
cmd.Flags().StringVarP(&options.Selector, "selector", "l", options.Selector, "Selector (label query) to filter on")
return cmd
}
@ -198,8 +202,16 @@ func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
// arguments and looks up the node using Builder
func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
var err error
if len(args) != 1 {
return cmdutil.UsageErrorf(cmd, "USAGE: %s [flags]", cmd.Use)
o.Selector = cmdutil.GetFlagString(cmd, "selector")
if len(args) == 0 && !cmd.Flags().Changed("selector") {
return cmdutil.UsageErrorf(cmd, fmt.Sprintf("USAGE: %s [flags]", cmd.Use))
}
if len(args) > 0 && len(o.Selector) > 0 {
return cmdutil.UsageErrorf(cmd, "error: cannot specify both a node name and a --selector option")
}
if len(args) > 0 && len(args) != 1 {
return cmdutil.UsageErrorf(cmd, fmt.Sprintf("USAGE: %s [flags]", cmd.Use))
}
if o.client, err = o.Factory.ClientSet(); err != nil {
@ -211,6 +223,7 @@ func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
return err
}
o.nodeInfos = []*resource.Info{}
o.mapper, o.typer = o.Factory.Object()
cmdNamespace, _, err := o.Factory.DefaultNamespace()
@ -218,9 +231,19 @@ func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
return err
}
nameArgs := []string{"nodes"}
if len(args) > 0 {
nameArgs = append(nameArgs, args[0])
if strings.Contains(args[0], "/") {
nameArgs = []string{args[0]}
}
}
r := o.Factory.NewBuilder().
NamespaceParam(cmdNamespace).DefaultNamespace().
ResourceNames("node", args[0]).
SelectorParam(o.Selector).
ResourceTypeOrNameArgs(true, nameArgs...).
Flatten().
Do()
if err = r.Err(); err != nil {
@ -231,7 +254,7 @@ func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
o.nodeInfo = info
o.nodeInfos = append(o.nodeInfos, info)
return nil
})
}
@ -242,26 +265,51 @@ func (o *DrainOptions) RunDrain() error {
return err
}
err := o.deleteOrEvictPodsSimple()
if err == nil {
cmdutil.PrintSuccess(o.mapper, false, o.Out, "node", o.nodeInfo.Name, false, "drained")
drainedNodes := sets.NewString()
var fatal error
for _, info := range o.nodeInfos {
err := o.deleteOrEvictPodsSimple(info)
if err == nil {
drainedNodes.Insert(info.Name)
cmdutil.PrintSuccess(o.mapper, false, o.Out, "node", info.Name, false, "drained")
} else {
fmt.Fprintf(o.ErrOut, "error: unable to drain node %q, aborting command...\n\n", info.Name)
remainingNodes := []string{}
fatal = err
for _, remainingInfo := range o.nodeInfos {
if drainedNodes.Has(remainingInfo.Name) {
continue
}
remainingNodes = append(remainingNodes, remainingInfo.Name)
}
if len(remainingNodes) > 0 {
fmt.Fprintf(o.ErrOut, "There are pending nodes to be drained:\n")
for _, nodeName := range remainingNodes {
fmt.Fprintf(o.ErrOut, " %s\n", nodeName)
}
}
break
}
}
return err
return fatal
}
func (o *DrainOptions) deleteOrEvictPodsSimple() error {
pods, err := o.getPodsForDeletion()
func (o *DrainOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error {
pods, err := o.getPodsForDeletion(nodeInfo)
if err != nil {
return err
}
err = o.deleteOrEvictPods(pods)
if err != nil {
pendingPods, newErr := o.getPodsForDeletion()
pendingPods, newErr := o.getPodsForDeletion(nodeInfo)
if newErr != nil {
return newErr
}
fmt.Fprintf(o.ErrOut, "There are pending pods when an error occurred: %v\n", err)
fmt.Fprintf(o.ErrOut, "There are pending pods in node %q when an error occurred: %v\n", nodeInfo.Name, err)
for _, pendingPod := range pendingPods {
fmt.Fprintf(o.ErrOut, "%s/%s\n", "pod", pendingPod.Name)
}
@ -393,11 +441,11 @@ func (ps podStatuses) Message() string {
return strings.Join(msgs, "; ")
}
// getPodsForDeletion returns all the pods we're going to delete. If there are
// any pods preventing us from deleting, we return that list in an error.
func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
// getPodsForDeletion receives resource info for a node, and returns all the pods from the given node that we
// are planning on deleting. If there are any pods preventing us from deleting, we return that list in an error.
func (o *DrainOptions) getPodsForDeletion(nodeInfo *resource.Info) (pods []api.Pod, err error) {
podList, err := o.client.Core().Pods(metav1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeInfo.Name}).String()})
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeInfo.Name}).String()})
if err != nil {
return pods, err
}
@ -625,41 +673,54 @@ func (o *DrainOptions) RunCordonOrUncordon(desired bool) error {
return err
}
if o.nodeInfo.Mapping.GroupVersionKind.Kind == "Node" {
obj, err := o.nodeInfo.Mapping.ConvertToVersion(o.nodeInfo.Object, o.nodeInfo.Mapping.GroupVersionKind.GroupVersion())
if err != nil {
return err
}
oldData, err := json.Marshal(obj)
if err != nil {
return err
}
node, ok := obj.(*corev1.Node)
if !ok {
return fmt.Errorf("unexpected Type%T, expected Node", obj)
}
unsched := node.Spec.Unschedulable
if unsched == desired {
cmdutil.PrintSuccess(o.mapper, false, o.Out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, false, already(desired))
cordonOrUncordon := "cordon"
if !desired {
cordonOrUncordon = "un" + cordonOrUncordon
}
for _, nodeInfo := range o.nodeInfos {
if nodeInfo.Mapping.GroupVersionKind.Kind == "Node" {
obj, err := nodeInfo.Mapping.ConvertToVersion(nodeInfo.Object, nodeInfo.Mapping.GroupVersionKind.GroupVersion())
if err != nil {
fmt.Printf("error: unable to %s node %q: %v", cordonOrUncordon, nodeInfo.Name, err)
continue
}
oldData, err := json.Marshal(obj)
if err != nil {
fmt.Printf("error: unable to %s node %q: %v", cordonOrUncordon, nodeInfo.Name, err)
continue
}
node, ok := obj.(*corev1.Node)
if !ok {
fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: unexpected Type%T, expected Node", cordonOrUncordon, nodeInfo.Name, obj)
continue
}
unsched := node.Spec.Unschedulable
if unsched == desired {
cmdutil.PrintSuccess(o.mapper, false, o.Out, nodeInfo.Mapping.Resource, nodeInfo.Name, false, already(desired))
} else {
helper := resource.NewHelper(o.restClient, nodeInfo.Mapping)
node.Spec.Unschedulable = desired
newData, err := json.Marshal(obj)
if err != nil {
fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: %v", cordonOrUncordon, nodeInfo.Name, err)
continue
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, obj)
if err != nil {
fmt.Printf("error: unable to %s node %q: %v", cordonOrUncordon, nodeInfo.Name, err)
continue
}
_, err = helper.Patch(cmdNamespace, nodeInfo.Name, types.StrategicMergePatchType, patchBytes)
if err != nil {
fmt.Printf("error: unable to %s node %q: %v", cordonOrUncordon, nodeInfo.Name, err)
continue
}
cmdutil.PrintSuccess(o.mapper, false, o.Out, nodeInfo.Mapping.Resource, nodeInfo.Name, false, changed(desired))
}
} else {
helper := resource.NewHelper(o.restClient, o.nodeInfo.Mapping)
node.Spec.Unschedulable = desired
newData, err := json.Marshal(obj)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, obj)
if err != nil {
return err
}
_, err = helper.Patch(cmdNamespace, o.nodeInfo.Name, types.StrategicMergePatchType, patchBytes)
if err != nil {
return err
}
cmdutil.PrintSuccess(o.mapper, false, o.Out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, false, changed(desired))
cmdutil.PrintSuccess(o.mapper, false, o.Out, nodeInfo.Mapping.Resource, nodeInfo.Name, false, "skipped")
}
} else {
cmdutil.PrintSuccess(o.mapper, false, o.Out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, false, "skipped")
}
return nil