diff --git a/pkg/kubelet/fake_node_manager.go b/pkg/kubelet/fake_node_manager.go deleted file mode 100644 index f24041377c4..00000000000 --- a/pkg/kubelet/fake_node_manager.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - "net" - - "k8s.io/kubernetes/pkg/api" -) - -type fakeNodeManager struct { - podCIDR string - node *api.Node - IP net.IP -} - -var _ nodeManager = &fakeNodeManager{} - -func (f *fakeNodeManager) Start() { -} - -func (f *fakeNodeManager) GetNode() (*api.Node, error) { - return f.node, nil -} - -func (f *fakeNodeManager) GetHostIP() (net.IP, error) { - return f.IP, nil -} - -func (f *fakeNodeManager) GetPodCIDR() string { - return f.podCIDR -} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f21df9c62f5..7b119673591 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -36,6 +36,7 @@ import ( "github.com/golang/glog" cadvisorApi "github.com/google/cadvisor/info/v1" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/validation" @@ -63,9 +64,11 @@ import ( utilErrors "k8s.io/kubernetes/pkg/util/errors" kubeio "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/mount" + nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" @@ -76,6 +79,9 @@ const ( // Max amount of time to wait for the container runtime to come up. maxWaitForContainerRuntime = 5 * time.Minute + // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. + nodeStatusUpdateRetry = 5 + // Location of container logs. containerLogsDir = "/var/log/containers" @@ -200,6 +206,23 @@ func NewMainKubelet( } serviceLister := &cache.StoreToServiceLister{Store: serviceStore} + nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + if kubeClient != nil { + // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather + // than an interface. There is no way to construct a list+watcher using resource name. + fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector() + listWatch := &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return kubeClient.Nodes().List(labels.Everything(), fieldSelector) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion) + }, + } + cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() + } + nodeLister := &cache.StoreToNodeLister{Store: nodeStore} + // TODO: get the real node object of ourself, // and use the real node name and UID. // TODO: what is namespace for node? @@ -241,10 +264,12 @@ func NewMainKubelet( readinessManager: readinessManager, httpClient: &http.Client{}, sourcesReady: sourcesReady, + registerNode: registerNode, standaloneMode: standaloneMode, clusterDomain: clusterDomain, clusterDNS: clusterDNS, serviceLister: serviceLister, + nodeLister: nodeLister, runtimeMutex: sync.Mutex{}, runtimeUpThreshold: maxWaitForContainerRuntime, lastTimestampRuntimeUp: time.Time{}, @@ -259,6 +284,7 @@ func NewMainKubelet( volumeManager: volumeManager, cloud: cloud, nodeRef: nodeRef, + nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, resourceContainer: resourceContainer, os: osInterface, oomWatcher: oomWatcher, @@ -266,10 +292,12 @@ func NewMainKubelet( mounter: mounter, writer: writer, configureCBR0: configureCBR0, + podCIDR: podCIDR, pods: pods, syncLoopMonitor: util.AtomicValue{}, resolverConfig: resolverConfig, cpuCFSQuota: cpuCFSQuota, + daemonEndpoints: daemonEndpoints, } if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil { @@ -278,7 +306,7 @@ func NewMainKubelet( klet.networkPlugin = plug } - machineInfo, err := klet.GetMachineInfo() + machineInfo, err := klet.GetCachedMachineInfo() if err != nil { return nil, err } @@ -340,10 +368,11 @@ func NewMainKubelet( } klet.containerManager = containerManager - klet.nodeManager = newRealNodeManager(kubeClient, cloud, registerNode, nodeStatusUpdateFrequency, recorder, nodeName, hostname, podCIDR, pods, klet, daemonEndpoints, nodeRef) - klet.nodeManager.Start() - go util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop) + if klet.kubeClient != nil { + // Start syncing node status immediately, this may set up things the runtime needs to run. + go util.Until(klet.syncNodeStatus, klet.nodeStatusUpdateFrequency, util.NeverStop) + } // Wait for the runtime to be up with a timeout. if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil { @@ -387,6 +416,11 @@ type serviceLister interface { List() (api.ServiceList, error) } +type nodeLister interface { + List() (machines api.NodeList, err error) + GetNodeInfo(id string) (*api.Node, error) +} + // Kubelet is the main kubelet implementation. type Kubelet struct { hostname string @@ -415,6 +449,8 @@ type Kubelet struct { // cAdvisor used for container information. cadvisor cadvisor.Interface + // Set to true to have the node register itself with the apiserver. + registerNode bool // for internal book keeping; access only from within registerWithApiserver registrationCompleted bool @@ -429,6 +465,7 @@ type Kubelet struct { masterServiceNamespace string serviceLister serviceLister + nodeLister nodeLister // Last timestamp when runtime responded on ping. // Mutex is used to protect this value. @@ -468,18 +505,13 @@ type Kubelet struct { // Cached MachineInfo returned by cadvisor. machineInfo *cadvisorApi.MachineInfo - // nodeManager handles interaction of api.Node object with apiserver. This - // includes watching the nodes, registering itself to the apiserver, and - // sync node status. - nodeManager nodeManager - // Syncs pods statuses with apiserver; also used as a cache of statuses. statusManager status.Manager // Manager for the volume maps for the pods. volumeManager *volumeManager - // Cloud provider interface + //Cloud provider interface cloud cloudprovider.Interface // Reference to this node. @@ -488,6 +520,19 @@ type Kubelet struct { // Container runtime. containerRuntime kubecontainer.Runtime + // nodeStatusUpdateFrequency specifies how often kubelet posts node status to master. + // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod + // in nodecontroller. There are several constraints: + // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where + // N means number of retries allowed for kubelet to post node status. It is pointless + // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there + // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. + // The constant must be less than podEvictionTimeout. + // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node + // status. Kubelet may fail to update node status reliably if the value is too small, + // as it takes time to gather all necessary node information. + nodeStatusUpdateFrequency time.Duration + // The name of the resource-only container to run the Kubelet in (empty for no container). // Name must be absolute. resourceContainer string @@ -512,6 +557,7 @@ type Kubelet struct { // Whether or not kubelet should take responsibility for keeping cbr0 in // the correct state. configureCBR0 bool + podCIDR string // Number of Pods which can be run by this Kubelet pods int @@ -535,6 +581,9 @@ type Kubelet struct { // True if container cpu limits should be enforced via cgroup CFS quota cpuCFSQuota bool + + // Information about the ports which are opened by daemons on Node running this Kubelet server. + daemonEndpoints *api.NodeDaemonEndpoints } // getRootDir returns the full path to the directory under which kubelet can @@ -676,6 +725,13 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) { return pods, nil } +func (kl *Kubelet) GetNode() (*api.Node, error) { + if kl.standaloneMode { + return nil, errors.New("no node entry for kubelet in standalone mode") + } + return kl.nodeLister.GetNodeInfo(kl.nodeName) +} + // Starts garbage collection threads. func (kl *Kubelet) StartGarbageCollection() { go util.Until(func() { @@ -740,6 +796,117 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { kl.syncLoop(updates, kl) } +func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { + node := &api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: kl.nodeName, + Labels: map[string]string{"kubernetes.io/hostname": kl.hostname}, + }, + } + if kl.cloud != nil { + instances, ok := kl.cloud.Instances() + if !ok { + return nil, fmt.Errorf("failed to get instances from cloud provider") + } + + // TODO(roberthbailey): Can we do this without having credentials to talk + // to the cloud provider? + // TODO: ExternalID is deprecated, we'll have to drop this code + externalID, err := instances.ExternalID(kl.nodeName) + if err != nil { + return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err) + } + node.Spec.ExternalID = externalID + + // TODO: We can't assume that the node has credentials to talk to the + // cloudprovider from arbitrary nodes. At most, we should talk to a + // local metadata server here. + node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName) + if err != nil { + return nil, err + } + } else { + node.Spec.ExternalID = kl.hostname + } + if err := kl.setNodeStatus(node); err != nil { + return nil, err + } + return node, nil +} + +// registerWithApiserver registers the node with the cluster master. It is safe +// to call multiple times, but not concurrently (kl.registrationCompleted is +// not locked). +func (kl *Kubelet) registerWithApiserver() { + if kl.registrationCompleted { + return + } + step := 100 * time.Millisecond + for { + time.Sleep(step) + step = step * 2 + if step >= 7*time.Second { + step = 7 * time.Second + } + + node, err := kl.initialNodeStatus() + if err != nil { + glog.Errorf("Unable to construct api.Node object for kubelet: %v", err) + continue + } + glog.V(2).Infof("Attempting to register node %s", node.Name) + if _, err := kl.kubeClient.Nodes().Create(node); err != nil { + if !apierrors.IsAlreadyExists(err) { + glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err) + continue + } + currentNode, err := kl.kubeClient.Nodes().Get(kl.nodeName) + if err != nil { + glog.Errorf("error getting node %q: %v", kl.nodeName, err) + continue + } + if currentNode == nil { + glog.Errorf("no node instance returned for %q", kl.nodeName) + continue + } + if currentNode.Spec.ExternalID == node.Spec.ExternalID { + glog.Infof("Node %s was previously registered", node.Name) + kl.registrationCompleted = true + return + } + glog.Errorf( + "Previously %q had externalID %q; now it is %q; will delete and recreate.", + kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID, + ) + if err := kl.kubeClient.Nodes().Delete(node.Name); err != nil { + glog.Errorf("Unable to delete old node: %v", err) + } else { + glog.Errorf("Deleted old node object %q", kl.nodeName) + } + continue + } + glog.Infof("Successfully registered node %s", node.Name) + kl.registrationCompleted = true + return + } +} + +// syncNodeStatus should be called periodically from a goroutine. +// It synchronizes node status to master, registering the kubelet first if +// necessary. +func (kl *Kubelet) syncNodeStatus() { + if kl.kubeClient == nil { + return + } + if kl.registerNode { + // This will exit immediately if it doesn't need to do anything. + kl.registerWithApiserver() + } + if err := kl.updateNodeStatus(); err != nil { + glog.Errorf("Unable to update node status: %v", err) + } +} + func makeMounts(container *api.Container, podVolumes kubecontainer.VolumeMap) (mounts []kubecontainer.Mount) { for _, mount := range container.VolumeMounts { vol, ok := podVolumes[mount.Name] @@ -1617,7 +1784,7 @@ func hasHostPortConflicts(pods []*api.Pod) bool { // TODO: Consider integrate disk space into this function, and returns a // suitable reason and message per resource type. func (kl *Kubelet) hasInsufficientfFreeResources(pods []*api.Pod) (bool, bool) { - info, err := kl.GetMachineInfo() + info, err := kl.GetCachedMachineInfo() if err != nil { glog.Errorf("error getting machine info: %v", err) // TODO: Should we admit the pod when machine info is unavailable? @@ -1657,7 +1824,7 @@ func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool { if kl.standaloneMode { return true } - node, err := kl.nodeManager.GetNode() + node, err := kl.GetNode() if err != nil { glog.Errorf("error getting node: %v", err) return true @@ -1707,12 +1874,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.") var housekeepingTimestamp time.Time for { - if !kl.ContainerRuntimeUp() { + if !kl.containerRuntimeUp() { time.Sleep(5 * time.Second) glog.Infof("Skipping pod synchronization, container runtime is not up.") continue } - if !kl.NetworkConfigured() { + if !kl.doneNetworkConfigure() { time.Sleep(5 * time.Second) glog.Infof("Skipping pod synchronization, network is not configured") continue @@ -1872,10 +2039,6 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time { return val.(time.Time) } -func (kl *Kubelet) GetVersionInfo() (*cadvisorApi.VersionInfo, error) { - return kl.cadvisor.VersionInfo() -} - // Returns the container runtime version for this Kubelet. func (kl *Kubelet) GetContainerRuntimeVersion() (kubecontainer.Version, error) { if kl.containerRuntime == nil { @@ -1956,7 +2119,11 @@ func (kl *Kubelet) GetHostname() string { // Returns host IP or nil in case of error. func (kl *Kubelet) GetHostIP() (net.IP, error) { - return kl.nodeManager.GetHostIP() + node, err := kl.GetNode() + if err != nil { + return nil, fmt.Errorf("cannot get node: %v", err) + } + return nodeutil.GetNodeHostIP(node) } // GetPods returns all pods bound to the kubelet and their spec, and the mirror @@ -2026,6 +2193,25 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error { return kl.shaper.ReconcileInterface() } +// updateNodeStatus updates node status to master with retries. +func (kl *Kubelet) updateNodeStatus() error { + for i := 0; i < nodeStatusUpdateRetry; i++ { + if err := kl.tryUpdateNodeStatus(); err != nil { + glog.Errorf("Error updating node status, will retry: %v", err) + } else { + return nil + } + } + return fmt.Errorf("update node status exceeds retry count") +} + +func (kl *Kubelet) recordNodeStatusEvent(event string) { + glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName) + // TODO: This requires a transaction, either both node status is updated + // and event is recorded or neither should happen, see issue #6055. + kl.recorder.Eventf(kl.nodeRef, event, "Node %s status is now: %s", kl.nodeName, event) +} + // Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() var oldNodeUnschedulable bool @@ -2039,10 +2225,10 @@ func (kl *Kubelet) syncNetworkStatus() { networkConfigured = false glog.Errorf("Error on adding ip table rules: %v", err) } - if len(kl.nodeManager.GetPodCIDR()) == 0 { + if len(kl.podCIDR) == 0 { glog.Warningf("ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now") networkConfigured = false - } else if err := kl.reconcileCBR0(kl.nodeManager.GetPodCIDR()); err != nil { + } else if err := kl.reconcileCBR0(kl.podCIDR); err != nil { networkConfigured = false glog.Errorf("Error configuring cbr0: %v", err) } @@ -2050,18 +2236,214 @@ func (kl *Kubelet) syncNetworkStatus() { kl.networkConfigured = networkConfigured } -func (kl *Kubelet) ContainerRuntimeUp() bool { +// setNodeStatus fills in the Status fields of the given Node, overwriting +// any fields that are currently set. +func (kl *Kubelet) setNodeStatus(node *api.Node) error { + // Set addresses for the node. + if kl.cloud != nil { + instances, ok := kl.cloud.Instances() + if !ok { + return fmt.Errorf("failed to get instances from cloud provider") + } + // TODO(roberthbailey): Can we do this without having credentials to talk + // to the cloud provider? + // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface + nodeAddresses, err := instances.NodeAddresses(kl.nodeName) + if err != nil { + return fmt.Errorf("failed to get node address from cloud provider: %v", err) + } + node.Status.Addresses = nodeAddresses + } else { + addr := net.ParseIP(kl.hostname) + if addr != nil { + node.Status.Addresses = []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: addr.String()}, + {Type: api.NodeInternalIP, Address: addr.String()}, + } + } else { + addrs, err := net.LookupIP(node.Name) + if err != nil { + return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err) + } else if len(addrs) == 0 { + return fmt.Errorf("no ip address for node %v", node.Name) + } else { + // check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address. + // If no match is found, it uses the IP of the interface with gateway on it. + for _, ip := range addrs { + if ip.IsLoopback() { + continue + } + + if ip.To4() != nil { + node.Status.Addresses = []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: ip.String()}, + {Type: api.NodeInternalIP, Address: ip.String()}, + } + break + } + } + + if len(node.Status.Addresses) == 0 { + ip, err := util.ChooseHostInterface() + if err != nil { + return err + } + + node.Status.Addresses = []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: ip.String()}, + {Type: api.NodeInternalIP, Address: ip.String()}, + } + } + } + } + } + + // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start + // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. + info, err := kl.GetCachedMachineInfo() + if err != nil { + // TODO(roberthbailey): This is required for test-cmd.sh to pass. + // See if the test should be updated instead. + node.Status.Capacity = api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), + api.ResourceMemory: resource.MustParse("0Gi"), + api.ResourcePods: *resource.NewQuantity(int64(kl.pods), resource.DecimalSI), + } + glog.Errorf("Error getting machine info: %v", err) + } else { + node.Status.NodeInfo.MachineID = info.MachineID + node.Status.NodeInfo.SystemUUID = info.SystemUUID + node.Status.Capacity = CapacityFromMachineInfo(info) + node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity( + int64(kl.pods), resource.DecimalSI) + if node.Status.NodeInfo.BootID != "" && + node.Status.NodeInfo.BootID != info.BootID { + // TODO: This requires a transaction, either both node status is updated + // and event is recorded or neither should happen, see issue #6055. + kl.recorder.Eventf(kl.nodeRef, "Rebooted", + "Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID) + } + node.Status.NodeInfo.BootID = info.BootID + } + + verinfo, err := kl.cadvisor.VersionInfo() + if err != nil { + glog.Errorf("Error getting version info: %v", err) + } else { + node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion + node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion + // TODO: Determine the runtime is docker or rocket + node.Status.NodeInfo.ContainerRuntimeVersion = "docker://" + verinfo.DockerVersion + node.Status.NodeInfo.KubeletVersion = version.Get().String() + // TODO: kube-proxy might be different version from kubelet in the future + node.Status.NodeInfo.KubeProxyVersion = version.Get().String() + } + + node.Status.DaemonEndpoints = *kl.daemonEndpoints + + // Check whether container runtime can be reported as up. + containerRuntimeUp := kl.containerRuntimeUp() + // Check whether network is configured properly + networkConfigured := kl.doneNetworkConfigure() + + currentTime := unversioned.Now() + var newNodeReadyCondition api.NodeCondition + var oldNodeReadyConditionStatus api.ConditionStatus + if containerRuntimeUp && networkConfigured { + newNodeReadyCondition = api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: "kubelet is posting ready status", + LastHeartbeatTime: currentTime, + } + } else { + var reasons []string + var messages []string + if !containerRuntimeUp { + messages = append(messages, "container runtime is down") + } + if !networkConfigured { + messages = append(reasons, "network not configured correctly") + } + newNodeReadyCondition = api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionFalse, + Reason: "KubeletNotReady", + Message: strings.Join(messages, ","), + LastHeartbeatTime: currentTime, + } + } + + updated := false + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == api.NodeReady { + oldNodeReadyConditionStatus = node.Status.Conditions[i].Status + if oldNodeReadyConditionStatus == newNodeReadyCondition.Status { + newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime + } else { + newNodeReadyCondition.LastTransitionTime = currentTime + } + node.Status.Conditions[i] = newNodeReadyCondition + updated = true + } + } + if !updated { + newNodeReadyCondition.LastTransitionTime = currentTime + node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition) + } + if !updated || oldNodeReadyConditionStatus != newNodeReadyCondition.Status { + if newNodeReadyCondition.Status == api.ConditionTrue { + kl.recordNodeStatusEvent("NodeReady") + } else { + kl.recordNodeStatusEvent("NodeNotReady") + } + } + if oldNodeUnschedulable != node.Spec.Unschedulable { + if node.Spec.Unschedulable { + kl.recordNodeStatusEvent("NodeNotSchedulable") + } else { + kl.recordNodeStatusEvent("NodeSchedulable") + } + oldNodeUnschedulable = node.Spec.Unschedulable + } + return nil +} + +func (kl *Kubelet) containerRuntimeUp() bool { kl.runtimeMutex.Lock() defer kl.runtimeMutex.Unlock() return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now()) } -func (kl *Kubelet) NetworkConfigured() bool { +func (kl *Kubelet) doneNetworkConfigure() bool { kl.networkConfigMutex.Lock() defer kl.networkConfigMutex.Unlock() return kl.networkConfigured } +// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 +// is set, this function will also confirm that cbr0 is configured correctly. +func (kl *Kubelet) tryUpdateNodeStatus() error { + node, err := kl.kubeClient.Nodes().Get(kl.nodeName) + if err != nil { + return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) + } + if node == nil { + return fmt.Errorf("no node instance returned for %q", kl.nodeName) + } + kl.networkConfigMutex.Lock() + kl.podCIDR = node.Spec.PodCIDR + kl.networkConfigMutex.Unlock() + + if err := kl.setNodeStatus(node); err != nil { + return err + } + // Update the current status on the API server + _, err = kl.kubeClient.Nodes().UpdateStatus(node) + return err +} + // GetPhase returns the phase of a pod given its container info. // This func is exported to simplify integration with 3rd party kubelet // integrations like kubernetes-mesos. @@ -2258,7 +2640,7 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { } if !kl.standaloneMode { - hostIP, err := kl.nodeManager.GetHostIP() + hostIP, err := kl.GetHostIP() if err != nil { glog.V(4).Infof("Cannot get host IP: %v", err) } else { @@ -2398,8 +2780,8 @@ func (kl *Kubelet) GetRawContainerInfo(containerName string, req *cadvisorApi.Co } } -// GetMachineInfo assumes that the machine info can't change without a reboot -func (kl *Kubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) { +// GetCachedMachineInfo assumes that the machine info can't change without a reboot +func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) { if kl.machineInfo == nil { info, err := kl.cadvisor.MachineInfo() if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 088338c9310..d0bf8401829 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -34,6 +34,7 @@ import ( cadvisorApi "github.com/google/cadvisor/info/v1" cadvisorApiv2 "github.com/google/cadvisor/info/v2" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" @@ -45,9 +46,11 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/bandwidth" + "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" _ "k8s.io/kubernetes/pkg/volume/host_path" ) @@ -75,7 +78,6 @@ type TestKubelet struct { fakeCadvisor *cadvisor.Mock fakeKubeClient *testclient.Fake fakeMirrorClient *fakeMirrorClient - fakeNodeManager *fakeNodeManager } func newTestKubelet(t *testing.T) *TestKubelet { @@ -102,15 +104,14 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.sourcesReady = func() bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} - + kubelet.nodeLister = testNodeLister{} kubelet.readinessManager = kubecontainer.NewReadinessManager() kubelet.recorder = fakeRecorder kubelet.statusManager = status.NewManager(fakeKubeClient) if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) } - fakeNodeManager := &fakeNodeManager{} - kubelet.nodeManager = fakeNodeManager + kubelet.daemonEndpoints = &api.NodeDaemonEndpoints{} mockCadvisor := &cadvisor.Mock{} kubelet.cadvisor = mockCadvisor podManager, fakeMirrorClient := newFakePodManager() @@ -136,7 +137,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.backOff = util.NewBackOff(time.Second, time.Minute) kubelet.backOff.Clock = fakeClock kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) - return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeNodeManager} + return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} } func newTestPods(count int) []*api.Pod { @@ -962,6 +963,25 @@ func (ls testServiceLister) List() (api.ServiceList, error) { }, nil } +type testNodeLister struct { + nodes []api.Node +} + +func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) { + for _, node := range ls.nodes { + if node.Name == id { + return &node, nil + } + } + return nil, fmt.Errorf("Node with name: %s does not exist", id) +} + +func (ls testNodeLister) List() (api.NodeList, error) { + return api.NodeList{ + Items: ls.nodes, + }, nil +} + type envs []kubecontainer.EnvVar func (e envs) Len() int { @@ -2154,10 +2174,10 @@ func TestHandlePortConflicts(t *testing.T) { // Tests that we handle not matching labels selector correctly by setting the failed status in status map. func TestHandleNodeSelector(t *testing.T) { testKubelet := newTestKubelet(t) - testKubelet.fakeNodeManager.node = &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}} - kl := testKubelet.kubelet + kl.nodeLister = testNodeLister{nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}}, + }} testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorApiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorApiv2.FsInfo{}, nil) @@ -2368,6 +2388,309 @@ func TestValidateContainerStatus(t *testing.T) { } } +func TestUpdateNewNodeStatus(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, + }}).ReactionChain + machineInfo := &cadvisorApi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorApi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: "3.16.0-0.bpo.4-amd64", + OsImage: "Debian GNU/Linux 7 (wheezy)", + ContainerRuntimeVersion: "docker://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, + {Type: api.NodeInternalIP, Address: "127.0.0.1"}, + }, + }, + } + + kubelet.updateRuntimeUp() + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Fatalf("unexpected actions: %v", actions) + } + if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + t.Fatalf("unexpected actions: %v", actions) + } + updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") + } + if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } + updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{} + if !reflect.DeepEqual(expectedNode, updatedNode) { + t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) + } +} + +func TestUpdateExistingNodeStatus(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(2048, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + }, + }, + }}).ReactionChain + mockCadvisor := testKubelet.fakeCadvisor + machineInfo := &cadvisorApi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorApi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: unversioned.Time{}, // placeholder + LastTransitionTime: unversioned.Time{}, // placeholder + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: "3.16.0-0.bpo.4-amd64", + OsImage: "Debian GNU/Linux 7 (wheezy)", + ContainerRuntimeVersion: "docker://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, + {Type: api.NodeInternalIP, Address: "127.0.0.1"}, + }, + }, + } + + kubelet.updateRuntimeUp() + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Errorf("unexpected actions: %v", actions) + } + updateAction, ok := actions[1].(testclient.UpdateAction) + if !ok { + t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) + } + updatedNode, ok := updateAction.GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + // Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same. + if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastHeartbeatTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) { + t.Errorf("expected \n%v\n, got \n%v", unversioned.Now(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) + } + if !reflect.DeepEqual(updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) { + t.Errorf("expected \n%#v\n, got \n%#v", updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy(), + unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) + } + updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{} + if !reflect.DeepEqual(expectedNode, updatedNode) { + t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) + } +} + +func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + fakeRuntime := testKubelet.fakeRuntime + // This causes returning an error from GetContainerRuntimeVersion() which + // simulates that container runtime is down. + fakeRuntime.VersionInfo = "" + + kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, + }}).ReactionChain + mockCadvisor := testKubelet.fakeCadvisor + machineInfo := &cadvisorApi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorApi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFalse, + Reason: "KubeletNotReady", + Message: fmt.Sprintf("container runtime is down"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: "3.16.0-0.bpo.4-amd64", + OsImage: "Debian GNU/Linux 7 (wheezy)", + ContainerRuntimeVersion: "docker://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, + {Type: api.NodeInternalIP, Address: "127.0.0.1"}, + }, + }, + } + + kubelet.runtimeUpThreshold = time.Duration(0) + kubelet.updateRuntimeUp() + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Fatalf("unexpected actions: %v", actions) + } + if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + t.Fatalf("unexpected actions: %v", actions) + } + updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) + } + + if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") + } + if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } + updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{} + if !reflect.DeepEqual(expectedNode, updatedNode) { + t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) + } +} + +func TestUpdateNodeStatusError(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + // No matching node for the kubelet + testKubelet.fakeKubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{}}).ReactionChain + + if err := kubelet.updateNodeStatus(); err == nil { + t.Errorf("unexpected non error: %v", err) + } + if len(testKubelet.fakeKubeClient.Actions()) != nodeStatusUpdateRetry { + t.Errorf("unexpected actions: %v", testKubelet.fakeKubeClient.Actions()) + } +} + func TestCreateMirrorPod(t *testing.T) { for _, updateType := range []SyncPodType{SyncPodCreate, SyncPodUpdate} { testKubelet := newTestKubelet(t) @@ -2743,6 +3066,55 @@ func TestFilterOutTerminatedPods(t *testing.T) { } } +func TestRegisterExistingNodeWithApiserver(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + kubeClient.AddReactor("create", "nodes", func(action testclient.Action) (bool, runtime.Object, error) { + // Return an error on create. + return true, &api.Node{}, &apierrors.StatusError{ + ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonAlreadyExists}, + } + }) + kubeClient.AddReactor("get", "nodes", func(action testclient.Action) (bool, runtime.Object, error) { + // Return an existing (matching) node on get. + return true, &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{ExternalID: testKubeletHostname}, + }, nil + }) + kubeClient.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("no reaction implemented for %s", action) + }) + machineInfo := &cadvisorApi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorApi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + done := make(chan struct{}) + go func() { + kubelet.registerWithApiserver() + done <- struct{}{} + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("timed out waiting for registration") + case <-done: + return + } +} + func TestMakePortMappings(t *testing.T) { tests := []struct { container *api.Container diff --git a/pkg/kubelet/node_manager.go b/pkg/kubelet/node_manager.go deleted file mode 100644 index 773672b179e..00000000000 --- a/pkg/kubelet/node_manager.go +++ /dev/null @@ -1,513 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - "errors" - "fmt" - "net" - "strings" - "sync" - "time" - - "github.com/golang/glog" - cadvisorApi "github.com/google/cadvisor/info/v1" - "k8s.io/kubernetes/pkg/api" - apierrors "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/api/resource" - "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/client/record" - client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" - nodeutil "k8s.io/kubernetes/pkg/util/node" - "k8s.io/kubernetes/pkg/version" - "k8s.io/kubernetes/pkg/watch" -) - -const ( - // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. - nodeStatusUpdateRetry = 5 -) - -type infoGetter interface { - GetMachineInfo() (*cadvisorApi.MachineInfo, error) - ContainerRuntimeUp() bool - NetworkConfigured() bool - GetVersionInfo() (*cadvisorApi.VersionInfo, error) -} - -type nodeManager interface { - Start() - GetNode() (*api.Node, error) - GetPodCIDR() string - GetHostIP() (net.IP, error) -} - -type realNodeManager struct { - // apiserver client. - client client.Interface - - nodeLister nodeLister - - // Set to true to have the node register itself with the apiserver. - registerNode bool - - // nodeStatusUpdateFrequency specifies how often kubelet posts node status to master. - // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod - // in nodecontroller. There are several constraints: - // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where - // N means number of retries allowed for kubelet to post node status. It is pointless - // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there - // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. - // The constant must be less than podEvictionTimeout. - // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node - // status. Kubelet may fail to update node status reliably if the value is too small, - // as it takes time to gather all necessary node information. - nodeStatusUpdateFrequency time.Duration - - // Cloud provider interface - cloud cloudprovider.Interface - - nodeName string - hostname string - - // Number of Pods which can be run by this Kubelet. - pods int - - // The EventRecorder to use - recorder record.EventRecorder - - // Information about the ports which are opened by daemons on Node running this Kubelet server. - daemonEndpoints *api.NodeDaemonEndpoints - - // Interface to get machine and version info. - infoGetter infoGetter - - // Reference to this node. - nodeRef *api.ObjectReference - - // podCIDR may be updated by node.Spec. - podCIDR string - - // for internal book keeping; access only from within registerWithApiserver - registrationCompleted bool - - lock sync.RWMutex -} - -func newRealNodeManager(client client.Interface, cloud cloudprovider.Interface, registerNode bool, - nodeStatusUpdateFrequency time.Duration, recorder record.EventRecorder, nodeName, hostname, podCIDR string, - pods int, infoGetter infoGetter, daemonEndpoints *api.NodeDaemonEndpoints, nodeRef *api.ObjectReference) *realNodeManager { - return &realNodeManager{ - client: client, - cloud: cloud, - registerNode: registerNode, - nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, - recorder: recorder, - nodeName: nodeName, - hostname: hostname, - podCIDR: podCIDR, - pods: pods, - infoGetter: infoGetter, - daemonEndpoints: daemonEndpoints, - nodeRef: nodeRef, - } -} - -type nodeLister interface { - List() (machines api.NodeList, err error) - GetNodeInfo(id string) (*api.Node, error) -} - -func (nm *realNodeManager) Start() { - if nm.client == nil { - return - } - nm.setNodeLister() - go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop) -} - -func (nm *realNodeManager) GetPodCIDR() string { - nm.lock.RLock() - defer nm.lock.RUnlock() - return nm.podCIDR -} - -func (nm *realNodeManager) GetNode() (*api.Node, error) { - if nm.client == nil { - return nil, errors.New("unable to get node entry because apiserver client is nil") - } - return nm.nodeLister.GetNodeInfo(nm.nodeName) -} - -// Returns host IP or nil in case of error. -func (nm *realNodeManager) GetHostIP() (net.IP, error) { - if nm.client == nil { - return nil, errors.New("unable to get node entry because apiserver client is nil") - } - node, err := nm.GetNode() - if err != nil { - return nil, fmt.Errorf("cannot get node: %v", err) - } - return nodeutil.GetNodeHostIP(node) -} - -func (nm *realNodeManager) setNodeLister() { - nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather - // than an interface. There is no way to construct a list+watcher using resource name. - fieldSelector := fields.Set{client.ObjectNameField: nm.nodeName}.AsSelector() - listWatch := &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return nm.client.Nodes().List(labels.Everything(), fieldSelector) - }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return nm.client.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion) - }, - } - cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() - nm.nodeLister = &cache.StoreToNodeLister{Store: nodeStore} -} - -// syncNodeStatus should be called periodically from a goroutine. -// It synchronizes node status to master, registering the kubelet first if -// necessary. -func (nm *realNodeManager) syncNodeStatus() { - - if nm.registerNode { - // This will exit immediately if it doesn't need to do anything. - nm.registerWithApiserver() - } - if err := nm.updateNodeStatus(); err != nil { - glog.Errorf("Unable to update node status: %v", err) - } -} - -func (nm *realNodeManager) initialNodeStatus() (*api.Node, error) { - node := &api.Node{ - ObjectMeta: api.ObjectMeta{ - Name: nm.nodeName, - Labels: map[string]string{"kubernetes.io/hostname": nm.hostname}, - }, - } - if nm.cloud != nil { - instances, ok := nm.cloud.Instances() - if !ok { - return nil, fmt.Errorf("failed to get instances from cloud provider") - } - - // TODO(roberthbailey): Can we do this without having credentials to talk - // to the cloud provider? - // TODO: ExternalID is deprecated, we'll have to drop this code - externalID, err := instances.ExternalID(nm.nodeName) - if err != nil { - return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err) - } - node.Spec.ExternalID = externalID - - // TODO: We can't assume that the node has credentials to talk to the - // cloudprovider from arbitrary nodes. At most, we should talk to a - // local metadata server here. - node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(nm.cloud, nm.nodeName) - if err != nil { - return nil, err - } - } else { - node.Spec.ExternalID = nm.hostname - } - if err := nm.setNodeStatus(node); err != nil { - return nil, err - } - return node, nil -} - -// registerWithApiserver registers the node with the cluster master. It is safe -// to call multiple times, but not concurrently (nm.registrationCompleted is -// not locked). -func (nm *realNodeManager) registerWithApiserver() { - if nm.registrationCompleted { - return - } - step := 100 * time.Millisecond - for { - time.Sleep(step) - step = step * 2 - if step >= 7*time.Second { - step = 7 * time.Second - } - - node, err := nm.initialNodeStatus() - if err != nil { - glog.Errorf("Unable to construct api.Node object for kubelet: %v", err) - continue - } - glog.V(2).Infof("Attempting to register node %s", node.Name) - if _, err := nm.client.Nodes().Create(node); err != nil { - if !apierrors.IsAlreadyExists(err) { - glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err) - continue - } - currentNode, err := nm.client.Nodes().Get(nm.nodeName) - if err != nil { - glog.Errorf("error getting node %q: %v", nm.nodeName, err) - continue - } - if currentNode == nil { - glog.Errorf("no node instance returned for %q", nm.nodeName) - continue - } - if currentNode.Spec.ExternalID == node.Spec.ExternalID { - glog.Infof("Node %s was previously registered", node.Name) - nm.registrationCompleted = true - return - } - glog.Errorf( - "Previously %q had externalID %q; now it is %q; will delete and recreate.", - nm.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID, - ) - if err := nm.client.Nodes().Delete(node.Name); err != nil { - glog.Errorf("Unable to delete old node: %v", err) - } else { - glog.Errorf("Deleted old node object %q", nm.nodeName) - } - continue - } - glog.Infof("Successfully registered node %s", node.Name) - nm.registrationCompleted = true - return - } -} - -// setNodeStatus fills in the Status fields of the given Node, overwriting -// any fields that are currently set. -func (nm *realNodeManager) setNodeStatus(node *api.Node) error { - // Set addresses for the node. - if nm.cloud != nil { - instances, ok := nm.cloud.Instances() - if !ok { - return fmt.Errorf("failed to get instances from cloud provider") - } - // TODO(roberthbailey): Can we do this without having credentials to talk - // to the cloud provider? - // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface - nodeAddresses, err := instances.NodeAddresses(nm.nodeName) - if err != nil { - return fmt.Errorf("failed to get node address from cloud provider: %v", err) - } - node.Status.Addresses = nodeAddresses - } else { - addr := net.ParseIP(nm.hostname) - if addr != nil { - node.Status.Addresses = []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: addr.String()}, - {Type: api.NodeInternalIP, Address: addr.String()}, - } - } else { - addrs, err := net.LookupIP(node.Name) - if err != nil { - return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err) - } else if len(addrs) == 0 { - return fmt.Errorf("no ip address for node %v", node.Name) - } else { - // check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address. - // If no match is found, it uses the IP of the interface with gateway on it. - for _, ip := range addrs { - if ip.IsLoopback() { - continue - } - - if ip.To4() != nil { - node.Status.Addresses = []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: ip.String()}, - {Type: api.NodeInternalIP, Address: ip.String()}, - } - break - } - } - - if len(node.Status.Addresses) == 0 { - ip, err := util.ChooseHostInterface() - if err != nil { - return err - } - - node.Status.Addresses = []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: ip.String()}, - {Type: api.NodeInternalIP, Address: ip.String()}, - } - } - } - } - } - - // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start - // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. - info, err := nm.infoGetter.GetMachineInfo() - if err != nil { - // TODO(roberthbailey): This is required for test-cmd.sh to pass. - // See if the test should be updated instead. - node.Status.Capacity = api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), - api.ResourceMemory: resource.MustParse("0Gi"), - api.ResourcePods: *resource.NewQuantity(int64(nm.pods), resource.DecimalSI), - } - glog.Errorf("Error getting machine info: %v", err) - } else { - node.Status.NodeInfo.MachineID = info.MachineID - node.Status.NodeInfo.SystemUUID = info.SystemUUID - node.Status.Capacity = CapacityFromMachineInfo(info) - node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity( - int64(nm.pods), resource.DecimalSI) - if node.Status.NodeInfo.BootID != "" && - node.Status.NodeInfo.BootID != info.BootID { - // TODO: This requires a transaction, either both node status is updated - // and event is recorded or neither should happen, see issue #6055. - nm.recorder.Eventf(nm.nodeRef, "Rebooted", - "Node %s has been rebooted, boot id: %s", nm.nodeName, info.BootID) - } - node.Status.NodeInfo.BootID = info.BootID - } - - verinfo, err := nm.infoGetter.GetVersionInfo() - if err != nil { - glog.Errorf("Error getting version info: %v", err) - } else { - node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion - node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion - // TODO: Determine the runtime is docker or rocket - node.Status.NodeInfo.ContainerRuntimeVersion = "docker://" + verinfo.DockerVersion - node.Status.NodeInfo.KubeletVersion = version.Get().String() - // TODO: kube-proxy might be different version from kubelet in the future - node.Status.NodeInfo.KubeProxyVersion = version.Get().String() - } - - node.Status.DaemonEndpoints = *nm.daemonEndpoints - - // Check whether container runtime can be reported as up. - containerRuntimeUp := nm.infoGetter.ContainerRuntimeUp() - // Check whether network is configured properly - networkConfigured := nm.infoGetter.NetworkConfigured() - - currentTime := unversioned.Now() - var newNodeReadyCondition api.NodeCondition - var oldNodeReadyConditionStatus api.ConditionStatus - if containerRuntimeUp && networkConfigured { - newNodeReadyCondition = api.NodeCondition{ - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: "kubelet is posting ready status", - LastHeartbeatTime: currentTime, - } - } else { - var reasons []string - var messages []string - if !containerRuntimeUp { - messages = append(messages, "container runtime is down") - } - if !networkConfigured { - messages = append(reasons, "network not configured correctly") - } - newNodeReadyCondition = api.NodeCondition{ - Type: api.NodeReady, - Status: api.ConditionFalse, - Reason: "KubeletNotReady", - Message: strings.Join(messages, ","), - LastHeartbeatTime: currentTime, - } - } - - updated := false - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == api.NodeReady { - oldNodeReadyConditionStatus = node.Status.Conditions[i].Status - if oldNodeReadyConditionStatus == newNodeReadyCondition.Status { - newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime - } else { - newNodeReadyCondition.LastTransitionTime = currentTime - } - node.Status.Conditions[i] = newNodeReadyCondition - updated = true - } - } - if !updated { - newNodeReadyCondition.LastTransitionTime = currentTime - node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition) - } - if !updated || oldNodeReadyConditionStatus != newNodeReadyCondition.Status { - if newNodeReadyCondition.Status == api.ConditionTrue { - nm.recordNodeStatusEvent("NodeReady") - } else { - nm.recordNodeStatusEvent("NodeNotReady") - } - } - if oldNodeUnschedulable != node.Spec.Unschedulable { - if node.Spec.Unschedulable { - nm.recordNodeStatusEvent("NodeNotSchedulable") - } else { - nm.recordNodeStatusEvent("NodeSchedulable") - } - oldNodeUnschedulable = node.Spec.Unschedulable - } - return nil -} - -// updateNodeStatus updates node status to master with retries. -func (nm *realNodeManager) updateNodeStatus() error { - for i := 0; i < nodeStatusUpdateRetry; i++ { - if err := nm.tryUpdateNodeStatus(); err != nil { - glog.Errorf("Error updating node status, will retry: %v", err) - } else { - return nil - } - } - return fmt.Errorf("update node status exceeds retry count") -} - -func (nm *realNodeManager) recordNodeStatusEvent(event string) { - glog.V(2).Infof("Recording %s event message for node %s", event, nm.nodeName) - // TODO: This requires a transaction, either both node status is updated - // and event is recorded or neither should happen, see issue #6055. - nm.recorder.Eventf(nm.nodeRef, event, "Node %s status is now: %s", nm.nodeName, event) -} - -// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 -// is set, this function will also confirm that cbr0 is configured correctly. -func (nm *realNodeManager) tryUpdateNodeStatus() error { - node, err := nm.client.Nodes().Get(nm.nodeName) - if err != nil { - return fmt.Errorf("error getting node %q: %v", nm.nodeName, err) - } - if node == nil { - return fmt.Errorf("no node instance returned for %q", nm.nodeName) - } - nm.lock.Lock() - defer nm.lock.Unlock() - nm.podCIDR = node.Spec.PodCIDR - - if err := nm.setNodeStatus(node); err != nil { - return err - } - // Update the current status on the API server - _, err = nm.client.Nodes().UpdateStatus(node) - return err -} diff --git a/pkg/kubelet/node_manager_test.go b/pkg/kubelet/node_manager_test.go deleted file mode 100644 index 0c5f0966b2c..00000000000 --- a/pkg/kubelet/node_manager_test.go +++ /dev/null @@ -1,445 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - "fmt" - "reflect" - "testing" - "time" - - cadvisorApi "github.com/google/cadvisor/info/v1" - "k8s.io/kubernetes/pkg/api" - apierrors "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/api/resource" - "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/client/record" - "k8s.io/kubernetes/pkg/client/unversioned/testclient" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/version" -) - -type fakeInfoGetter struct { - machineInfo *cadvisorApi.MachineInfo - versionInfo *cadvisorApi.VersionInfo - runtimeUp bool - networkConfigured bool -} - -func (f *fakeInfoGetter) GetMachineInfo() (*cadvisorApi.MachineInfo, error) { - return f.machineInfo, nil -} - -func (f *fakeInfoGetter) GetVersionInfo() (*cadvisorApi.VersionInfo, error) { - return f.versionInfo, nil -} - -func (f *fakeInfoGetter) ContainerRuntimeUp() bool { - return f.runtimeUp -} - -func (f *fakeInfoGetter) NetworkConfigured() bool { - return f.networkConfigured -} - -var _ infoGetter = &fakeInfoGetter{} - -type testNodeLister struct { - nodes []api.Node -} - -func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) { - for _, node := range ls.nodes { - if node.Name == id { - return &node, nil - } - } - return nil, fmt.Errorf("Node with name: %s does not exist", id) -} - -func (ls testNodeLister) List() (api.NodeList, error) { - return api.NodeList{ - Items: ls.nodes, - }, nil -} - -type testNodeManager struct { - fakeClient *testclient.Fake - fakeInfoGetter *fakeInfoGetter - nodeManager *realNodeManager -} - -func newTestNodeManager() *testNodeManager { - fakeRecorder := &record.FakeRecorder{} - fakeClient := &testclient.Fake{} - fakeInfoGetter := &fakeInfoGetter{} - nodeManager := newRealNodeManager(fakeClient, nil, true, time.Second, fakeRecorder, testKubeletHostname, - testKubeletHostname, "", 0, fakeInfoGetter, &api.NodeDaemonEndpoints{}, nil) - nodeManager.nodeLister = &testNodeLister{} - return &testNodeManager{fakeClient: fakeClient, fakeInfoGetter: fakeInfoGetter, nodeManager: nodeManager} -} - -func TestUpdateNewNodeStatus(t *testing.T) { - testNodeManager := newTestNodeManager() - nodeManager := testNodeManager.nodeManager - client := testNodeManager.fakeClient - fakeInfoGetter := testNodeManager.fakeInfoGetter - - client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, - }}).ReactionChain - - fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - - expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, - }, - NodeInfo: api.NodeSystemInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - KernelVersion: "3.16.0-0.bpo.4-amd64", - OsImage: "Debian GNU/Linux 7 (wheezy)", - ContainerRuntimeVersion: "docker://1.5.0", - KubeletVersion: version.Get().String(), - KubeProxyVersion: version.Get().String(), - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, - {Type: api.NodeInternalIP, Address: "127.0.0.1"}, - }, - }, - } - fakeInfoGetter.runtimeUp = true - fakeInfoGetter.networkConfigured = true - - if err := nodeManager.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := client.Actions() - if len(actions) != 2 { - t.Fatalf("unexpected actions: %v", actions) - } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { - t.Fatalf("unexpected actions: %v", actions) - } - updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected object type") - } - if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{} - updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{} - if !reflect.DeepEqual(expectedNode, updatedNode) { - t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) - } -} - -func TestUpdateExistingNodeStatus(t *testing.T) { - testNodeManager := newTestNodeManager() - nodeManager := testNodeManager.nodeManager - client := testNodeManager.fakeClient - fakeInfoGetter := testNodeManager.fakeInfoGetter - - client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(2048, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - }, - }, - }}).ReactionChain - fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - - expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: unversioned.Time{}, // placeholder - LastTransitionTime: unversioned.Time{}, // placeholder - }, - }, - NodeInfo: api.NodeSystemInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - KernelVersion: "3.16.0-0.bpo.4-amd64", - OsImage: "Debian GNU/Linux 7 (wheezy)", - ContainerRuntimeVersion: "docker://1.5.0", - KubeletVersion: version.Get().String(), - KubeProxyVersion: version.Get().String(), - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, - {Type: api.NodeInternalIP, Address: "127.0.0.1"}, - }, - }, - } - fakeInfoGetter.runtimeUp = true - fakeInfoGetter.networkConfigured = true - - if err := nodeManager.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := client.Actions() - if len(actions) != 2 { - t.Errorf("unexpected actions: %v", actions) - } - updateAction, ok := actions[1].(testclient.UpdateAction) - if !ok { - t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) - } - updatedNode, ok := updateAction.GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected object type") - } - // Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same. - if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastHeartbeatTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) { - t.Errorf("expected \n%v\n, got \n%v", unversioned.Now(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) - } - if !reflect.DeepEqual(updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) { - t.Errorf("expected \n%#v\n, got \n%#v", updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy(), - unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) - } - updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{} - updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{} - if !reflect.DeepEqual(expectedNode, updatedNode) { - t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) - } -} - -func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { - testNodeManager := newTestNodeManager() - nodeManager := testNodeManager.nodeManager - client := testNodeManager.fakeClient - fakeInfoGetter := testNodeManager.fakeInfoGetter - - client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, - }}).ReactionChain - - fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - - expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFalse, - Reason: "KubeletNotReady", - Message: fmt.Sprintf("container runtime is down"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, - }, - NodeInfo: api.NodeSystemInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - KernelVersion: "3.16.0-0.bpo.4-amd64", - OsImage: "Debian GNU/Linux 7 (wheezy)", - ContainerRuntimeVersion: "docker://1.5.0", - KubeletVersion: version.Get().String(), - KubeProxyVersion: version.Get().String(), - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, - {Type: api.NodeInternalIP, Address: "127.0.0.1"}, - }, - }, - } - // Pretend that container runtime is down. - fakeInfoGetter.runtimeUp = false - fakeInfoGetter.networkConfigured = true - - if err := nodeManager.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := client.Actions() - if len(actions) != 2 { - t.Fatalf("unexpected actions: %v", actions) - } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { - t.Fatalf("unexpected actions: %v", actions) - } - updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) - } - - if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{} - updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{} - if !reflect.DeepEqual(expectedNode, updatedNode) { - t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) - } -} - -func TestUpdateNodeStatusError(t *testing.T) { - testNodeManager := newTestNodeManager() - nodeManager := testNodeManager.nodeManager - client := testNodeManager.fakeClient - // No matching node for the kubelet - client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{}}).ReactionChain - - if err := nodeManager.updateNodeStatus(); err == nil { - t.Errorf("unexpected non error: %v", err) - } - if len(client.Actions()) != nodeStatusUpdateRetry { - t.Errorf("unexpected actions: %v", client.Actions()) - } -} - -func TestRegisterExistingNodeWithApiserver(t *testing.T) { - testNodeManager := newTestNodeManager() - nodeManager := testNodeManager.nodeManager - client := testNodeManager.fakeClient - fakeInfoGetter := testNodeManager.fakeInfoGetter - client.AddReactor("create", "nodes", func(action testclient.Action) (bool, runtime.Object, error) { - // Return an error on create. - return true, &api.Node{}, &apierrors.StatusError{ - ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonAlreadyExists}, - } - }) - client.AddReactor("get", "nodes", func(action testclient.Action) (bool, runtime.Object, error) { - // Return an existing (matching) node on get. - return true, &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{ExternalID: testKubeletHostname}, - }, nil - }) - client.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) { - return true, nil, fmt.Errorf("no reaction implemented for %s", action) - }) - fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - - done := make(chan struct{}) - go func() { - nodeManager.registerWithApiserver() - done <- struct{}{} - }() - select { - case <-time.After(5 * time.Second): - t.Errorf("timed out waiting for registration") - case <-done: - return - } -} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index e5c25b10fff..f236ebfa6cd 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -83,11 +83,11 @@ func TestRunOnce(t *testing.T) { rootDirectory: "/tmp/kubelet", recorder: &record.FakeRecorder{}, cadvisor: cadvisor, + nodeLister: testNodeLister{}, statusManager: status.NewManager(nil), containerRefManager: kubecontainer.NewRefManager(), readinessManager: kubecontainer.NewReadinessManager(), podManager: podManager, - nodeManager: &fakeNodeManager{}, os: kubecontainer.FakeOS{}, volumeManager: newVolumeManager(), diskSpaceManager: diskSpaceManager, diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 250ebadacc7..b91fc972385 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -100,7 +100,7 @@ type HostInterface interface { GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) GetContainerRuntimeVersion() (kubecontainer.Version, error) GetRawContainerInfo(containerName string, req *cadvisorApi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorApi.ContainerInfo, error) - GetMachineInfo() (*cadvisorApi.MachineInfo, error) + GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) GetPods() []*api.Pod GetRunningPods() ([]*api.Pod, error) GetPodByName(namespace, name string) (*api.Pod, bool) @@ -440,7 +440,7 @@ func (s *Server) getLogs(request *restful.Request, response *restful.Response) { // getSpec handles spec requests against the Kubelet. func (s *Server) getSpec(request *restful.Request, response *restful.Response) { - info, err := s.host.GetMachineInfo() + info, err := s.host.GetCachedMachineInfo() if err != nil { response.WriteError(http.StatusInternalServerError, err) return diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index d4e130e9ed4..b1301d87add 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -86,7 +86,7 @@ func (fk *fakeKubelet) GetContainerRuntimeVersion() (kubecontainer.Version, erro return fk.containerVersionFunc() } -func (fk *fakeKubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) { +func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) { return fk.machineInfoFunc() }