Merge pull request #35235 from foxish/node-controller-no-force-deletion

Automatic merge from submit-queue

Node controller to not force delete pods

Fixes https://github.com/kubernetes/kubernetes/issues/35145

- [x] e2e tests to test Petset, RC, Job.
- [x] Remove and cover other locations where we force-delete pods within the NodeController.

**Release note**:

<!--  Steps to write your release note:
1. Use the release-note-* labels to set the release note state (if you have access) 
2. Enter your extended release note in the below block; leaving it blank means using the PR title as the release note. If no release note is required, just write `NONE`. 
-->

``` release-note
Node controller no longer force-deletes pods from the api-server.

* For StatefulSet (previously PetSet), this change means creation of replacement pods is blocked until old pods are definitely not running (indicated either by the kubelet returning from partitioned state, or deletion of the Node object, or deletion of the instance in the cloud provider, or force deletion of the pod from the api-server). This has the desirable outcome of "fencing" to prevent "split brain" scenarios.
* For all other existing controllers except StatefulSet , this has no effect on the ability of the controller to replace pods because the controllers do not reuse pod names (they use generate-name).
* User-written controllers that reuse names of pod objects should evaluate this change.
```
This commit is contained in:
Kubernetes Submit Queue 2016-11-01 20:08:57 -07:00 committed by GitHub
commit 49e7d640d9
10 changed files with 621 additions and 511 deletions

View File

@ -19,7 +19,6 @@ package node
import (
"fmt"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
@ -90,23 +89,9 @@ func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
return err
}
// forcefullyDeleteNode immediately deletes all pods on the node, and then
// deletes the node itself.
func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, forcefulDeletePodFunc func(*api.Pod) error) error {
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err)
}
for _, pod := range pods.Items {
if pod.Spec.NodeName != nodeName {
continue
}
if err := forcefulDeletePodFunc(&pod); err != nil {
return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err)
}
}
// forcefullyDeleteNode immediately the node. The pods on the node are cleaned
// up by the podGC.
func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error {
if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
}
@ -265,59 +250,3 @@ func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_s
// and event is recorded or neither should happen, see issue #6055.
recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
}
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
// long before we should check again (the next deadline for a pod to complete), or an error.
func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, nodeUID string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) {
// the time before we should try again
nextAttempt := time.Duration(0)
// have we deleted all pods
complete := true
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return false, nextAttempt, err
}
now := time.Now()
elapsed := now.Sub(since)
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// only clean terminated pods
if pod.DeletionGracePeriodSeconds == nil {
continue
}
// the user's requested grace period
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if grace > maxGracePeriod {
grace = maxGracePeriod
}
// the time remaining before the pod should have been deleted
remaining := grace - elapsed
if remaining < 0 {
remaining = 0
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
recordNodeEvent(recorder, nodeName, nodeUID, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
complete = false
}
} else {
glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining)
complete = false
}
if nextAttempt < remaining {
nextAttempt = remaining
}
}
return complete, nextAttempt, nil
}

View File

@ -125,9 +125,8 @@ type NodeController struct {
// Lock to access evictor workers
evictorLock sync.Mutex
// workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*RateLimitedTimedQueue
zoneTerminationEvictor map[string]*RateLimitedTimedQueue
podEvictionTimeout time.Duration
zonePodEvictor map[string]*RateLimitedTimedQueue
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
recorder record.EventRecorder
@ -215,7 +214,6 @@ func NewNodeController(
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
@ -370,17 +368,8 @@ func (nc *NodeController) Run() {
}, nc.nodeMonitorPeriod, wait.NeverStop)
// Managing eviction of nodes:
// 1. when we delete pods off a node, if the node was not empty at the time we then
// queue a termination watcher
// a. If we hit an error, retry deletion
// 2. The terminator loop ensures that pods are eventually cleaned and we never
// terminate a pod in a time period less than nc.maximumGracePeriod. AddedAt
// is the time from which we measure "has this pod been terminating too long",
// after which we will delete the pod with grace period 0 (force delete).
// a. If we hit errors, retry instantly
// b. If there are no pods left terminating, exit
// c. If there are pods still terminating, wait for their estimated completion
// before retrying
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
@ -405,42 +394,12 @@ func (nc *NodeController) Run() {
}
if remaining {
nc.zoneTerminationEvictor[k].Add(value.Value, value.UID)
glog.Infof("Pods awaiting deletion due to NodeController eviction")
}
return true, 0
})
}
}, nodeEvictionPeriod, wait.NeverStop)
// TODO: replace with a controller that ensures pods that are terminating complete
// in a particular time period
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneTerminationEvictor {
nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, value.AddedAt, nc.maximumGracePeriod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
}
if completed {
glog.V(2).Infof("All pods terminated on %s", value.Value)
recordNodeEvent(nc.recorder, value.Value, nodeUid, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0
}
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
// clamp very short intervals
if remaining < nodeEvictionPeriod {
remaining = nodeEvictionPeriod
}
return false, remaining
})
}
}, nodeEvictionPeriod, wait.NeverStop)
}()
}
@ -470,10 +429,6 @@ func (nc *NodeController) monitorNodeStatus() error {
glog.Infof("Initializing eviction metric for zone: %v", zone)
EvictionsNumber.WithLabelValues(zone).Add(0)
}
if _, found := nc.zoneTerminationEvictor[zone]; !found {
nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
}
nc.cancelPodEviction(added[i])
}
@ -557,7 +512,7 @@ func (nc *NodeController) monitorNodeStatus() error {
// Kubelet is not reporting and Cloud Provider says node
// is gone. Delete it without worrying about grace
// periods.
if err := forcefullyDeleteNode(nc.kubeClient, nodeName, nc.forcefullyDeletePod); err != nil {
if err := forcefullyDeleteNode(nc.kubeClient, nodeName); err != nil {
glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
}
}(node.Name)
@ -618,7 +573,6 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*ap
// We stop all evictions.
for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].SwapLimiter(0)
nc.zoneTerminationEvictor[k].SwapLimiter(0)
}
for k := range nc.zoneStates {
nc.zoneStates[k] = stateFullDisruption
@ -662,17 +616,12 @@ func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zone
switch state {
case stateNormal:
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
nc.zoneTerminationEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
case statePartialDisruption:
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
nc.zoneTerminationEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
case stateFullDisruption:
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
nc.zoneTerminationEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
}
}
@ -871,8 +820,7 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name)
wasTerminating := nc.zoneTerminationEvictor[zone].Remove(node.Name)
if wasDeleting || wasTerminating {
if wasDeleting {
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
return true
}

View File

@ -519,15 +519,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
if remaining {
nodeController.zoneTerminationEvictor[zone].Add(value.Value, nodeUid)
}
return true, 0
})
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, value.AddedAt, nodeController.maximumGracePeriod)
deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
return true, 0
})
}
@ -1054,15 +1046,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
remaining, _ := deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore)
if remaining {
nodeController.zoneTerminationEvictor[zone].Add(value.Value, value.UID)
}
return true, 0
})
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
terminatePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, value.AddedAt, nodeController.maximumGracePeriod)
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore)
return true, 0
})
}

