From 98fc4a107a8e6da008669b2792044a154f4a75a7 Mon Sep 17 00:00:00 2001 From: Zhen Wang Date: Mon, 1 Oct 2018 11:32:56 -0700 Subject: [PATCH] Update kubelet node status report logic with node lease feature When node lease feature is enabled, kubelet reports node status to api server only if there is some change or it didn't report over last report interval. --- .../testdata/conversion/master/internal.yaml | 1 + .../testdata/conversion/master/v1alpha3.yaml | 1 + .../testdata/conversion/master/v1beta1.yaml | 1 + .../testdata/defaulting/master/defaulted.yaml | 1 + pkg/kubelet/BUILD | 2 + pkg/kubelet/apis/config/fuzzer/fuzzer.go | 1 + pkg/kubelet/apis/config/helpers_test.go | 1 + pkg/kubelet/apis/config/types.go | 12 +- pkg/kubelet/apis/config/v1beta1/defaults.go | 10 + .../config/v1beta1/zz_generated.conversion.go | 2 + .../apis/config/zz_generated.deepcopy.go | 1 + pkg/kubelet/kubelet.go | 17 +- pkg/kubelet/kubelet_network.go | 12 +- pkg/kubelet/kubelet_node_status.go | 77 +++- pkg/kubelet/kubelet_node_status_test.go | 390 ++++++++++++++++++ .../k8s.io/kubelet/config/v1beta1/types.go | 18 +- .../config/v1beta1/zz_generated.deepcopy.go | 1 + test/e2e/common/BUILD | 1 + test/e2e/common/node_lease.go | 98 ++++- 19 files changed, 615 insertions(+), 32 deletions(-) diff --git a/cmd/kubeadm/app/util/config/testdata/conversion/master/internal.yaml b/cmd/kubeadm/app/util/config/testdata/conversion/master/internal.yaml index a94f6b1b841..af08a33e5e5 100644 --- a/cmd/kubeadm/app/util/config/testdata/conversion/master/internal.yaml +++ b/cmd/kubeadm/app/util/config/testdata/conversion/master/internal.yaml @@ -141,6 +141,7 @@ ComponentConfigs: MaxOpenFiles: 1000000 MaxPods: 110 NodeLeaseDurationSeconds: 40 + NodeStatusReportFrequency: 1m0s NodeStatusUpdateFrequency: 10s OOMScoreAdj: -999 PodCIDR: "" diff --git a/cmd/kubeadm/app/util/config/testdata/conversion/master/v1alpha3.yaml b/cmd/kubeadm/app/util/config/testdata/conversion/master/v1alpha3.yaml index 9a47c24bd95..f6ad739d51e 100644 --- a/cmd/kubeadm/app/util/config/testdata/conversion/master/v1alpha3.yaml +++ b/cmd/kubeadm/app/util/config/testdata/conversion/master/v1alpha3.yaml @@ -148,6 +148,7 @@ makeIPTablesUtilChains: true maxOpenFiles: 1000000 maxPods: 110 nodeLeaseDurationSeconds: 40 +nodeStatusReportFrequency: 1m0s nodeStatusUpdateFrequency: 10s oomScoreAdj: -999 podPidsLimit: -1 diff --git a/cmd/kubeadm/app/util/config/testdata/conversion/master/v1beta1.yaml b/cmd/kubeadm/app/util/config/testdata/conversion/master/v1beta1.yaml index c15ea283fea..92758f57c30 100644 --- a/cmd/kubeadm/app/util/config/testdata/conversion/master/v1beta1.yaml +++ b/cmd/kubeadm/app/util/config/testdata/conversion/master/v1beta1.yaml @@ -152,6 +152,7 @@ makeIPTablesUtilChains: true maxOpenFiles: 1000000 maxPods: 110 nodeLeaseDurationSeconds: 40 +nodeStatusReportFrequency: 1m0s nodeStatusUpdateFrequency: 10s oomScoreAdj: -999 podPidsLimit: -1 diff --git a/cmd/kubeadm/app/util/config/testdata/defaulting/master/defaulted.yaml b/cmd/kubeadm/app/util/config/testdata/defaulting/master/defaulted.yaml index 520c9dfb993..1df4353c21f 100644 --- a/cmd/kubeadm/app/util/config/testdata/defaulting/master/defaulted.yaml +++ b/cmd/kubeadm/app/util/config/testdata/defaulting/master/defaulted.yaml @@ -139,6 +139,7 @@ makeIPTablesUtilChains: true maxOpenFiles: 1000000 maxPods: 110 nodeLeaseDurationSeconds: 40 +nodeStatusReportFrequency: 1m0s nodeStatusUpdateFrequency: 10s oomScoreAdj: -999 podPidsLimit: -1 diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index f75821e08e7..b70e9933aaa 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -117,6 +117,7 @@ go_library( "//pkg/volume/validation:go_default_library", "//staging/src/k8s.io/api/authentication/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -234,6 +235,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", diff --git a/pkg/kubelet/apis/config/fuzzer/fuzzer.go b/pkg/kubelet/apis/config/fuzzer/fuzzer.go index 1154789c420..4efd0f45afd 100644 --- a/pkg/kubelet/apis/config/fuzzer/fuzzer.go +++ b/pkg/kubelet/apis/config/fuzzer/fuzzer.go @@ -62,6 +62,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} { obj.MaxPods = 110 obj.PodPidsLimit = -1 obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second} + obj.NodeStatusReportFrequency = metav1.Duration{Duration: time.Minute} obj.NodeLeaseDurationSeconds = 40 obj.CPUManagerPolicy = "none" obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency diff --git a/pkg/kubelet/apis/config/helpers_test.go b/pkg/kubelet/apis/config/helpers_test.go index 412975b98bf..0ba6dad1a5b 100644 --- a/pkg/kubelet/apis/config/helpers_test.go +++ b/pkg/kubelet/apis/config/helpers_test.go @@ -197,6 +197,7 @@ var ( "MaxOpenFiles", "MaxPods", "NodeStatusUpdateFrequency.Duration", + "NodeStatusReportFrequency.Duration", "NodeLeaseDurationSeconds", "OOMScoreAdj", "PodCIDR", diff --git a/pkg/kubelet/apis/config/types.go b/pkg/kubelet/apis/config/types.go index ab232157493..941d8374866 100644 --- a/pkg/kubelet/apis/config/types.go +++ b/pkg/kubelet/apis/config/types.go @@ -151,10 +151,16 @@ type KubeletConfiguration struct { // streamingConnectionIdleTimeout is the maximum time a streaming connection // can be idle before the connection is automatically closed. StreamingConnectionIdleTimeout metav1.Duration - // nodeStatusUpdateFrequency is the frequency that kubelet posts node - // status to master. Note: be cautious when changing the constant, it - // must work with nodeMonitorGracePeriod in nodecontroller. + // nodeStatusUpdateFrequency is the frequency that kubelet computes node + // status. If node lease feature is not enabled, it is also the frequency that + // kubelet posts node status to master. In that case, be cautious when + // changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. NodeStatusUpdateFrequency metav1.Duration + // nodeStatusReportFrequency is the frequency that kubelet posts node + // status to master if node status does not change. Kubelet will ignore this + // frequency and post node status immediately if any change is detected. It is + // only used when node lease feature is enabled. + NodeStatusReportFrequency metav1.Duration // nodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease. NodeLeaseDurationSeconds int32 // imageMinimumGCAge is the minimum age for an unused image before it is diff --git a/pkg/kubelet/apis/config/v1beta1/defaults.go b/pkg/kubelet/apis/config/v1beta1/defaults.go index d5a12f338b0..0aebb8447b8 100644 --- a/pkg/kubelet/apis/config/v1beta1/defaults.go +++ b/pkg/kubelet/apis/config/v1beta1/defaults.go @@ -107,6 +107,16 @@ func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfigura if obj.StreamingConnectionIdleTimeout == zeroDuration { obj.StreamingConnectionIdleTimeout = metav1.Duration{Duration: 4 * time.Hour} } + if obj.NodeStatusReportFrequency == zeroDuration { + // For backward compatibility, NodeStatusReportFrequency's default value is + // set to NodeStatusUpdateFrequency if NodeStatusUpdateFrequency is set + // explicitly. + if obj.NodeStatusUpdateFrequency == zeroDuration { + obj.NodeStatusReportFrequency = metav1.Duration{Duration: time.Minute} + } else { + obj.NodeStatusReportFrequency = obj.NodeStatusUpdateFrequency + } + } if obj.NodeStatusUpdateFrequency == zeroDuration { obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second} } diff --git a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go index 9e67bf9e9a9..b264f2423b2 100644 --- a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go +++ b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go @@ -251,6 +251,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in out.ClusterDNS = *(*[]string)(unsafe.Pointer(&in.ClusterDNS)) out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency + out.NodeStatusReportFrequency = in.NodeStatusReportFrequency out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds out.ImageMinimumGCAge = in.ImageMinimumGCAge if err := v1.Convert_Pointer_int32_To_int32(&in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent, s); err != nil { @@ -380,6 +381,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in out.ClusterDNS = *(*[]string)(unsafe.Pointer(&in.ClusterDNS)) out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency + out.NodeStatusReportFrequency = in.NodeStatusReportFrequency out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds out.ImageMinimumGCAge = in.ImageMinimumGCAge if err := v1.Convert_int32_To_Pointer_int32(&in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent, s); err != nil { diff --git a/pkg/kubelet/apis/config/zz_generated.deepcopy.go b/pkg/kubelet/apis/config/zz_generated.deepcopy.go index 266a862943f..1644f968c56 100644 --- a/pkg/kubelet/apis/config/zz_generated.deepcopy.go +++ b/pkg/kubelet/apis/config/zz_generated.deepcopy.go @@ -112,6 +112,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { } out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency + out.NodeStatusReportFrequency = in.NodeStatusReportFrequency out.ImageMinimumGCAge = in.ImageMinimumGCAge out.VolumeStatsAggPeriod = in.VolumeStatsAggPeriod out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index fe7352f07c2..8fe90c2da71 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -510,6 +510,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, nodeRef: nodeRef, nodeLabels: nodeLabels, nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration, + nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration, os: kubeDeps.OSInterface, oomWatcher: oomWatcher, cgroupsPerQOS: kubeCfg.CgroupsPerQOS, @@ -716,7 +717,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{}) klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime) klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy) - if err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil { + if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil { glog.Errorf("Pod CIDR update failed %v", err) } @@ -1035,8 +1036,9 @@ type Kubelet struct { // used for generating ContainerStatus. reasonCache *ReasonCache - // nodeStatusUpdateFrequency specifies how often kubelet posts node status to master. - // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod + // nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease + // feature is not enabled, it is also the frequency that kubelet posts node status to master. + // In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod // in nodecontroller. There are several constraints: // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where // N means number of retries allowed for kubelet to post node status. It is pointless @@ -1048,6 +1050,13 @@ type Kubelet struct { // as it takes time to gather all necessary node information. nodeStatusUpdateFrequency time.Duration + // nodeStatusUpdateFrequency is the frequency that kubelet posts node + // status to master. It is only used when node lease feature is enabled. + nodeStatusReportFrequency time.Duration + + // lastStatusReportTime is the time when node status was last reported. + lastStatusReportTime time.Time + // syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe. // This lock is used by Kublet.syncNodeStatus function and shouldn't be used anywhere else. syncNodeStatusMux sync.Mutex @@ -2236,7 +2245,7 @@ func (kl *Kubelet) fastStatusUpdateOnce() { continue } if node.Spec.PodCIDR != "" { - if err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { + if _, err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { glog.Errorf("Pod CIDR update failed %v", err) continue } diff --git a/pkg/kubelet/kubelet_network.go b/pkg/kubelet/kubelet_network.go index 0528d2c1e57..aa791280cb1 100644 --- a/pkg/kubelet/kubelet_network.go +++ b/pkg/kubelet/kubelet_network.go @@ -55,26 +55,28 @@ func (kl *Kubelet) providerRequiresNetworkingConfiguration() bool { } // updatePodCIDR updates the pod CIDR in the runtime state if it is different -// from the current CIDR. -func (kl *Kubelet) updatePodCIDR(cidr string) error { +// from the current CIDR. Return true if pod CIDR is actually changed. +func (kl *Kubelet) updatePodCIDR(cidr string) (bool, error) { kl.updatePodCIDRMux.Lock() defer kl.updatePodCIDRMux.Unlock() podCIDR := kl.runtimeState.podCIDR() if podCIDR == cidr { - return nil + return false, nil } // kubelet -> generic runtime -> runtime shim -> network plugin // docker/non-cri implementations have a passthrough UpdatePodCIDR if err := kl.getRuntime().UpdatePodCIDR(cidr); err != nil { - return fmt.Errorf("failed to update pod CIDR: %v", err) + // If updatePodCIDR would fail, theoretically pod CIDR could not change. + // But it is better to be on the safe side to still return true here. + return true, fmt.Errorf("failed to update pod CIDR: %v", err) } glog.Infof("Setting Pod CIDR: %v -> %v", podCIDR, cidr) kl.runtimeState.setPodCIDR(cidr) - return nil + return true, nil } // GetPodDNS returns DNS settings for the pod. diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 7e5dac7964d..5dbbed729d7 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -21,11 +21,13 @@ import ( "fmt" "net" goruntime "runtime" + "sort" "time" "github.com/golang/glog" "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -348,8 +350,8 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { } // syncNodeStatus should be called periodically from a goroutine. -// It synchronizes node status to master, registering the kubelet first if -// necessary. +// It synchronizes node status to master if there is any change or enough time +// passed from the last sync, registering the kubelet first if necessary. func (kl *Kubelet) syncNodeStatus() { kl.syncNodeStatusMux.Lock() defer kl.syncNodeStatusMux.Unlock() @@ -366,7 +368,8 @@ func (kl *Kubelet) syncNodeStatus() { } } -// updateNodeStatus updates node status to master with retries. +// updateNodeStatus updates node status to master with retries if there is any +// change or enough time passed from the last sync. func (kl *Kubelet) updateNodeStatus() error { glog.V(5).Infof("Updating node status") for i := 0; i < nodeStatusUpdateRetry; i++ { @@ -382,7 +385,8 @@ func (kl *Kubelet) updateNodeStatus() error { return fmt.Errorf("update node status exceeds retry count") } -// tryUpdateNodeStatus tries to update node status to master. +// tryUpdateNodeStatus tries to update node status to master if there is any +// change or enough time passed from the last sync. func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { // In large clusters, GET and PUT operations on Node objects coming // from here are the majority of load on apiserver and etcd. @@ -404,18 +408,31 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { return fmt.Errorf("nil %q node object", kl.nodeName) } + podCIDRChanged := false if node.Spec.PodCIDR != "" { - if err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { + // Pod CIDR could have been updated before, so we cannot rely on + // node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is + // actually changed. + if podCIDRChanged, err = kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { glog.Errorf(err.Error()) } } kl.setNodeStatus(node) + + now := kl.clock.Now() + if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) { + if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) { + return nil + } + } + // Patch the current status on the API server updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node) if err != nil { return err } + kl.lastStatusReportTime = now kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses) // If update finishes successfully, mark the volumeInUse as reportedInUse to indicate // those volumes are already updated in the node's status @@ -553,3 +570,53 @@ func validateNodeIP(nodeIP net.IP) error { } return fmt.Errorf("Node IP: %q not found in the host's network interfaces", nodeIP.String()) } + +// nodeStatusHasChanged compares the original node and current node's status and +// returns true if any change happens. The heartbeat timestamp is ignored. +func nodeStatusHasChanged(originalStatus *v1.NodeStatus, status *v1.NodeStatus) bool { + if originalStatus == nil && status == nil { + return false + } + if originalStatus == nil || status == nil { + return true + } + + // Compare node conditions here because we need to ignore the heartbeat timestamp. + if nodeConditionsHaveChanged(originalStatus.Conditions, status.Conditions) { + return true + } + + // Compare other fields of NodeStatus. + originalStatusCopy := originalStatus.DeepCopy() + statusCopy := status.DeepCopy() + originalStatusCopy.Conditions = nil + statusCopy.Conditions = nil + return !apiequality.Semantic.DeepEqual(originalStatusCopy, statusCopy) +} + +// nodeConditionsHaveChanged compares the original node and current node's +// conditions and returns true if any change happens. The heartbeat timestamp is +// ignored. +func nodeConditionsHaveChanged(originalConditions []v1.NodeCondition, conditions []v1.NodeCondition) bool { + if len(originalConditions) != len(conditions) { + return true + } + + originalConditionsCopy := make([]v1.NodeCondition, 0, len(originalConditions)) + originalConditionsCopy = append(originalConditionsCopy, originalConditions...) + conditionsCopy := make([]v1.NodeCondition, 0, len(conditions)) + conditionsCopy = append(conditionsCopy, conditions...) + + sort.SliceStable(originalConditionsCopy, func(i, j int) bool { return originalConditionsCopy[i].Type < originalConditionsCopy[j].Type }) + sort.SliceStable(conditionsCopy, func(i, j int) bool { return conditionsCopy[i].Type < conditionsCopy[j].Type }) + + replacedheartbeatTime := metav1.Time{} + for i := range conditionsCopy { + originalConditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime + conditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime + if !apiequality.Semantic.DeepEqual(&originalConditionsCopy[i], &conditionsCopy[i]) { + return true + } + } + return false +} diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index af73cd5f378..caeda4c0ff0 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" + utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" @@ -795,6 +796,239 @@ func TestUpdateNodeStatusError(t *testing.T) { assert.Len(t, testKubelet.fakeKubeClient.Actions(), nodeStatusUpdateRetry) } +func TestUpdateNodeStatusWithLease(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeLease, true)() + + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + clock := testKubelet.fakeClock + kubelet := testKubelet.kubelet + kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test + kubelet.kubeClient = nil // ensure only the heartbeat client is used + kubelet.containerManager = &localCM{ + ContainerManager: cm.NewStubContainerManager(), + allocatableReservation: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), + }, + capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + } + // Since this test retroactively overrides the stub container manager, + // we have to regenerate default status setters. + kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() + kubelet.nodeStatusReportFrequency = time.Minute + + kubeClient := testKubelet.fakeKubeClient + existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} + kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*existingNode}}).ReactionChain + machineInfo := &cadvisorapi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 20E9, + } + kubelet.machineInfo = machineInfo + + now := metav1.NewTime(clock.Now()).Rfc3339Copy() + expectedNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeMemoryPressure, + Status: v1.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: fmt.Sprintf("kubelet has sufficient memory available"), + LastHeartbeatTime: now, + LastTransitionTime: now, + }, + { + Type: v1.NodeDiskPressure, + Status: v1.ConditionFalse, + Reason: "KubeletHasNoDiskPressure", + Message: fmt.Sprintf("kubelet has no disk pressure"), + LastHeartbeatTime: now, + LastTransitionTime: now, + }, + { + Type: v1.NodePIDPressure, + Status: v1.ConditionFalse, + Reason: "KubeletHasSufficientPID", + Message: fmt.Sprintf("kubelet has sufficient PID available"), + LastHeartbeatTime: now, + LastTransitionTime: now, + }, + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: now, + LastTransitionTime: now, + }, + }, + NodeInfo: v1.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: cadvisortest.FakeKernelVersion, + OSImage: cadvisortest.FakeContainerOsVersion, + OperatingSystem: goruntime.GOOS, + Architecture: goruntime.GOARCH, + ContainerRuntimeVersion: "test://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + Addresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "127.0.0.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + // images will be sorted from max to min in node status. + Images: []v1.ContainerImage{ + { + Names: []string{"k8s.gcr.io:v1", "k8s.gcr.io:v2"}, + SizeBytes: 123, + }, + { + Names: []string{"k8s.gcr.io:v3", "k8s.gcr.io:v4"}, + SizeBytes: 456, + }, + }, + }, + } + + // Update node status when node status is created. + // Report node status. + kubelet.updateRuntimeUp() + assert.NoError(t, kubelet.updateNodeStatus()) + + actions := kubeClient.Actions() + assert.Len(t, actions, 2) + assert.IsType(t, core.GetActionImpl{}, actions[0]) + assert.IsType(t, core.PatchActionImpl{}, actions[1]) + patchAction := actions[1].(core.PatchActionImpl) + + updatedNode, err := applyNodeStatusPatch(existingNode, patchAction.GetPatch()) + require.NoError(t, err) + for _, cond := range updatedNode.Status.Conditions { + cond.LastHeartbeatTime = cond.LastHeartbeatTime.Rfc3339Copy() + cond.LastTransitionTime = cond.LastTransitionTime.Rfc3339Copy() + } + assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode)) + + // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 + assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type, + "NodeReady should be the last condition") + + // Update node status again when nothing is changed (except heatbeat time). + // Report node status if it has exceeded the duration of nodeStatusReportFrequency. + clock.Step(time.Minute) + assert.NoError(t, kubelet.updateNodeStatus()) + + // 2 more action (There were 2 actions before). + actions = kubeClient.Actions() + assert.Len(t, actions, 4) + assert.IsType(t, core.GetActionImpl{}, actions[2]) + assert.IsType(t, core.PatchActionImpl{}, actions[3]) + patchAction = actions[3].(core.PatchActionImpl) + + updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch()) + require.NoError(t, err) + for _, cond := range updatedNode.Status.Conditions { + cond.LastHeartbeatTime = cond.LastHeartbeatTime.Rfc3339Copy() + cond.LastTransitionTime = cond.LastTransitionTime.Rfc3339Copy() + } + + // Expect LastHearbeat updated, other things unchanged. + for i, cond := range expectedNode.Status.Conditions { + expectedNode.Status.Conditions[i].LastHeartbeatTime = metav1.NewTime(cond.LastHeartbeatTime.Time.Add(time.Minute)).Rfc3339Copy() + } + assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode)) + + // Update node status again when nothing is changed (except heatbeat time). + // Do not report node status if it is within the duration of nodeStatusReportFrequency. + clock.Step(10 * time.Second) + assert.NoError(t, kubelet.updateNodeStatus()) + + // Only 1 more action (There were 4 actions before). + actions = kubeClient.Actions() + assert.Len(t, actions, 5) + assert.IsType(t, core.GetActionImpl{}, actions[4]) + + // Update node status again when something is changed. + // Report node status even if it is still within the duration of nodeStatusReportFrequency. + clock.Step(10 * time.Second) + var newMemoryCapacity int64 = 40E9 + kubelet.machineInfo.MemoryCapacity = uint64(newMemoryCapacity) + assert.NoError(t, kubelet.updateNodeStatus()) + + // 2 more action (There were 5 actions before). + actions = kubeClient.Actions() + assert.Len(t, actions, 7) + assert.IsType(t, core.GetActionImpl{}, actions[5]) + assert.IsType(t, core.PatchActionImpl{}, actions[6]) + patchAction = actions[6].(core.PatchActionImpl) + + updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch()) + require.NoError(t, err) + memCapacity, _ := updatedNode.Status.Capacity[v1.ResourceMemory] + updatedMemoryCapacity, _ := (&memCapacity).AsInt64() + assert.Equal(t, newMemoryCapacity, updatedMemoryCapacity, "Memory capacity") + + now = metav1.NewTime(clock.Now()).Rfc3339Copy() + for _, cond := range updatedNode.Status.Conditions { + // Expect LastHearbeat updated, while LastTransitionTime unchanged. + assert.Equal(t, now, cond.LastHeartbeatTime.Rfc3339Copy(), + "LastHeartbeatTime for condition %v", cond.Type) + assert.Equal(t, now, metav1.NewTime(cond.LastTransitionTime.Time.Add(time.Minute+20*time.Second)).Rfc3339Copy(), + "LastTransitionTime for condition %v", cond.Type) + } + + // Update node status when changing pod CIDR. + // Report node status if it is still within the duration of nodeStatusReportFrequency. + clock.Step(10 * time.Second) + assert.Equal(t, "", kubelet.runtimeState.podCIDR(), "Pod CIDR should be empty") + podCIDR := "10.0.0.0/24" + updatedNode.Spec.PodCIDR = podCIDR + kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*updatedNode}}).ReactionChain + assert.NoError(t, kubelet.updateNodeStatus()) + assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now") + // 2 more action (There were 7 actions before). + actions = kubeClient.Actions() + assert.Len(t, actions, 9) + assert.IsType(t, core.GetActionImpl{}, actions[7]) + assert.IsType(t, core.PatchActionImpl{}, actions[8]) + patchAction = actions[8].(core.PatchActionImpl) + + // Update node status when keeping the pod CIDR. + // Do not report node status if it is within the duration of nodeStatusReportFrequency. + clock.Step(10 * time.Second) + assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated") + assert.NoError(t, kubelet.updateNodeStatus()) + // Only 1 more action (There were 9 actions before). + actions = kubeClient.Actions() + assert.Len(t, actions, 10) + assert.IsType(t, core.GetActionImpl{}, actions[9]) +} + func TestRegisterWithApiServer(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() @@ -1529,3 +1763,159 @@ func TestRegisterWithApiServerWithTaint(t *testing.T) { return }) } + +func TestNodeStatusHasChanged(t *testing.T) { + fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) + fakeFuture := metav1.Time{Time: fakeNow.Time.Add(time.Minute)} + readyCondition := v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionTrue, + LastHeartbeatTime: fakeNow, + LastTransitionTime: fakeNow, + } + readyConditionAtDiffHearbeatTime := v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionTrue, + LastHeartbeatTime: fakeFuture, + LastTransitionTime: fakeNow, + } + readyConditionAtDiffTransitionTime := v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionTrue, + LastHeartbeatTime: fakeFuture, + LastTransitionTime: fakeFuture, + } + notReadyCondition := v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionFalse, + LastHeartbeatTime: fakeNow, + LastTransitionTime: fakeNow, + } + memoryPressureCondition := v1.NodeCondition{ + Type: v1.NodeMemoryPressure, + Status: v1.ConditionFalse, + LastHeartbeatTime: fakeNow, + LastTransitionTime: fakeNow, + } + testcases := []struct { + name string + originalStatus *v1.NodeStatus + status *v1.NodeStatus + expectChange bool + }{ + { + name: "Node status does not change with nil status.", + originalStatus: nil, + status: nil, + expectChange: false, + }, + { + name: "Node status does not change with default status.", + originalStatus: &v1.NodeStatus{}, + status: &v1.NodeStatus{}, + expectChange: false, + }, + { + name: "Node status changes with nil and default status.", + originalStatus: nil, + status: &v1.NodeStatus{}, + expectChange: true, + }, + { + name: "Node status changes with nil and status.", + originalStatus: nil, + status: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, + }, + expectChange: true, + }, + { + name: "Node status does not change with empty conditions.", + originalStatus: &v1.NodeStatus{Conditions: []v1.NodeCondition{}}, + status: &v1.NodeStatus{Conditions: []v1.NodeCondition{}}, + expectChange: false, + }, + { + name: "Node status does not change", + originalStatus: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, + }, + status: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, + }, + expectChange: false, + }, + { + name: "Node status does not change even if heartbeat time changes.", + originalStatus: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, + }, + status: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyConditionAtDiffHearbeatTime, memoryPressureCondition}, + }, + expectChange: false, + }, + { + name: "Node status does not change even if the orders of conditions are different.", + originalStatus: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, + }, + status: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{memoryPressureCondition, readyConditionAtDiffHearbeatTime}, + }, + expectChange: false, + }, + { + name: "Node status changes if condition status differs.", + originalStatus: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, + }, + status: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{notReadyCondition, memoryPressureCondition}, + }, + expectChange: true, + }, + { + name: "Node status changes if transition time changes.", + originalStatus: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, + }, + status: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyConditionAtDiffTransitionTime, memoryPressureCondition}, + }, + expectChange: true, + }, + { + name: "Node status changes with different number of conditions.", + originalStatus: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyCondition}, + }, + status: &v1.NodeStatus{ + Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition}, + }, + expectChange: true, + }, + { + name: "Node status changes with different phase.", + originalStatus: &v1.NodeStatus{ + Phase: v1.NodePending, + Conditions: []v1.NodeCondition{readyCondition}, + }, + status: &v1.NodeStatus{ + Phase: v1.NodeRunning, + Conditions: []v1.NodeCondition{readyCondition}, + }, + expectChange: true, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + originalStatusCopy := tc.originalStatus.DeepCopy() + statusCopy := tc.status.DeepCopy() + changed := nodeStatusHasChanged(tc.originalStatus, tc.status) + assert.Equal(t, tc.expectChange, changed, "Expect node status change to be %t, but got %t.", tc.expectChange, changed) + assert.True(t, apiequality.Semantic.DeepEqual(originalStatusCopy, tc.originalStatus), "%s", diff.ObjectDiff(originalStatusCopy, tc.originalStatus)) + assert.True(t, apiequality.Semantic.DeepEqual(statusCopy, tc.status), "%s", diff.ObjectDiff(statusCopy, tc.status)) + }) + } +} diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/types.go b/staging/src/k8s.io/kubelet/config/v1beta1/types.go index 281298f029d..4abe387773d 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/types.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/types.go @@ -292,9 +292,11 @@ type KubeletConfiguration struct { // Default: "4h" // +optional StreamingConnectionIdleTimeout metav1.Duration `json:"streamingConnectionIdleTimeout,omitempty"` - // nodeStatusUpdateFrequency is the frequency that kubelet posts node - // status to master. Note: be cautious when changing the constant, it - // must work with nodeMonitorGracePeriod in nodecontroller. + // nodeStatusUpdateFrequency is the frequency that kubelet computes node + // status. If node lease feature is not enabled, it is also the frequency that + // kubelet posts node status to master. + // Note: When node lease feature is not enabled, be cautious when changing the + // constant, it must work with nodeMonitorGracePeriod in nodecontroller. // Dynamic Kubelet Config (beta): If dynamically updating this field, consider that // it may impact node scalability, and also that the node controller's // nodeMonitorGracePeriod must be set to N*NodeStatusUpdateFrequency, @@ -303,6 +305,16 @@ type KubeletConfiguration struct { // Default: "10s" // +optional NodeStatusUpdateFrequency metav1.Duration `json:"nodeStatusUpdateFrequency,omitempty"` + // nodeStatusReportFrequency is the frequency that kubelet posts node + // status to master if node status does not change. Kubelet will ignore this + // frequency and post node status immediately if any change is detected. It is + // only used when node lease feature is enabled. nodeStatusReportFrequency's + // default value is 1m. But if nodeStatusUpdateFrequency is set explicitly, + // nodeStatusReportFrequency's default value will be set to + // nodeStatusUpdateFrequency for backward compatibility. + // Default: "1m" + // +optional + NodeStatusReportFrequency metav1.Duration `json:"nodeStatusReportFrequency,omitempty"` // nodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease, // when the NodeLease feature is enabled. This feature provides an indicator of node // health by having the Kublet create and periodically renew a lease, named after the node, diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go b/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go index 37be9cb1843..a924fac2ec3 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go @@ -143,6 +143,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { } out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency + out.NodeStatusReportFrequency = in.NodeStatusReportFrequency out.ImageMinimumGCAge = in.ImageMinimumGCAge if in.ImageGCHighThresholdPercent != nil { in, out := &in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent diff --git a/test/e2e/common/BUILD b/test/e2e/common/BUILD index 495e99258a4..87fc58f4c1a 100644 --- a/test/e2e/common/BUILD +++ b/test/e2e/common/BUILD @@ -44,6 +44,7 @@ go_library( ], importpath = "k8s.io/kubernetes/test/e2e/common", deps = [ + "//pkg/api/v1/node:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", diff --git a/test/e2e/common/node_lease.go b/test/e2e/common/node_lease.go index 66a37c58bb1..633b4e4f4c2 100644 --- a/test/e2e/common/node_lease.go +++ b/test/e2e/common/node_lease.go @@ -23,7 +23,9 @@ import ( coordv1beta1 "k8s.io/api/coordination/v1beta1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + v1node "k8s.io/kubernetes/pkg/api/v1/node" "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" @@ -31,33 +33,41 @@ import ( ) var _ = framework.KubeDescribe("[Feature:NodeLease][NodeAlphaFeature:NodeLease]", func() { + var nodeName string f := framework.NewDefaultFramework("node-lease-test") + + BeforeEach(func() { + nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet) + Expect(len(nodes.Items)).NotTo(BeZero()) + nodeName = nodes.Items[0].ObjectMeta.Name + }) + Context("when the NodeLease feature is enabled", func() { - It("the Kubelet should create and update a lease in the kube-node-lease namespace", func() { + It("the kubelet should create and update a lease in the kube-node-lease namespace", func() { leaseClient := f.ClientSet.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease) var ( err error lease *coordv1beta1.Lease ) - // check that lease for this Kubelet exists in the kube-node-lease namespace + By("check that lease for this Kubelet exists in the kube-node-lease namespace") Eventually(func() error { - lease, err = leaseClient.Get(framework.TestContext.NodeName, metav1.GetOptions{}) + lease, err = leaseClient.Get(nodeName, metav1.GetOptions{}) if err != nil { return err } return nil }, 5*time.Minute, 5*time.Second).Should(BeNil()) // check basic expectations for the lease - Expect(expectLease(lease)).To(BeNil()) - // ensure that at least one lease renewal happens within the - // lease duration by checking for a change to renew time + Expect(expectLease(lease, nodeName)).To(BeNil()) + + By("check that node lease is updated at least once within the lease duration") Eventually(func() error { - newLease, err := leaseClient.Get(framework.TestContext.NodeName, metav1.GetOptions{}) + newLease, err := leaseClient.Get(nodeName, metav1.GetOptions{}) if err != nil { return err } // check basic expectations for the latest lease - if err := expectLease(newLease); err != nil { + if err := expectLease(newLease, nodeName); err != nil { return err } // check that RenewTime has been updated on the latest lease @@ -68,12 +78,76 @@ var _ = framework.KubeDescribe("[Feature:NodeLease][NodeAlphaFeature:NodeLease]" } return nil }, time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second, - time.Duration(*lease.Spec.LeaseDurationSeconds/3)*time.Second) + time.Duration(*lease.Spec.LeaseDurationSeconds/4)*time.Second) + }) + + It("the kubelet should report node status infrequently", func() { + By("wait until node is ready") + framework.WaitForNodeToBeReady(f.ClientSet, nodeName, 5*time.Minute) + + By("wait until there is node lease") + var err error + var lease *coordv1beta1.Lease + Eventually(func() error { + lease, err = f.ClientSet.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease).Get(nodeName, metav1.GetOptions{}) + if err != nil { + return err + } + return nil + }, 5*time.Minute, 5*time.Second).Should(BeNil()) + // check basic expectations for the lease + Expect(expectLease(lease, nodeName)).To(BeNil()) + leaseDuration := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second + + By("verify NodeStatus report period is longer than lease duration") + // NodeStatus is reported from node to master when there is some change or + // enough time has passed. So for here, keep checking the time diff + // between 2 NodeStatus report, until it is longer than lease duration ( + // the same as nodeMonitorGracePeriod). + heartbeatTime := getNextReadyConditionHeartbeatTime(f.ClientSet, nodeName, metav1.Time{}) + Eventually(func() error { + nextHeartbeatTime := getNextReadyConditionHeartbeatTime(f.ClientSet, nodeName, heartbeatTime) + + if nextHeartbeatTime.Time.After(heartbeatTime.Time.Add(leaseDuration)) { + return nil + } + heartbeatTime = nextHeartbeatTime + return fmt.Errorf("node status report period is shorter than lease duration") + + // Enter next round immediately. + }, 5*time.Minute, time.Nanosecond).Should(BeNil()) + + By("verify node is still in ready status even though node status report is infrequent") + // This check on node status is only meaningful when this e2e test is + // running as cluster e2e test, because node e2e test does not create and + // run controller manager, i.e., no node lifecycle controller. + node, err := f.ClientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + Expect(err).To(BeNil()) + _, readyCondition := v1node.GetNodeCondition(&node.Status, corev1.NodeReady) + Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue)) }) }) }) -func expectLease(lease *coordv1beta1.Lease) error { +func getNextReadyConditionHeartbeatTime(clientSet clientset.Interface, nodeName string, prevHeartbeatTime metav1.Time) metav1.Time { + var newHeartbeatTime metav1.Time + Eventually(func() error { + node, err := clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + if err != nil { + return err + } + _, readyCondition := v1node.GetNodeCondition(&node.Status, corev1.NodeReady) + Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue)) + newHeartbeatTime = readyCondition.LastHeartbeatTime + if prevHeartbeatTime.Before(&newHeartbeatTime) { + return nil + } + return fmt.Errorf("heartbeat has not changed yet") + }, 5*time.Minute, 5*time.Second).Should(BeNil()) + return newHeartbeatTime +} + +func expectLease(lease *coordv1beta1.Lease, nodeName string) error { // expect values for HolderIdentity, LeaseDurationSeconds, and RenewTime if lease.Spec.HolderIdentity == nil { return fmt.Errorf("Spec.HolderIdentity should not be nil") @@ -85,8 +159,8 @@ func expectLease(lease *coordv1beta1.Lease) error { return fmt.Errorf("Spec.RenewTime should not be nil") } // ensure that the HolderIdentity matches the node name - if *lease.Spec.HolderIdentity != framework.TestContext.NodeName { - return fmt.Errorf("Spec.HolderIdentity (%v) should match the node name (%v)", *lease.Spec.HolderIdentity, framework.TestContext.NodeName) + if *lease.Spec.HolderIdentity != nodeName { + return fmt.Errorf("Spec.HolderIdentity (%v) should match the node name (%v)", *lease.Spec.HolderIdentity, nodeName) } return nil }