Merge pull request #23590 from kargakis/timeout-logs-attach

Automatic merge from submit-queue

kubectl: more sophisticated pod selection for logs and attach

Trying to get the logs or attach to an object other than a pod
will poll forever if that object has no replicas. This commit adds
a 20s timeout for polling.

@kubernetes/kubectl @deads2k @fabianofranz
This commit is contained in:
k8s-merge-robot 2016-05-08 10:51:00 -07:00
commit 601e09852c
3 changed files with 258 additions and 20 deletions

View File

@ -27,6 +27,7 @@ import (
"os/user"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
@ -50,6 +51,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
clientset "k8s.io/kubernetes/pkg/client/unversioned/adapters/internalclientset"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/labels"
@ -57,6 +59,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/json"
utilflag "k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/watch"
)
const (
@ -443,7 +446,8 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory {
return nil, errors.New("provided options object is not a PodLogOptions")
}
selector := labels.SelectorFromSet(t.Spec.Selector)
pod, numPods, err := GetFirstPod(c, t.Namespace, selector)
sortBy := func(pods []*api.Pod) sort.Interface { return controller.ActivePods(pods) }
pod, numPods, err := GetFirstPod(c, t.Namespace, selector, 20*time.Second, sortBy)
if err != nil {
return nil, err
}
@ -462,7 +466,8 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory {
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
pod, numPods, err := GetFirstPod(c, t.Namespace, selector)
sortBy := func(pods []*api.Pod) sort.Interface { return controller.ActivePods(pods) }
pod, numPods, err := GetFirstPod(c, t.Namespace, selector, 20*time.Second, sortBy)
if err != nil {
return nil, err
}
@ -619,21 +624,24 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory {
switch t := object.(type) {
case *api.ReplicationController:
selector := labels.SelectorFromSet(t.Spec.Selector)
pod, _, err := GetFirstPod(client, t.Namespace, selector)
sortBy := func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) }
pod, _, err := GetFirstPod(client, t.Namespace, selector, 1*time.Minute, sortBy)
return pod, err
case *extensions.Deployment:
selector, err := unversioned.LabelSelectorAsSelector(t.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
pod, _, err := GetFirstPod(client, t.Namespace, selector)
sortBy := func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) }
pod, _, err := GetFirstPod(client, t.Namespace, selector, 1*time.Minute, sortBy)
return pod, err
case *batch.Job:
selector, err := unversioned.LabelSelectorAsSelector(t.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
pod, _, err := GetFirstPod(client, t.Namespace, selector)
sortBy := func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) }
pod, _, err := GetFirstPod(client, t.Namespace, selector, 1*time.Minute, sortBy)
return pod, err
case *api.Pod:
return t, nil
@ -651,21 +659,45 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory {
}
}
// GetFirstPod returns the first pod of an object from its namespace and selector and the number of matching pods
func GetFirstPod(client *client.Client, namespace string, selector labels.Selector) (*api.Pod, int, error) {
var pods *api.PodList
for pods == nil || len(pods.Items) == 0 {
var err error
options := api.ListOptions{LabelSelector: selector}
if pods, err = client.Pods(namespace).List(options); err != nil {
return nil, 0, err
}
if len(pods.Items) == 0 {
time.Sleep(2 * time.Second)
}
// GetFirstPod returns a pod matching the namespace and label selector
// and the number of all pods that match the label selector.
func GetFirstPod(client client.Interface, namespace string, selector labels.Selector, timeout time.Duration, sortBy func([]*api.Pod) sort.Interface) (*api.Pod, int, error) {
options := api.ListOptions{LabelSelector: selector}
podList, err := client.Pods(namespace).List(options)
if err != nil {
return nil, 0, err
}
pod := &pods.Items[0]
return pod, len(pods.Items), nil
pods := []*api.Pod{}
for i := range podList.Items {
pod := podList.Items[i]
pods = append(pods, &pod)
}
if len(pods) > 0 {
sort.Sort(sortBy(pods))
return pods[0], len(podList.Items), nil
}
// Watch until we observe a pod
options.ResourceVersion = podList.ResourceVersion
w, err := client.Pods(namespace).Watch(options)
if err != nil {
return nil, 0, err
}
defer w.Stop()
condition := func(event watch.Event) (bool, error) {
return event.Type == watch.Added || event.Type == watch.Modified, nil
}
event, err := watch.Until(timeout, w, condition)
if err != nil {
return nil, 0, err
}
pod, ok := event.Object.(*api.Pod)
if !ok {
return nil, 0, fmt.Errorf("%#v is not a pod event", event)
}
return pod, 1, nil
}
// Command will stringify and return all environment arguments ie. a command run by a client

View File

@ -19,14 +19,17 @@ package util
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/user"
"path"
"reflect"
"sort"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
@ -36,9 +39,13 @@ import (
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/watch"
)
func TestNewFactoryDefaultFlagBindings(t *testing.T) {
@ -372,3 +379,199 @@ func TestSubstitueUser(t *testing.T) {
}
}
}
func newPodList(count, isUnready, isUnhealthy int, labels map[string]string) *api.PodList {
pods := []api.Pod{}
for i := 0; i < count; i++ {
newPod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("pod-%d", i+1),
Namespace: api.NamespaceDefault,
CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, i, 0, time.UTC),
Labels: labels,
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Status: api.ConditionTrue,
Type: api.PodReady,
},
},
},
}
pods = append(pods, newPod)
}
if isUnready > -1 && isUnready < count {
pods[isUnready].Status.Conditions[0].Status = api.ConditionFalse
}
if isUnhealthy > -1 && isUnhealthy < count {
pods[isUnhealthy].Status.ContainerStatuses = []api.ContainerStatus{{RestartCount: 5}}
}
return &api.PodList{
Items: pods,
}
}
func TestGetFirstPod(t *testing.T) {
labelSet := map[string]string{"test": "selector"}
tests := []struct {
name string
podList *api.PodList
watching []watch.Event
sortBy func([]*api.Pod) sort.Interface
expected *api.Pod
expectedNum int
expectedErr bool
}{
{
name: "kubectl logs - two ready pods",
podList: newPodList(2, -1, -1, labelSet),
sortBy: func(pods []*api.Pod) sort.Interface { return controller.ActivePods(pods) },
expected: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod-2",
Namespace: api.NamespaceDefault,
CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, 1, 0, time.UTC),
Labels: map[string]string{"test": "selector"},
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Status: api.ConditionTrue,
Type: api.PodReady,
},
},
},
},
expectedNum: 2,
},
{
name: "kubectl logs - one unhealthy, one healthy",
podList: newPodList(2, -1, 1, labelSet),
sortBy: func(pods []*api.Pod) sort.Interface { return controller.ActivePods(pods) },
expected: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod-2",
Namespace: api.NamespaceDefault,
CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, 1, 0, time.UTC),
Labels: map[string]string{"test": "selector"},
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Status: api.ConditionTrue,
Type: api.PodReady,
},
},
ContainerStatuses: []api.ContainerStatus{{RestartCount: 5}},
},
},
expectedNum: 2,
},
{
name: "kubectl attach - two ready pods",
podList: newPodList(2, -1, -1, labelSet),
sortBy: func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) },
expected: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod-1",
Namespace: api.NamespaceDefault,
CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, 0, 0, time.UTC),
Labels: map[string]string{"test": "selector"},
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Status: api.ConditionTrue,
Type: api.PodReady,
},
},
},
},
expectedNum: 2,
},
{
name: "kubectl attach - wait for ready pod",
podList: newPodList(1, 1, -1, labelSet),
watching: []watch.Event{
{
Type: watch.Modified,
Object: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod-1",
Namespace: api.NamespaceDefault,
CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, 0, 0, time.UTC),
Labels: map[string]string{"test": "selector"},
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Status: api.ConditionTrue,
Type: api.PodReady,
},
},
},
},
},
},
sortBy: func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) },
expected: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod-1",
Namespace: api.NamespaceDefault,
CreationTimestamp: unversioned.Date(2016, time.April, 1, 1, 0, 0, 0, time.UTC),
Labels: map[string]string{"test": "selector"},
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
{
Status: api.ConditionTrue,
Type: api.PodReady,
},
},
},
},
expectedNum: 1,
},
}
for i := range tests {
test := tests[i]
client := &testclient.Fake{}
client.PrependReactor("list", "pods", func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
return true, test.podList, nil
})
if len(test.watching) > 0 {
watcher := watch.NewFake()
for _, event := range test.watching {
switch event.Type {
case watch.Added:
go watcher.Add(event.Object)
case watch.Modified:
go watcher.Modify(event.Object)
}
}
client.PrependWatchReactor("pods", testclient.DefaultWatchReactor(watcher, nil))
}
selector := labels.Set(labelSet).AsSelector()
pod, numPods, err := GetFirstPod(client, api.NamespaceDefault, selector, 1*time.Minute, test.sortBy)
if !test.expectedErr && err != nil {
t.Errorf("%s: unexpected error: %v", test.name, err)
continue
}
if test.expectedErr && err == nil {
t.Errorf("%s: expected an error", test.name)
continue
}
if test.expectedNum != numPods {
t.Errorf("%s: expected %d pods, got %d", test.name, test.expectedNum, numPods)
continue
}
if !reflect.DeepEqual(test.expected, pod) {
t.Errorf("%s:\nexpected pod:\n%#v\ngot:\n%#v\n\n", test.name, test.expected, pod)
}
}
}

View File

@ -32,6 +32,7 @@ import (
"path"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
@ -42,6 +43,7 @@ import (
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/labels"
@ -504,7 +506,8 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
WithStdinData("abcd1234\n").
ExecOrDie()
Expect(runOutput).ToNot(ContainSubstring("stdin closed"))
runTestPod, _, err := util.GetFirstPod(c, ns, labels.SelectorFromSet(map[string]string{"run": "run-test-3"}))
f := func(pods []*api.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) }
runTestPod, _, err := util.GetFirstPod(c, ns, labels.SelectorFromSet(map[string]string{"run": "run-test-3"}), 1*time.Minute, f)
if err != nil {
os.Exit(1)
}