View File

@ -68,6 +68,7 @@ go_library(
"mesos.go",
"monitoring.go",
"namespace.go",
"network_partition.go",
"networking.go",
"networking_perf.go",
"node_problem_detector.go",

View File

@ -167,6 +167,9 @@ const (
// Number of objects that gc can delete in a second.
// GC issues 2 requestes for single delete.
gcThroughput = 10
// TODO(justinsb): Avoid hardcoding this.
awsMasterIP = "172.20.0.9"
)
var (
@ -185,6 +188,12 @@ var (
}
)
type Address struct {
internalIP string
externalIP string
hostname string
}
// GetServerArchitecture fetches the architecture of the cluster's apiserver.
func GetServerArchitecture(c clientset.Interface) string {
arch := ""
@ -1395,6 +1404,27 @@ func WaitForRCToStabilize(c clientset.Interface, ns, name string, timeout time.D
return err
}
// WaitForPodAddition waits for pods to be added within the timeout.
func WaitForPodAddition(c clientset.Interface, ns string, timeout time.Duration) error {
options := api.ListOptions{FieldSelector: fields.Set{
"metadata.namespace": ns,
}.AsSelector()}
w, err := c.Core().Pods(ns).Watch(options)
if err != nil {
return err
}
_, err = watch.Until(timeout, w, func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Added:
return true, nil
}
Logf("Waiting for pod(s) to be added in namespace %v", ns)
return false, nil
})
return err
}
func WaitForPodToDisappear(c clientset.Interface, ns, podName string, label labels.Selector, interval, timeout time.Duration) error {
return wait.PollImmediate(interval, timeout, func() (bool, error) {
Logf("Waiting for pod %s to disappear", podName)
@ -1423,9 +1453,9 @@ func WaitForPodToDisappear(c clientset.Interface, ns, podName string, label labe
// In case of failure or too long waiting time, an error is returned.
func WaitForRCPodToDisappear(c clientset.Interface, ns, rcName, podName string) error {
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": rcName}))
// NodeController evicts pod after 5 minutes, so we need timeout greater than that.
// Additionally, there can be non-zero grace period, so we are setting 10 minutes
// to be on the safe size.
// NodeController evicts pod after 5 minutes, so we need timeout greater than that to observe effects.
// The grace period must be set to 0 on the pod for it to be deleted during the partition.
// Otherwise, it goes to the 'Terminating' state till the kubelet confirms deletion.
return WaitForPodToDisappear(c, ns, podName, label, 20*time.Second, 10*time.Minute)
}
@ -1656,11 +1686,16 @@ func PodsResponding(c clientset.Interface, ns, name string, wantName bool, pods
}
func PodsCreated(c clientset.Interface, ns, name string, replicas int32) (*api.PodList, error) {
timeout := 2 * time.Minute
// List the pods, making sure we observe all the replicas.
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
return PodsCreatedByLabel(c, ns, name, replicas, label)
}
func PodsCreatedByLabel(c clientset.Interface, ns, name string, replicas int32, label labels.Selector) (*api.PodList, error) {
timeout := 2 * time.Minute
for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
options := api.ListOptions{LabelSelector: label}
// List the pods, making sure we observe all the replicas.
pods, err := c.Core().Pods(ns).List(options)
if err != nil {
return nil, err
@ -4587,3 +4622,65 @@ func CleanupGCEResources(loadBalancerName string) (err error) {
gceCloud.DeleteTargetPool(loadBalancerName, hc)
return nil
}
// getMaster populates the externalIP, internalIP and hostname fields of the master.
// If any of these is unavailable, it is set to "".
func getMaster(c clientset.Interface) Address {
master := Address{}
// Populate the internal IP.
eps, err := c.Core().Endpoints(api.NamespaceDefault).Get("kubernetes")
if err != nil {
Failf("Failed to get kubernetes endpoints: %v", err)
}
if len(eps.Subsets) != 1 || len(eps.Subsets[0].Addresses) != 1 {
Failf("There are more than 1 endpoints for kubernetes service: %+v", eps)
}
master.internalIP = eps.Subsets[0].Addresses[0].IP
// Populate the external IP/hostname.
url, err := url.Parse(TestContext.Host)
if err != nil {
Failf("Failed to parse hostname: %v", err)
}
if net.ParseIP(url.Host) != nil {
// TODO: Check that it is external IP (not having a reserved IP address as per RFC1918).
master.externalIP = url.Host
} else {
master.hostname = url.Host
}
return master
}
// GetMasterAddress returns the hostname/external IP/internal IP as appropriate for e2e tests on a particular provider
// which is the address of the interface used for communication with the kubelet.
func GetMasterAddress(c clientset.Interface) string {
master := getMaster(c)
switch TestContext.Provider {
case "gce", "gke":
return master.externalIP
case "aws":
return awsMasterIP
default:
Failf("This test is not supported for provider %s and should be disabled", TestContext.Provider)
}
return ""
}
// GetNodeExternalIP returns node external IP concatenated with port 22 for ssh
// e.g. 1.2.3.4:22
func GetNodeExternalIP(node *api.Node) string {
Logf("Getting external IP address for %s", node.Name)
host := ""
for _, a := range node.Status.Addresses {
if a.Type == api.NodeExternalIP {
host = a.Address + ":22"
break
}
}
if host == "" {
Failf("Couldn't get the external IP of host %s with addresses %v", node.Name, node.Status.Addresses)
}
return host
}

View File

@ -55,7 +55,7 @@ func verifyRCs(c clientset.Interface, ns string, names []string) {
}
func createNewRC(c clientset.Interface, ns string, name string) {
_, err := newRCByName(c, ns, name, 1)
_, err := newRCByName(c, ns, name, 1, nil)
framework.ExpectNoError(err)
}

View File

@ -0,0 +1,478 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/test/e2e/framework"
testutils "k8s.io/kubernetes/test/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
// Blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func testUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *api.Node, testFunc func()) {
host := framework.GetNodeExternalIP(node)
master := framework.GetMasterAddress(c)
By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
defer func() {
// This code will execute even if setting the iptables rule failed.
// It is on purpose because we may have an error even if the new rule
// had been inserted. (yes, we could look at the error code and ssh error
// separately, but I prefer to stay on the safe side).
By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
framework.UnblockNetwork(host, master)
}()
framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
if !framework.WaitForNodeToBe(c, node.Name, api.NodeReady, true, resizeNodeReadyTimeout) {
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
framework.BlockNetwork(host, master)
framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
if !framework.WaitForNodeToBe(c, node.Name, api.NodeReady, false, resizeNodeNotReadyTimeout) {
framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
}
testFunc()
// network traffic is unblocked in a deferred function
}
func expectNodeReadiness(isReady bool, newNode chan *api.Node) {
timeout := false
expected := false
timer := time.After(nodeReadinessTimeout)
for !expected && !timeout {
select {
case n := <-newNode:
if framework.IsNodeConditionSetAsExpected(n, api.NodeReady, isReady) {
expected = true
} else {
framework.Logf("Observed node ready status is NOT %v as expected", isReady)
}
case <-timer:
timeout = true
}
}
if !expected {
framework.Failf("Failed to observe node ready status change to %v", isReady)
}
}
func podOnNode(podName, nodeName string, image string) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Labels: map[string]string{
"name": podName,
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: podName,
Image: image,
Ports: []api.ContainerPort{{ContainerPort: 9376}},
},
},
NodeName: nodeName,
RestartPolicy: api.RestartPolicyNever,
},
}
}
func newPodOnNode(c clientset.Interface, namespace, podName, nodeName string) error {
pod, err := c.Core().Pods(namespace).Create(podOnNode(podName, nodeName, serveHostnameImage))
if err == nil {
framework.Logf("Created pod %s on node %s", pod.ObjectMeta.Name, nodeName)
} else {
framework.Logf("Failed to create pod %s on node %s: %v", podName, nodeName, err)
}
return err
}
var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
f := framework.NewDefaultFramework("network-partition")
var systemPodsNo int32
var c clientset.Interface
var ns string
ignoreLabels := framework.ImagePullerLabels
var group string
BeforeEach(func() {
c = f.ClientSet
ns = f.Namespace.Name
systemPods, err := framework.GetPodsInNamespace(c, ns, ignoreLabels)
Expect(err).NotTo(HaveOccurred())
systemPodsNo = int32(len(systemPods))
if strings.Index(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") >= 0 {
framework.Failf("Test dose not support cluster setup with more than one MIG: %s", framework.TestContext.CloudConfig.NodeInstanceGroup)
} else {
group = framework.TestContext.CloudConfig.NodeInstanceGroup
}
})
framework.KubeDescribe("Pods", func() {
Context("should return to running and ready state after network partition is healed", func() {
BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke", "aws")
framework.SkipUnlessNodeCountIsAtLeast(2)
})
// What happens in this test:
// Network traffic from a node to master is cut off to simulate network partition
// Expect to observe:
// 1. Node is marked NotReady after timeout by nodecontroller (40seconds)
// 2. All pods on node are marked NotReady shortly after #1
// 3. Node and pods return to Ready after connectivivty recovers
It("All pods on the unreachable node should be marked as NotReady upon the node turn NotReady "+
"AND all pods should be mark back to Ready when the node get back to Ready before pod eviction timeout", func() {
By("choose a node - we will block all network traffic on this node")
var podOpts api.ListOptions
nodeOpts := api.ListOptions{}
nodes, err := c.Core().Nodes().List(nodeOpts)
Expect(err).NotTo(HaveOccurred())
framework.FilterNodes(nodes, func(node api.Node) bool {
if !framework.IsNodeConditionSetAsExpected(&node, api.NodeReady, true) {
return false
}
podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name)}
pods, err := c.Core().Pods(api.NamespaceAll).List(podOpts)
if err != nil || len(pods.Items) <= 0 {
return false
}
return true
})
if len(nodes.Items) <= 0 {
framework.Failf("No eligible node were found: %d", len(nodes.Items))
}
node := nodes.Items[0]
podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name)}
if err = framework.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, testutils.PodRunningReady); err != nil {
framework.Failf("Pods on node %s are not ready and running within %v: %v", node.Name, podReadyTimeout, err)
}
By("Set up watch on node status")
nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name)
stopCh := make(chan struct{})
newNode := make(chan *api.Node)
var controller *cache.Controller
_, controller = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = nodeSelector
obj, err := f.ClientSet.Core().Nodes().List(options)
return runtime.Object(obj), err
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.FieldSelector = nodeSelector
return f.ClientSet.Core().Nodes().Watch(options)
},
},
&api.Node{},
0,
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
n, ok := newObj.(*api.Node)
Expect(ok).To(Equal(true))
newNode <- n
},
},
)
defer func() {
// Will not explicitly close newNode channel here due to
// race condition where stopCh and newNode are closed but informer onUpdate still executes.
close(stopCh)
}()
go controller.Run(stopCh)
By(fmt.Sprintf("Block traffic from node %s to the master", node.Name))
host := framework.GetNodeExternalIP(&node)
master := framework.GetMasterAddress(c)
defer func() {
By(fmt.Sprintf("Unblock traffic from node %s to the master", node.Name))
framework.UnblockNetwork(host, master)
if CurrentGinkgoTestDescription().Failed {
return
}
By("Expect to observe node and pod status change from NotReady to Ready after network connectivity recovers")
expectNodeReadiness(true, newNode)
if err = framework.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, testutils.PodRunningReady); err != nil {
framework.Failf("Pods on node %s did not become ready and running within %v: %v", node.Name, podReadyTimeout, err)
}
}()
framework.BlockNetwork(host, master)
By("Expect to observe node and pod status change from Ready to NotReady after network partition")
expectNodeReadiness(false, newNode)
if err = framework.WaitForMatchPodsCondition(c, podOpts, "NotReady", podNotReadyTimeout, testutils.PodNotReady); err != nil {
framework.Failf("Pods on node %s did not become NotReady within %v: %v", node.Name, podNotReadyTimeout, err)
}
})
})
})
framework.KubeDescribe("[ReplicationController]", func() {
It("should recreate pods scheduled on the unreachable node "+
"AND allow scheduling of pods on a node after it rejoins the cluster", func() {
// Create a replication controller for a service that serves its hostname.
// The source for the Docker container kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
name := "my-hostname-net"
newSVCByName(c, ns, name)
replicas := int32(framework.TestContext.CloudConfig.NumNodes)
newRCByName(c, ns, name, replicas, nil)
err := framework.VerifyPods(c, ns, name, true, replicas)
Expect(err).NotTo(HaveOccurred(), "Each pod should start running and responding")
By("choose a node with at least one pod - we will block some network traffic on this node")
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
options := api.ListOptions{LabelSelector: label}
pods, err := c.Core().Pods(ns).List(options) // list pods after all have been scheduled
Expect(err).NotTo(HaveOccurred())
nodeName := pods.Items[0].Spec.NodeName
node, err := c.Core().Nodes().Get(nodeName)
Expect(err).NotTo(HaveOccurred())
// This creates a temporary network partition, verifies that 'podNameToDisappear',
// that belongs to replication controller 'rcName', really disappeared (because its
// grace period is set to 0).
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas'.
By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
err := framework.WaitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
Expect(err).NotTo(HaveOccurred())
By("verifying whether the pod from the unreachable node is recreated")
err = framework.VerifyPods(c, ns, name, true, replicas)
Expect(err).NotTo(HaveOccurred())
})
framework.Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
if !framework.WaitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
// sleep a bit, to allow Watch in NodeController to catch up.
time.Sleep(5 * time.Second)
By("verify whether new pods can be created on the re-attached node")
// increasing the RC size is not a valid way to test this
// since we have no guarantees the pod will be scheduled on our node.
additionalPod := "additionalpod"
err = newPodOnNode(c, ns, additionalPod, node.Name)
Expect(err).NotTo(HaveOccurred())
err = framework.VerifyPods(c, ns, additionalPod, true, 1)
Expect(err).NotTo(HaveOccurred())
// verify that it is really on the requested node
{
pod, err := c.Core().Pods(ns).Get(additionalPod)
Expect(err).NotTo(HaveOccurred())
if pod.Spec.NodeName != node.Name {
framework.Logf("Pod %s found on invalid node: %s instead of %s", pod.Name, pod.Spec.NodeName, node.Name)
}
}
})
It("should eagerly create replacement pod during network partition when termination grace is non-zero", func() {
// Create a replication controller for a service that serves its hostname.
// The source for the Docker container kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
name := "my-hostname-net"
gracePeriod := int64(30)
newSVCByName(c, ns, name)
replicas := int32(framework.TestContext.CloudConfig.NumNodes)
newRCByName(c, ns, name, replicas, &gracePeriod)
err := framework.VerifyPods(c, ns, name, true, replicas)
Expect(err).NotTo(HaveOccurred(), "Each pod should start running and responding")
By("choose a node with at least one pod - we will block some network traffic on this node")
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
options := api.ListOptions{LabelSelector: label}
pods, err := c.Core().Pods(ns).List(options) // list pods after all have been scheduled
Expect(err).NotTo(HaveOccurred())
nodeName := pods.Items[0].Spec.NodeName
node, err := c.Core().Nodes().Get(nodeName)
Expect(err).NotTo(HaveOccurred())
// This creates a temporary network partition, verifies that 'podNameToDisappear',
// that belongs to replication controller 'rcName', did not disappear (because its
// grace period is set to 30).
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas + 1'.
By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
err := framework.WaitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
By(fmt.Sprintf("verifying that there are %v running pods during partition", replicas))
_, err = framework.PodsCreated(c, ns, name, replicas)
Expect(err).NotTo(HaveOccurred())
})
framework.Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
if !framework.WaitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
})
})
framework.KubeDescribe("[StatefulSet]", func() {
psName := "pet"
labels := map[string]string{
"foo": "bar",
}
headlessSvcName := "test"
BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke")
By("creating service " + headlessSvcName + " in namespace " + f.Namespace.Name)
headlessService := createServiceSpec(headlessSvcName, "", true, labels)
_, err := f.ClientSet.Core().Services(f.Namespace.Name).Create(headlessService)
framework.ExpectNoError(err)
c = f.ClientSet
ns = f.Namespace.Name
})
AfterEach(func() {
if CurrentGinkgoTestDescription().Failed {
dumpDebugInfo(c, ns)
}
framework.Logf("Deleting all petset in ns %v", ns)
deleteAllStatefulSets(c, ns)
})
It("should come back up if node goes down [Slow] [Disruptive] [Feature:PetSet]", func() {
petMounts := []api.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
podMounts := []api.VolumeMount{{Name: "home", MountPath: "/home"}}
ps := newStatefulSet(psName, ns, headlessSvcName, 3, petMounts, podMounts, labels)
_, err := c.Apps().StatefulSets(ns).Create(ps)
Expect(err).NotTo(HaveOccurred())
pst := statefulSetTester{c: c}
pst.saturate(ps)
nn := framework.TestContext.CloudConfig.NumNodes
nodeNames, err := framework.CheckNodesReady(f.ClientSet, framework.NodeReadyInitialTimeout, nn)
framework.ExpectNoError(err)
restartNodes(f, nodeNames)
By("waiting for pods to be running again")
pst.waitForRunning(ps.Spec.Replicas, ps)
})
It("should not reschedule pets if there is a network partition [Slow] [Disruptive] [Feature:PetSet]", func() {
ps := newStatefulSet(psName, ns, headlessSvcName, 3, []api.VolumeMount{}, []api.VolumeMount{}, labels)
_, err := c.Apps().StatefulSets(ns).Create(ps)
Expect(err).NotTo(HaveOccurred())
pst := statefulSetTester{c: c}
pst.saturate(ps)
pod := pst.getPodList(ps).Items[0]
node, err := c.Core().Nodes().Get(pod.Spec.NodeName)
framework.ExpectNoError(err)
// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
// that belongs to StatefulSet 'petSetName', **does not** disappear due to forced deletion from the apiserver.
// The grace period on the petset pods is set to a value > 0.
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Checking that the NodeController does not force delete pet %v", pod.Name)
err := framework.WaitTimeoutForPodNoLongerRunningInNamespace(c, pod.Name, ns, pod.ResourceVersion, 10*time.Minute)
Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
})
framework.Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
if !framework.WaitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
By("waiting for pods to be running again")
pst.waitForRunning(ps.Spec.Replicas, ps)
})
})
framework.KubeDescribe("[Job]", func() {
It("should create new pods when node is partitioned", func() {
parallelism := int32(2)
completions := int32(4)
job := newTestJob("notTerminate", "network-partition", api.RestartPolicyNever, parallelism, completions)
job, err := createJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
label := labels.SelectorFromSet(labels.Set(map[string]string{jobSelectorKey: job.Name}))
By(fmt.Sprintf("verifying that there are now %v running pods", parallelism))
_, err = framework.PodsCreatedByLabel(c, ns, job.Name, parallelism, label)
Expect(err).NotTo(HaveOccurred())
By("choose a node with at least one pod - we will block some network traffic on this node")
options := api.ListOptions{LabelSelector: label}
pods, err := c.Core().Pods(ns).List(options) // list pods after all have been scheduled
Expect(err).NotTo(HaveOccurred())
nodeName := pods.Items[0].Spec.NodeName
node, err := c.Core().Nodes().Get(nodeName)
Expect(err).NotTo(HaveOccurred())
// This creates a temporary network partition, verifies that the job has 'parallelism' number of
// running pods after the node-controller detects node unreachable.
By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
err := framework.WaitForPodToDisappear(c, ns, pods.Items[0].Name, label, 20*time.Second, 10*time.Minute)
Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
By(fmt.Sprintf("verifying that there are now %v running pods", parallelism))
_, err = framework.PodsCreatedByLabel(c, ns, job.Name, parallelism, label)
Expect(err).NotTo(HaveOccurred())
})
framework.Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
if !framework.WaitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
})
})
})

