diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 914fccb036f..c5474872841 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -10,7 +10,6 @@ go_library( name = "go_default_library", srcs = [ "active_deadline.go", - "cloud_request_manager.go", "doc.go", "kubelet.go", "kubelet_getters.go", @@ -48,6 +47,7 @@ go_library( "//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/certificate:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/cloudresource:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/configmap:go_default_library", @@ -66,6 +66,7 @@ go_library( "//pkg/kubelet/metrics/collectors:go_default_library", "//pkg/kubelet/mountpod:go_default_library", "//pkg/kubelet/network/dns:go_default_library", + "//pkg/kubelet/nodestatus:go_default_library", "//pkg/kubelet/pleg:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/preemption:go_default_library", @@ -102,7 +103,6 @@ go_library( "//pkg/util/node:go_default_library", "//pkg/util/oom:go_default_library", "//pkg/util/removeall:go_default_library", - "//pkg/version:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/csi:go_default_library", "//pkg/volume/util:go_default_library", @@ -119,7 +119,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", @@ -147,7 +146,6 @@ go_test( name = "go_default_test", srcs = [ "active_deadline_test.go", - "cloud_request_manager_test.go", "kubelet_getters_test.go", "kubelet_network_test.go", "kubelet_node_status_test.go", @@ -166,7 +164,6 @@ go_test( deps = [ "//pkg/apis/core/install:go_default_library", "//pkg/capabilities:go_default_library", - "//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/cadvisor/testing:go_default_library", @@ -180,6 +177,7 @@ go_test( "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/logs:go_default_library", "//pkg/kubelet/network/dns:go_default_library", + "//pkg/kubelet/nodestatus:go_default_library", "//pkg/kubelet/pleg:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/pod/testing:go_default_library", @@ -255,6 +253,7 @@ filegroup( "//pkg/kubelet/checkpoint:all-srcs", "//pkg/kubelet/checkpointmanager:all-srcs", "//pkg/kubelet/client:all-srcs", + "//pkg/kubelet/cloudresource:all-srcs", "//pkg/kubelet/cm:all-srcs", "//pkg/kubelet/config:all-srcs", "//pkg/kubelet/configmap:all-srcs", @@ -273,6 +272,7 @@ filegroup( "//pkg/kubelet/metrics:all-srcs", "//pkg/kubelet/mountpod:all-srcs", "//pkg/kubelet/network:all-srcs", + "//pkg/kubelet/nodestatus:all-srcs", "//pkg/kubelet/pleg:all-srcs", "//pkg/kubelet/pod:all-srcs", "//pkg/kubelet/preemption:all-srcs", diff --git a/pkg/kubelet/cloudresource/BUILD b/pkg/kubelet/cloudresource/BUILD new file mode 100644 index 00000000000..ec93978d78b --- /dev/null +++ b/pkg/kubelet/cloudresource/BUILD @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["cloud_request_manager.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/cloudresource", + visibility = ["//visibility:public"], + deps = [ + "//pkg/cloudprovider:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["cloud_request_manager_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/cloudprovider/providers/fake:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/cloud_request_manager.go b/pkg/kubelet/cloudresource/cloud_request_manager.go similarity index 89% rename from pkg/kubelet/cloud_request_manager.go rename to pkg/kubelet/cloudresource/cloud_request_manager.go index 58752bf19d3..f7ba8a197c4 100644 --- a/pkg/kubelet/cloud_request_manager.go +++ b/pkg/kubelet/cloudresource/cloud_request_manager.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kubelet +package cloudresource import ( "context" @@ -32,6 +32,14 @@ import ( var nodeAddressesRetryPeriod = 5 * time.Second +// SyncManager is an interface for making requests to a cloud provider +type SyncManager interface { + Run(stopCh <-chan struct{}) + NodeAddresses() ([]v1.NodeAddress, error) +} + +var _ SyncManager = &cloudResourceSyncManager{} + type cloudResourceSyncManager struct { // Cloud provider interface. cloud cloudprovider.Interface @@ -45,9 +53,9 @@ type cloudResourceSyncManager struct { nodeName types.NodeName } -// NewCloudResourceSyncManager creates a manager responsible for collecting resources +// NewSyncManager creates a manager responsible for collecting resources // from a cloud provider through requests that are sensitive to timeouts and hanging -func NewCloudResourceSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) *cloudResourceSyncManager { +func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager { return &cloudResourceSyncManager{ cloud: cloud, syncPeriod: syncPeriod, diff --git a/pkg/kubelet/cloud_request_manager_test.go b/pkg/kubelet/cloudresource/cloud_request_manager_test.go similarity index 96% rename from pkg/kubelet/cloud_request_manager_test.go rename to pkg/kubelet/cloudresource/cloud_request_manager_test.go index f29293c504f..a0777f5bf5c 100644 --- a/pkg/kubelet/cloud_request_manager_test.go +++ b/pkg/kubelet/cloudresource/cloud_request_manager_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kubelet +package cloudresource import ( "fmt" @@ -64,7 +64,7 @@ func TestNodeAddressesRequest(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - manager := NewCloudResourceSyncManager(cloud, "defaultNode", syncPeriod) + manager := NewSyncManager(cloud, "defaultNode", syncPeriod).(*cloudResourceSyncManager) go manager.Run(stopCh) nodeAddresses, err := collectNodeAddresses(manager) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b4877b2ec96..8f0828b748a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -61,6 +61,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cadvisor" kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cloudresource" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/configmap" @@ -521,7 +522,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } if klet.cloud != nil { - klet.cloudResourceSyncManager = NewCloudResourceSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency) + klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency) } var secretManager secret.Manager @@ -789,7 +790,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity) - klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() // setup eviction manager evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock) @@ -836,6 +836,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, // Finally, put the most recent version of the config on the Kubelet, so // people can see how it was configured. klet.kubeletConfiguration = *kubeCfg + + // Generating the status funcs should be the last thing we do, + // since this relies on the rest of the Kubelet having been constructed. + klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() + return klet, nil } @@ -962,7 +967,7 @@ type Kubelet struct { // Cloud provider interface. cloud cloudprovider.Interface // Handles requests to cloud provider with timeout - cloudResourceSyncManager *cloudResourceSyncManager + cloudResourceSyncManager cloudresource.SyncManager // Indicates that the node initialization happens in an external cloud controller externalCloudProvider bool diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 21d6d82ee52..ce55bb53ece 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -19,10 +19,8 @@ package kubelet import ( "context" "fmt" - "math" "net" goruntime "runtime" - "strings" "time" "github.com/golang/glog" @@ -31,28 +29,20 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - utilnet "k8s.io/apimachinery/pkg/util/net" utilfeature "k8s.io/apiserver/pkg/util/feature" k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/features" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/nodestatus" "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/scheduler/algorithm" nodeutil "k8s.io/kubernetes/pkg/util/node" - "k8s.io/kubernetes/pkg/version" volutil "k8s.io/kubernetes/pkg/volume/util" ) -const ( - // maxNamesPerImageInNodeStatus is max number of names per image stored in - // the node status. - maxNamesPerImageInNodeStatus = 5 -) - // registerWithAPIServer registers the node with the cluster master. It is safe // to call multiple times, but not concurrently (kl.registrationCompleted is // not locked). @@ -342,30 +332,6 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { return node, nil } -// setVolumeLimits updates volume limits on the node -func (kl *Kubelet) setVolumeLimits(node *v1.Node) { - if node.Status.Capacity == nil { - node.Status.Capacity = v1.ResourceList{} - } - - if node.Status.Allocatable == nil { - node.Status.Allocatable = v1.ResourceList{} - } - - pluginWithLimits := kl.volumePluginMgr.ListVolumePluginWithLimits() - for _, volumePlugin := range pluginWithLimits { - attachLimits, err := volumePlugin.GetVolumeLimits() - if err != nil { - glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName()) - continue - } - for limitKey, value := range attachLimits { - node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI) - node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI) - } - } -} - // syncNodeStatus should be called periodically from a goroutine. // It synchronizes node status to master, registering the kubelet first if // necessary. @@ -398,8 +364,7 @@ func (kl *Kubelet) updateNodeStatus() error { return fmt.Errorf("update node status exceeds retry count") } -// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 -// is set, this function will also confirm that cbr0 is configured correctly. +// tryUpdateNodeStatus tries to update node status to master. func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { // In large clusters, GET and PUT operations on Node objects coming // from here are the majority of load on apiserver and etcd. @@ -447,591 +412,13 @@ func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) { kl.recorder.Eventf(kl.nodeRef, eventType, event, "Node %s status is now: %s", kl.nodeName, event) } -// Set IP and hostname addresses for the node. -func (kl *Kubelet) setNodeAddress(node *v1.Node) error { - if kl.nodeIP != nil { - if err := kl.nodeIPValidator(kl.nodeIP); err != nil { - return fmt.Errorf("failed to validate nodeIP: %v", err) - } - glog.V(2).Infof("Using node IP: %q", kl.nodeIP.String()) - } - - if kl.externalCloudProvider { - if kl.nodeIP != nil { - if node.ObjectMeta.Annotations == nil { - node.ObjectMeta.Annotations = make(map[string]string) - } - node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr] = kl.nodeIP.String() - } - // We rely on the external cloud provider to supply the addresses. - return nil - } - if kl.cloud != nil { - nodeAddresses, err := kl.cloudResourceSyncManager.NodeAddresses() - if err != nil { - return err - } - - if kl.nodeIP != nil { - enforcedNodeAddresses := []v1.NodeAddress{} - - var nodeIPType v1.NodeAddressType - for _, nodeAddress := range nodeAddresses { - if nodeAddress.Address == kl.nodeIP.String() { - enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address}) - nodeIPType = nodeAddress.Type - break - } - } - if len(enforcedNodeAddresses) > 0 { - for _, nodeAddress := range nodeAddresses { - if nodeAddress.Type != nodeIPType && nodeAddress.Type != v1.NodeHostName { - enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address}) - } - } - - enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: kl.GetHostname()}) - node.Status.Addresses = enforcedNodeAddresses - return nil - } - return fmt.Errorf("failed to get node address from cloud provider that matches ip: %v", kl.nodeIP) - } - - // Only add a NodeHostName address if the cloudprovider did not specify any addresses. - // (we assume the cloudprovider is authoritative if it specifies any addresses) - if len(nodeAddresses) == 0 { - nodeAddresses = []v1.NodeAddress{{Type: v1.NodeHostName, Address: kl.GetHostname()}} - } - node.Status.Addresses = nodeAddresses - } else { - var ipAddr net.IP - var err error - - // 1) Use nodeIP if set - // 2) If the user has specified an IP to HostnameOverride, use it - // 3) Lookup the IP from node name by DNS and use the first valid IPv4 address. - // If the node does not have a valid IPv4 address, use the first valid IPv6 address. - // 4) Try to get the IP from the network interface used as default gateway - if kl.nodeIP != nil { - ipAddr = kl.nodeIP - } else if addr := net.ParseIP(kl.hostname); addr != nil { - ipAddr = addr - } else { - var addrs []net.IP - addrs, _ = net.LookupIP(node.Name) - for _, addr := range addrs { - if err = kl.nodeIPValidator(addr); err == nil { - if addr.To4() != nil { - ipAddr = addr - break - } - if addr.To16() != nil && ipAddr == nil { - ipAddr = addr - } - } - } - - if ipAddr == nil { - ipAddr, err = utilnet.ChooseHostInterface() - } - } - - if ipAddr == nil { - // We tried everything we could, but the IP address wasn't fetchable; error out - return fmt.Errorf("can't get ip address of node %s. error: %v", node.Name, err) - } - node.Status.Addresses = []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: ipAddr.String()}, - {Type: v1.NodeHostName, Address: kl.GetHostname()}, - } - } - return nil -} - -func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { - // Note: avoid blindly overwriting the capacity in case opaque - // resources are being advertised. - if node.Status.Capacity == nil { - node.Status.Capacity = v1.ResourceList{} - } - - var devicePluginAllocatable v1.ResourceList - var devicePluginCapacity v1.ResourceList - var removedDevicePlugins []string - - // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start - // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. - info, err := kl.GetCachedMachineInfo() - if err != nil { - // TODO(roberthbailey): This is required for test-cmd.sh to pass. - // See if the test should be updated instead. - node.Status.Capacity[v1.ResourceCPU] = *resource.NewMilliQuantity(0, resource.DecimalSI) - node.Status.Capacity[v1.ResourceMemory] = resource.MustParse("0Gi") - node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(int64(kl.maxPods), resource.DecimalSI) - glog.Errorf("Error getting machine info: %v", err) - } else { - node.Status.NodeInfo.MachineID = info.MachineID - node.Status.NodeInfo.SystemUUID = info.SystemUUID - - for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) { - node.Status.Capacity[rName] = rCap - } - - if kl.podsPerCore > 0 { - node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity( - int64(math.Min(float64(info.NumCores*kl.podsPerCore), float64(kl.maxPods))), resource.DecimalSI) - } else { - node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity( - int64(kl.maxPods), resource.DecimalSI) - } - - if node.Status.NodeInfo.BootID != "" && - node.Status.NodeInfo.BootID != info.BootID { - // TODO: This requires a transaction, either both node status is updated - // and event is recorded or neither should happen, see issue #6055. - kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.NodeRebooted, - "Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID) - } - node.Status.NodeInfo.BootID = info.BootID - - if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { - // TODO: all the node resources should use GetCapacity instead of deriving the - // capacity for every node status request - initialCapacity := kl.containerManager.GetCapacity() - if initialCapacity != nil { - node.Status.Capacity[v1.ResourceEphemeralStorage] = initialCapacity[v1.ResourceEphemeralStorage] - } - } - - devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = kl.containerManager.GetDevicePluginResourceCapacity() - if devicePluginCapacity != nil { - for k, v := range devicePluginCapacity { - if old, ok := node.Status.Capacity[k]; !ok || old.Value() != v.Value() { - glog.V(2).Infof("Update capacity for %s to %d", k, v.Value()) - } - node.Status.Capacity[k] = v - } - } - - for _, removedResource := range removedDevicePlugins { - glog.V(2).Infof("Set capacity for %s to 0 on device removal", removedResource) - // Set the capacity of the removed resource to 0 instead of - // removing the resource from the node status. This is to indicate - // that the resource is managed by device plugin and had been - // registered before. - // - // This is required to differentiate the device plugin managed - // resources and the cluster-level resources, which are absent in - // node status. - node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI) - } - } - - // Set Allocatable. - if node.Status.Allocatable == nil { - node.Status.Allocatable = make(v1.ResourceList) - } - // Remove extended resources from allocatable that are no longer - // present in capacity. - for k := range node.Status.Allocatable { - _, found := node.Status.Capacity[k] - if !found && v1helper.IsExtendedResourceName(k) { - delete(node.Status.Allocatable, k) - } - } - allocatableReservation := kl.containerManager.GetNodeAllocatableReservation() - for k, v := range node.Status.Capacity { - value := *(v.Copy()) - if res, exists := allocatableReservation[k]; exists { - value.Sub(res) - } - if value.Sign() < 0 { - // Negative Allocatable resources don't make sense. - value.Set(0) - } - node.Status.Allocatable[k] = value - } - - if devicePluginAllocatable != nil { - for k, v := range devicePluginAllocatable { - if old, ok := node.Status.Allocatable[k]; !ok || old.Value() != v.Value() { - glog.V(2).Infof("Update allocatable for %s to %d", k, v.Value()) - } - node.Status.Allocatable[k] = v - } - } - // for every huge page reservation, we need to remove it from allocatable memory - for k, v := range node.Status.Capacity { - if v1helper.IsHugePageResourceName(k) { - allocatableMemory := node.Status.Allocatable[v1.ResourceMemory] - value := *(v.Copy()) - allocatableMemory.Sub(value) - if allocatableMemory.Sign() < 0 { - // Negative Allocatable resources don't make sense. - allocatableMemory.Set(0) - } - node.Status.Allocatable[v1.ResourceMemory] = allocatableMemory - } - } -} - -// Set versioninfo for the node. -func (kl *Kubelet) setNodeStatusVersionInfo(node *v1.Node) { - verinfo, err := kl.cadvisor.VersionInfo() - if err != nil { - glog.Errorf("Error getting version info: %v", err) - return - } - - node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion - node.Status.NodeInfo.OSImage = verinfo.ContainerOsVersion - - runtimeVersion := "Unknown" - if runtimeVer, err := kl.containerRuntime.Version(); err == nil { - runtimeVersion = runtimeVer.String() - } - node.Status.NodeInfo.ContainerRuntimeVersion = fmt.Sprintf("%s://%s", kl.containerRuntime.Type(), runtimeVersion) - - node.Status.NodeInfo.KubeletVersion = version.Get().String() - // TODO: kube-proxy might be different version from kubelet in the future - node.Status.NodeInfo.KubeProxyVersion = version.Get().String() -} - -// Set daemonEndpoints for the node. -func (kl *Kubelet) setNodeStatusDaemonEndpoints(node *v1.Node) { - node.Status.DaemonEndpoints = *kl.daemonEndpoints -} - -// Set images list for the node -func (kl *Kubelet) setNodeStatusImages(node *v1.Node) { - // Update image list of this node - var imagesOnNode []v1.ContainerImage - containerImages, err := kl.imageManager.GetImageList() - if err != nil { - glog.Errorf("Error getting image list: %v", err) - node.Status.Images = imagesOnNode - return - } - // sort the images from max to min, and only set top N images into the node status. - if int(kl.nodeStatusMaxImages) > -1 && - int(kl.nodeStatusMaxImages) < len(containerImages) { - containerImages = containerImages[0:kl.nodeStatusMaxImages] - } - - for _, image := range containerImages { - names := append(image.RepoDigests, image.RepoTags...) - // Report up to maxNamesPerImageInNodeStatus names per image. - if len(names) > maxNamesPerImageInNodeStatus { - names = names[0:maxNamesPerImageInNodeStatus] - } - imagesOnNode = append(imagesOnNode, v1.ContainerImage{ - Names: names, - SizeBytes: image.Size, - }) - } - - node.Status.Images = imagesOnNode -} - -// Set the GOOS and GOARCH for this node -func (kl *Kubelet) setNodeStatusGoRuntime(node *v1.Node) { - node.Status.NodeInfo.OperatingSystem = goruntime.GOOS - node.Status.NodeInfo.Architecture = goruntime.GOARCH -} - -// Set status for the node. -func (kl *Kubelet) setNodeStatusInfo(node *v1.Node) { - kl.setNodeStatusMachineInfo(node) - kl.setNodeStatusVersionInfo(node) - kl.setNodeStatusDaemonEndpoints(node) - kl.setNodeStatusImages(node) - kl.setNodeStatusGoRuntime(node) - if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { - kl.setVolumeLimits(node) - } -} - -// Set Ready condition for the node. -func (kl *Kubelet) setNodeReadyCondition(node *v1.Node) { - // NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions. - // This is due to an issue with version skewed kubelet and master components. - // ref: https://github.com/kubernetes/kubernetes/issues/16961 - currentTime := metav1.NewTime(kl.clock.Now()) - newNodeReadyCondition := v1.NodeCondition{ - Type: v1.NodeReady, - Status: v1.ConditionTrue, - Reason: "KubeletReady", - Message: "kubelet is posting ready status", - LastHeartbeatTime: currentTime, - } - rs := append(kl.runtimeState.runtimeErrors(), kl.runtimeState.networkErrors()...) - requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods} - if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { - requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage) - } - missingCapacities := []string{} - for _, resource := range requiredCapacities { - if _, found := node.Status.Capacity[resource]; !found { - missingCapacities = append(missingCapacities, string(resource)) - } - } - if len(missingCapacities) > 0 { - rs = append(rs, fmt.Sprintf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", "))) - } - if len(rs) > 0 { - newNodeReadyCondition = v1.NodeCondition{ - Type: v1.NodeReady, - Status: v1.ConditionFalse, - Reason: "KubeletNotReady", - Message: strings.Join(rs, ","), - LastHeartbeatTime: currentTime, - } - } - // Append AppArmor status if it's enabled. - // TODO(tallclair): This is a temporary message until node feature reporting is added. - if newNodeReadyCondition.Status == v1.ConditionTrue && - kl.appArmorValidator != nil && kl.appArmorValidator.ValidateHost() == nil { - newNodeReadyCondition.Message = fmt.Sprintf("%s. AppArmor enabled", newNodeReadyCondition.Message) - } - - // Record any soft requirements that were not met in the container manager. - status := kl.containerManager.Status() - if status.SoftRequirements != nil { - newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error()) - } - - readyConditionUpdated := false - needToRecordEvent := false - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == v1.NodeReady { - if node.Status.Conditions[i].Status == newNodeReadyCondition.Status { - newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime - } else { - newNodeReadyCondition.LastTransitionTime = currentTime - needToRecordEvent = true - } - node.Status.Conditions[i] = newNodeReadyCondition - readyConditionUpdated = true - break - } - } - if !readyConditionUpdated { - newNodeReadyCondition.LastTransitionTime = currentTime - node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition) - } - if needToRecordEvent { - if newNodeReadyCondition.Status == v1.ConditionTrue { - kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeReady) - } else { - kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeNotReady) - glog.Infof("Node became not ready: %+v", newNodeReadyCondition) - } - } -} - -// setNodeMemoryPressureCondition for the node. -// TODO: this needs to move somewhere centralized... -func (kl *Kubelet) setNodeMemoryPressureCondition(node *v1.Node) { - currentTime := metav1.NewTime(kl.clock.Now()) - var condition *v1.NodeCondition - - // Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update. - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == v1.NodeMemoryPressure { - condition = &node.Status.Conditions[i] - } - } - - newCondition := false - // If the NodeMemoryPressure condition doesn't exist, create one - if condition == nil { - condition = &v1.NodeCondition{ - Type: v1.NodeMemoryPressure, - Status: v1.ConditionUnknown, - } - // cannot be appended to node.Status.Conditions here because it gets - // copied to the slice. So if we append to the slice here none of the - // updates we make below are reflected in the slice. - newCondition = true - } - - // Update the heartbeat time - condition.LastHeartbeatTime = currentTime - - // Note: The conditions below take care of the case when a new NodeMemoryPressure condition is - // created and as well as the case when the condition already exists. When a new condition - // is created its status is set to v1.ConditionUnknown which matches either - // condition.Status != v1.ConditionTrue or - // condition.Status != v1.ConditionFalse in the conditions below depending on whether - // the kubelet is under memory pressure or not. - if kl.evictionManager.IsUnderMemoryPressure() { - if condition.Status != v1.ConditionTrue { - condition.Status = v1.ConditionTrue - condition.Reason = "KubeletHasInsufficientMemory" - condition.Message = "kubelet has insufficient memory available" - condition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasInsufficientMemory") - } - } else if condition.Status != v1.ConditionFalse { - condition.Status = v1.ConditionFalse - condition.Reason = "KubeletHasSufficientMemory" - condition.Message = "kubelet has sufficient memory available" - condition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasSufficientMemory") - } - - if newCondition { - node.Status.Conditions = append(node.Status.Conditions, *condition) - } -} - -// setNodePIDPressureCondition for the node. -// TODO: this needs to move somewhere centralized... -func (kl *Kubelet) setNodePIDPressureCondition(node *v1.Node) { - currentTime := metav1.NewTime(kl.clock.Now()) - var condition *v1.NodeCondition - - // Check if NodePIDPressure condition already exists and if it does, just pick it up for update. - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == v1.NodePIDPressure { - condition = &node.Status.Conditions[i] - } - } - - newCondition := false - // If the NodePIDPressure condition doesn't exist, create one - if condition == nil { - condition = &v1.NodeCondition{ - Type: v1.NodePIDPressure, - Status: v1.ConditionUnknown, - } - // cannot be appended to node.Status.Conditions here because it gets - // copied to the slice. So if we append to the slice here none of the - // updates we make below are reflected in the slice. - newCondition = true - } - - // Update the heartbeat time - condition.LastHeartbeatTime = currentTime - - // Note: The conditions below take care of the case when a new NodePIDPressure condition is - // created and as well as the case when the condition already exists. When a new condition - // is created its status is set to v1.ConditionUnknown which matches either - // condition.Status != v1.ConditionTrue or - // condition.Status != v1.ConditionFalse in the conditions below depending on whether - // the kubelet is under PID pressure or not. - if kl.evictionManager.IsUnderPIDPressure() { - if condition.Status != v1.ConditionTrue { - condition.Status = v1.ConditionTrue - condition.Reason = "KubeletHasInsufficientPID" - condition.Message = "kubelet has insufficient PID available" - condition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasInsufficientPID") - } - } else if condition.Status != v1.ConditionFalse { - condition.Status = v1.ConditionFalse - condition.Reason = "KubeletHasSufficientPID" - condition.Message = "kubelet has sufficient PID available" - condition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasSufficientPID") - } - - if newCondition { - node.Status.Conditions = append(node.Status.Conditions, *condition) - } -} - -// setNodeDiskPressureCondition for the node. -// TODO: this needs to move somewhere centralized... -func (kl *Kubelet) setNodeDiskPressureCondition(node *v1.Node) { - currentTime := metav1.NewTime(kl.clock.Now()) - var condition *v1.NodeCondition - - // Check if NodeDiskPressure condition already exists and if it does, just pick it up for update. - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == v1.NodeDiskPressure { - condition = &node.Status.Conditions[i] - } - } - - newCondition := false - // If the NodeDiskPressure condition doesn't exist, create one - if condition == nil { - condition = &v1.NodeCondition{ - Type: v1.NodeDiskPressure, - Status: v1.ConditionUnknown, - } - // cannot be appended to node.Status.Conditions here because it gets - // copied to the slice. So if we append to the slice here none of the - // updates we make below are reflected in the slice. - newCondition = true - } - - // Update the heartbeat time - condition.LastHeartbeatTime = currentTime - - // Note: The conditions below take care of the case when a new NodeDiskPressure condition is - // created and as well as the case when the condition already exists. When a new condition - // is created its status is set to v1.ConditionUnknown which matches either - // condition.Status != v1.ConditionTrue or - // condition.Status != v1.ConditionFalse in the conditions below depending on whether - // the kubelet is under disk pressure or not. - if kl.evictionManager.IsUnderDiskPressure() { - if condition.Status != v1.ConditionTrue { - condition.Status = v1.ConditionTrue - condition.Reason = "KubeletHasDiskPressure" - condition.Message = "kubelet has disk pressure" - condition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasDiskPressure") - } - } else if condition.Status != v1.ConditionFalse { - condition.Status = v1.ConditionFalse - condition.Reason = "KubeletHasNoDiskPressure" - condition.Message = "kubelet has no disk pressure" - condition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasNoDiskPressure") - } - - if newCondition { - node.Status.Conditions = append(node.Status.Conditions, *condition) - } -} - -// Set OODCondition for the node. -func (kl *Kubelet) setNodeOODCondition(node *v1.Node) { - currentTime := metav1.NewTime(kl.clock.Now()) - var nodeOODCondition *v1.NodeCondition - - // Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update. - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == v1.NodeOutOfDisk { - nodeOODCondition = &node.Status.Conditions[i] - } - } - - newOODCondition := nodeOODCondition == nil - if newOODCondition { - nodeOODCondition = &v1.NodeCondition{} - } - if nodeOODCondition.Status != v1.ConditionFalse { - nodeOODCondition.Type = v1.NodeOutOfDisk - nodeOODCondition.Status = v1.ConditionFalse - nodeOODCondition.Reason = "KubeletHasSufficientDisk" - nodeOODCondition.Message = "kubelet has sufficient disk space available" - nodeOODCondition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasSufficientDisk") - } - - // Update the heartbeat time irrespective of all the conditions. - nodeOODCondition.LastHeartbeatTime = currentTime - - if newOODCondition { - node.Status.Conditions = append(node.Status.Conditions, *nodeOODCondition) - } +// recordEvent records an event for this node, the Kubelet's nodeRef is passed to the recorder +func (kl *Kubelet) recordEvent(eventType, event, message string) { + kl.recorder.Eventf(kl.nodeRef, eventType, event, message) } // record if node schedulable change. -func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) { +func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) error { kl.lastNodeUnschedulableLock.Lock() defer kl.lastNodeUnschedulableLock.Unlock() if kl.lastNodeUnschedulable != node.Spec.Unschedulable { @@ -1042,15 +429,7 @@ func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) { } kl.lastNodeUnschedulable = node.Spec.Unschedulable } -} - -// Update VolumesInUse field in Node Status only after states are synced up at least once -// in volume reconciler. -func (kl *Kubelet) setNodeVolumesInUseStatus(node *v1.Node) { - // Make sure to only update node status after reconciler starts syncing up states - if kl.volumeManager.ReconcilerStatesHasBeenSynced() { - node.Status.VolumesInUse = kl.volumeManager.GetVolumesInUse() - } + return nil } // setNodeStatus fills in the Status fields of the given Node, overwriting @@ -1080,24 +459,42 @@ func (kl *Kubelet) getLastObservedNodeAddresses() []v1.NodeAddress { // defaultNodeStatusFuncs is a factory that generates the default set of // setNodeStatus funcs func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { - // initial set of node status update handlers, can be modified by Option's - withoutError := func(f func(*v1.Node)) func(*v1.Node) error { - return func(n *v1.Node) error { - f(n) - return nil - } + // if cloud is not nil, we expect the cloud resource sync manager to exist + var nodeAddressesFunc func() ([]v1.NodeAddress, error) + if kl.cloud != nil { + nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses } - return []func(*v1.Node) error{ - kl.setNodeAddress, - withoutError(kl.setNodeStatusInfo), - withoutError(kl.setNodeOODCondition), - withoutError(kl.setNodeMemoryPressureCondition), - withoutError(kl.setNodeDiskPressureCondition), - withoutError(kl.setNodePIDPressureCondition), - withoutError(kl.setNodeReadyCondition), - withoutError(kl.setNodeVolumesInUseStatus), - withoutError(kl.recordNodeSchedulableEvent), + var validateHostFunc func() error + if kl.appArmorValidator != nil { + validateHostFunc = kl.appArmorValidator.ValidateHost } + var setters []func(n *v1.Node) error + setters = append(setters, + nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc), + nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity, + kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent), + nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version), + nodestatus.DaemonEndpoints(kl.daemonEndpoints), + nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList), + nodestatus.GoRuntime(), + ) + if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { + setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits)) + } + setters = append(setters, + nodestatus.OutOfDiskCondition(kl.clock.Now, kl.recordNodeStatusEvent), + nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent), + nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent), + nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent), + nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent), + nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse), + // TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event + // and record state back to the Kubelet runtime object. In the future, I'd like to isolate + // these side-effects by decoupling the decisions to send events and partial status recording + // from the Node setters. + kl.recordNodeSchedulableEvent, + ) + return setters } // Validate given node IP belongs to the current host diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 6cdc26e9e8f..229feece4fa 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -46,11 +46,11 @@ import ( v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" core "k8s.io/client-go/testing" - fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/nodestatus" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume/util" @@ -85,7 +85,7 @@ func makeExpectedImageList(imageList []kubecontainer.Image, maxImages int) []v1. var expectedImageList []v1.ContainerImage for _, kubeImage := range imageList { apiImage := v1.ContainerImage{ - Names: kubeImage.RepoTags[0:maxNamesPerImageInNodeStatus], + Names: kubeImage.RepoTags[0:nodestatus.MaxNamesPerImageInNodeStatus], SizeBytes: kubeImage.Size, } @@ -100,9 +100,9 @@ func makeExpectedImageList(imageList []kubecontainer.Image, maxImages int) []v1. func generateImageTags() []string { var tagList []string - // Generate > maxNamesPerImageInNodeStatus tags so that the test can verify - // that kubelet report up to maxNamesPerImageInNodeStatus tags. - count := rand.IntnRange(maxNamesPerImageInNodeStatus+1, maxImageTagsForTest+1) + // Generate > MaxNamesPerImageInNodeStatus tags so that the test can verify + // that kubelet report up to MaxNamesPerImageInNodeStatus tags. + count := rand.IntnRange(nodestatus.MaxNamesPerImageInNodeStatus+1, maxImageTagsForTest+1) for ; count > 0; count-- { tagList = append(tagList, "k8s.gcr.io:v"+strconv.Itoa(count)) } @@ -140,160 +140,6 @@ func (lcm *localCM) GetCapacity() v1.ResourceList { return lcm.capacity } -func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - defer testKubelet.Cleanup() - kubelet := testKubelet.kubelet - kubelet.kubeClient = nil // ensure only the heartbeat client is used - kubelet.hostname = testKubeletHostname - - cases := []struct { - name string - nodeIP net.IP - nodeAddresses []v1.NodeAddress - expectedAddresses []v1.NodeAddress - shouldError bool - }{ - { - name: "A single InternalIP", - nodeIP: net.ParseIP("10.1.1.1"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - shouldError: false, - }, - { - name: "NodeIP is external", - nodeIP: net.ParseIP("55.55.55.55"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - shouldError: false, - }, - { - // Accommodating #45201 and #49202 - name: "InternalIP and ExternalIP are the same", - nodeIP: net.ParseIP("55.55.55.55"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - shouldError: false, - }, - { - name: "An Internal/ExternalIP, an Internal/ExternalDNS", - nodeIP: net.ParseIP("10.1.1.1"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"}, - {Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"}, - {Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - shouldError: false, - }, - { - name: "An Internal with multiple internal IPs", - nodeIP: net.ParseIP("10.1.1.1"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeInternalIP, Address: "10.2.2.2"}, - {Type: v1.NodeInternalIP, Address: "10.3.3.3"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - shouldError: false, - }, - { - name: "An InternalIP that isn't valid: should error", - nodeIP: net.ParseIP("10.2.2.2"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: nil, - shouldError: true, - }, - } - for _, testCase := range cases { - // testCase setup - existingNode := v1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)}, - Spec: v1.NodeSpec{}, - } - - kubelet.nodeIP = testCase.nodeIP - - fakeCloud := &fakecloud.FakeCloud{ - Addresses: testCase.nodeAddresses, - Err: nil, - } - kubelet.cloud = fakeCloud - kubelet.cloudResourceSyncManager = NewCloudResourceSyncManager(kubelet.cloud, kubelet.nodeName, kubelet.nodeStatusUpdateFrequency) - stopCh := make(chan struct{}) - go kubelet.cloudResourceSyncManager.Run(stopCh) - kubelet.nodeIPValidator = func(nodeIP net.IP) error { - return nil - } - - // execute method - err := kubelet.setNodeAddress(&existingNode) - close(stopCh) - if err != nil && !testCase.shouldError { - t.Errorf("Unexpected error for test %s: %q", testCase.name, err) - continue - } else if err != nil && testCase.shouldError { - // expected an error - continue - } - - // Sort both sets for consistent equality - sortNodeAddresses(testCase.expectedAddresses) - sortNodeAddresses(existingNode.Status.Addresses) - - assert.True( - t, - apiequality.Semantic.DeepEqual( - testCase.expectedAddresses, - existingNode.Status.Addresses, - ), - fmt.Sprintf("Test %s failed %%s", testCase.name), - diff.ObjectDiff(testCase.expectedAddresses, existingNode.Status.Addresses), - ) - } -} - // sortableNodeAddress is a type for sorting []v1.NodeAddress type sortableNodeAddress []v1.NodeAddress @@ -350,6 +196,10 @@ func TestUpdateNewNodeStatus(t *testing.T) { v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, } + // Since this test retroactively overrides the stub container manager, + // we have to regenerate default status setters. + kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() + kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain @@ -483,6 +333,9 @@ func TestUpdateExistingNodeStatus(t *testing.T) { v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), }, } + // Since this test retroactively overrides the stub container manager, + // we have to regenerate default status setters. + kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ @@ -749,6 +602,9 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { v1.ResourceEphemeralStorage: *resource.NewQuantity(20E9, resource.BinarySI), }, } + // Since this test retroactively overrides the stub container manager, + // we have to regenerate default status setters. + kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() clock := testKubelet.fakeClock kubeClient := testKubelet.fakeKubeClient @@ -1190,6 +1046,10 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) { v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI), }, } + // Since this test retroactively overrides the stub container manager, + // we have to regenerate default status setters. + kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() + kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain @@ -1641,85 +1501,3 @@ func TestValidateNodeIPParam(t *testing.T) { } } } - -func TestSetVolumeLimits(t *testing.T) { - testKubelet := newTestKubeletWithoutFakeVolumePlugin(t, false /* controllerAttachDetachEnabled */) - defer testKubelet.Cleanup() - kubelet := testKubelet.kubelet - kubelet.kubeClient = nil // ensure only the heartbeat client is used - kubelet.hostname = testKubeletHostname - - var testcases = []struct { - name string - cloudProviderName string - expectedVolumeKey string - expectedLimit int64 - }{ - { - name: "For default GCE cloudprovider", - cloudProviderName: "gce", - expectedVolumeKey: util.GCEVolumeLimitKey, - expectedLimit: 16, - }, - { - name: "For default AWS Cloudprovider", - cloudProviderName: "aws", - expectedVolumeKey: util.EBSVolumeLimitKey, - expectedLimit: 39, - }, - { - name: "for default Azure cloudprovider", - cloudProviderName: "azure", - expectedVolumeKey: util.AzureVolumeLimitKey, - expectedLimit: 16, - }, - { - name: "when no cloudprovider is present", - cloudProviderName: "", - expectedVolumeKey: util.AzureVolumeLimitKey, - expectedLimit: -1, - }, - } - for _, test := range testcases { - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)}, - Spec: v1.NodeSpec{}, - } - - if test.cloudProviderName != "" { - fakeCloud := &fakecloud.FakeCloud{ - Provider: test.cloudProviderName, - Err: nil, - } - kubelet.cloud = fakeCloud - } else { - kubelet.cloud = nil - } - - kubelet.setVolumeLimits(node) - nodeLimits := []v1.ResourceList{} - nodeLimits = append(nodeLimits, node.Status.Allocatable) - nodeLimits = append(nodeLimits, node.Status.Capacity) - for _, volumeLimits := range nodeLimits { - if test.expectedLimit == -1 { - _, ok := volumeLimits[v1.ResourceName(test.expectedVolumeKey)] - if ok { - t.Errorf("Expected no volume limit found for %s", test.expectedVolumeKey) - } - } else { - fl, ok := volumeLimits[v1.ResourceName(test.expectedVolumeKey)] - - if !ok { - t.Errorf("Expected to found volume limit for %s found none", test.expectedVolumeKey) - } - foundLimit, _ := fl.AsInt64() - expectedValue := resource.NewQuantity(test.expectedLimit, resource.DecimalSI) - if expectedValue.Cmp(fl) != 0 { - t.Errorf("Expected volume limit for %s to be %v found %v", test.expectedVolumeKey, test.expectedLimit, foundLimit) - } - } - - } - - } -} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index e5416c4a71e..bdf54d6d873 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -292,7 +292,6 @@ func newTestKubeletWithImageList( // Relist period does not affect the tests. kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, clock.RealClock{}) kubelet.clock = fakeClock - kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() nodeRef := &v1.ObjectReference{ Kind: "Node", @@ -338,6 +337,8 @@ func newTestKubeletWithImageList( false, /* experimentalCheckNodeCapabilitiesBeforeMount*/ false /* keepTerminatedPodVolumes */) + kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() + // enable active deadline handler activeDeadlineHandler, err := newActiveDeadlineHandler(kubelet.statusManager, kubelet.recorder, kubelet.clock) require.NoError(t, err, "Can't initialize active deadline handler") diff --git a/pkg/kubelet/nodestatus/BUILD b/pkg/kubelet/nodestatus/BUILD new file mode 100644 index 00000000000..d8c5f632d8e --- /dev/null +++ b/pkg/kubelet/nodestatus/BUILD @@ -0,0 +1,68 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["setters.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/nodestatus", + visibility = ["//visibility:public"], + deps = [ + "//pkg/apis/core/v1/helper:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/features:go_default_library", + "//pkg/kubelet/apis:go_default_library", + "//pkg/kubelet/cadvisor:go_default_library", + "//pkg/kubelet/cm:go_default_library", + "//pkg/kubelet/container:go_default_library", + "//pkg/kubelet/events:go_default_library", + "//pkg/version:go_default_library", + "//pkg/volume:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/google/cadvisor/info/v1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["setters_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/cloudprovider/providers/fake:go_default_library", + "//pkg/kubelet/cm:go_default_library", + "//pkg/kubelet/container:go_default_library", + "//pkg/kubelet/container/testing:go_default_library", + "//pkg/kubelet/events:go_default_library", + "//pkg/kubelet/util/sliceutils:go_default_library", + "//pkg/version:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/testing:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", + "//vendor/github.com/google/cadvisor/info/v1:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", + ], +) diff --git a/pkg/kubelet/nodestatus/setters.go b/pkg/kubelet/nodestatus/setters.go new file mode 100644 index 00000000000..abe6970ba24 --- /dev/null +++ b/pkg/kubelet/nodestatus/setters.go @@ -0,0 +1,739 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodestatus + +import ( + "fmt" + "math" + "net" + goruntime "runtime" + "strings" + "time" + + cadvisorapiv1 "github.com/google/cadvisor/info/v1" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilnet "k8s.io/apimachinery/pkg/util/net" + utilfeature "k8s.io/apiserver/pkg/util/feature" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/features" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + "k8s.io/kubernetes/pkg/kubelet/cadvisor" + "k8s.io/kubernetes/pkg/kubelet/cm" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/version" + "k8s.io/kubernetes/pkg/volume" + + "github.com/golang/glog" +) + +const ( + // MaxNamesPerImageInNodeStatus is max number of names + // per image stored in the node status. + MaxNamesPerImageInNodeStatus = 5 +) + +// Setter modifies the node in-place, and returns an error if the modification failed. +// Setters may partially mutate the node before returning an error. +type Setter func(node *v1.Node) error + +// NodeAddress returns a Setter that updates address-related information on the node. +func NodeAddress(nodeIP net.IP, // typically Kubelet.nodeIP + validateNodeIPFunc func(net.IP) error, // typically Kubelet.nodeIPValidator + hostname string, // typically Kubelet.hostname + externalCloudProvider bool, // typically Kubelet.externalCloudProvider + cloud cloudprovider.Interface, // typically Kubelet.cloud + nodeAddressesFunc func() ([]v1.NodeAddress, error), // typically Kubelet.cloudResourceSyncManager.NodeAddresses +) Setter { + return func(node *v1.Node) error { + if nodeIP != nil { + if err := validateNodeIPFunc(nodeIP); err != nil { + return fmt.Errorf("failed to validate nodeIP: %v", err) + } + glog.V(2).Infof("Using node IP: %q", nodeIP.String()) + } + + if externalCloudProvider { + if nodeIP != nil { + if node.ObjectMeta.Annotations == nil { + node.ObjectMeta.Annotations = make(map[string]string) + } + node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr] = nodeIP.String() + } + // We rely on the external cloud provider to supply the addresses. + return nil + } + if cloud != nil { + nodeAddresses, err := nodeAddressesFunc() + if err != nil { + return err + } + if nodeIP != nil { + enforcedNodeAddresses := []v1.NodeAddress{} + + var nodeIPType v1.NodeAddressType + for _, nodeAddress := range nodeAddresses { + if nodeAddress.Address == nodeIP.String() { + enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address}) + nodeIPType = nodeAddress.Type + break + } + } + if len(enforcedNodeAddresses) > 0 { + for _, nodeAddress := range nodeAddresses { + if nodeAddress.Type != nodeIPType && nodeAddress.Type != v1.NodeHostName { + enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address}) + } + } + + enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: hostname}) + node.Status.Addresses = enforcedNodeAddresses + return nil + } + return fmt.Errorf("failed to get node address from cloud provider that matches ip: %v", nodeIP) + } + + // Only add a NodeHostName address if the cloudprovider did not specify any addresses. + // (we assume the cloudprovider is authoritative if it specifies any addresses) + if len(nodeAddresses) == 0 { + nodeAddresses = []v1.NodeAddress{{Type: v1.NodeHostName, Address: hostname}} + } + node.Status.Addresses = nodeAddresses + } else { + var ipAddr net.IP + var err error + + // 1) Use nodeIP if set + // 2) If the user has specified an IP to HostnameOverride, use it + // 3) Lookup the IP from node name by DNS and use the first valid IPv4 address. + // If the node does not have a valid IPv4 address, use the first valid IPv6 address. + // 4) Try to get the IP from the network interface used as default gateway + if nodeIP != nil { + ipAddr = nodeIP + } else if addr := net.ParseIP(hostname); addr != nil { + ipAddr = addr + } else { + var addrs []net.IP + addrs, _ = net.LookupIP(node.Name) + for _, addr := range addrs { + if err = validateNodeIPFunc(addr); err == nil { + if addr.To4() != nil { + ipAddr = addr + break + } + if addr.To16() != nil && ipAddr == nil { + ipAddr = addr + } + } + } + + if ipAddr == nil { + ipAddr, err = utilnet.ChooseHostInterface() + } + } + + if ipAddr == nil { + // We tried everything we could, but the IP address wasn't fetchable; error out + return fmt.Errorf("can't get ip address of node %s. error: %v", node.Name, err) + } + node.Status.Addresses = []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: ipAddr.String()}, + {Type: v1.NodeHostName, Address: hostname}, + } + } + return nil + } +} + +// MachineInfo returns a Setter that updates machine-related information on the node. +func MachineInfo(nodeName string, + maxPods int, + podsPerCore int, + machineInfoFunc func() (*cadvisorapiv1.MachineInfo, error), // typically Kubelet.GetCachedMachineInfo + capacityFunc func() v1.ResourceList, // typically Kubelet.containerManager.GetCapacity + devicePluginResourceCapacityFunc func() (v1.ResourceList, v1.ResourceList, []string), // typically Kubelet.containerManager.GetDevicePluginResourceCapacity + nodeAllocatableReservationFunc func() v1.ResourceList, // typically Kubelet.containerManager.GetNodeAllocatableReservation + recordEventFunc func(eventType, event, message string), // typically Kubelet.recordEvent +) Setter { + return func(node *v1.Node) error { + // Note: avoid blindly overwriting the capacity in case opaque + // resources are being advertised. + if node.Status.Capacity == nil { + node.Status.Capacity = v1.ResourceList{} + } + + var devicePluginAllocatable v1.ResourceList + var devicePluginCapacity v1.ResourceList + var removedDevicePlugins []string + + // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start + // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. + info, err := machineInfoFunc() + if err != nil { + // TODO(roberthbailey): This is required for test-cmd.sh to pass. + // See if the test should be updated instead. + node.Status.Capacity[v1.ResourceCPU] = *resource.NewMilliQuantity(0, resource.DecimalSI) + node.Status.Capacity[v1.ResourceMemory] = resource.MustParse("0Gi") + node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(int64(maxPods), resource.DecimalSI) + glog.Errorf("Error getting machine info: %v", err) + } else { + node.Status.NodeInfo.MachineID = info.MachineID + node.Status.NodeInfo.SystemUUID = info.SystemUUID + + for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) { + node.Status.Capacity[rName] = rCap + } + + if podsPerCore > 0 { + node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity( + int64(math.Min(float64(info.NumCores*podsPerCore), float64(maxPods))), resource.DecimalSI) + } else { + node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity( + int64(maxPods), resource.DecimalSI) + } + + if node.Status.NodeInfo.BootID != "" && + node.Status.NodeInfo.BootID != info.BootID { + // TODO: This requires a transaction, either both node status is updated + // and event is recorded or neither should happen, see issue #6055. + recordEventFunc(v1.EventTypeWarning, events.NodeRebooted, + fmt.Sprintf("Node %s has been rebooted, boot id: %s", nodeName, info.BootID)) + } + node.Status.NodeInfo.BootID = info.BootID + + if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { + // TODO: all the node resources should use ContainerManager.GetCapacity instead of deriving the + // capacity for every node status request + initialCapacity := capacityFunc() + if initialCapacity != nil { + node.Status.Capacity[v1.ResourceEphemeralStorage] = initialCapacity[v1.ResourceEphemeralStorage] + } + } + + devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = devicePluginResourceCapacityFunc() + if devicePluginCapacity != nil { + for k, v := range devicePluginCapacity { + if old, ok := node.Status.Capacity[k]; !ok || old.Value() != v.Value() { + glog.V(2).Infof("Update capacity for %s to %d", k, v.Value()) + } + node.Status.Capacity[k] = v + } + } + + for _, removedResource := range removedDevicePlugins { + glog.V(2).Infof("Set capacity for %s to 0 on device removal", removedResource) + // Set the capacity of the removed resource to 0 instead of + // removing the resource from the node status. This is to indicate + // that the resource is managed by device plugin and had been + // registered before. + // + // This is required to differentiate the device plugin managed + // resources and the cluster-level resources, which are absent in + // node status. + node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI) + } + } + + // Set Allocatable. + if node.Status.Allocatable == nil { + node.Status.Allocatable = make(v1.ResourceList) + } + // Remove extended resources from allocatable that are no longer + // present in capacity. + for k := range node.Status.Allocatable { + _, found := node.Status.Capacity[k] + if !found && v1helper.IsExtendedResourceName(k) { + delete(node.Status.Allocatable, k) + } + } + allocatableReservation := nodeAllocatableReservationFunc() + for k, v := range node.Status.Capacity { + value := *(v.Copy()) + if res, exists := allocatableReservation[k]; exists { + value.Sub(res) + } + if value.Sign() < 0 { + // Negative Allocatable resources don't make sense. + value.Set(0) + } + node.Status.Allocatable[k] = value + } + + if devicePluginAllocatable != nil { + for k, v := range devicePluginAllocatable { + if old, ok := node.Status.Allocatable[k]; !ok || old.Value() != v.Value() { + glog.V(2).Infof("Update allocatable for %s to %d", k, v.Value()) + } + node.Status.Allocatable[k] = v + } + } + // for every huge page reservation, we need to remove it from allocatable memory + for k, v := range node.Status.Capacity { + if v1helper.IsHugePageResourceName(k) { + allocatableMemory := node.Status.Allocatable[v1.ResourceMemory] + value := *(v.Copy()) + allocatableMemory.Sub(value) + if allocatableMemory.Sign() < 0 { + // Negative Allocatable resources don't make sense. + allocatableMemory.Set(0) + } + node.Status.Allocatable[v1.ResourceMemory] = allocatableMemory + } + } + return nil + } +} + +// VersionInfo returns a Setter that updates version-related information on the node. +func VersionInfo(versionInfoFunc func() (*cadvisorapiv1.VersionInfo, error), // typically Kubelet.cadvisor.VersionInfo + runtimeTypeFunc func() string, // typically Kubelet.containerRuntime.Type + runtimeVersionFunc func() (kubecontainer.Version, error), // typically Kubelet.containerRuntime.Version +) Setter { + return func(node *v1.Node) error { + verinfo, err := versionInfoFunc() + if err != nil { + // TODO(mtaufen): consider removing this log line, since returned error will be logged + glog.Errorf("Error getting version info: %v", err) + return fmt.Errorf("error getting version info: %v", err) + } + + node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion + node.Status.NodeInfo.OSImage = verinfo.ContainerOsVersion + + runtimeVersion := "Unknown" + if runtimeVer, err := runtimeVersionFunc(); err == nil { + runtimeVersion = runtimeVer.String() + } + node.Status.NodeInfo.ContainerRuntimeVersion = fmt.Sprintf("%s://%s", runtimeTypeFunc(), runtimeVersion) + + node.Status.NodeInfo.KubeletVersion = version.Get().String() + // TODO: kube-proxy might be different version from kubelet in the future + node.Status.NodeInfo.KubeProxyVersion = version.Get().String() + return nil + } +} + +// DaemonEndpoints returns a Setter that updates the daemon endpoints on the node. +func DaemonEndpoints(daemonEndpoints *v1.NodeDaemonEndpoints) Setter { + return func(node *v1.Node) error { + node.Status.DaemonEndpoints = *daemonEndpoints + return nil + } +} + +// Images returns a Setter that updates the images on the node. +// imageListFunc is expected to return a list of images sorted in descending order by image size. +// nodeStatusMaxImages is ignored if set to -1. +func Images(nodeStatusMaxImages int32, + imageListFunc func() ([]kubecontainer.Image, error), // typically Kubelet.imageManager.GetImageList +) Setter { + return func(node *v1.Node) error { + // Update image list of this node + var imagesOnNode []v1.ContainerImage + containerImages, err := imageListFunc() + if err != nil { + // TODO(mtaufen): consider removing this log line, since returned error will be logged + glog.Errorf("Error getting image list: %v", err) + node.Status.Images = imagesOnNode + return fmt.Errorf("error getting image list: %v", err) + } + // we expect imageListFunc to return a sorted list, so we just need to truncate + if int(nodeStatusMaxImages) > -1 && + int(nodeStatusMaxImages) < len(containerImages) { + containerImages = containerImages[0:nodeStatusMaxImages] + } + + for _, image := range containerImages { + names := append(image.RepoDigests, image.RepoTags...) + // Report up to MaxNamesPerImageInNodeStatus names per image. + if len(names) > MaxNamesPerImageInNodeStatus { + names = names[0:MaxNamesPerImageInNodeStatus] + } + imagesOnNode = append(imagesOnNode, v1.ContainerImage{ + Names: names, + SizeBytes: image.Size, + }) + } + + node.Status.Images = imagesOnNode + return nil + } +} + +// GoRuntime returns a Setter that sets GOOS and GOARCH on the node. +func GoRuntime() Setter { + return func(node *v1.Node) error { + node.Status.NodeInfo.OperatingSystem = goruntime.GOOS + node.Status.NodeInfo.Architecture = goruntime.GOARCH + return nil + } +} + +// ReadyCondition returns a Setter that updates the v1.NodeReady condition on the node. +func ReadyCondition( + nowFunc func() time.Time, // typically Kubelet.clock.Now + runtimeErrorsFunc func() []string, // typically Kubelet.runtimeState.runtimeErrors + networkErrorsFunc func() []string, // typically Kubelet.runtimeState.networkErrors + appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator + cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status + recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent +) Setter { + return func(node *v1.Node) error { + // NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions. + // This is due to an issue with version skewed kubelet and master components. + // ref: https://github.com/kubernetes/kubernetes/issues/16961 + currentTime := metav1.NewTime(nowFunc()) + newNodeReadyCondition := v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionTrue, + Reason: "KubeletReady", + Message: "kubelet is posting ready status", + LastHeartbeatTime: currentTime, + } + rs := append(runtimeErrorsFunc(), networkErrorsFunc()...) + requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods} + if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { + requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage) + } + missingCapacities := []string{} + for _, resource := range requiredCapacities { + if _, found := node.Status.Capacity[resource]; !found { + missingCapacities = append(missingCapacities, string(resource)) + } + } + if len(missingCapacities) > 0 { + rs = append(rs, fmt.Sprintf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", "))) + } + if len(rs) > 0 { + newNodeReadyCondition = v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionFalse, + Reason: "KubeletNotReady", + Message: strings.Join(rs, ","), + LastHeartbeatTime: currentTime, + } + } + // Append AppArmor status if it's enabled. + // TODO(tallclair): This is a temporary message until node feature reporting is added. + if appArmorValidateHostFunc != nil && newNodeReadyCondition.Status == v1.ConditionTrue { + if err := appArmorValidateHostFunc(); err == nil { + newNodeReadyCondition.Message = fmt.Sprintf("%s. AppArmor enabled", newNodeReadyCondition.Message) + } + } + + // Record any soft requirements that were not met in the container manager. + status := cmStatusFunc() + if status.SoftRequirements != nil { + newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error()) + } + + readyConditionUpdated := false + needToRecordEvent := false + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == v1.NodeReady { + if node.Status.Conditions[i].Status == newNodeReadyCondition.Status { + newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime + } else { + newNodeReadyCondition.LastTransitionTime = currentTime + needToRecordEvent = true + } + node.Status.Conditions[i] = newNodeReadyCondition + readyConditionUpdated = true + break + } + } + if !readyConditionUpdated { + newNodeReadyCondition.LastTransitionTime = currentTime + node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition) + } + if needToRecordEvent { + if newNodeReadyCondition.Status == v1.ConditionTrue { + recordEventFunc(v1.EventTypeNormal, events.NodeReady) + } else { + recordEventFunc(v1.EventTypeNormal, events.NodeNotReady) + glog.Infof("Node became not ready: %+v", newNodeReadyCondition) + } + } + return nil + } +} + +// MemoryPressureCondition returns a Setter that updates the v1.NodeMemoryPressure condition on the node. +func MemoryPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now + pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderMemoryPressure + recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent +) Setter { + return func(node *v1.Node) error { + currentTime := metav1.NewTime(nowFunc()) + var condition *v1.NodeCondition + + // Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update. + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == v1.NodeMemoryPressure { + condition = &node.Status.Conditions[i] + } + } + + newCondition := false + // If the NodeMemoryPressure condition doesn't exist, create one + if condition == nil { + condition = &v1.NodeCondition{ + Type: v1.NodeMemoryPressure, + Status: v1.ConditionUnknown, + } + // cannot be appended to node.Status.Conditions here because it gets + // copied to the slice. So if we append to the slice here none of the + // updates we make below are reflected in the slice. + newCondition = true + } + + // Update the heartbeat time + condition.LastHeartbeatTime = currentTime + + // Note: The conditions below take care of the case when a new NodeMemoryPressure condition is + // created and as well as the case when the condition already exists. When a new condition + // is created its status is set to v1.ConditionUnknown which matches either + // condition.Status != v1.ConditionTrue or + // condition.Status != v1.ConditionFalse in the conditions below depending on whether + // the kubelet is under memory pressure or not. + if pressureFunc() { + if condition.Status != v1.ConditionTrue { + condition.Status = v1.ConditionTrue + condition.Reason = "KubeletHasInsufficientMemory" + condition.Message = "kubelet has insufficient memory available" + condition.LastTransitionTime = currentTime + recordEventFunc(v1.EventTypeNormal, "NodeHasInsufficientMemory") + } + } else if condition.Status != v1.ConditionFalse { + condition.Status = v1.ConditionFalse + condition.Reason = "KubeletHasSufficientMemory" + condition.Message = "kubelet has sufficient memory available" + condition.LastTransitionTime = currentTime + recordEventFunc(v1.EventTypeNormal, "NodeHasSufficientMemory") + } + + if newCondition { + node.Status.Conditions = append(node.Status.Conditions, *condition) + } + return nil + } +} + +// PIDPressureCondition returns a Setter that updates the v1.NodePIDPressure condition on the node. +func PIDPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now + pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderPIDPressure + recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent +) Setter { + return func(node *v1.Node) error { + currentTime := metav1.NewTime(nowFunc()) + var condition *v1.NodeCondition + + // Check if NodePIDPressure condition already exists and if it does, just pick it up for update. + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == v1.NodePIDPressure { + condition = &node.Status.Conditions[i] + } + } + + newCondition := false + // If the NodePIDPressure condition doesn't exist, create one + if condition == nil { + condition = &v1.NodeCondition{ + Type: v1.NodePIDPressure, + Status: v1.ConditionUnknown, + } + // cannot be appended to node.Status.Conditions here because it gets + // copied to the slice. So if we append to the slice here none of the + // updates we make below are reflected in the slice. + newCondition = true + } + + // Update the heartbeat time + condition.LastHeartbeatTime = currentTime + + // Note: The conditions below take care of the case when a new NodePIDPressure condition is + // created and as well as the case when the condition already exists. When a new condition + // is created its status is set to v1.ConditionUnknown which matches either + // condition.Status != v1.ConditionTrue or + // condition.Status != v1.ConditionFalse in the conditions below depending on whether + // the kubelet is under PID pressure or not. + if pressureFunc() { + if condition.Status != v1.ConditionTrue { + condition.Status = v1.ConditionTrue + condition.Reason = "KubeletHasInsufficientPID" + condition.Message = "kubelet has insufficient PID available" + condition.LastTransitionTime = currentTime + recordEventFunc(v1.EventTypeNormal, "NodeHasInsufficientPID") + } + } else if condition.Status != v1.ConditionFalse { + condition.Status = v1.ConditionFalse + condition.Reason = "KubeletHasSufficientPID" + condition.Message = "kubelet has sufficient PID available" + condition.LastTransitionTime = currentTime + recordEventFunc(v1.EventTypeNormal, "NodeHasSufficientPID") + } + + if newCondition { + node.Status.Conditions = append(node.Status.Conditions, *condition) + } + return nil + } +} + +// DiskPressureCondition returns a Setter that updates the v1.NodeDiskPressure condition on the node. +func DiskPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now + pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderDiskPressure + recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent +) Setter { + return func(node *v1.Node) error { + currentTime := metav1.NewTime(nowFunc()) + var condition *v1.NodeCondition + + // Check if NodeDiskPressure condition already exists and if it does, just pick it up for update. + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == v1.NodeDiskPressure { + condition = &node.Status.Conditions[i] + } + } + + newCondition := false + // If the NodeDiskPressure condition doesn't exist, create one + if condition == nil { + condition = &v1.NodeCondition{ + Type: v1.NodeDiskPressure, + Status: v1.ConditionUnknown, + } + // cannot be appended to node.Status.Conditions here because it gets + // copied to the slice. So if we append to the slice here none of the + // updates we make below are reflected in the slice. + newCondition = true + } + + // Update the heartbeat time + condition.LastHeartbeatTime = currentTime + + // Note: The conditions below take care of the case when a new NodeDiskPressure condition is + // created and as well as the case when the condition already exists. When a new condition + // is created its status is set to v1.ConditionUnknown which matches either + // condition.Status != v1.ConditionTrue or + // condition.Status != v1.ConditionFalse in the conditions below depending on whether + // the kubelet is under disk pressure or not. + if pressureFunc() { + if condition.Status != v1.ConditionTrue { + condition.Status = v1.ConditionTrue + condition.Reason = "KubeletHasDiskPressure" + condition.Message = "kubelet has disk pressure" + condition.LastTransitionTime = currentTime + recordEventFunc(v1.EventTypeNormal, "NodeHasDiskPressure") + } + } else if condition.Status != v1.ConditionFalse { + condition.Status = v1.ConditionFalse + condition.Reason = "KubeletHasNoDiskPressure" + condition.Message = "kubelet has no disk pressure" + condition.LastTransitionTime = currentTime + recordEventFunc(v1.EventTypeNormal, "NodeHasNoDiskPressure") + } + + if newCondition { + node.Status.Conditions = append(node.Status.Conditions, *condition) + } + return nil + } +} + +// OutOfDiskCondition returns a Setter that updates the v1.NodeOutOfDisk condition on the node. +// TODO(#65658): remove this condition +func OutOfDiskCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now + recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent +) Setter { + return func(node *v1.Node) error { + currentTime := metav1.NewTime(nowFunc()) + var nodeOODCondition *v1.NodeCondition + + // Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update. + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == v1.NodeOutOfDisk { + nodeOODCondition = &node.Status.Conditions[i] + } + } + + newOODCondition := nodeOODCondition == nil + if newOODCondition { + nodeOODCondition = &v1.NodeCondition{} + } + if nodeOODCondition.Status != v1.ConditionFalse { + nodeOODCondition.Type = v1.NodeOutOfDisk + nodeOODCondition.Status = v1.ConditionFalse + nodeOODCondition.Reason = "KubeletHasSufficientDisk" + nodeOODCondition.Message = "kubelet has sufficient disk space available" + nodeOODCondition.LastTransitionTime = currentTime + recordEventFunc(v1.EventTypeNormal, "NodeHasSufficientDisk") + } + + // Update the heartbeat time irrespective of all the conditions. + nodeOODCondition.LastHeartbeatTime = currentTime + + if newOODCondition { + node.Status.Conditions = append(node.Status.Conditions, *nodeOODCondition) + } + return nil + } +} + +// VolumesInUse returns a Setter that updates the volumes in use on the node. +func VolumesInUse(syncedFunc func() bool, // typically Kubelet.volumeManager.ReconcilerStatesHasBeenSynced + volumesInUseFunc func() []v1.UniqueVolumeName, // typically Kubelet.volumeManager.GetVolumesInUse +) Setter { + return func(node *v1.Node) error { + // Make sure to only update node status after reconciler starts syncing up states + if syncedFunc() { + node.Status.VolumesInUse = volumesInUseFunc() + } + return nil + } +} + +// VolumeLimits returns a Setter that updates the volume limits on the node. +func VolumeLimits(volumePluginListFunc func() []volume.VolumePluginWithAttachLimits, // typically Kubelet.volumePluginMgr.ListVolumePluginWithLimits +) Setter { + return func(node *v1.Node) error { + if node.Status.Capacity == nil { + node.Status.Capacity = v1.ResourceList{} + } + if node.Status.Allocatable == nil { + node.Status.Allocatable = v1.ResourceList{} + } + + pluginWithLimits := volumePluginListFunc() + for _, volumePlugin := range pluginWithLimits { + attachLimits, err := volumePlugin.GetVolumeLimits() + if err != nil { + glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName()) + continue + } + for limitKey, value := range attachLimits { + node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI) + node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI) + } + } + return nil + } +} diff --git a/pkg/kubelet/nodestatus/setters_test.go b/pkg/kubelet/nodestatus/setters_test.go new file mode 100644 index 00000000000..2952d2db098 --- /dev/null +++ b/pkg/kubelet/nodestatus/setters_test.go @@ -0,0 +1,1589 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodestatus + +import ( + "fmt" + "net" + "sort" + "strconv" + "testing" + "time" + + cadvisorapiv1 "github.com/google/cadvisor/info/v1" + + "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/uuid" + fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" + "k8s.io/kubernetes/pkg/kubelet/cm" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + kubecontainertest "k8s.io/kubernetes/pkg/kubelet/container/testing" + "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" + "k8s.io/kubernetes/pkg/version" + "k8s.io/kubernetes/pkg/volume" + volumetest "k8s.io/kubernetes/pkg/volume/testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testKubeletHostname = "127.0.0.1" +) + +// TODO(mtaufen): below is ported from the old kubelet_node_status_test.go code, potentially add more test coverage for NodeAddress setter in future +func TestNodeAddress(t *testing.T) { + cases := []struct { + name string + nodeIP net.IP + nodeAddresses []v1.NodeAddress + expectedAddresses []v1.NodeAddress + shouldError bool + }{ + { + name: "A single InternalIP", + nodeIP: net.ParseIP("10.1.1.1"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + shouldError: false, + }, + { + name: "NodeIP is external", + nodeIP: net.ParseIP("55.55.55.55"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + shouldError: false, + }, + { + // Accommodating #45201 and #49202 + name: "InternalIP and ExternalIP are the same", + nodeIP: net.ParseIP("55.55.55.55"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + shouldError: false, + }, + { + name: "An Internal/ExternalIP, an Internal/ExternalDNS", + nodeIP: net.ParseIP("10.1.1.1"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"}, + {Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"}, + {Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + shouldError: false, + }, + { + name: "An Internal with multiple internal IPs", + nodeIP: net.ParseIP("10.1.1.1"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeInternalIP, Address: "10.2.2.2"}, + {Type: v1.NodeInternalIP, Address: "10.3.3.3"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + shouldError: false, + }, + { + name: "An InternalIP that isn't valid: should error", + nodeIP: net.ParseIP("10.2.2.2"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: nil, + shouldError: true, + }, + } + for _, testCase := range cases { + t.Run(testCase.name, func(t *testing.T) { + // testCase setup + existingNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)}, + Spec: v1.NodeSpec{}, + } + + nodeIP := testCase.nodeIP + nodeIPValidator := func(nodeIP net.IP) error { + return nil + } + hostname := testKubeletHostname + externalCloudProvider := false + cloud := &fakecloud.FakeCloud{ + Addresses: testCase.nodeAddresses, + Err: nil, + } + nodeAddressesFunc := func() ([]v1.NodeAddress, error) { + return testCase.nodeAddresses, nil + } + + // construct setter + setter := NodeAddress(nodeIP, + nodeIPValidator, + hostname, + externalCloudProvider, + cloud, + nodeAddressesFunc) + + // call setter on existing node + err := setter(existingNode) + if err != nil && !testCase.shouldError { + t.Fatalf("unexpected error: %v", err) + } else if err != nil && testCase.shouldError { + // expected an error, and got one, so just return early here + return + } + + // Sort both sets for consistent equality + sortNodeAddresses(testCase.expectedAddresses) + sortNodeAddresses(existingNode.Status.Addresses) + + assert.True(t, apiequality.Semantic.DeepEqual(testCase.expectedAddresses, existingNode.Status.Addresses), + "Diff: %s", diff.ObjectDiff(testCase.expectedAddresses, existingNode.Status.Addresses)) + }) + } +} + +func TestMachineInfo(t *testing.T) { + const nodeName = "test-node" + + type dprc struct { + capacity v1.ResourceList + allocatable v1.ResourceList + inactive []string + } + + cases := []struct { + desc string + node *v1.Node + maxPods int + podsPerCore int + machineInfo *cadvisorapiv1.MachineInfo + machineInfoError error + capacity v1.ResourceList + devicePluginResourceCapacity dprc + nodeAllocatableReservation v1.ResourceList + expectNode *v1.Node + expectEvents []testEvent + }{ + { + desc: "machine identifiers, basic capacity and allocatable", + node: &v1.Node{}, + maxPods: 110, + machineInfo: &cadvisorapiv1.MachineInfo{ + MachineID: "MachineID", + SystemUUID: "SystemUUID", + NumCores: 2, + MemoryCapacity: 1024, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + MachineID: "MachineID", + SystemUUID: "SystemUUID", + }, + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + }, + }, + }, + { + desc: "podsPerCore greater than zero, but less than maxPods/cores", + node: &v1.Node{}, + maxPods: 10, + podsPerCore: 4, + machineInfo: &cadvisorapiv1.MachineInfo{ + NumCores: 2, + MemoryCapacity: 1024, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(8, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(8, resource.DecimalSI), + }, + }, + }, + }, + { + desc: "podsPerCore greater than maxPods/cores", + node: &v1.Node{}, + maxPods: 10, + podsPerCore: 6, + machineInfo: &cadvisorapiv1.MachineInfo{ + NumCores: 2, + MemoryCapacity: 1024, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + }, + }, + }, + { + desc: "allocatable should equal capacity minus reservations", + node: &v1.Node{}, + maxPods: 110, + machineInfo: &cadvisorapiv1.MachineInfo{ + NumCores: 2, + MemoryCapacity: 1024, + }, + nodeAllocatableReservation: v1.ResourceList{ + // reserve 1 unit for each resource + v1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(1, resource.BinarySI), + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(1999, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1023, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(109, resource.DecimalSI), + }, + }, + }, + }, + { + desc: "allocatable memory does not double-count hugepages reservations", + node: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + // it's impossible on any real system to reserve 1 byte, + // but we just need to test that the setter does the math + v1.ResourceHugePagesPrefix + "test": *resource.NewQuantity(1, resource.BinarySI), + }, + }, + }, + maxPods: 110, + machineInfo: &cadvisorapiv1.MachineInfo{ + NumCores: 2, + MemoryCapacity: 1024, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourceHugePagesPrefix + "test": *resource.NewQuantity(1, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + // memory has 1-unit difference for hugepages reservation + v1.ResourceMemory: *resource.NewQuantity(1023, resource.BinarySI), + v1.ResourceHugePagesPrefix + "test": *resource.NewQuantity(1, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + }, + }, + }, + { + desc: "negative capacity resources should be set to 0 in allocatable", + node: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + "negative-resource": *resource.NewQuantity(-1, resource.BinarySI), + }, + }, + }, + maxPods: 110, + machineInfo: &cadvisorapiv1.MachineInfo{ + NumCores: 2, + MemoryCapacity: 1024, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + "negative-resource": *resource.NewQuantity(-1, resource.BinarySI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + "negative-resource": *resource.NewQuantity(0, resource.BinarySI), + }, + }, + }, + }, + { + desc: "ephemeral storage is reflected in capacity and allocatable", + node: &v1.Node{}, + maxPods: 110, + machineInfo: &cadvisorapiv1.MachineInfo{ + NumCores: 2, + MemoryCapacity: 1024, + }, + capacity: v1.ResourceList{ + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + }, + }, + }, + { + desc: "device plugin resources are reflected in capacity and allocatable", + node: &v1.Node{}, + maxPods: 110, + machineInfo: &cadvisorapiv1.MachineInfo{ + NumCores: 2, + MemoryCapacity: 1024, + }, + devicePluginResourceCapacity: dprc{ + capacity: v1.ResourceList{ + "device-plugin": *resource.NewQuantity(1, resource.BinarySI), + }, + allocatable: v1.ResourceList{ + "device-plugin": *resource.NewQuantity(1, resource.BinarySI), + }, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + "device-plugin": *resource.NewQuantity(1, resource.BinarySI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + "device-plugin": *resource.NewQuantity(1, resource.BinarySI), + }, + }, + }, + }, + { + desc: "inactive device plugin resources should have their capacity set to 0", + node: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + "inactive": *resource.NewQuantity(1, resource.BinarySI), + }, + }, + }, + maxPods: 110, + machineInfo: &cadvisorapiv1.MachineInfo{ + NumCores: 2, + MemoryCapacity: 1024, + }, + devicePluginResourceCapacity: dprc{ + inactive: []string{"inactive"}, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + "inactive": *resource.NewQuantity(0, resource.BinarySI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + "inactive": *resource.NewQuantity(0, resource.BinarySI), + }, + }, + }, + }, + { + desc: "extended resources not present in capacity are removed from allocatable", + node: &v1.Node{ + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + "example.com/extended": *resource.NewQuantity(1, resource.BinarySI), + }, + }, + }, + maxPods: 110, + machineInfo: &cadvisorapiv1.MachineInfo{ + NumCores: 2, + MemoryCapacity: 1024, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + }, + }, + }, + { + desc: "on failure to get machine info, allocatable and capacity for memory and cpu are set to 0, pods to maxPods", + node: &v1.Node{}, + maxPods: 110, + // podsPerCore is not accounted for when getting machine info fails + podsPerCore: 1, + machineInfoError: fmt.Errorf("foo"), + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), + v1.ResourceMemory: resource.MustParse("0Gi"), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), + v1.ResourceMemory: resource.MustParse("0Gi"), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + }, + }, + }, + { + desc: "node reboot event is recorded", + node: &v1.Node{ + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + BootID: "foo", + }, + }, + }, + maxPods: 110, + machineInfo: &cadvisorapiv1.MachineInfo{ + BootID: "bar", + NumCores: 2, + MemoryCapacity: 1024, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + BootID: "bar", + }, + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + }, + }, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeWarning, + event: events.NodeRebooted, + message: fmt.Sprintf("Node %s has been rebooted, boot id: %s", nodeName, "bar"), + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + machineInfoFunc := func() (*cadvisorapiv1.MachineInfo, error) { + return tc.machineInfo, tc.machineInfoError + } + capacityFunc := func() v1.ResourceList { + return tc.capacity + } + devicePluginResourceCapacityFunc := func() (v1.ResourceList, v1.ResourceList, []string) { + c := tc.devicePluginResourceCapacity + return c.capacity, c.allocatable, c.inactive + } + nodeAllocatableReservationFunc := func() v1.ResourceList { + return tc.nodeAllocatableReservation + } + + events := []testEvent{} + recordEventFunc := func(eventType, event, message string) { + events = append(events, testEvent{ + eventType: eventType, + event: event, + message: message, + }) + } + // construct setter + setter := MachineInfo(nodeName, tc.maxPods, tc.podsPerCore, machineInfoFunc, capacityFunc, + devicePluginResourceCapacityFunc, nodeAllocatableReservationFunc, recordEventFunc) + // call setter on node + if err := setter(tc.node); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check expected node + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectNode, tc.node), + "Diff: %s", diff.ObjectDiff(tc.expectNode, tc.node)) + // check expected events + require.Equal(t, len(tc.expectEvents), len(events)) + for i := range tc.expectEvents { + assert.Equal(t, tc.expectEvents[i], events[i]) + } + }) + } + +} + +func TestVersionInfo(t *testing.T) { + cases := []struct { + desc string + node *v1.Node + versionInfo *cadvisorapiv1.VersionInfo + versionInfoError error + runtimeType string + runtimeVersion kubecontainer.Version + runtimeVersionError error + expectNode *v1.Node + expectError error + }{ + { + desc: "versions set in node info", + node: &v1.Node{}, + versionInfo: &cadvisorapiv1.VersionInfo{ + KernelVersion: "KernelVersion", + ContainerOsVersion: "ContainerOSVersion", + }, + runtimeType: "RuntimeType", + runtimeVersion: &kubecontainertest.FakeVersion{ + Version: "RuntimeVersion", + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + KernelVersion: "KernelVersion", + OSImage: "ContainerOSVersion", + ContainerRuntimeVersion: "RuntimeType://RuntimeVersion", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + }, + }, + }, + { + desc: "error getting version info", + node: &v1.Node{}, + versionInfoError: fmt.Errorf("foo"), + expectNode: &v1.Node{}, + expectError: fmt.Errorf("error getting version info: foo"), + }, + { + desc: "error getting runtime version results in Unknown runtime", + node: &v1.Node{}, + versionInfo: &cadvisorapiv1.VersionInfo{}, + runtimeType: "RuntimeType", + runtimeVersionError: fmt.Errorf("foo"), + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + ContainerRuntimeVersion: "RuntimeType://Unknown", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + versionInfoFunc := func() (*cadvisorapiv1.VersionInfo, error) { + return tc.versionInfo, tc.versionInfoError + } + runtimeTypeFunc := func() string { + return tc.runtimeType + } + runtimeVersionFunc := func() (kubecontainer.Version, error) { + return tc.runtimeVersion, tc.runtimeVersionError + } + // construct setter + setter := VersionInfo(versionInfoFunc, runtimeTypeFunc, runtimeVersionFunc) + // call setter on node + err := setter(tc.node) + require.Equal(t, tc.expectError, err) + // check expected node + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectNode, tc.node), + "Diff: %s", diff.ObjectDiff(tc.expectNode, tc.node)) + }) + } +} + +func TestImages(t *testing.T) { + const ( + minImageSize = 23 * 1024 * 1024 + maxImageSize = 1000 * 1024 * 1024 + ) + + cases := []struct { + desc string + maxImages int32 + imageList []kubecontainer.Image + imageListError error + expectError error + }{ + { + desc: "max images enforced", + maxImages: 1, + imageList: makeImageList(2, 1, minImageSize, maxImageSize), + }, + { + desc: "no max images cap for -1", + maxImages: -1, + imageList: makeImageList(2, 1, minImageSize, maxImageSize), + }, + { + desc: "max names per image enforced", + maxImages: -1, + imageList: makeImageList(1, MaxNamesPerImageInNodeStatus+1, minImageSize, maxImageSize), + }, + { + desc: "images are sorted by size, descending", + maxImages: -1, + // makeExpectedImageList will sort them for expectedNode when the test case is run + imageList: []kubecontainer.Image{{Size: 3}, {Size: 1}, {Size: 4}, {Size: 2}}, + }, + { + desc: "repo digests and tags both show up in image names", + maxImages: -1, + // makeExpectedImageList will use both digests and tags + imageList: []kubecontainer.Image{ + { + RepoDigests: []string{"foo", "bar"}, + RepoTags: []string{"baz", "quux"}, + }, + }, + }, + { + desc: "error getting image list, image list on node is reset to empty", + maxImages: -1, + imageListError: fmt.Errorf("foo"), + expectError: fmt.Errorf("error getting image list: foo"), + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + imageListFunc := func() ([]kubecontainer.Image, error) { + // today, imageListFunc is expected to return a sorted list, + // but we may choose to sort in the setter at some future point + // (e.g. if the image cache stopped sorting for us) + sort.Sort(sliceutils.ByImageSize(tc.imageList)) + return tc.imageList, tc.imageListError + } + // construct setter + setter := Images(tc.maxImages, imageListFunc) + // call setter on node + node := &v1.Node{} + err := setter(node) + require.Equal(t, tc.expectError, err) + // check expected node, image list should be reset to empty when there is an error + expectNode := &v1.Node{} + if err == nil { + expectNode.Status.Images = makeExpectedImageList(tc.imageList, tc.maxImages, MaxNamesPerImageInNodeStatus) + } + assert.True(t, apiequality.Semantic.DeepEqual(expectNode, node), + "Diff: %s", diff.ObjectDiff(expectNode, node)) + }) + } + +} + +func TestReadyCondition(t *testing.T) { + now := time.Now() + before := now.Add(-time.Second) + nowFunc := func() time.Time { return now } + + withCapacity := &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(100, resource.DecimalSI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + }, + } + + cases := []struct { + desc string + node *v1.Node + runtimeErrors []string + networkErrors []string + appArmorValidateHostFunc func() error + cmStatus cm.Status + expectConditions []v1.NodeCondition + expectEvents []testEvent + }{ + { + desc: "new, ready", + node: withCapacity.DeepCopy(), + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)}, + // TODO(mtaufen): The current behavior is that we don't send an event for the initial NodeReady condition, + // the reason for this is unclear, so we may want to actually send an event, and change these test cases + // to ensure an event is sent. + }, + { + desc: "new, ready: apparmor validator passed", + node: withCapacity.DeepCopy(), + appArmorValidateHostFunc: func() error { return nil }, + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. AppArmor enabled", now, now)}, + }, + { + desc: "new, ready: apparmor validator failed", + node: withCapacity.DeepCopy(), + appArmorValidateHostFunc: func() error { return fmt.Errorf("foo") }, + // absence of an additional message is understood to mean that AppArmor is disabled + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)}, + }, + { + desc: "new, ready: soft requirement warning", + node: withCapacity.DeepCopy(), + cmStatus: cm.Status{ + SoftRequirements: fmt.Errorf("foo"), + }, + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. WARNING: foo", now, now)}, + }, + { + desc: "new, not ready: runtime errors", + node: withCapacity.DeepCopy(), + runtimeErrors: []string{"foo", "bar"}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo,bar", now, now)}, + }, + { + desc: "new, not ready: network errors", + node: withCapacity.DeepCopy(), + networkErrors: []string{"foo", "bar"}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo,bar", now, now)}, + }, + { + desc: "new, not ready: runtime and network errors", + node: withCapacity.DeepCopy(), + runtimeErrors: []string{"runtime"}, + networkErrors: []string{"network"}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "runtime,network", now, now)}, + }, + { + desc: "new, not ready: missing capacities", + node: &v1.Node{}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "Missing node capacity for resources: cpu, memory, pods, ephemeral-storage", now, now)}, + }, + // the transition tests ensure timestamps are set correctly, no need to test the entire condition matrix in this section + { + desc: "transition to ready", + node: func() *v1.Node { + node := withCapacity.DeepCopy() + node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(false, "", before, before)} + return node + }(), + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: events.NodeReady, + }, + }, + }, + { + desc: "transition to not ready", + node: func() *v1.Node { + node := withCapacity.DeepCopy() + node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(true, "", before, before)} + return node + }(), + runtimeErrors: []string{"foo"}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo", now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: events.NodeNotReady, + }, + }, + }, + { + desc: "ready, no transition", + node: func() *v1.Node { + node := withCapacity.DeepCopy() + node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(true, "", before, before)} + return node + }(), + expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", before, now)}, + expectEvents: []testEvent{}, + }, + { + desc: "not ready, no transition", + node: func() *v1.Node { + node := withCapacity.DeepCopy() + node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(false, "", before, before)} + return node + }(), + runtimeErrors: []string{"foo"}, + expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo", before, now)}, + expectEvents: []testEvent{}, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + runtimeErrorsFunc := func() []string { + return tc.runtimeErrors + } + networkErrorsFunc := func() []string { + return tc.networkErrors + } + cmStatusFunc := func() cm.Status { + return tc.cmStatus + } + events := []testEvent{} + recordEventFunc := func(eventType, event string) { + events = append(events, testEvent{ + eventType: eventType, + event: event, + }) + } + // construct setter + setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc) + // call setter on node + if err := setter(tc.node); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check expected condition + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectConditions, tc.node.Status.Conditions), + "Diff: %s", diff.ObjectDiff(tc.expectConditions, tc.node.Status.Conditions)) + // check expected events + require.Equal(t, len(tc.expectEvents), len(events)) + for i := range tc.expectEvents { + assert.Equal(t, tc.expectEvents[i], events[i]) + } + }) + } +} + +func TestMemoryPressureCondition(t *testing.T) { + now := time.Now() + before := now.Add(-time.Second) + nowFunc := func() time.Time { return now } + + cases := []struct { + desc string + node *v1.Node + pressure bool + expectConditions []v1.NodeCondition + expectEvents []testEvent + }{ + { + desc: "new, no pressure", + node: &v1.Node{}, + pressure: false, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasSufficientMemory", + }, + }, + }, + { + desc: "new, pressure", + node: &v1.Node{}, + pressure: true, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasInsufficientMemory", + }, + }, + }, + { + desc: "transition to pressure", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, before, before)}, + }, + }, + pressure: true, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasInsufficientMemory", + }, + }, + }, + { + desc: "transition to no pressure", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, before, before)}, + }, + }, + pressure: false, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasSufficientMemory", + }, + }, + }, + { + desc: "pressure, no transition", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, before, before)}, + }, + }, + pressure: true, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, before, now)}, + expectEvents: []testEvent{}, + }, + { + desc: "no pressure, no transition", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, before, before)}, + }, + }, + pressure: false, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, before, now)}, + expectEvents: []testEvent{}, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + events := []testEvent{} + recordEventFunc := func(eventType, event string) { + events = append(events, testEvent{ + eventType: eventType, + event: event, + }) + } + pressureFunc := func() bool { + return tc.pressure + } + // construct setter + setter := MemoryPressureCondition(nowFunc, pressureFunc, recordEventFunc) + // call setter on node + if err := setter(tc.node); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check expected condition + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectConditions, tc.node.Status.Conditions), + "Diff: %s", diff.ObjectDiff(tc.expectConditions, tc.node.Status.Conditions)) + // check expected events + require.Equal(t, len(tc.expectEvents), len(events)) + for i := range tc.expectEvents { + assert.Equal(t, tc.expectEvents[i], events[i]) + } + }) + } +} + +func TestPIDPressureCondition(t *testing.T) { + now := time.Now() + before := now.Add(-time.Second) + nowFunc := func() time.Time { return now } + + cases := []struct { + desc string + node *v1.Node + pressure bool + expectConditions []v1.NodeCondition + expectEvents []testEvent + }{ + { + desc: "new, no pressure", + node: &v1.Node{}, + pressure: false, + expectConditions: []v1.NodeCondition{*makePIDPressureCondition(false, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasSufficientPID", + }, + }, + }, + { + desc: "new, pressure", + node: &v1.Node{}, + pressure: true, + expectConditions: []v1.NodeCondition{*makePIDPressureCondition(true, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasInsufficientPID", + }, + }, + }, + { + desc: "transition to pressure", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makePIDPressureCondition(false, before, before)}, + }, + }, + pressure: true, + expectConditions: []v1.NodeCondition{*makePIDPressureCondition(true, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasInsufficientPID", + }, + }, + }, + { + desc: "transition to no pressure", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makePIDPressureCondition(true, before, before)}, + }, + }, + pressure: false, + expectConditions: []v1.NodeCondition{*makePIDPressureCondition(false, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasSufficientPID", + }, + }, + }, + { + desc: "pressure, no transition", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makePIDPressureCondition(true, before, before)}, + }, + }, + pressure: true, + expectConditions: []v1.NodeCondition{*makePIDPressureCondition(true, before, now)}, + expectEvents: []testEvent{}, + }, + { + desc: "no pressure, no transition", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makePIDPressureCondition(false, before, before)}, + }, + }, + pressure: false, + expectConditions: []v1.NodeCondition{*makePIDPressureCondition(false, before, now)}, + expectEvents: []testEvent{}, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + events := []testEvent{} + recordEventFunc := func(eventType, event string) { + events = append(events, testEvent{ + eventType: eventType, + event: event, + }) + } + pressureFunc := func() bool { + return tc.pressure + } + // construct setter + setter := PIDPressureCondition(nowFunc, pressureFunc, recordEventFunc) + // call setter on node + if err := setter(tc.node); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check expected condition + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectConditions, tc.node.Status.Conditions), + "Diff: %s", diff.ObjectDiff(tc.expectConditions, tc.node.Status.Conditions)) + // check expected events + require.Equal(t, len(tc.expectEvents), len(events)) + for i := range tc.expectEvents { + assert.Equal(t, tc.expectEvents[i], events[i]) + } + }) + } +} + +func TestDiskPressureCondition(t *testing.T) { + now := time.Now() + before := now.Add(-time.Second) + nowFunc := func() time.Time { return now } + + cases := []struct { + desc string + node *v1.Node + pressure bool + expectConditions []v1.NodeCondition + expectEvents []testEvent + }{ + { + desc: "new, no pressure", + node: &v1.Node{}, + pressure: false, + expectConditions: []v1.NodeCondition{*makeDiskPressureCondition(false, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasNoDiskPressure", + }, + }, + }, + { + desc: "new, pressure", + node: &v1.Node{}, + pressure: true, + expectConditions: []v1.NodeCondition{*makeDiskPressureCondition(true, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasDiskPressure", + }, + }, + }, + { + desc: "transition to pressure", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeDiskPressureCondition(false, before, before)}, + }, + }, + pressure: true, + expectConditions: []v1.NodeCondition{*makeDiskPressureCondition(true, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasDiskPressure", + }, + }, + }, + { + desc: "transition to no pressure", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeDiskPressureCondition(true, before, before)}, + }, + }, + pressure: false, + expectConditions: []v1.NodeCondition{*makeDiskPressureCondition(false, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasNoDiskPressure", + }, + }, + }, + { + desc: "pressure, no transition", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeDiskPressureCondition(true, before, before)}, + }, + }, + pressure: true, + expectConditions: []v1.NodeCondition{*makeDiskPressureCondition(true, before, now)}, + expectEvents: []testEvent{}, + }, + { + desc: "no pressure, no transition", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeDiskPressureCondition(false, before, before)}, + }, + }, + pressure: false, + expectConditions: []v1.NodeCondition{*makeDiskPressureCondition(false, before, now)}, + expectEvents: []testEvent{}, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + events := []testEvent{} + recordEventFunc := func(eventType, event string) { + events = append(events, testEvent{ + eventType: eventType, + event: event, + }) + } + pressureFunc := func() bool { + return tc.pressure + } + // construct setter + setter := DiskPressureCondition(nowFunc, pressureFunc, recordEventFunc) + // call setter on node + if err := setter(tc.node); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check expected condition + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectConditions, tc.node.Status.Conditions), + "Diff: %s", diff.ObjectDiff(tc.expectConditions, tc.node.Status.Conditions)) + // check expected events + require.Equal(t, len(tc.expectEvents), len(events)) + for i := range tc.expectEvents { + assert.Equal(t, tc.expectEvents[i], events[i]) + } + }) + } +} + +func TestVolumesInUse(t *testing.T) { + withVolumesInUse := &v1.Node{ + Status: v1.NodeStatus{ + VolumesInUse: []v1.UniqueVolumeName{"foo"}, + }, + } + + cases := []struct { + desc string + node *v1.Node + synced bool + volumesInUse []v1.UniqueVolumeName + expectVolumesInUse []v1.UniqueVolumeName + }{ + { + desc: "synced", + node: withVolumesInUse.DeepCopy(), + synced: true, + volumesInUse: []v1.UniqueVolumeName{"bar"}, + expectVolumesInUse: []v1.UniqueVolumeName{"bar"}, + }, + { + desc: "not synced", + node: withVolumesInUse.DeepCopy(), + synced: false, + volumesInUse: []v1.UniqueVolumeName{"bar"}, + expectVolumesInUse: []v1.UniqueVolumeName{"foo"}, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + syncedFunc := func() bool { + return tc.synced + } + volumesInUseFunc := func() []v1.UniqueVolumeName { + return tc.volumesInUse + } + // construct setter + setter := VolumesInUse(syncedFunc, volumesInUseFunc) + // call setter on node + if err := setter(tc.node); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check expected volumes + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectVolumesInUse, tc.node.Status.VolumesInUse), + "Diff: %s", diff.ObjectDiff(tc.expectVolumesInUse, tc.node.Status.VolumesInUse)) + }) + } +} + +func TestVolumeLimits(t *testing.T) { + const ( + volumeLimitKey = "attachable-volumes-fake-provider" + volumeLimitVal = 16 + ) + + var cases = []struct { + desc string + volumePluginList []volume.VolumePluginWithAttachLimits + expectNode *v1.Node + }{ + { + desc: "translate limits to capacity and allocatable for plugins that return successfully from GetVolumeLimits", + volumePluginList: []volume.VolumePluginWithAttachLimits{ + &volumetest.FakeVolumePlugin{ + VolumeLimits: map[string]int64{volumeLimitKey: volumeLimitVal}, + }, + }, + expectNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + volumeLimitKey: *resource.NewQuantity(volumeLimitVal, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + volumeLimitKey: *resource.NewQuantity(volumeLimitVal, resource.DecimalSI), + }, + }, + }, + }, + { + desc: "skip plugins that return errors from GetVolumeLimits", + volumePluginList: []volume.VolumePluginWithAttachLimits{ + &volumetest.FakeVolumePlugin{ + VolumeLimitsError: fmt.Errorf("foo"), + }, + }, + expectNode: &v1.Node{}, + }, + { + desc: "no plugins", + expectNode: &v1.Node{}, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + volumePluginListFunc := func() []volume.VolumePluginWithAttachLimits { + return tc.volumePluginList + } + // construct setter + setter := VolumeLimits(volumePluginListFunc) + // call setter on node + node := &v1.Node{} + if err := setter(node); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check expected node + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectNode, node), + "Diff: %s", diff.ObjectDiff(tc.expectNode, node)) + }) + } +} + +// Test Helpers: + +// sortableNodeAddress is a type for sorting []v1.NodeAddress +type sortableNodeAddress []v1.NodeAddress + +func (s sortableNodeAddress) Len() int { return len(s) } +func (s sortableNodeAddress) Less(i, j int) bool { + return (string(s[i].Type) + s[i].Address) < (string(s[j].Type) + s[j].Address) +} +func (s sortableNodeAddress) Swap(i, j int) { s[j], s[i] = s[i], s[j] } + +func sortNodeAddresses(addrs sortableNodeAddress) { + sort.Sort(addrs) +} + +// testEvent is used to record events for tests +type testEvent struct { + eventType string + event string + message string +} + +// makeImageList randomly generates a list of images with the given count +func makeImageList(numImages, numTags, minSize, maxSize int32) []kubecontainer.Image { + images := make([]kubecontainer.Image, numImages) + for i := range images { + image := &images[i] + image.ID = string(uuid.NewUUID()) + image.RepoTags = makeImageTags(numTags) + image.Size = rand.Int63nRange(int64(minSize), int64(maxSize+1)) + } + return images +} + +func makeExpectedImageList(imageList []kubecontainer.Image, maxImages, maxNames int32) []v1.ContainerImage { + // copy the imageList, we do not want to mutate it in-place and accidentally edit a test case + images := make([]kubecontainer.Image, len(imageList)) + copy(images, imageList) + // sort images by size + sort.Sort(sliceutils.ByImageSize(images)) + // convert to []v1.ContainerImage and truncate the list of names + expectedImages := make([]v1.ContainerImage, len(images)) + for i := range images { + image := &images[i] + expectedImage := &expectedImages[i] + names := append(image.RepoDigests, image.RepoTags...) + if len(names) > int(maxNames) { + names = names[0:maxNames] + } + expectedImage.Names = names + expectedImage.SizeBytes = image.Size + } + // -1 means no limit, truncate result list if necessary. + if maxImages > -1 && + int(maxImages) < len(expectedImages) { + return expectedImages[0:maxImages] + } + return expectedImages +} + +func makeImageTags(num int32) []string { + tags := make([]string, num) + for i := range tags { + tags[i] = "k8s.gcr.io:v" + strconv.Itoa(i) + } + return tags +} + +func makeReadyCondition(ready bool, message string, transition, heartbeat time.Time) *v1.NodeCondition { + if ready { + return &v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionTrue, + Reason: "KubeletReady", + Message: message, + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } + } + return &v1.NodeCondition{ + Type: v1.NodeReady, + Status: v1.ConditionFalse, + Reason: "KubeletNotReady", + Message: message, + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } +} + +func makeMemoryPressureCondition(pressure bool, transition, heartbeat time.Time) *v1.NodeCondition { + if pressure { + return &v1.NodeCondition{ + Type: v1.NodeMemoryPressure, + Status: v1.ConditionTrue, + Reason: "KubeletHasInsufficientMemory", + Message: "kubelet has insufficient memory available", + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } + } + return &v1.NodeCondition{ + Type: v1.NodeMemoryPressure, + Status: v1.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: "kubelet has sufficient memory available", + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } +} + +func makePIDPressureCondition(pressure bool, transition, heartbeat time.Time) *v1.NodeCondition { + if pressure { + return &v1.NodeCondition{ + Type: v1.NodePIDPressure, + Status: v1.ConditionTrue, + Reason: "KubeletHasInsufficientPID", + Message: "kubelet has insufficient PID available", + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } + } + return &v1.NodeCondition{ + Type: v1.NodePIDPressure, + Status: v1.ConditionFalse, + Reason: "KubeletHasSufficientPID", + Message: "kubelet has sufficient PID available", + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } +} + +func makeDiskPressureCondition(pressure bool, transition, heartbeat time.Time) *v1.NodeCondition { + if pressure { + return &v1.NodeCondition{ + Type: v1.NodeDiskPressure, + Status: v1.ConditionTrue, + Reason: "KubeletHasDiskPressure", + Message: "kubelet has disk pressure", + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } + } + return &v1.NodeCondition{ + Type: v1.NodeDiskPressure, + Status: v1.ConditionFalse, + Reason: "KubeletHasNoDiskPressure", + Message: "kubelet has no disk pressure", + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } +} diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index c102e551957..17aa4106351 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -223,6 +223,9 @@ type FakeVolumePlugin struct { LastProvisionerOptions VolumeOptions NewAttacherCallCount int NewDetacherCallCount int + VolumeLimits map[string]int64 + VolumeLimitsError error + LimitKey string Mounters []*FakeVolume Unmounters []*FakeVolume @@ -238,6 +241,7 @@ var _ RecyclableVolumePlugin = &FakeVolumePlugin{} var _ DeletableVolumePlugin = &FakeVolumePlugin{} var _ ProvisionableVolumePlugin = &FakeVolumePlugin{} var _ AttachableVolumePlugin = &FakeVolumePlugin{} +var _ VolumePluginWithAttachLimits = &FakeVolumePlugin{} func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume { volume := &FakeVolume{} @@ -448,6 +452,14 @@ func (plugin *FakeVolumePlugin) RequiresFSResize() bool { return true } +func (plugin *FakeVolumePlugin) GetVolumeLimits() (map[string]int64, error) { + return plugin.VolumeLimits, plugin.VolumeLimitsError +} + +func (plugin *FakeVolumePlugin) VolumeLimitKey(spec *Spec) string { + return plugin.LimitKey +} + type FakeFileVolumePlugin struct { }