From a25088fe15eefb4a794ea483d924fe1ec1f08e8f Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Tue, 29 Mar 2016 19:01:54 +0200 Subject: [PATCH] kubectl: more sophisticated pod selection for logs and attach --- pkg/kubectl/cmd/util/factory.go | 70 ++++++--- pkg/kubectl/cmd/util/factory_test.go | 203 +++++++++++++++++++++++++++ test/e2e/kubectl.go | 5 +- 3 files changed, 258 insertions(+), 20 deletions(-) diff --git a/pkg/kubectl/cmd/util/factory.go b/pkg/kubectl/cmd/util/factory.go index cadf469297c..cb3a29da34f 100644 --- a/pkg/kubectl/cmd/util/factory.go +++ b/pkg/kubectl/cmd/util/factory.go @@ -27,6 +27,7 @@ import ( "os/user" "path" "path/filepath" + "sort" "strconv" "strings" "time" @@ -50,6 +51,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" clientset "k8s.io/kubernetes/pkg/client/unversioned/adapters/internalclientset" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/labels" @@ -57,6 +59,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime/serializer/json" utilflag "k8s.io/kubernetes/pkg/util/flag" + "k8s.io/kubernetes/pkg/watch" ) const ( @@ -480,7 +483,8 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory { return nil, errors.New("provided options object is not a PodLogOptions") } selector := labels.SelectorFromSet(t.Spec.Selector) - pod, numPods, err := GetFirstPod(c, t.Namespace, selector) + sortBy := func(pods []*api.Pod) sort.Interface { return controller.ActivePods(pods) } + pod, numPods, err := GetFirstPod(c, t.Namespace, selector, 20*time.Second, sortBy) if err != nil { return nil, err } @@ -499,7 +503,8 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory { if err != nil { return nil, fmt.Errorf("invalid label selector: %v", err) } - pod, numPods, err := GetFirstPod(c, t.Namespace, selector) + sortBy := func(pods []*api.Pod) sort.Interface { return controller.ActivePods(pods) } + pod, numPods, err := GetFirstPod(c, t.Namespace, selector, 20*time.Second, sortBy) if err != nil { return nil, err } @@ -656,21 +661,24 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory { switch t := object.(type) { case *api.ReplicationController: selector := labels.SelectorFromSet(t.Spec.Selector) - pod, _, err := GetFirstPod(client, t.Namespace, selector) + sortBy := func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) } + pod, _, err := GetFirstPod(client, t.Namespace, selector, 1*time.Minute, sortBy) return pod, err case *extensions.Deployment: selector, err := unversioned.LabelSelectorAsSelector(t.Spec.Selector) if err != nil { return nil, fmt.Errorf("invalid label selector: %v", err) } - pod, _, err := GetFirstPod(client, t.Namespace, selector) + sortBy := func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) } + pod, _, err := GetFirstPod(client, t.Namespace, selector, 1*time.Minute, sortBy) return pod, err case *batch.Job: selector, err := unversioned.LabelSelectorAsSelector(t.Spec.Selector) if err != nil { return nil, fmt.Errorf("invalid label selector: %v", err) } - pod, _, err := GetFirstPod(client, t.Namespace, selector) + sortBy := func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) } + pod, _, err := GetFirstPod(client, t.Namespace, selector, 1*time.Minute, sortBy) return pod, err case *api.Pod: return t, nil @@ -688,21 +696,45 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory { } } -// GetFirstPod returns the first pod of an object from its namespace and selector and the number of matching pods -func GetFirstPod(client *client.Client, namespace string, selector labels.Selector) (*api.Pod, int, error) { - var pods *api.PodList - for pods == nil || len(pods.Items) == 0 { - var err error - options := api.ListOptions{LabelSelector: selector} - if pods, err = client.Pods(namespace).List(options); err != nil { - return nil, 0, err - } - if len(pods.Items) == 0 { - time.Sleep(2 * time.Second) - } +// GetFirstPod returns a pod matching the namespace and label selector +// and the number of all pods that match the label selector. +func GetFirstPod(client client.Interface, namespace string, selector labels.Selector, timeout time.Duration, sortBy func([]*api.Pod) sort.Interface) (*api.Pod, int, error) { + options := api.ListOptions{LabelSelector: selector} + + podList, err := client.Pods(namespace).List(options) + if err != nil { + return nil, 0, err } - pod := &pods.Items[0] - return pod, len(pods.Items), nil + pods := []*api.Pod{} + for i := range podList.Items { + pod := podList.Items[i] + pods = append(pods, &pod) + } + if len(pods) > 0 { + sort.Sort(sortBy(pods)) + return pods[0], len(podList.Items), nil + } + + // Watch until we observe a pod + options.ResourceVersion = podList.ResourceVersion + w, err := client.Pods(namespace).Watch(options) + if err != nil { + return nil, 0, err + } + defer w.Stop() + + condition := func(event watch.Event) (bool, error) { + return event.Type == watch.Added || event.Type == watch.Modified, nil + } + event, err := watch.Until(timeout, w, condition) + if err != nil { + return nil, 0, err + } + pod, ok := event.Object.(*api.Pod) + if !ok { + return nil, 0, fmt.Errorf("%#v is not a pod event", event) + } + return pod, 1, nil } // Command will stringify and return all environment arguments ie. a command run by a client diff --git a/pkg/kubectl/cmd/util/factory_test.go b/pkg/kubectl/cmd/util/factory_test.go index 459b34d6cd8..ff3484f3494 100644 --- a/pkg/kubectl/cmd/util/factory_test.go +++ b/pkg/kubectl/cmd/util/factory_test.go @@ -19,14 +19,17 @@ package util import ( "bytes" "encoding/json" + "fmt" "io/ioutil" "net/http" "os" "os/user" "path" + "reflect" "sort" "strings" "testing" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" @@ -36,9 +39,13 @@ import ( "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" "k8s.io/kubernetes/pkg/client/unversioned/fake" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/kubectl" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/flag" + "k8s.io/kubernetes/pkg/watch" ) func TestNewFactoryDefaultFlagBindings(t *testing.T) { @@ -394,3 +401,199 @@ func TestSubstitueUser(t *testing.T) { } } } + +func newPodList(count, isUnready, isUnhealthy int, labels map[string]string) *api.PodList { + pods := []api.Pod{} + for i := 0; i < count; i++ { + newPod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", i+1), + Namespace: api.NamespaceDefault, + CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, i, 0, time.UTC), + Labels: labels, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Status: api.ConditionTrue, + Type: api.PodReady, + }, + }, + }, + } + pods = append(pods, newPod) + } + if isUnready > -1 && isUnready < count { + pods[isUnready].Status.Conditions[0].Status = api.ConditionFalse + } + if isUnhealthy > -1 && isUnhealthy < count { + pods[isUnhealthy].Status.ContainerStatuses = []api.ContainerStatus{{RestartCount: 5}} + } + return &api.PodList{ + Items: pods, + } +} + +func TestGetFirstPod(t *testing.T) { + labelSet := map[string]string{"test": "selector"} + tests := []struct { + name string + + podList *api.PodList + watching []watch.Event + sortBy func([]*api.Pod) sort.Interface + + expected *api.Pod + expectedNum int + expectedErr bool + }{ + { + name: "kubectl logs - two ready pods", + podList: newPodList(2, -1, -1, labelSet), + sortBy: func(pods []*api.Pod) sort.Interface { return controller.ActivePods(pods) }, + expected: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod-2", + Namespace: api.NamespaceDefault, + CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, 1, 0, time.UTC), + Labels: map[string]string{"test": "selector"}, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Status: api.ConditionTrue, + Type: api.PodReady, + }, + }, + }, + }, + expectedNum: 2, + }, + { + name: "kubectl logs - one unhealthy, one healthy", + podList: newPodList(2, -1, 1, labelSet), + sortBy: func(pods []*api.Pod) sort.Interface { return controller.ActivePods(pods) }, + expected: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod-2", + Namespace: api.NamespaceDefault, + CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, 1, 0, time.UTC), + Labels: map[string]string{"test": "selector"}, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Status: api.ConditionTrue, + Type: api.PodReady, + }, + }, + ContainerStatuses: []api.ContainerStatus{{RestartCount: 5}}, + }, + }, + expectedNum: 2, + }, + { + name: "kubectl attach - two ready pods", + podList: newPodList(2, -1, -1, labelSet), + sortBy: func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) }, + expected: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod-1", + Namespace: api.NamespaceDefault, + CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, 0, 0, time.UTC), + Labels: map[string]string{"test": "selector"}, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Status: api.ConditionTrue, + Type: api.PodReady, + }, + }, + }, + }, + expectedNum: 2, + }, + { + name: "kubectl attach - wait for ready pod", + podList: newPodList(1, 1, -1, labelSet), + watching: []watch.Event{ + { + Type: watch.Modified, + Object: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod-1", + Namespace: api.NamespaceDefault, + CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, 0, 0, time.UTC), + Labels: map[string]string{"test": "selector"}, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Status: api.ConditionTrue, + Type: api.PodReady, + }, + }, + }, + }, + }, + }, + sortBy: func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) }, + expected: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod-1", + Namespace: api.NamespaceDefault, + CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, 0, 0, time.UTC), + Labels: map[string]string{"test": "selector"}, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Status: api.ConditionTrue, + Type: api.PodReady, + }, + }, + }, + }, + expectedNum: 1, + }, + } + + for i := range tests { + test := tests[i] + client := &testclient.Fake{} + client.PrependReactor("list", "pods", func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + return true, test.podList, nil + }) + if len(test.watching) > 0 { + watcher := watch.NewFake() + for _, event := range test.watching { + switch event.Type { + case watch.Added: + go watcher.Add(event.Object) + case watch.Modified: + go watcher.Modify(event.Object) + } + } + client.PrependWatchReactor("pods", testclient.DefaultWatchReactor(watcher, nil)) + } + selector := labels.Set(labelSet).AsSelector() + + pod, numPods, err := GetFirstPod(client, api.NamespaceDefault, selector, 1*time.Minute, test.sortBy) + if !test.expectedErr && err != nil { + t.Errorf("%s: unexpected error: %v", test.name, err) + continue + } + if test.expectedErr && err == nil { + t.Errorf("%s: expected an error", test.name) + continue + } + if test.expectedNum != numPods { + t.Errorf("%s: expected %d pods, got %d", test.name, test.expectedNum, numPods) + continue + } + if !reflect.DeepEqual(test.expected, pod) { + t.Errorf("%s:\nexpected pod:\n%#v\ngot:\n%#v\n\n", test.name, test.expected, pod) + } + } +} diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index 0940e2e1f02..5b940b17330 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -32,6 +32,7 @@ import ( "path" "path/filepath" "regexp" + "sort" "strconv" "strings" "time" @@ -42,6 +43,7 @@ import ( apierrs "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/labels" @@ -503,7 +505,8 @@ var _ = framework.KubeDescribe("Kubectl client", func() { WithStdinData("abcd1234\n"). ExecOrDie() Expect(runOutput).ToNot(ContainSubstring("stdin closed")) - runTestPod, _, err := util.GetFirstPod(c, ns, labels.SelectorFromSet(map[string]string{"run": "run-test-3"})) + f := func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) } + runTestPod, _, err := util.GetFirstPod(c, ns, labels.SelectorFromSet(map[string]string{"run": "run-test-3"}), 1*time.Minute, f) if err != nil { os.Exit(1) }