mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Added test to density that will run maximum capacity pods on all nodes
This commit is contained in:
parent
20444ac84d
commit
fb72b50135
@ -34,6 +34,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
utiluuid "k8s.io/kubernetes/pkg/util/uuid"
|
utiluuid "k8s.io/kubernetes/pkg/util/uuid"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
@ -50,6 +51,15 @@ const (
|
|||||||
// Maximum container failures this test tolerates before failing.
|
// Maximum container failures this test tolerates before failing.
|
||||||
var MaxContainerFailures = 0
|
var MaxContainerFailures = 0
|
||||||
|
|
||||||
|
type DensityTestConfig struct {
|
||||||
|
Configs []framework.RCConfig
|
||||||
|
Client *client.Client
|
||||||
|
Namespace string
|
||||||
|
PollInterval time.Duration
|
||||||
|
PodCount int
|
||||||
|
Timeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceConstraint {
|
func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceConstraint {
|
||||||
var apiserverMem uint64
|
var apiserverMem uint64
|
||||||
var controllerMem uint64
|
var controllerMem uint64
|
||||||
@ -167,6 +177,155 @@ func logPodStartupStatus(c *client.Client, expectedPods int, ns string, observed
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// runDensityTest will perform a density test and return the time it took for
|
||||||
|
// all pods to start
|
||||||
|
func runDensityTest(dtc DensityTestConfig) time.Duration {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
// Create a listener for events.
|
||||||
|
// eLock is a lock protects the events
|
||||||
|
var eLock sync.Mutex
|
||||||
|
events := make([](*api.Event), 0)
|
||||||
|
_, controller := controllerframework.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
return dtc.Client.Events(dtc.Namespace).List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return dtc.Client.Events(dtc.Namespace).Watch(options)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&api.Event{},
|
||||||
|
0,
|
||||||
|
controllerframework.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: func(obj interface{}) {
|
||||||
|
eLock.Lock()
|
||||||
|
defer eLock.Unlock()
|
||||||
|
events = append(events, obj.(*api.Event))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
stop := make(chan struct{})
|
||||||
|
go controller.Run(stop)
|
||||||
|
|
||||||
|
// Create a listener for api updates
|
||||||
|
// uLock is a lock protects the updateCount
|
||||||
|
var uLock sync.Mutex
|
||||||
|
updateCount := 0
|
||||||
|
label := labels.SelectorFromSet(labels.Set(map[string]string{"type": "densityPod"}))
|
||||||
|
_, updateController := controllerframework.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
options.LabelSelector = label
|
||||||
|
return dtc.Client.Pods(dtc.Namespace).List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
options.LabelSelector = label
|
||||||
|
return dtc.Client.Pods(dtc.Namespace).Watch(options)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&api.Pod{},
|
||||||
|
0,
|
||||||
|
controllerframework.ResourceEventHandlerFuncs{
|
||||||
|
UpdateFunc: func(_, _ interface{}) {
|
||||||
|
uLock.Lock()
|
||||||
|
defer uLock.Unlock()
|
||||||
|
updateCount++
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
go updateController.Run(stop)
|
||||||
|
|
||||||
|
// Start all replication controllers.
|
||||||
|
startTime := time.Now()
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(len(dtc.Configs))
|
||||||
|
for i := range dtc.Configs {
|
||||||
|
rcConfig := dtc.Configs[i]
|
||||||
|
go func() {
|
||||||
|
framework.ExpectNoError(framework.RunRC(rcConfig))
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
logStopCh := make(chan struct{})
|
||||||
|
go logPodStartupStatus(dtc.Client, dtc.PodCount, dtc.Namespace, map[string]string{"type": "densityPod"}, dtc.PollInterval, logStopCh)
|
||||||
|
wg.Wait()
|
||||||
|
startupTime := time.Now().Sub(startTime)
|
||||||
|
close(logStopCh)
|
||||||
|
framework.Logf("E2E startup time for %d pods: %v", dtc.PodCount, startupTime)
|
||||||
|
framework.Logf("Throughput (pods/s) during cluster saturation phase: %v", float32(dtc.PodCount)/float32(startupTime/time.Second))
|
||||||
|
|
||||||
|
By("Waiting for all events to be recorded")
|
||||||
|
last := -1
|
||||||
|
current := len(events)
|
||||||
|
lastCount := -1
|
||||||
|
currentCount := updateCount
|
||||||
|
for start := time.Now(); (last < current || lastCount < currentCount) && time.Since(start) < dtc.Timeout; time.Sleep(10 * time.Second) {
|
||||||
|
func() {
|
||||||
|
eLock.Lock()
|
||||||
|
defer eLock.Unlock()
|
||||||
|
last = current
|
||||||
|
current = len(events)
|
||||||
|
}()
|
||||||
|
func() {
|
||||||
|
uLock.Lock()
|
||||||
|
defer uLock.Unlock()
|
||||||
|
lastCount = currentCount
|
||||||
|
currentCount = updateCount
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
close(stop)
|
||||||
|
|
||||||
|
if current != last {
|
||||||
|
framework.Logf("Warning: Not all events were recorded after waiting %.2f minutes", dtc.Timeout.Minutes())
|
||||||
|
}
|
||||||
|
framework.Logf("Found %d events", current)
|
||||||
|
if currentCount != lastCount {
|
||||||
|
framework.Logf("Warning: Not all updates were recorded after waiting %.2f minutes", dtc.Timeout.Minutes())
|
||||||
|
}
|
||||||
|
framework.Logf("Found %d updates", currentCount)
|
||||||
|
|
||||||
|
// Tune the threshold for allowed failures.
|
||||||
|
badEvents := framework.BadEvents(events)
|
||||||
|
Expect(badEvents).NotTo(BeNumerically(">", int(math.Floor(0.01*float64(dtc.PodCount)))))
|
||||||
|
// Print some data about Pod to Node allocation
|
||||||
|
By("Printing Pod to Node allocation data")
|
||||||
|
podList, err := dtc.Client.Pods(api.NamespaceAll).List(api.ListOptions{})
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
pausePodAllocation := make(map[string]int)
|
||||||
|
systemPodAllocation := make(map[string][]string)
|
||||||
|
for _, pod := range podList.Items {
|
||||||
|
if pod.Namespace == api.NamespaceSystem {
|
||||||
|
systemPodAllocation[pod.Spec.NodeName] = append(systemPodAllocation[pod.Spec.NodeName], pod.Name)
|
||||||
|
} else {
|
||||||
|
pausePodAllocation[pod.Spec.NodeName]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nodeNames := make([]string, 0)
|
||||||
|
for k := range pausePodAllocation {
|
||||||
|
nodeNames = append(nodeNames, k)
|
||||||
|
}
|
||||||
|
sort.Strings(nodeNames)
|
||||||
|
for _, node := range nodeNames {
|
||||||
|
framework.Logf("%v: %v pause pods, system pods: %v", node, pausePodAllocation[node], systemPodAllocation[node])
|
||||||
|
}
|
||||||
|
return startupTime
|
||||||
|
}
|
||||||
|
|
||||||
|
func cleanupDensityTest(dtc DensityTestConfig) {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
By("Deleting ReplicationController")
|
||||||
|
// We explicitly delete all pods to have API calls necessary for deletion accounted in metrics.
|
||||||
|
for i := range dtc.Configs {
|
||||||
|
rcName := dtc.Configs[i].Name
|
||||||
|
rc, err := dtc.Client.ReplicationControllers(dtc.Namespace).Get(rcName)
|
||||||
|
if err == nil && rc.Spec.Replicas != 0 {
|
||||||
|
By("Cleaning up the replication controller")
|
||||||
|
err := framework.DeleteRC(dtc.Client, dtc.Namespace, rcName)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// This test suite can take a long time to run, and can affect or be affected by other tests.
|
// This test suite can take a long time to run, and can affect or be affected by other tests.
|
||||||
// So by default it is added to the ginkgo.skip list (see driver.go).
|
// So by default it is added to the ginkgo.skip list (see driver.go).
|
||||||
// To run this suite you must explicitly ask for it by setting the
|
// To run this suite you must explicitly ask for it by setting the
|
||||||
@ -185,6 +344,8 @@ var _ = framework.KubeDescribe("Density", func() {
|
|||||||
var totalPods int
|
var totalPods int
|
||||||
var nodeCpuCapacity int64
|
var nodeCpuCapacity int64
|
||||||
var nodeMemCapacity int64
|
var nodeMemCapacity int64
|
||||||
|
var nodes *api.NodeList
|
||||||
|
var masters sets.String
|
||||||
|
|
||||||
// Gathers data prior to framework namespace teardown
|
// Gathers data prior to framework namespace teardown
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
@ -225,8 +386,7 @@ var _ = framework.KubeDescribe("Density", func() {
|
|||||||
// of nodes without Routes created. Since this would make a node
|
// of nodes without Routes created. Since this would make a node
|
||||||
// unschedulable, we need to wait until all of them are schedulable.
|
// unschedulable, we need to wait until all of them are schedulable.
|
||||||
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(c))
|
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(c))
|
||||||
|
masters, nodes = framework.GetMasterAndWorkerNodesOrDie(c)
|
||||||
nodes := framework.GetReadySchedulableNodesOrDie(c)
|
|
||||||
nodeCount = len(nodes.Items)
|
nodeCount = len(nodes.Items)
|
||||||
Expect(nodeCount).NotTo(BeZero())
|
Expect(nodeCount).NotTo(BeZero())
|
||||||
if nodeCount == 30 {
|
if nodeCount == 30 {
|
||||||
@ -291,16 +451,18 @@ var _ = framework.KubeDescribe("Density", func() {
|
|||||||
}
|
}
|
||||||
itArg := testArg
|
itArg := testArg
|
||||||
It(name, func() {
|
It(name, func() {
|
||||||
|
podsPerNode := itArg.podsPerNode
|
||||||
|
totalPods = podsPerNode * nodeCount
|
||||||
fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid))
|
fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid))
|
||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
defer fileHndl.Close()
|
defer fileHndl.Close()
|
||||||
podsPerNode := itArg.podsPerNode
|
timeout := 10 * time.Minute
|
||||||
totalPods = podsPerNode * nodeCount
|
|
||||||
// TODO: loop to podsPerNode instead of 1 when we're ready.
|
// TODO: loop to podsPerNode instead of 1 when we're ready.
|
||||||
numberOrRCs := 1
|
numberOrRCs := 1
|
||||||
RCConfigs := make([]framework.RCConfig, numberOrRCs)
|
RCConfigs := make([]framework.RCConfig, numberOrRCs)
|
||||||
for i := 0; i < numberOrRCs; i++ {
|
for i := 0; i < numberOrRCs; i++ {
|
||||||
RCName = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid
|
RCName := "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid
|
||||||
RCConfigs[i] = framework.RCConfig{Client: c,
|
RCConfigs[i] = framework.RCConfig{Client: c,
|
||||||
Image: framework.GetPauseImageName(f.Client),
|
Image: framework.GetPauseImageName(f.Client),
|
||||||
Name: RCName,
|
Name: RCName,
|
||||||
@ -316,135 +478,14 @@ var _ = framework.KubeDescribe("Density", func() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a listener for events.
|
dConfig := DensityTestConfig{Client: c,
|
||||||
// eLock is a lock protects the events
|
Configs: RCConfigs,
|
||||||
var eLock sync.Mutex
|
PodCount: totalPods,
|
||||||
events := make([](*api.Event), 0)
|
Namespace: ns,
|
||||||
_, controller := controllerframework.NewInformer(
|
PollInterval: itArg.interval,
|
||||||
&cache.ListWatch{
|
Timeout: timeout,
|
||||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
|
||||||
return c.Events(ns).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
|
||||||
return c.Events(ns).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&api.Event{},
|
|
||||||
0,
|
|
||||||
controllerframework.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: func(obj interface{}) {
|
|
||||||
eLock.Lock()
|
|
||||||
defer eLock.Unlock()
|
|
||||||
events = append(events, obj.(*api.Event))
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
stop := make(chan struct{})
|
|
||||||
go controller.Run(stop)
|
|
||||||
|
|
||||||
// Create a listener for api updates
|
|
||||||
// uLock is a lock protects the updateCount
|
|
||||||
var uLock sync.Mutex
|
|
||||||
updateCount := 0
|
|
||||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"type": "densityPod"}))
|
|
||||||
_, updateController := controllerframework.NewInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
|
||||||
options.LabelSelector = label
|
|
||||||
return c.Pods(ns).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
|
||||||
options.LabelSelector = label
|
|
||||||
return c.Pods(ns).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&api.Pod{},
|
|
||||||
0,
|
|
||||||
controllerframework.ResourceEventHandlerFuncs{
|
|
||||||
UpdateFunc: func(_, _ interface{}) {
|
|
||||||
uLock.Lock()
|
|
||||||
defer uLock.Unlock()
|
|
||||||
updateCount++
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
go updateController.Run(stop)
|
|
||||||
|
|
||||||
// Start all replication controllers.
|
|
||||||
startTime := time.Now()
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(len(RCConfigs))
|
|
||||||
for i := range RCConfigs {
|
|
||||||
rcConfig := RCConfigs[i]
|
|
||||||
go func() {
|
|
||||||
framework.ExpectNoError(framework.RunRC(rcConfig))
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
logStopCh := make(chan struct{})
|
e2eStartupTime = runDensityTest(dConfig)
|
||||||
go logPodStartupStatus(c, totalPods, ns, map[string]string{"type": "densityPod"}, itArg.interval, logStopCh)
|
|
||||||
wg.Wait()
|
|
||||||
e2eStartupTime = time.Now().Sub(startTime)
|
|
||||||
close(logStopCh)
|
|
||||||
framework.Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime)
|
|
||||||
framework.Logf("Throughput (pods/s) during cluster saturation phase: %v", float32(totalPods)/float32(e2eStartupTime/time.Second))
|
|
||||||
|
|
||||||
By("Waiting for all events to be recorded")
|
|
||||||
last := -1
|
|
||||||
current := len(events)
|
|
||||||
lastCount := -1
|
|
||||||
currentCount := updateCount
|
|
||||||
timeout := 10 * time.Minute
|
|
||||||
for start := time.Now(); (last < current || lastCount < currentCount) && time.Since(start) < timeout; time.Sleep(10 * time.Second) {
|
|
||||||
func() {
|
|
||||||
eLock.Lock()
|
|
||||||
defer eLock.Unlock()
|
|
||||||
last = current
|
|
||||||
current = len(events)
|
|
||||||
}()
|
|
||||||
func() {
|
|
||||||
uLock.Lock()
|
|
||||||
defer uLock.Unlock()
|
|
||||||
lastCount = currentCount
|
|
||||||
currentCount = updateCount
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
close(stop)
|
|
||||||
|
|
||||||
if current != last {
|
|
||||||
framework.Logf("Warning: Not all events were recorded after waiting %.2f minutes", timeout.Minutes())
|
|
||||||
}
|
|
||||||
framework.Logf("Found %d events", current)
|
|
||||||
if currentCount != lastCount {
|
|
||||||
framework.Logf("Warning: Not all updates were recorded after waiting %.2f minutes", timeout.Minutes())
|
|
||||||
}
|
|
||||||
framework.Logf("Found %d updates", currentCount)
|
|
||||||
|
|
||||||
// Tune the threshold for allowed failures.
|
|
||||||
badEvents := framework.BadEvents(events)
|
|
||||||
Expect(badEvents).NotTo(BeNumerically(">", int(math.Floor(0.01*float64(totalPods)))))
|
|
||||||
// Print some data about Pod to Node allocation
|
|
||||||
By("Printing Pod to Node allocation data")
|
|
||||||
podList, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
pausePodAllocation := make(map[string]int)
|
|
||||||
systemPodAllocation := make(map[string][]string)
|
|
||||||
for _, pod := range podList.Items {
|
|
||||||
if pod.Namespace == api.NamespaceSystem {
|
|
||||||
systemPodAllocation[pod.Spec.NodeName] = append(systemPodAllocation[pod.Spec.NodeName], pod.Name)
|
|
||||||
} else {
|
|
||||||
pausePodAllocation[pod.Spec.NodeName]++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nodeNames := make([]string, 0)
|
|
||||||
for k := range pausePodAllocation {
|
|
||||||
nodeNames = append(nodeNames, k)
|
|
||||||
}
|
|
||||||
sort.Strings(nodeNames)
|
|
||||||
for _, node := range nodeNames {
|
|
||||||
framework.Logf("%v: %v pause pods, system pods: %v", node, pausePodAllocation[node], systemPodAllocation[node])
|
|
||||||
}
|
|
||||||
|
|
||||||
if itArg.runLatencyTest {
|
if itArg.runLatencyTest {
|
||||||
By("Scheduling additional Pods to measure startup latencies")
|
By("Scheduling additional Pods to measure startup latencies")
|
||||||
|
|
||||||
@ -613,17 +654,7 @@ var _ = framework.KubeDescribe("Density", func() {
|
|||||||
framework.LogSuspiciousLatency(startupLag, e2eLag, nodeCount, c)
|
framework.LogSuspiciousLatency(startupLag, e2eLag, nodeCount, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
By("Deleting ReplicationController")
|
cleanupDensityTest(dConfig)
|
||||||
// We explicitly delete all pods to have API calls necessary for deletion accounted in metrics.
|
|
||||||
for i := range RCConfigs {
|
|
||||||
rcName := RCConfigs[i].Name
|
|
||||||
rc, err := c.ReplicationControllers(ns).Get(rcName)
|
|
||||||
if err == nil && rc.Spec.Replicas != 0 {
|
|
||||||
By("Cleaning up the replication controller")
|
|
||||||
err := framework.DeleteRC(c, ns, rcName)
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
By("Removing additional replication controllers if any")
|
By("Removing additional replication controllers if any")
|
||||||
for i := 1; i <= nodeCount; i++ {
|
for i := 1; i <= nodeCount; i++ {
|
||||||
@ -632,6 +663,48 @@ var _ = framework.KubeDescribe("Density", func() {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Calculate total number of pods from each node's max-pod
|
||||||
|
It("[Feature:ManualPerformance] should allow running maximum capacity pods on nodes", func() {
|
||||||
|
totalPods = 0
|
||||||
|
for _, n := range nodes.Items {
|
||||||
|
totalPods += int(n.Status.Capacity.Pods().Value())
|
||||||
|
}
|
||||||
|
totalPods -= framework.WaitForStableCluster(c, masters)
|
||||||
|
|
||||||
|
fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid))
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
defer fileHndl.Close()
|
||||||
|
rcCnt := 1
|
||||||
|
RCConfigs := make([]framework.RCConfig, rcCnt)
|
||||||
|
podsPerRC := int(totalPods / rcCnt)
|
||||||
|
for i := 0; i < rcCnt; i++ {
|
||||||
|
if i == rcCnt-1 {
|
||||||
|
podsPerRC += int(math.Mod(float64(totalPods), float64(rcCnt)))
|
||||||
|
}
|
||||||
|
RCName = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid
|
||||||
|
RCConfigs[i] = framework.RCConfig{Client: c,
|
||||||
|
Image: "gcr.io/google_containers/pause-amd64:3.0",
|
||||||
|
Name: RCName,
|
||||||
|
Namespace: ns,
|
||||||
|
Labels: map[string]string{"type": "densityPod"},
|
||||||
|
PollInterval: 10 * time.Second,
|
||||||
|
PodStatusFile: fileHndl,
|
||||||
|
Replicas: podsPerRC,
|
||||||
|
MaxContainerFailures: &MaxContainerFailures,
|
||||||
|
Silent: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dConfig := DensityTestConfig{Client: c,
|
||||||
|
Configs: RCConfigs,
|
||||||
|
PodCount: totalPods,
|
||||||
|
Namespace: ns,
|
||||||
|
PollInterval: 10 * time.Second,
|
||||||
|
Timeout: 10 * time.Minute,
|
||||||
|
}
|
||||||
|
e2eStartupTime = runDensityTest(dConfig)
|
||||||
|
cleanupDensityTest(dConfig)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
func createRunningPodFromRC(wg *sync.WaitGroup, c *client.Client, name, ns, image, podType string, cpuRequest, memRequest resource.Quantity) {
|
func createRunningPodFromRC(wg *sync.WaitGroup, c *client.Client, name, ns, image, podType string, cpuRequest, memRequest resource.Quantity) {
|
||||||
|
@ -65,6 +65,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
labelsutil "k8s.io/kubernetes/pkg/util/labels"
|
labelsutil "k8s.io/kubernetes/pkg/util/labels"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
|
"k8s.io/kubernetes/pkg/util/system"
|
||||||
"k8s.io/kubernetes/pkg/util/uuid"
|
"k8s.io/kubernetes/pkg/util/uuid"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
utilyaml "k8s.io/kubernetes/pkg/util/yaml"
|
utilyaml "k8s.io/kubernetes/pkg/util/yaml"
|
||||||
@ -4665,3 +4666,73 @@ func retryCmd(command string, args ...string) (string, string, error) {
|
|||||||
})
|
})
|
||||||
return stdout, stderr, err
|
return stdout, stderr, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetPodsScheduled returns a number of currently scheduled and not scheduled Pods.
|
||||||
|
func GetPodsScheduled(masterNodes sets.String, pods *api.PodList) (scheduledPods, notScheduledPods []api.Pod) {
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if !masterNodes.Has(pod.Spec.NodeName) {
|
||||||
|
if pod.Spec.NodeName != "" {
|
||||||
|
_, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled)
|
||||||
|
Expect(scheduledCondition != nil).To(Equal(true))
|
||||||
|
Expect(scheduledCondition.Status).To(Equal(api.ConditionTrue))
|
||||||
|
scheduledPods = append(scheduledPods, pod)
|
||||||
|
} else {
|
||||||
|
_, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled)
|
||||||
|
Expect(scheduledCondition != nil).To(Equal(true))
|
||||||
|
Expect(scheduledCondition.Status).To(Equal(api.ConditionFalse))
|
||||||
|
if scheduledCondition.Reason == "Unschedulable" {
|
||||||
|
|
||||||
|
notScheduledPods = append(notScheduledPods, pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForStableCluster waits until all existing pods are scheduled and returns their amount.
|
||||||
|
func WaitForStableCluster(c *client.Client, masterNodes sets.String) int {
|
||||||
|
timeout := 10 * time.Minute
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
|
allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
|
||||||
|
ExpectNoError(err)
|
||||||
|
// API server returns also Pods that succeeded. We need to filter them out.
|
||||||
|
currentPods := make([]api.Pod, 0, len(allPods.Items))
|
||||||
|
for _, pod := range allPods.Items {
|
||||||
|
if pod.Status.Phase != api.PodSucceeded && pod.Status.Phase != api.PodFailed {
|
||||||
|
currentPods = append(currentPods, pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
allPods.Items = currentPods
|
||||||
|
scheduledPods, currentlyNotScheduledPods := GetPodsScheduled(masterNodes, allPods)
|
||||||
|
for len(currentlyNotScheduledPods) != 0 {
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
|
||||||
|
ExpectNoError(err)
|
||||||
|
scheduledPods, currentlyNotScheduledPods = GetPodsScheduled(masterNodes, allPods)
|
||||||
|
|
||||||
|
if startTime.Add(timeout).Before(time.Now()) {
|
||||||
|
Failf("Timed out after %v waiting for stable cluster.", timeout)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return len(scheduledPods)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMasterAndWorkerNodesOrDie will return a list masters and schedulable worker nodes
|
||||||
|
func GetMasterAndWorkerNodesOrDie(c *client.Client) (sets.String, *api.NodeList) {
|
||||||
|
nodes := &api.NodeList{}
|
||||||
|
masters := sets.NewString()
|
||||||
|
all, _ := c.Nodes().List(api.ListOptions{})
|
||||||
|
for _, n := range all.Items {
|
||||||
|
if system.IsMasterNode(&n) {
|
||||||
|
masters.Insert(n.Name)
|
||||||
|
} else if isNodeSchedulable(&n) {
|
||||||
|
nodes.Items = append(nodes.Items, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return masters, nodes
|
||||||
|
}
|
||||||
|
@ -148,7 +148,7 @@ func getRequestedCPU(pod api.Pod) int64 {
|
|||||||
func verifyResult(c *client.Client, podName string, expectedScheduled int, expectedNotScheduled int, ns string) {
|
func verifyResult(c *client.Client, podName string, expectedScheduled int, expectedNotScheduled int, ns string) {
|
||||||
allPods, err := c.Pods(ns).List(api.ListOptions{})
|
allPods, err := c.Pods(ns).List(api.ListOptions{})
|
||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
scheduledPods, notScheduledPods := getPodsScheduled(allPods)
|
scheduledPods, notScheduledPods := framework.GetPodsScheduled(masterNodes, allPods)
|
||||||
|
|
||||||
printed := false
|
printed := false
|
||||||
printOnce := func(msg string) string {
|
printOnce := func(msg string) string {
|
||||||
@ -174,37 +174,6 @@ func cleanupPods(c *client.Client, ns string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Waits until all existing pods are scheduled and returns their amount.
|
|
||||||
func waitForStableCluster(c *client.Client) int {
|
|
||||||
timeout := 10 * time.Minute
|
|
||||||
startTime := time.Now()
|
|
||||||
|
|
||||||
allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
// API server returns also Pods that succeeded. We need to filter them out.
|
|
||||||
currentPods := make([]api.Pod, 0, len(allPods.Items))
|
|
||||||
for _, pod := range allPods.Items {
|
|
||||||
if pod.Status.Phase != api.PodSucceeded && pod.Status.Phase != api.PodFailed {
|
|
||||||
currentPods = append(currentPods, pod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
allPods.Items = currentPods
|
|
||||||
scheduledPods, currentlyNotScheduledPods := getPodsScheduled(allPods)
|
|
||||||
for len(currentlyNotScheduledPods) != 0 {
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
|
|
||||||
allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
scheduledPods, currentlyNotScheduledPods = getPodsScheduled(allPods)
|
|
||||||
|
|
||||||
if startTime.Add(timeout).Before(time.Now()) {
|
|
||||||
framework.Failf("Timed out after %v waiting for stable cluster.", timeout)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return len(scheduledPods)
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
||||||
var c *client.Client
|
var c *client.Client
|
||||||
var nodeList *api.NodeList
|
var nodeList *api.NodeList
|
||||||
@ -279,7 +248,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||||||
totalPodCapacity += podCapacity.Value()
|
totalPodCapacity += podCapacity.Value()
|
||||||
}
|
}
|
||||||
|
|
||||||
currentlyScheduledPods := waitForStableCluster(c)
|
currentlyScheduledPods := framework.WaitForStableCluster(c, masterNodes)
|
||||||
podsNeededForSaturation := int(totalPodCapacity) - currentlyScheduledPods
|
podsNeededForSaturation := int(totalPodCapacity) - currentlyScheduledPods
|
||||||
|
|
||||||
By(fmt.Sprintf("Starting additional %v Pods to fully saturate the cluster max pods and trying to start another one", podsNeededForSaturation))
|
By(fmt.Sprintf("Starting additional %v Pods to fully saturate the cluster max pods and trying to start another one", podsNeededForSaturation))
|
||||||
@ -349,7 +318,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||||||
nodeMaxCapacity = capacity.MilliValue()
|
nodeMaxCapacity = capacity.MilliValue()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
waitForStableCluster(c)
|
framework.WaitForStableCluster(c, masterNodes)
|
||||||
|
|
||||||
pods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
|
pods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
|
||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
@ -444,7 +413,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||||||
By("Trying to schedule Pod with nonempty NodeSelector.")
|
By("Trying to schedule Pod with nonempty NodeSelector.")
|
||||||
podName := "restricted-pod"
|
podName := "restricted-pod"
|
||||||
|
|
||||||
waitForStableCluster(c)
|
framework.WaitForStableCluster(c, masterNodes)
|
||||||
|
|
||||||
_, err := c.Pods(ns).Create(&api.Pod{
|
_, err := c.Pods(ns).Create(&api.Pod{
|
||||||
TypeMeta: unversioned.TypeMeta{
|
TypeMeta: unversioned.TypeMeta{
|
||||||
@ -597,7 +566,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||||||
By("Trying to schedule Pod with nonempty NodeSelector.")
|
By("Trying to schedule Pod with nonempty NodeSelector.")
|
||||||
podName := "restricted-pod"
|
podName := "restricted-pod"
|
||||||
|
|
||||||
waitForStableCluster(c)
|
framework.WaitForStableCluster(c, masterNodes)
|
||||||
|
|
||||||
_, err := c.Pods(ns).Create(&api.Pod{
|
_, err := c.Pods(ns).Create(&api.Pod{
|
||||||
TypeMeta: unversioned.TypeMeta{
|
TypeMeta: unversioned.TypeMeta{
|
||||||
@ -853,7 +822,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
|
|||||||
By("Trying to schedule Pod with nonempty Pod Affinity.")
|
By("Trying to schedule Pod with nonempty Pod Affinity.")
|
||||||
podName := "without-label-" + string(uuid.NewUUID())
|
podName := "without-label-" + string(uuid.NewUUID())
|
||||||
|
|
||||||
waitForStableCluster(c)
|
framework.WaitForStableCluster(c, masterNodes)
|
||||||
|
|
||||||
_, err := c.Pods(ns).Create(&api.Pod{
|
_, err := c.Pods(ns).Create(&api.Pod{
|
||||||
TypeMeta: unversioned.TypeMeta{
|
TypeMeta: unversioned.TypeMeta{
|
||||||
|
Loading…
Reference in New Issue
Block a user