View File

@ -18,8 +18,6 @@ package e2e
import (
"fmt"
"net"
"net/url"
"os/exec"
"regexp"
"strings"
@ -29,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/test/e2e/framework"
@ -37,12 +34,7 @@ import (
"github.com/aws/aws-sdk-go/service/autoscaling"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/client/cache"
awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
testutils "k8s.io/kubernetes/test/utils"
)
const (
@ -53,17 +45,8 @@ const (
podNotReadyTimeout = 1 * time.Minute
podReadyTimeout = 2 * time.Minute
testPort = 9376
// TODO(justinsb): Avoid hardcoding this.
awsMasterIP = "172.20.0.9"
)
type Address struct {
internalIP string
externalIP string
hostname string
}
func ResizeGroup(group string, size int32) error {
if framework.TestContext.ReportDir != "" {
framework.CoreDump(framework.TestContext.ReportDir)
@ -176,57 +159,26 @@ func newSVCByName(c clientset.Interface, ns, name string) error {
return err
}
func podOnNode(podName, nodeName string, image string) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Labels: map[string]string{
"name": podName,
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: podName,
Image: image,
Ports: []api.ContainerPort{{ContainerPort: 9376}},
},
},
NodeName: nodeName,
RestartPolicy: api.RestartPolicyNever,
},
}
}
func rcByNamePort(name string, replicas int32, image string, port int, protocol api.Protocol,
labels map[string]string, gracePeriod *int64) *api.ReplicationController {
func newPodOnNode(c clientset.Interface, namespace, podName, nodeName string) error {
pod, err := c.Core().Pods(namespace).Create(podOnNode(podName, nodeName, serveHostnameImage))
if err == nil {
framework.Logf("Created pod %s on node %s", pod.ObjectMeta.Name, nodeName)
} else {
framework.Logf("Failed to create pod %s on node %s: %v", podName, nodeName, err)
}
return err
}
func rcByName(name string, replicas int32, image string, labels map[string]string) *api.ReplicationController {
return rcByNameContainer(name, replicas, image, labels, api.Container{
Name: name,
Image: image,
})
}
func rcByNamePort(name string, replicas int32, image string, port int, protocol api.Protocol, labels map[string]string) *api.ReplicationController {
return rcByNameContainer(name, replicas, image, labels, api.Container{
Name: name,
Image: image,
Ports: []api.ContainerPort{{ContainerPort: int32(port), Protocol: protocol}},
})
}, gracePeriod)
}
func rcByNameContainer(name string, replicas int32, image string, labels map[string]string, c api.Container) *api.ReplicationController {
func rcByNameContainer(name string, replicas int32, image string, labels map[string]string, c api.Container,
gracePeriod *int64) *api.ReplicationController {
zeroGracePeriod := int64(0)
// Add "name": name to the labels, overwriting if it exists.
labels["name"] = name
gracePeriod := int64(0)
if gracePeriod == nil {
gracePeriod = &zeroGracePeriod
}
return &api.ReplicationController{
TypeMeta: unversioned.TypeMeta{
Kind: "ReplicationController",
@ -246,7 +198,7 @@ func rcByNameContainer(name string, replicas int32, image string, labels map[str
},
Spec: api.PodSpec{
Containers: []api.Container{c},
TerminationGracePeriodSeconds: &gracePeriod,
TerminationGracePeriodSeconds: gracePeriod,
},
},
},
@ -254,10 +206,10 @@ func rcByNameContainer(name string, replicas int32, image string, labels map[str
}
// newRCByName creates a replication controller with a selector by name of name.
func newRCByName(c clientset.Interface, ns, name string, replicas int32) (*api.ReplicationController, error) {
func newRCByName(c clientset.Interface, ns, name string, replicas int32, gracePeriod *int64) (*api.ReplicationController, error) {
By(fmt.Sprintf("creating replication controller %s", name))
return c.Core().ReplicationControllers(ns).Create(rcByNamePort(
name, replicas, serveHostnameImage, 9376, api.ProtocolTCP, map[string]string{}))
name, replicas, serveHostnameImage, 9376, api.ProtocolTCP, map[string]string{}, gracePeriod))
}
func resizeRC(c clientset.Interface, ns, name string, replicas int32) error {
@ -270,131 +222,6 @@ func resizeRC(c clientset.Interface, ns, name string, replicas int32) error {
return err
}
// getMaster populates the externalIP, internalIP and hostname fields of the master.
// If any of these is unavailable, it is set to "".
func getMaster(c clientset.Interface) Address {
master := Address{}
// Populate the internal IP.
eps, err := c.Core().Endpoints(api.NamespaceDefault).Get("kubernetes")
if err != nil {
framework.Failf("Failed to get kubernetes endpoints: %v", err)
}
if len(eps.Subsets) != 1 || len(eps.Subsets[0].Addresses) != 1 {
framework.Failf("There are more than 1 endpoints for kubernetes service: %+v", eps)
}
master.internalIP = eps.Subsets[0].Addresses[0].IP
// Populate the external IP/hostname.
url, err := url.Parse(framework.TestContext.Host)
if err != nil {
framework.Failf("Failed to parse hostname: %v", err)
}
if net.ParseIP(url.Host) != nil {
// TODO: Check that it is external IP (not having a reserved IP address as per RFC1918).
master.externalIP = url.Host
} else {
master.hostname = url.Host
}
return master
}
// getMasterAddress returns the hostname/external IP/internal IP as appropriate for e2e tests on a particular provider
// which is the address of the interface used for communication with the kubelet.
func getMasterAddress(c clientset.Interface) string {
master := getMaster(c)
switch framework.TestContext.Provider {
case "gce", "gke":
return master.externalIP
case "aws":
return awsMasterIP
default:
framework.Failf("This test is not supported for provider %s and should be disabled", framework.TestContext.Provider)
}
return ""
}
// Return node external IP concatenated with port 22 for ssh
// e.g. 1.2.3.4:22
func getNodeExternalIP(node *api.Node) string {
framework.Logf("Getting external IP address for %s", node.Name)
host := ""
for _, a := range node.Status.Addresses {
if a.Type == api.NodeExternalIP {
host = a.Address + ":22"
break
}
}
if host == "" {
framework.Failf("Couldn't get the external IP of host %s with addresses %v", node.Name, node.Status.Addresses)
}
return host
}
// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
// that belongs to replication controller 'rcName', really disappeared.
// Finally, it checks that the replication controller recreates the
// pods on another node and that now the number of replicas is equal 'replicas'.
// At the end (even in case of errors), the network traffic is brought back to normal.
// This function executes commands on a node so it will work only for some
// environments.
func performTemporaryNetworkFailure(c clientset.Interface, ns, rcName string, replicas int32, podNameToDisappear string, node *api.Node) {
host := getNodeExternalIP(node)
master := getMasterAddress(c)
By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
defer func() {
// This code will execute even if setting the iptables rule failed.
// It is on purpose because we may have an error even if the new rule
// had been inserted. (yes, we could look at the error code and ssh error
// separately, but I prefer to stay on the safe side).
By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
framework.UnblockNetwork(host, master)
}()
framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
if !framework.WaitForNodeToBe(c, node.Name, api.NodeReady, true, resizeNodeReadyTimeout) {
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
framework.BlockNetwork(host, master)
framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
if !framework.WaitForNodeToBe(c, node.Name, api.NodeReady, false, resizeNodeNotReadyTimeout) {
framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
}
framework.Logf("Waiting for pod %s to be removed", podNameToDisappear)
err := framework.WaitForRCPodToDisappear(c, ns, rcName, podNameToDisappear)
Expect(err).NotTo(HaveOccurred())
By("verifying whether the pod from the unreachable node is recreated")
err = framework.VerifyPods(c, ns, rcName, true, replicas)
Expect(err).NotTo(HaveOccurred())
// network traffic is unblocked in a deferred function
}
func expectNodeReadiness(isReady bool, newNode chan *api.Node) {
timeout := false
expected := false
timer := time.After(nodeReadinessTimeout)
for !expected && !timeout {
select {
case n := <-newNode:
if framework.IsNodeConditionSetAsExpected(n, api.NodeReady, isReady) {
expected = true
} else {
framework.Logf("Observed node ready status is NOT %v as expected", isReady)
}
case <-timer:
timeout = true
}
}
if !expected {
framework.Failf("Failed to observe node ready status change to %v", isReady)
}
}
var _ = framework.KubeDescribe("Nodes [Disruptive]", func() {
f := framework.NewDefaultFramework("resize-nodes")
var systemPodsNo int32
@ -468,7 +295,7 @@ var _ = framework.KubeDescribe("Nodes [Disruptive]", func() {
// The source for the Docker container kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
name := "my-hostname-delete-node"
replicas := int32(framework.TestContext.CloudConfig.NumNodes)
newRCByName(c, ns, name, replicas)
newRCByName(c, ns, name, replicas, nil)
err := framework.VerifyPods(c, ns, name, true, replicas)
Expect(err).NotTo(HaveOccurred())
@ -480,6 +307,9 @@ var _ = framework.KubeDescribe("Nodes [Disruptive]", func() {
err = framework.WaitForClusterSize(c, int(replicas-1), 10*time.Minute)
Expect(err).NotTo(HaveOccurred())
By("waiting for podGC to remove/recreate any pods scheduled on the now non-existent node")
framework.WaitForPodAddition(c, ns, 2*time.Minute)
By("verifying whether the pods from the removed node are recreated")
err = framework.VerifyPods(c, ns, name, true, replicas)
Expect(err).NotTo(HaveOccurred())
@ -492,7 +322,7 @@ var _ = framework.KubeDescribe("Nodes [Disruptive]", func() {
name := "my-hostname-add-node"
newSVCByName(c, ns, name)
replicas := int32(framework.TestContext.CloudConfig.NumNodes)
newRCByName(c, ns, name, replicas)
newRCByName(c, ns, name, replicas, nil)
err := framework.VerifyPods(c, ns, name, true, replicas)
Expect(err).NotTo(HaveOccurred())
@ -511,166 +341,4 @@ var _ = framework.KubeDescribe("Nodes [Disruptive]", func() {
Expect(err).NotTo(HaveOccurred())
})
})
framework.KubeDescribe("Network", func() {
Context("when a node becomes unreachable", func() {
BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke", "aws")
framework.SkipUnlessNodeCountIsAtLeast(2)
})
// TODO marekbiskup 2015-06-19 #10085
// This test has nothing to do with resizing nodes so it should be moved elsewhere.
// Two things are tested here:
// 1. pods from a uncontactable nodes are rescheduled
// 2. when a node joins the cluster, it can host new pods.
// Factor out the cases into two separate tests.
It("[replication controller] recreates pods scheduled on the unreachable node "+
"AND allows scheduling of pods on a node after it rejoins the cluster", func() {
// Create a replication controller for a service that serves its hostname.
// The source for the Docker container kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
name := "my-hostname-net"
newSVCByName(c, ns, name)
replicas := int32(framework.TestContext.CloudConfig.NumNodes)
newRCByName(c, ns, name, replicas)
err := framework.VerifyPods(c, ns, name, true, replicas)
Expect(err).NotTo(HaveOccurred(), "Each pod should start running and responding")
By("choose a node with at least one pod - we will block some network traffic on this node")
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
options := api.ListOptions{LabelSelector: label}
pods, err := c.Core().Pods(ns).List(options) // list pods after all have been scheduled
Expect(err).NotTo(HaveOccurred())
nodeName := pods.Items[0].Spec.NodeName
node, err := c.Core().Nodes().Get(nodeName)
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("block network traffic from node %s", node.Name))
performTemporaryNetworkFailure(c, ns, name, replicas, pods.Items[0].Name, node)
framework.Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
if !framework.WaitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
}
// sleep a bit, to allow Watch in NodeController to catch up.
time.Sleep(5 * time.Second)
By("verify whether new pods can be created on the re-attached node")
// increasing the RC size is not a valid way to test this
// since we have no guarantees the pod will be scheduled on our node.
additionalPod := "additionalpod"
err = newPodOnNode(c, ns, additionalPod, node.Name)
Expect(err).NotTo(HaveOccurred())
err = framework.VerifyPods(c, ns, additionalPod, true, 1)
Expect(err).NotTo(HaveOccurred())
// verify that it is really on the requested node
{
pod, err := c.Core().Pods(ns).Get(additionalPod)
Expect(err).NotTo(HaveOccurred())
if pod.Spec.NodeName != node.Name {
framework.Logf("Pod %s found on invalid node: %s instead of %s", pod.Name, pod.Spec.NodeName, node.Name)
}
}
})
// What happens in this test:
// Network traffic from a node to master is cut off to simulate network partition
// Expect to observe:
// 1. Node is marked NotReady after timeout by nodecontroller (40seconds)
// 2. All pods on node are marked NotReady shortly after #1
// 3. Node and pods return to Ready after connectivivty recovers
It("All pods on the unreachable node should be marked as NotReady upon the node turn NotReady "+
"AND all pods should be mark back to Ready when the node get back to Ready before pod eviction timeout", func() {
By("choose a node - we will block all network traffic on this node")
var podOpts api.ListOptions
nodeOpts := api.ListOptions{}
nodes, err := c.Core().Nodes().List(nodeOpts)
Expect(err).NotTo(HaveOccurred())
framework.FilterNodes(nodes, func(node api.Node) bool {
if !framework.IsNodeConditionSetAsExpected(&node, api.NodeReady, true) {
return false
}
podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name)}
pods, err := c.Core().Pods(api.NamespaceAll).List(podOpts)
if err != nil || len(pods.Items) <= 0 {
return false
}
return true
})
if len(nodes.Items) <= 0 {
framework.Failf("No eligible node were found: %d", len(nodes.Items))
}
node := nodes.Items[0]
podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name)}
if err = framework.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, testutils.PodRunningReady); err != nil {
framework.Failf("Pods on node %s are not ready and running within %v: %v", node.Name, podReadyTimeout, err)
}
By("Set up watch on node status")
nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name)
stopCh := make(chan struct{})
newNode := make(chan *api.Node)
var controller *cache.Controller
_, controller = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = nodeSelector
obj, err := f.ClientSet.Core().Nodes().List(options)
return runtime.Object(obj), err
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.FieldSelector = nodeSelector
return f.ClientSet.Core().Nodes().Watch(options)
},
},
&api.Node{},
0,
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
n, ok := newObj.(*api.Node)
Expect(ok).To(Equal(true))
newNode <- n
},
},
)
defer func() {
// Will not explicitly close newNode channel here due to
// race condition where stopCh and newNode are closed but informer onUpdate still executes.
close(stopCh)
}()
go controller.Run(stopCh)
By(fmt.Sprintf("Block traffic from node %s to the master", node.Name))
host := getNodeExternalIP(&node)
master := getMasterAddress(c)
defer func() {
By(fmt.Sprintf("Unblock traffic from node %s to the master", node.Name))
framework.UnblockNetwork(host, master)
if CurrentGinkgoTestDescription().Failed {
return
}
By("Expect to observe node and pod status change from NotReady to Ready after network connectivity recovers")
expectNodeReadiness(true, newNode)
if err = framework.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, testutils.PodRunningReady); err != nil {
framework.Failf("Pods on node %s did not become ready and running within %v: %v", node.Name, podReadyTimeout, err)
}
}()
framework.BlockNetwork(host, master)
By("Expect to observe node and pod status change from Ready to NotReady after network partition")
expectNodeReadiness(false, newNode)
if err = framework.WaitForMatchPodsCondition(c, podOpts, "NotReady", podNotReadyTimeout, testutils.PodNotReady); err != nil {
framework.Failf("Pods on node %s did not become NotReady within %v: %v", node.Name, podNotReadyTimeout, err)
}
})
})
})
})

