storage capacity: check pod events to finish negative tests faster

By creating CSIStorageCapacity objects in advance, we get the
FailedScheduling pod event if (and only if!) the test is expected to
fail because of insufficient or missing capacity. We can use that as
indicator that waiting for pod start can be stopped early. However,
because we might not get to see the event under load, we still need
the timeout.
This commit is contained in:
Patrick Ohly 2020-07-02 16:47:20 +02:00
parent bd0579103a
commit 30f93802a7
2 changed files with 124 additions and 19 deletions

View File

@ -36,6 +36,7 @@ go_library(
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//pkg/apis/storage/v1/util:go_default_library", "//pkg/apis/storage/v1/util:go_default_library",
"//pkg/client/conditions:go_default_library", "//pkg/client/conditions:go_default_library",
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/kubelet/events:go_default_library", "//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/metrics:go_default_library",
"//pkg/util/slice:go_default_library", "//pkg/util/slice:go_default_library",

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
@ -32,11 +33,13 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2epv "k8s.io/kubernetes/test/e2e/framework/pv" e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
@ -91,9 +94,9 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
enableTopology bool enableTopology bool
podInfo *bool podInfo *bool
storageCapacity *bool storageCapacity *bool
scName string scName string // pre-selected storage class name; must be unique in the cluster
enableResizing bool // enable resizing for both CSI mock driver and storageClass. enableResizing bool // enable resizing for both CSI mock driver and storageClass.
enableNodeExpansion bool // enable node expansion for CSI mock driver enableNodeExpansion bool // enable node expansion for CSI mock driver
// just disable resizing on driver it overrides enableResizing flag for CSI mock driver // just disable resizing on driver it overrides enableResizing flag for CSI mock driver
disableResizingOnDriver bool disableResizingOnDriver bool
javascriptHooks map[string]string javascriptHooks map[string]string
@ -179,7 +182,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
m.pods = append(m.pods, pod) m.pods = append(m.pods, pod)
} }
} else { } else {
class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, f.Namespace.Name) class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
if class != nil { if class != nil {
m.sc[class.Name] = class m.sc[class.Name] = class
} }
@ -952,7 +955,6 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
) )
// Tests that expect a failure are slow because we have to wait for a while // Tests that expect a failure are slow because we have to wait for a while
// to be sure that the volume isn't getting created. // to be sure that the volume isn't getting created.
// TODO: stop waiting as soon as we see the "node(s) did not have enough free storage" pod event?
tests := []struct { tests := []struct {
name string name string
storageCapacity *bool storageCapacity *bool
@ -967,10 +969,16 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
storageCapacity: &no, storageCapacity: &no,
}, },
{ {
name: "CSIStorageCapacity used, no capacity [Slow]", name: "CSIStorageCapacity used, no capacity",
storageCapacity: &yes, storageCapacity: &yes,
expectFailure: true, expectFailure: true,
}, },
{
name: "CSIStorageCapacity used, insufficient capacity",
storageCapacity: &yes,
expectFailure: true,
capacities: []string{"1Mi"},
},
{ {
name: "CSIStorageCapacity used, have capacity", name: "CSIStorageCapacity used, have capacity",
storageCapacity: &yes, storageCapacity: &yes,
@ -983,22 +991,17 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
for _, t := range tests { for _, t := range tests {
test := t test := t
ginkgo.It(t.name, func() { ginkgo.It(t.name, func() {
scName := "mock-csi-storage-capacity-" + f.UniqueName
init(testParameters{ init(testParameters{
registerDriver: true, registerDriver: true,
scName: "csi-mock-sc-" + f.UniqueName, scName: scName,
storageCapacity: test.storageCapacity, storageCapacity: test.storageCapacity,
lateBinding: true, lateBinding: true,
}) })
defer cleanup() defer cleanup()
// kube-scheduler may need some time before it gets the CSIDriver object. // The storage class uses a random name, therefore we have to create it first
// Without it, scheduling will happen without considering capacity, which // before adding CSIStorageCapacity objects for it.
// is not what we want to test.
time.Sleep(5 * time.Second)
sc, _, pod := createPod(false /* persistent volume, late binding as specified above */)
for _, capacityStr := range test.capacities { for _, capacityStr := range test.capacities {
capacityQuantity := resource.MustParse(capacityStr) capacityQuantity := resource.MustParse(capacityStr)
capacity := &storagev1alpha1.CSIStorageCapacity{ capacity := &storagev1alpha1.CSIStorageCapacity{
@ -1006,7 +1009,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
GenerateName: "fake-capacity-", GenerateName: "fake-capacity-",
}, },
// Empty topology, usable by any node. // Empty topology, usable by any node.
StorageClassName: sc.Name, StorageClassName: scName,
NodeTopology: &metav1.LabelSelector{}, NodeTopology: &metav1.LabelSelector{},
Capacity: &capacityQuantity, Capacity: &capacityQuantity,
} }
@ -1017,9 +1020,35 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
}) })
} }
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) // kube-scheduler may need some time before it gets the CSIDriver and CSIStorageCapacity objects.
// Without them, scheduling doesn't run as expected by the test.
syncDelay := 5 * time.Second
time.Sleep(syncDelay)
sc, _, pod := createPod(false /* persistent volume, late binding as specified above */)
framework.ExpectEqual(sc.Name, scName, "pre-selected storage class name not used")
waitCtx, cancel := context.WithTimeout(context.Background(), podStartTimeout)
defer cancel()
condition := anyOf(
podRunning(waitCtx, f.ClientSet, pod.Name, pod.Namespace),
// We only just created the CSIStorageCapacity objects, therefore
// we have to ignore all older events, plus the syncDelay as our
// safety margin.
podHasStorage(waitCtx, f.ClientSet, pod.Name, pod.Namespace, time.Now().Add(syncDelay)),
)
err = wait.PollImmediateUntil(poll, condition, waitCtx.Done())
if test.expectFailure { if test.expectFailure {
framework.ExpectError(err, "pod unexpectedly started to run") switch {
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, wait.ErrWaitTimeout),
errors.Is(err, errNotEnoughSpace):
// Okay, we expected that.
case err == nil:
framework.Fail("pod unexpectedly started to run")
default:
framework.Failf("unexpected error while waiting for pod: %v", err)
}
} else { } else {
framework.ExpectNoError(err, "failed to start pod") framework.ExpectNoError(err, "failed to start pod")
} }
@ -1032,6 +1061,78 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
}) })
}) })
// A lot of this code was copied from e2e/framework. It would be nicer
// if it could be reused - see https://github.com/kubernetes/kubernetes/issues/92754
func podRunning(ctx context.Context, c clientset.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return false, err
}
switch pod.Status.Phase {
case v1.PodRunning:
return true, nil
case v1.PodFailed, v1.PodSucceeded:
return false, errPodCompleted
}
return false, nil
}
}
const (
podStartTimeout = 5 * time.Minute
poll = 2 * time.Second
)
var (
errPodCompleted = fmt.Errorf("pod ran to completion")
errNotEnoughSpace = errors.New(scheduling.ErrReasonNotEnoughSpace)
)
func podHasStorage(ctx context.Context, c clientset.Interface, podName, namespace string, when time.Time) wait.ConditionFunc {
// Check for events of this pod. Copied from test/e2e/common/container_probe.go.
expectedEvent := fields.Set{
"involvedObject.kind": "Pod",
"involvedObject.name": podName,
"involvedObject.namespace": namespace,
"reason": "FailedScheduling",
}.AsSelector().String()
options := metav1.ListOptions{
FieldSelector: expectedEvent,
}
// copied from test/e2e/framework/events/events.go
return func() (bool, error) {
// We cannot be sure here whether it has enough storage, only when
// it hasn't. In that case we abort waiting with a special error.
events, err := c.CoreV1().Events(namespace).List(ctx, options)
if err != nil {
return false, fmt.Errorf("got error while getting events: %w", err)
}
for _, event := range events.Items {
if /* event.CreationTimestamp.After(when) &&
*/strings.Contains(event.Message, scheduling.ErrReasonNotEnoughSpace) {
return false, errNotEnoughSpace
}
}
return false, nil
}
}
func anyOf(conditions ...wait.ConditionFunc) wait.ConditionFunc {
return func() (bool, error) {
for _, condition := range conditions {
done, err := condition()
if err != nil {
return false, err
}
if done {
return true, nil
}
}
return false, nil
}
}
func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error { func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error {
waitErr := wait.PollImmediate(10*time.Second, csiPodUnschedulableTimeout, func() (bool, error) { waitErr := wait.PollImmediate(10*time.Second, csiPodUnschedulableTimeout, func() (bool, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) pod, err := cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
@ -1073,8 +1174,11 @@ func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Inte
return attachLimit, nil return attachLimit, nil
} }
func startPausePod(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) { func startPausePod(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
class := newStorageClass(t, ns, "") class := newStorageClass(t, ns, "")
if scName != "" {
class.Name = scName
}
var err error var err error
_, err = cs.StorageV1().StorageClasses().Get(context.TODO(), class.Name, metav1.GetOptions{}) _, err = cs.StorageV1().StorageClasses().Get(context.TODO(), class.Name, metav1.GetOptions{})
if err != nil { if err != nil {