diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index ebfdd22ae0f..41e648b4ee2 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -22,7 +22,6 @@ import ( "math" "net" goruntime "runtime" - "strings" "time" "github.com/golang/glog" @@ -646,84 +645,6 @@ func (kl *Kubelet) setNodeStatusInfo(node *v1.Node) { } } -// Set Ready condition for the node. -func (kl *Kubelet) setNodeReadyCondition(node *v1.Node) { - // NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions. - // This is due to an issue with version skewed kubelet and master components. - // ref: https://github.com/kubernetes/kubernetes/issues/16961 - currentTime := metav1.NewTime(kl.clock.Now()) - newNodeReadyCondition := v1.NodeCondition{ - Type: v1.NodeReady, - Status: v1.ConditionTrue, - Reason: "KubeletReady", - Message: "kubelet is posting ready status", - LastHeartbeatTime: currentTime, - } - rs := append(kl.runtimeState.runtimeErrors(), kl.runtimeState.networkErrors()...) - requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods} - if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { - requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage) - } - missingCapacities := []string{} - for _, resource := range requiredCapacities { - if _, found := node.Status.Capacity[resource]; !found { - missingCapacities = append(missingCapacities, string(resource)) - } - } - if len(missingCapacities) > 0 { - rs = append(rs, fmt.Sprintf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", "))) - } - if len(rs) > 0 { - newNodeReadyCondition = v1.NodeCondition{ - Type: v1.NodeReady, - Status: v1.ConditionFalse, - Reason: "KubeletNotReady", - Message: strings.Join(rs, ","), - LastHeartbeatTime: currentTime, - } - } - // Append AppArmor status if it's enabled. - // TODO(tallclair): This is a temporary message until node feature reporting is added. - if newNodeReadyCondition.Status == v1.ConditionTrue && - kl.appArmorValidator != nil && kl.appArmorValidator.ValidateHost() == nil { - newNodeReadyCondition.Message = fmt.Sprintf("%s. AppArmor enabled", newNodeReadyCondition.Message) - } - - // Record any soft requirements that were not met in the container manager. - status := kl.containerManager.Status() - if status.SoftRequirements != nil { - newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error()) - } - - readyConditionUpdated := false - needToRecordEvent := false - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == v1.NodeReady { - if node.Status.Conditions[i].Status == newNodeReadyCondition.Status { - newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime - } else { - newNodeReadyCondition.LastTransitionTime = currentTime - needToRecordEvent = true - } - node.Status.Conditions[i] = newNodeReadyCondition - readyConditionUpdated = true - break - } - } - if !readyConditionUpdated { - newNodeReadyCondition.LastTransitionTime = currentTime - node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition) - } - if needToRecordEvent { - if newNodeReadyCondition.Status == v1.ConditionTrue { - kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeReady) - } else { - kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeNotReady) - glog.Infof("Node became not ready: %+v", newNodeReadyCondition) - } - } -} - // record if node schedulable change. func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) { kl.lastNodeUnschedulableLock.Lock() @@ -786,6 +707,10 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { if kl.cloud != nil { nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses } + var validateHostFunc func() error + if kl.appArmorValidator != nil { + validateHostFunc = kl.appArmorValidator.ValidateHost + } return []func(*v1.Node) error{ nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc), withoutError(kl.setNodeStatusInfo), @@ -793,7 +718,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent), nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent), nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent), - withoutError(kl.setNodeReadyCondition), + nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent), withoutError(kl.setNodeVolumesInUseStatus), withoutError(kl.recordNodeSchedulableEvent), } diff --git a/pkg/kubelet/nodestatus/BUILD b/pkg/kubelet/nodestatus/BUILD index 0fe196b85b9..a405e615a5c 100644 --- a/pkg/kubelet/nodestatus/BUILD +++ b/pkg/kubelet/nodestatus/BUILD @@ -7,10 +7,14 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/cloudprovider:go_default_library", + "//pkg/features:go_default_library", "//pkg/kubelet/apis:go_default_library", + "//pkg/kubelet/cm:go_default_library", + "//pkg/kubelet/events:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) @@ -35,8 +39,11 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/cloudprovider/providers/fake:go_default_library", + "//pkg/kubelet/cm:go_default_library", + "//pkg/kubelet/events:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", diff --git a/pkg/kubelet/nodestatus/setters.go b/pkg/kubelet/nodestatus/setters.go index 0a547ba678f..5e0a0ecb29c 100644 --- a/pkg/kubelet/nodestatus/setters.go +++ b/pkg/kubelet/nodestatus/setters.go @@ -19,13 +19,18 @@ package nodestatus import ( "fmt" "net" + "strings" "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilnet "k8s.io/apimachinery/pkg/util/net" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/features" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + "k8s.io/kubernetes/pkg/kubelet/cm" + "k8s.io/kubernetes/pkg/kubelet/events" "github.com/golang/glog" ) @@ -142,6 +147,95 @@ func NodeAddress(nodeIP net.IP, // typically Kubelet.nodeIP } } +// ReadyCondition returns a Setter that updates the v1.NodeReady condition on the node. +func ReadyCondition( + nowFunc func() time.Time, // typically Kubelet.clock.Now + runtimeErrorsFunc func() []string, // typically Kubelet.runtimeState.runtimeErrors + networkErrorsFunc func() []string, // typically Kubelet.runtimeState.networkErrors + appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator + cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status + recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent +) Setter { + return func(node *v1.Node) error { + // NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions. + // This is due to an issue with version skewed kubelet and master components. + // ref: https://github.com/kubernetes/kubernetes/issues/16961 + currentTime := metav1.NewTime(nowFunc()) + newNodeReadyCondition := v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionTrue, + Reason: "KubeletReady", + Message: "kubelet is posting ready status", + LastHeartbeatTime: currentTime, + } + rs := append(runtimeErrorsFunc(), networkErrorsFunc()...) + requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods} + if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { + requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage) + } + missingCapacities := []string{} + for _, resource := range requiredCapacities { + if _, found := node.Status.Capacity[resource]; !found { + missingCapacities = append(missingCapacities, string(resource)) + } + } + if len(missingCapacities) > 0 { + rs = append(rs, fmt.Sprintf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", "))) + } + if len(rs) > 0 { + newNodeReadyCondition = v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionFalse, + Reason: "KubeletNotReady", + Message: strings.Join(rs, ","), + LastHeartbeatTime: currentTime, + } + } + // Append AppArmor status if it's enabled. + // TODO(tallclair): This is a temporary message until node feature reporting is added. + if appArmorValidateHostFunc != nil && newNodeReadyCondition.Status == v1.ConditionTrue { + if err := appArmorValidateHostFunc(); err == nil { + newNodeReadyCondition.Message = fmt.Sprintf("%s. AppArmor enabled", newNodeReadyCondition.Message) + } + } + + // Record any soft requirements that were not met in the container manager. + status := cmStatusFunc() + if status.SoftRequirements != nil { + newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error()) + } + + readyConditionUpdated := false + needToRecordEvent := false + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == v1.NodeReady { + if node.Status.Conditions[i].Status == newNodeReadyCondition.Status { + newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime + } else { + newNodeReadyCondition.LastTransitionTime = currentTime + needToRecordEvent = true + } + node.Status.Conditions[i] = newNodeReadyCondition + readyConditionUpdated = true + break + } + } + if !readyConditionUpdated { + newNodeReadyCondition.LastTransitionTime = currentTime + node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition) + } + if needToRecordEvent { + if newNodeReadyCondition.Status == v1.ConditionTrue { + recordEventFunc(v1.EventTypeNormal, events.NodeReady) + } else { + recordEventFunc(v1.EventTypeNormal, events.NodeNotReady) + glog.Infof("Node became not ready: %+v", newNodeReadyCondition) + } + } + return nil + } +} + // MemoryPressureCondition returns a Setter that updates the v1.NodeMemoryPressure condition on the node. func MemoryPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderMemoryPressure diff --git a/pkg/kubelet/nodestatus/setters_test.go b/pkg/kubelet/nodestatus/setters_test.go index 19a3180b97b..ee17e2de02a 100644 --- a/pkg/kubelet/nodestatus/setters_test.go +++ b/pkg/kubelet/nodestatus/setters_test.go @@ -17,15 +17,20 @@ limitations under the License. package nodestatus import ( + "fmt" "net" "sort" "testing" + "time" "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/diff" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" + "k8s.io/kubernetes/pkg/kubelet/cm" + "k8s.io/kubernetes/pkg/kubelet/events" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -185,6 +190,175 @@ func TestNodeAddress(t *testing.T) { } } +func TestReadyCondition(t *testing.T) { + now := time.Now() + before := now.Add(-time.Second) + nowFunc := func() time.Time { return now } + + withCapacity := &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(100, resource.DecimalSI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + }, + } + + cases := []struct { + desc string + node *v1.Node + runtimeErrors []string + networkErrors []string + appArmorValidateHostFunc func() error + cmStatus cm.Status + expectConditions []v1.NodeCondition + expectEvents []testEvent + }{ + { + desc: "new, ready", + node: withCapacity.DeepCopy(), + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)}, + // TODO(mtaufen): The current behavior is that we don't send an event for the initial NodeReady condition, + // the reason for this is unclear, so we may want to actually send an event, and change these test cases + // to ensure an event is sent. + }, + { + desc: "new, ready: apparmor validator passed", + node: withCapacity.DeepCopy(), + appArmorValidateHostFunc: func() error { return nil }, + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. AppArmor enabled", now, now)}, + }, + { + desc: "new, ready: apparmor validator failed", + node: withCapacity.DeepCopy(), + appArmorValidateHostFunc: func() error { return fmt.Errorf("foo") }, + // absence of an additional message is understood to mean that AppArmor is disabled + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)}, + }, + { + desc: "new, ready: soft requirement warning", + node: withCapacity.DeepCopy(), + cmStatus: cm.Status{ + SoftRequirements: fmt.Errorf("foo"), + }, + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. WARNING: foo", now, now)}, + }, + { + desc: "new, not ready: runtime errors", + node: withCapacity.DeepCopy(), + runtimeErrors: []string{"foo", "bar"}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo,bar", now, now)}, + }, + { + desc: "new, not ready: network errors", + node: withCapacity.DeepCopy(), + networkErrors: []string{"foo", "bar"}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo,bar", now, now)}, + }, + { + desc: "new, not ready: runtime and network errors", + node: withCapacity.DeepCopy(), + runtimeErrors: []string{"runtime"}, + networkErrors: []string{"network"}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "runtime,network", now, now)}, + }, + { + desc: "new, not ready: missing capacities", + node: &v1.Node{}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "Missing node capacity for resources: cpu, memory, pods, ephemeral-storage", now, now)}, + }, + // the transition tests ensure timestamps are set correctly, no need to test the entire condition matrix in this section + { + desc: "transition to ready", + node: func() *v1.Node { + node := withCapacity.DeepCopy() + node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(false, "", before, before)} + return node + }(), + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: events.NodeReady, + }, + }, + }, + { + desc: "transition to not ready", + node: func() *v1.Node { + node := withCapacity.DeepCopy() + node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(true, "", before, before)} + return node + }(), + runtimeErrors: []string{"foo"}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo", now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: events.NodeNotReady, + }, + }, + }, + { + desc: "ready, no transition", + node: func() *v1.Node { + node := withCapacity.DeepCopy() + node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(true, "", before, before)} + return node + }(), + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", before, now)}, + expectEvents: []testEvent{}, + }, + { + desc: "not ready, no transition", + node: func() *v1.Node { + node := withCapacity.DeepCopy() + node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(false, "", before, before)} + return node + }(), + runtimeErrors: []string{"foo"}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo", before, now)}, + expectEvents: []testEvent{}, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + runtimeErrorsFunc := func() []string { + return tc.runtimeErrors + } + networkErrorsFunc := func() []string { + return tc.networkErrors + } + cmStatusFunc := func() cm.Status { + return tc.cmStatus + } + events := []testEvent{} + recordEventFunc := func(eventType, event string) { + events = append(events, testEvent{ + eventType: eventType, + event: event, + }) + } + // construct setter + setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc) + // call setter on node + if err := setter(tc.node); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check expected condition + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectConditions, tc.node.Status.Conditions), + "Diff: %s", diff.ObjectDiff(tc.expectConditions, tc.node.Status.Conditions)) + // check expected events + require.Equal(t, len(tc.expectEvents), len(events)) + for i := range tc.expectEvents { + assert.Equal(t, tc.expectEvents[i], events[i]) + } + }) + } +} + func TestMemoryPressureCondition(t *testing.T) { now := time.Now() before := now.Add(-time.Second) @@ -569,6 +743,27 @@ type testEvent struct { event string } +func makeReadyCondition(ready bool, message string, transition, heartbeat time.Time) *v1.NodeCondition { + if ready { + return &v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionTrue, + Reason: "KubeletReady", + Message: message, + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } + } + return &v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionFalse, + Reason: "KubeletNotReady", + Message: message, + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } +} + func makeMemoryPressureCondition(pressure bool, transition, heartbeat time.Time) *v1.NodeCondition { if pressure { return &v1.NodeCondition{