View File

@ -1062,7 +1062,7 @@ var _ = framework.KubeDescribe("Services", func() {
},
},
},
})
}, nil)
By(fmt.Sprintf("createing RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
_, err := t.createRC(rcSpec)
@ -2585,7 +2585,7 @@ func (t *ServiceTestFixture) BuildServiceSpec() *api.Service {
// CreateWebserverRC creates rc-backed pods with the well-known webserver
// configuration and records it for cleanup.
func (t *ServiceTestFixture) CreateWebserverRC(replicas int32) *api.ReplicationController {
rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolTCP, t.Labels)
rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolTCP, t.Labels, nil)
rcAct, err := t.createRC(rcSpec)
if err != nil {
framework.Failf("Failed to create rc %s: %v", rcSpec.Name, err)

View File

@ -255,6 +255,12 @@ Namespaces should always delete fast (ALL of 100 namespaces in 150 seconds),rmmh
Namespaces should delete fast enough (90 percent of 100 namespaces in 150 seconds),kevin-wangzefeng,1
Namespaces should ensure that all pods are removed when a namespace is deleted.,xiang90,1
Namespaces should ensure that all services are removed when a namespace is deleted.,pmorie,1
Network Partition *,foxish,0
Network Partition Pods should return to running and ready state after network partition is healed *,foxish,0
Network Partition should come back up if node goes down,foxish,0
Network Partition should create new pods when node is partitioned,foxish,0
Network Partition should eagerly create replacement pod during network partition when termination grace is non-zero,foxish,0
Network Partition should not reschedule pets if there is a network partition,foxish,0
Networking Granular Checks: Pods should function for intra-pod communication: http,stts,0
Networking Granular Checks: Pods should function for intra-pod communication: udp,freehan,0
Networking Granular Checks: Pods should function for node-pod communication: http,spxtr,1
@ -275,7 +281,6 @@ Networking should provide Internet connection for containers,sttts,0
"Networking should provide unchanging, static URL paths for kubernetes api services",freehan,0
NodeOutOfDisk runs out of disk space,vishh,0
NodeProblemDetector KernelMonitor should generate node condition and events for corresponding errors,Random-Liu,0
Nodes Network when a node becomes unreachable *,alex-mohr,1
Nodes Resize should be able to add nodes,piosz,1
Nodes Resize should be able to delete nodes,zmerlynn,1
Opaque resources should account opaque integer resources in pods with multiple containers.,ConnorDoyle,0

1 name owner auto-assigned
255 Namespaces should delete fast enough (90 percent of 100 namespaces in 150 seconds) kevin-wangzefeng 1
256 Namespaces should ensure that all pods are removed when a namespace is deleted. xiang90 1
257 Namespaces should ensure that all services are removed when a namespace is deleted. pmorie 1
258 Network Partition * foxish 0
259 Network Partition Pods should return to running and ready state after network partition is healed * foxish 0
260 Network Partition should come back up if node goes down foxish 0
261 Network Partition should create new pods when node is partitioned foxish 0
262 Network Partition should eagerly create replacement pod during network partition when termination grace is non-zero foxish 0
263 Network Partition should not reschedule pets if there is a network partition foxish 0
264 Networking Granular Checks: Pods should function for intra-pod communication: http stts 0
265 Networking Granular Checks: Pods should function for intra-pod communication: udp freehan 0
266 Networking Granular Checks: Pods should function for node-pod communication: http spxtr 1
281 Networking should provide unchanging, static URL paths for kubernetes api services freehan 0
282 NodeOutOfDisk runs out of disk space vishh 0
283 NodeProblemDetector KernelMonitor should generate node condition and events for corresponding errors Random-Liu 0
Nodes Network when a node becomes unreachable * alex-mohr 1
284 Nodes Resize should be able to add nodes piosz 1
285 Nodes Resize should be able to delete nodes zmerlynn 1
286 Opaque resources should account opaque integer resources in pods with multiple containers. ConnorDoyle 0