diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 8b191d3291b..01836c18b84 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -36,7 +36,6 @@ import ( "github.com/golang/glog" cadvisorApi "github.com/google/cadvisor/info/v1" "k8s.io/kubernetes/pkg/api" - apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/client/cache" @@ -66,7 +65,6 @@ import ( "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/sets" - "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" @@ -77,9 +75,6 @@ const ( // Max amount of time to wait for the container runtime to come up. maxWaitForContainerRuntime = 5 * time.Minute - // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. - nodeStatusUpdateRetry = 5 - // Location of container logs. containerLogsDir = "/var/log/containers" @@ -261,7 +256,6 @@ func NewMainKubelet( readinessManager: readinessManager, httpClient: &http.Client{}, sourcesReady: sourcesReady, - registerNode: registerNode, standaloneMode: standaloneMode, clusterDomain: clusterDomain, clusterDNS: clusterDNS, @@ -281,19 +275,16 @@ func NewMainKubelet( volumeManager: volumeManager, cloud: cloud, nodeRef: nodeRef, - nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, resourceContainer: resourceContainer, os: osInterface, oomWatcher: oomWatcher, cgroupRoot: cgroupRoot, mounter: mounter, configureCBR0: configureCBR0, - podCIDR: podCIDR, pods: pods, syncLoopMonitor: util.AtomicValue{}, resolverConfig: resolverConfig, cpuCFSQuota: cpuCFSQuota, - daemonEndpoints: daemonEndpoints, } if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil { @@ -302,7 +293,7 @@ func NewMainKubelet( klet.networkPlugin = plug } - machineInfo, err := klet.GetCachedMachineInfo() + machineInfo, err := klet.GetMachineInfo() if err != nil { return nil, err } @@ -364,11 +355,10 @@ func NewMainKubelet( } klet.containerManager = containerManager + klet.nodeManager = newRealNodeManager(kubeClient, cloud, registerNode, nodeStatusUpdateFrequency, recorder, nodeName, hostname, podCIDR, pods, klet, daemonEndpoints, nodeRef) + klet.nodeManager.Start() + go util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop) - if klet.kubeClient != nil { - // Start syncing node status immediately, this may set up things the runtime needs to run. - go util.Until(klet.syncNodeStatus, klet.nodeStatusUpdateFrequency, util.NeverStop) - } // Wait for the runtime to be up with a timeout. if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil { @@ -445,8 +435,6 @@ type Kubelet struct { // cAdvisor used for container information. cadvisor cadvisor.Interface - // Set to true to have the node register itself with the apiserver. - registerNode bool // for internal book keeping; access only from within registerWithApiserver registrationCompleted bool @@ -501,13 +489,18 @@ type Kubelet struct { // Cached MachineInfo returned by cadvisor. machineInfo *cadvisorApi.MachineInfo + // nodeManager handles interaction of api.Node object with apiserver. This + // includes watching the nodes, registering itself to the apiserver, and + // sync node status. + nodeManager nodeManager + // Syncs pods statuses with apiserver; also used as a cache of statuses. statusManager status.Manager // Manager for the volume maps for the pods. volumeManager *volumeManager - //Cloud provider interface + // Cloud provider interface cloud cloudprovider.Interface // Reference to this node. @@ -516,19 +509,6 @@ type Kubelet struct { // Container runtime. containerRuntime kubecontainer.Runtime - // nodeStatusUpdateFrequency specifies how often kubelet posts node status to master. - // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod - // in nodecontroller. There are several constraints: - // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where - // N means number of retries allowed for kubelet to post node status. It is pointless - // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there - // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. - // The constant must be less than podEvictionTimeout. - // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node - // status. Kubelet may fail to update node status reliably if the value is too small, - // as it takes time to gather all necessary node information. - nodeStatusUpdateFrequency time.Duration - // The name of the resource-only container to run the Kubelet in (empty for no container). // Name must be absolute. resourceContainer string @@ -550,7 +530,6 @@ type Kubelet struct { // Whether or not kubelet should take responsibility for keeping cbr0 in // the correct state. configureCBR0 bool - podCIDR string // Number of Pods which can be run by this Kubelet pods int @@ -574,9 +553,6 @@ type Kubelet struct { // True if container cpu limits should be enforced via cgroup CFS quota cpuCFSQuota bool - - // Information about the ports which are opened by daemons on Node running this Kubelet server. - daemonEndpoints *api.NodeDaemonEndpoints } // getRootDir returns the full path to the directory under which kubelet can @@ -789,117 +765,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { kl.syncLoop(updates, kl) } -func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { - node := &api.Node{ - ObjectMeta: api.ObjectMeta{ - Name: kl.nodeName, - Labels: map[string]string{"kubernetes.io/hostname": kl.hostname}, - }, - } - if kl.cloud != nil { - instances, ok := kl.cloud.Instances() - if !ok { - return nil, fmt.Errorf("failed to get instances from cloud provider") - } - - // TODO(roberthbailey): Can we do this without having credentials to talk - // to the cloud provider? - // TODO: ExternalID is deprecated, we'll have to drop this code - externalID, err := instances.ExternalID(kl.nodeName) - if err != nil { - return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err) - } - node.Spec.ExternalID = externalID - - // TODO: We can't assume that the node has credentials to talk to the - // cloudprovider from arbitrary nodes. At most, we should talk to a - // local metadata server here. - node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName) - if err != nil { - return nil, err - } - } else { - node.Spec.ExternalID = kl.hostname - } - if err := kl.setNodeStatus(node); err != nil { - return nil, err - } - return node, nil -} - -// registerWithApiserver registers the node with the cluster master. It is safe -// to call multiple times, but not concurrently (kl.registrationCompleted is -// not locked). -func (kl *Kubelet) registerWithApiserver() { - if kl.registrationCompleted { - return - } - step := 100 * time.Millisecond - for { - time.Sleep(step) - step = step * 2 - if step >= 7*time.Second { - step = 7 * time.Second - } - - node, err := kl.initialNodeStatus() - if err != nil { - glog.Errorf("Unable to construct api.Node object for kubelet: %v", err) - continue - } - glog.V(2).Infof("Attempting to register node %s", node.Name) - if _, err := kl.kubeClient.Nodes().Create(node); err != nil { - if !apierrors.IsAlreadyExists(err) { - glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err) - continue - } - currentNode, err := kl.kubeClient.Nodes().Get(kl.nodeName) - if err != nil { - glog.Errorf("error getting node %q: %v", kl.nodeName, err) - continue - } - if currentNode == nil { - glog.Errorf("no node instance returned for %q", kl.nodeName) - continue - } - if currentNode.Spec.ExternalID == node.Spec.ExternalID { - glog.Infof("Node %s was previously registered", node.Name) - kl.registrationCompleted = true - return - } - glog.Errorf( - "Previously %q had externalID %q; now it is %q; will delete and recreate.", - kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID, - ) - if err := kl.kubeClient.Nodes().Delete(node.Name); err != nil { - glog.Errorf("Unable to delete old node: %v", err) - } else { - glog.Errorf("Deleted old node object %q", kl.nodeName) - } - continue - } - glog.Infof("Successfully registered node %s", node.Name) - kl.registrationCompleted = true - return - } -} - -// syncNodeStatus should be called periodically from a goroutine. -// It synchronizes node status to master, registering the kubelet first if -// necessary. -func (kl *Kubelet) syncNodeStatus() { - if kl.kubeClient == nil { - return - } - if kl.registerNode { - // This will exit immediately if it doesn't need to do anything. - kl.registerWithApiserver() - } - if err := kl.updateNodeStatus(); err != nil { - glog.Errorf("Unable to update node status: %v", err) - } -} - func makeMounts(container *api.Container, podVolumes kubecontainer.VolumeMap) (mounts []kubecontainer.Mount) { for _, mount := range container.VolumeMounts { vol, ok := podVolumes[mount.Name] @@ -1777,7 +1642,7 @@ func hasHostPortConflicts(pods []*api.Pod) bool { // TODO: Consider integrate disk space into this function, and returns a // suitable reason and message per resource type. func (kl *Kubelet) hasInsufficientfFreeResources(pods []*api.Pod) (bool, bool) { - info, err := kl.GetCachedMachineInfo() + info, err := kl.GetMachineInfo() if err != nil { glog.Errorf("error getting machine info: %v", err) // TODO: Should we admit the pod when machine info is unavailable? @@ -1867,12 +1732,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.") var housekeepingTimestamp time.Time for { - if !kl.containerRuntimeUp() { + if !kl.ContainerRuntimeUp() { time.Sleep(5 * time.Second) glog.Infof("Skipping pod synchronization, container runtime is not up.") continue } - if !kl.doneNetworkConfigure() { + if !kl.NetworkConfigured() { time.Sleep(5 * time.Second) glog.Infof("Skipping pod synchronization, network is not configured") continue @@ -2032,6 +1897,10 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time { return val.(time.Time) } +func (kl *Kubelet) GetVersionInfo() (*cadvisorApi.VersionInfo, error) { + return kl.cadvisor.VersionInfo() +} + // Returns the container runtime version for this Kubelet. func (kl *Kubelet) GetContainerRuntimeVersion() (kubecontainer.Version, error) { if kl.containerRuntime == nil { @@ -2186,25 +2055,6 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error { return kl.shaper.ReconcileInterface() } -// updateNodeStatus updates node status to master with retries. -func (kl *Kubelet) updateNodeStatus() error { - for i := 0; i < nodeStatusUpdateRetry; i++ { - if err := kl.tryUpdateNodeStatus(); err != nil { - glog.Errorf("Error updating node status, will retry: %v", err) - } else { - return nil - } - } - return fmt.Errorf("update node status exceeds retry count") -} - -func (kl *Kubelet) recordNodeStatusEvent(event string) { - glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName) - // TODO: This requires a transaction, either both node status is updated - // and event is recorded or neither should happen, see issue #6055. - kl.recorder.Eventf(kl.nodeRef, event, "Node %s status is now: %s", kl.nodeName, event) -} - // Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() var oldNodeUnschedulable bool @@ -2218,10 +2068,10 @@ func (kl *Kubelet) syncNetworkStatus() { networkConfigured = false glog.Errorf("Error on adding ip table rules: %v", err) } - if len(kl.podCIDR) == 0 { + if len(kl.nodeManager.GetPodCIDR()) == 0 { glog.Warningf("ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now") networkConfigured = false - } else if err := kl.reconcileCBR0(kl.podCIDR); err != nil { + } else if err := kl.reconcileCBR0(kl.nodeManager.GetPodCIDR()); err != nil { networkConfigured = false glog.Errorf("Error configuring cbr0: %v", err) } @@ -2229,214 +2079,18 @@ func (kl *Kubelet) syncNetworkStatus() { kl.networkConfigured = networkConfigured } -// setNodeStatus fills in the Status fields of the given Node, overwriting -// any fields that are currently set. -func (kl *Kubelet) setNodeStatus(node *api.Node) error { - // Set addresses for the node. - if kl.cloud != nil { - instances, ok := kl.cloud.Instances() - if !ok { - return fmt.Errorf("failed to get instances from cloud provider") - } - // TODO(roberthbailey): Can we do this without having credentials to talk - // to the cloud provider? - // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface - nodeAddresses, err := instances.NodeAddresses(kl.nodeName) - if err != nil { - return fmt.Errorf("failed to get node address from cloud provider: %v", err) - } - node.Status.Addresses = nodeAddresses - } else { - addr := net.ParseIP(kl.hostname) - if addr != nil { - node.Status.Addresses = []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: addr.String()}, - {Type: api.NodeInternalIP, Address: addr.String()}, - } - } else { - addrs, err := net.LookupIP(node.Name) - if err != nil { - return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err) - } else if len(addrs) == 0 { - return fmt.Errorf("no ip address for node %v", node.Name) - } else { - // check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address. - // If no match is found, it uses the IP of the interface with gateway on it. - for _, ip := range addrs { - if ip.IsLoopback() { - continue - } - - if ip.To4() != nil { - node.Status.Addresses = []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: ip.String()}, - {Type: api.NodeInternalIP, Address: ip.String()}, - } - break - } - } - - if len(node.Status.Addresses) == 0 { - ip, err := util.ChooseHostInterface() - if err != nil { - return err - } - - node.Status.Addresses = []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: ip.String()}, - {Type: api.NodeInternalIP, Address: ip.String()}, - } - } - } - } - } - - // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start - // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. - info, err := kl.GetCachedMachineInfo() - if err != nil { - // TODO(roberthbailey): This is required for test-cmd.sh to pass. - // See if the test should be updated instead. - node.Status.Capacity = api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), - api.ResourceMemory: resource.MustParse("0Gi"), - api.ResourcePods: *resource.NewQuantity(int64(kl.pods), resource.DecimalSI), - } - glog.Errorf("Error getting machine info: %v", err) - } else { - node.Status.NodeInfo.MachineID = info.MachineID - node.Status.NodeInfo.SystemUUID = info.SystemUUID - node.Status.Capacity = CapacityFromMachineInfo(info) - node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity( - int64(kl.pods), resource.DecimalSI) - if node.Status.NodeInfo.BootID != "" && - node.Status.NodeInfo.BootID != info.BootID { - // TODO: This requires a transaction, either both node status is updated - // and event is recorded or neither should happen, see issue #6055. - kl.recorder.Eventf(kl.nodeRef, "Rebooted", - "Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID) - } - node.Status.NodeInfo.BootID = info.BootID - } - - verinfo, err := kl.cadvisor.VersionInfo() - if err != nil { - glog.Errorf("Error getting version info: %v", err) - } else { - node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion - node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion - // TODO: Determine the runtime is docker or rocket - node.Status.NodeInfo.ContainerRuntimeVersion = "docker://" + verinfo.DockerVersion - node.Status.NodeInfo.KubeletVersion = version.Get().String() - // TODO: kube-proxy might be different version from kubelet in the future - node.Status.NodeInfo.KubeProxyVersion = version.Get().String() - } - - node.Status.DaemonEndpoints = *kl.daemonEndpoints - - // Check whether container runtime can be reported as up. - containerRuntimeUp := kl.containerRuntimeUp() - // Check whether network is configured properly - networkConfigured := kl.doneNetworkConfigure() - - currentTime := util.Now() - var newNodeReadyCondition api.NodeCondition - var oldNodeReadyConditionStatus api.ConditionStatus - if containerRuntimeUp && networkConfigured { - newNodeReadyCondition = api.NodeCondition{ - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: "kubelet is posting ready status", - LastHeartbeatTime: currentTime, - } - } else { - var reasons []string - var messages []string - if !containerRuntimeUp { - messages = append(messages, "container runtime is down") - } - if !networkConfigured { - messages = append(reasons, "network not configured correctly") - } - newNodeReadyCondition = api.NodeCondition{ - Type: api.NodeReady, - Status: api.ConditionFalse, - Reason: "KubeletNotReady", - Message: strings.Join(messages, ","), - LastHeartbeatTime: currentTime, - } - } - - updated := false - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == api.NodeReady { - oldNodeReadyConditionStatus = node.Status.Conditions[i].Status - if oldNodeReadyConditionStatus == newNodeReadyCondition.Status { - newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime - } else { - newNodeReadyCondition.LastTransitionTime = currentTime - } - node.Status.Conditions[i] = newNodeReadyCondition - updated = true - } - } - if !updated { - newNodeReadyCondition.LastTransitionTime = currentTime - node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition) - } - if !updated || oldNodeReadyConditionStatus != newNodeReadyCondition.Status { - if newNodeReadyCondition.Status == api.ConditionTrue { - kl.recordNodeStatusEvent("NodeReady") - } else { - kl.recordNodeStatusEvent("NodeNotReady") - } - } - if oldNodeUnschedulable != node.Spec.Unschedulable { - if node.Spec.Unschedulable { - kl.recordNodeStatusEvent("NodeNotSchedulable") - } else { - kl.recordNodeStatusEvent("NodeSchedulable") - } - oldNodeUnschedulable = node.Spec.Unschedulable - } - return nil -} - -func (kl *Kubelet) containerRuntimeUp() bool { +func (kl *Kubelet) ContainerRuntimeUp() bool { kl.runtimeMutex.Lock() defer kl.runtimeMutex.Unlock() return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now()) } -func (kl *Kubelet) doneNetworkConfigure() bool { +func (kl *Kubelet) NetworkConfigured() bool { kl.networkConfigMutex.Lock() defer kl.networkConfigMutex.Unlock() return kl.networkConfigured } -// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 -// is set, this function will also confirm that cbr0 is configured correctly. -func (kl *Kubelet) tryUpdateNodeStatus() error { - node, err := kl.kubeClient.Nodes().Get(kl.nodeName) - if err != nil { - return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) - } - if node == nil { - return fmt.Errorf("no node instance returned for %q", kl.nodeName) - } - kl.networkConfigMutex.Lock() - kl.podCIDR = node.Spec.PodCIDR - kl.networkConfigMutex.Unlock() - - if err := kl.setNodeStatus(node); err != nil { - return err - } - // Update the current status on the API server - _, err = kl.kubeClient.Nodes().UpdateStatus(node) - return err -} - // GetPhase returns the phase of a pod given its container info. // This func is exported to simplify integration with 3rd party kubelet // integrations like kubernetes-mesos. @@ -2734,8 +2388,8 @@ func (kl *Kubelet) GetRawContainerInfo(containerName string, req *cadvisorApi.Co } } -// GetCachedMachineInfo assumes that the machine info can't change without a reboot -func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) { +// GetMachineInfo assumes that the machine info can't change without a reboot +func (kl *Kubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) { if kl.machineInfo == nil { info, err := kl.cadvisor.MachineInfo() if err != nil { diff --git a/pkg/kubelet/node_manager.go b/pkg/kubelet/node_manager.go new file mode 100644 index 00000000000..e6c506e398c --- /dev/null +++ b/pkg/kubelet/node_manager.go @@ -0,0 +1,461 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +// Note: if you change code in this file, you might need to change code in +// contrib/mesos/pkg/executor/. + +import ( + "fmt" + "net" + "strings" + "sync" + "time" + + "github.com/golang/glog" + cadvisorApi "github.com/google/cadvisor/info/v1" + "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/version" +) + +const ( + // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. + nodeStatusUpdateRetry = 5 +) + +type infoGetter interface { + GetMachineInfo() (*cadvisorApi.MachineInfo, error) + ContainerRuntimeUp() bool + NetworkConfigured() bool + GetVersionInfo() (*cadvisorApi.VersionInfo, error) +} + +type nodeManager interface { + Start() + GetPodCIDR() string +} + +type realNodeManager struct { + // apiserver client. + client client.Interface + + // Set to true to have the node register itself with the apiserver. + registerNode bool + + // nodeStatusUpdateFrequency specifies how often kubelet posts node status to master. + // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod + // in nodecontroller. There are several constraints: + // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where + // N means number of retries allowed for kubelet to post node status. It is pointless + // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there + // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. + // The constant must be less than podEvictionTimeout. + // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node + // status. Kubelet may fail to update node status reliably if the value is too small, + // as it takes time to gather all necessary node information. + nodeStatusUpdateFrequency time.Duration + + // Cloud provider interface + cloud cloudprovider.Interface + + nodeName string + hostname string + + // Number of Pods which can be run by this Kubelet. + pods int + + // The EventRecorder to use + recorder record.EventRecorder + + // Information about the ports which are opened by daemons on Node running this Kubelet server. + daemonEndpoints *api.NodeDaemonEndpoints + + // Interface to get machine and version info. + infoGetter infoGetter + + // Reference to this node. + nodeRef *api.ObjectReference + + // podCIDR may be updated by node.Spec. + podCIDR string + + // for internal book keeping; access only from within registerWithApiserver + registrationCompleted bool + + lock sync.RWMutex +} + +func newRealNodeManager(client client.Interface, cloud cloudprovider.Interface, registerNode bool, + nodeStatusUpdateFrequency time.Duration, recorder record.EventRecorder, nodeName, hostname, podCIDR string, + pods int, infoGetter infoGetter, daemonEndpoints *api.NodeDaemonEndpoints, nodeRef *api.ObjectReference) *realNodeManager { + return &realNodeManager{ + client: client, + cloud: cloud, + registerNode: registerNode, + nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, + recorder: recorder, + nodeName: nodeName, + hostname: hostname, + podCIDR: podCIDR, + pods: pods, + infoGetter: infoGetter, + daemonEndpoints: daemonEndpoints, + nodeRef: nodeRef, + } +} + +func (nm *realNodeManager) Start() { + if nm.client != nil { + go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop) + } +} + +func (nm *realNodeManager) GetPodCIDR() string { + nm.lock.RLock() + defer nm.lock.RUnlock() + return nm.podCIDR +} + +// syncNodeStatus should be called periodically from a goroutine. +// It synchronizes node status to master, registering the kubelet first if +// necessary. +func (nm *realNodeManager) syncNodeStatus() { + + if nm.registerNode { + // This will exit immediately if it doesn't need to do anything. + nm.registerWithApiserver() + } + if err := nm.updateNodeStatus(); err != nil { + glog.Errorf("Unable to update node status: %v", err) + } +} + +func (nm *realNodeManager) initialNodeStatus() (*api.Node, error) { + node := &api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: nm.nodeName, + Labels: map[string]string{"kubernetes.io/hostname": nm.hostname}, + }, + } + if nm.cloud != nil { + instances, ok := nm.cloud.Instances() + if !ok { + return nil, fmt.Errorf("failed to get instances from cloud provider") + } + + // TODO(roberthbailey): Can we do this without having credentials to talk + // to the cloud provider? + // TODO: ExternalID is deprecated, we'll have to drop this code + externalID, err := instances.ExternalID(nm.nodeName) + if err != nil { + return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err) + } + node.Spec.ExternalID = externalID + + // TODO: We can't assume that the node has credentials to talk to the + // cloudprovider from arbitrary nodes. At most, we should talk to a + // local metadata server here. + node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(nm.cloud, nm.nodeName) + if err != nil { + return nil, err + } + } else { + node.Spec.ExternalID = nm.hostname + } + if err := nm.setNodeStatus(node); err != nil { + return nil, err + } + return node, nil +} + +// registerWithApiserver registers the node with the cluster master. It is safe +// to call multiple times, but not concurrently (nm.registrationCompleted is +// not locked). +func (nm *realNodeManager) registerWithApiserver() { + if nm.registrationCompleted { + return + } + step := 100 * time.Millisecond + for { + time.Sleep(step) + step = step * 2 + if step >= 7*time.Second { + step = 7 * time.Second + } + + node, err := nm.initialNodeStatus() + if err != nil { + glog.Errorf("Unable to construct api.Node object for kubelet: %v", err) + continue + } + glog.V(2).Infof("Attempting to register node %s", node.Name) + if _, err := nm.client.Nodes().Create(node); err != nil { + if !apierrors.IsAlreadyExists(err) { + glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err) + continue + } + currentNode, err := nm.client.Nodes().Get(nm.nodeName) + if err != nil { + glog.Errorf("error getting node %q: %v", nm.nodeName, err) + continue + } + if currentNode == nil { + glog.Errorf("no node instance returned for %q", nm.nodeName) + continue + } + if currentNode.Spec.ExternalID == node.Spec.ExternalID { + glog.Infof("Node %s was previously registered", node.Name) + nm.registrationCompleted = true + return + } + glog.Errorf( + "Previously %q had externalID %q; now it is %q; will delete and recreate.", + nm.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID, + ) + if err := nm.client.Nodes().Delete(node.Name); err != nil { + glog.Errorf("Unable to delete old node: %v", err) + } else { + glog.Errorf("Deleted old node object %q", nm.nodeName) + } + continue + } + glog.Infof("Successfully registered node %s", node.Name) + nm.registrationCompleted = true + return + } +} + +// setNodeStatus fills in the Status fields of the given Node, overwriting +// any fields that are currently set. +func (nm *realNodeManager) setNodeStatus(node *api.Node) error { + // Set addresses for the node. + if nm.cloud != nil { + instances, ok := nm.cloud.Instances() + if !ok { + return fmt.Errorf("failed to get instances from cloud provider") + } + // TODO(roberthbailey): Can we do this without having credentials to talk + // to the cloud provider? + // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface + nodeAddresses, err := instances.NodeAddresses(nm.nodeName) + if err != nil { + return fmt.Errorf("failed to get node address from cloud provider: %v", err) + } + node.Status.Addresses = nodeAddresses + } else { + addr := net.ParseIP(nm.hostname) + if addr != nil { + node.Status.Addresses = []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: addr.String()}, + {Type: api.NodeInternalIP, Address: addr.String()}, + } + } else { + addrs, err := net.LookupIP(node.Name) + if err != nil { + return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err) + } else if len(addrs) == 0 { + return fmt.Errorf("no ip address for node %v", node.Name) + } else { + // check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address. + // If no match is found, it uses the IP of the interface with gateway on it. + for _, ip := range addrs { + if ip.IsLoopback() { + continue + } + + if ip.To4() != nil { + node.Status.Addresses = []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: ip.String()}, + {Type: api.NodeInternalIP, Address: ip.String()}, + } + break + } + } + + if len(node.Status.Addresses) == 0 { + ip, err := util.ChooseHostInterface() + if err != nil { + return err + } + + node.Status.Addresses = []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: ip.String()}, + {Type: api.NodeInternalIP, Address: ip.String()}, + } + } + } + } + } + + // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start + // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. + info, err := nm.infoGetter.GetMachineInfo() + if err != nil { + // TODO(roberthbailey): This is required for test-cmd.sh to pass. + // See if the test should be updated instead. + node.Status.Capacity = api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), + api.ResourceMemory: resource.MustParse("0Gi"), + api.ResourcePods: *resource.NewQuantity(int64(nm.pods), resource.DecimalSI), + } + glog.Errorf("Error getting machine info: %v", err) + } else { + node.Status.NodeInfo.MachineID = info.MachineID + node.Status.NodeInfo.SystemUUID = info.SystemUUID + node.Status.Capacity = CapacityFromMachineInfo(info) + node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity( + int64(nm.pods), resource.DecimalSI) + if node.Status.NodeInfo.BootID != "" && + node.Status.NodeInfo.BootID != info.BootID { + // TODO: This requires a transaction, either both node status is updated + // and event is recorded or neither should happen, see issue #6055. + nm.recorder.Eventf(nm.nodeRef, "Rebooted", + "Node %s has been rebooted, boot id: %s", nm.nodeName, info.BootID) + } + node.Status.NodeInfo.BootID = info.BootID + } + + verinfo, err := nm.infoGetter.GetVersionInfo() + if err != nil { + glog.Errorf("Error getting version info: %v", err) + } else { + node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion + node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion + // TODO: Determine the runtime is docker or rocket + node.Status.NodeInfo.ContainerRuntimeVersion = "docker://" + verinfo.DockerVersion + node.Status.NodeInfo.KubeletVersion = version.Get().String() + // TODO: kube-proxy might be different version from kubelet in the future + node.Status.NodeInfo.KubeProxyVersion = version.Get().String() + } + + node.Status.DaemonEndpoints = *nm.daemonEndpoints + + // Check whether container runtime can be reported as up. + containerRuntimeUp := nm.infoGetter.ContainerRuntimeUp() + // Check whether network is configured properly + networkConfigured := nm.infoGetter.NetworkConfigured() + + currentTime := util.Now() + var newNodeReadyCondition api.NodeCondition + var oldNodeReadyConditionStatus api.ConditionStatus + if containerRuntimeUp && networkConfigured { + newNodeReadyCondition = api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: "kubelet is posting ready status", + LastHeartbeatTime: currentTime, + } + } else { + var reasons []string + var messages []string + if !containerRuntimeUp { + messages = append(messages, "container runtime is down") + } + if !networkConfigured { + messages = append(reasons, "network not configured correctly") + } + newNodeReadyCondition = api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionFalse, + Reason: "KubeletNotReady", + Message: strings.Join(messages, ","), + LastHeartbeatTime: currentTime, + } + } + + updated := false + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == api.NodeReady { + oldNodeReadyConditionStatus = node.Status.Conditions[i].Status + if oldNodeReadyConditionStatus == newNodeReadyCondition.Status { + newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime + } else { + newNodeReadyCondition.LastTransitionTime = currentTime + } + node.Status.Conditions[i] = newNodeReadyCondition + updated = true + } + } + if !updated { + newNodeReadyCondition.LastTransitionTime = currentTime + node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition) + } + if !updated || oldNodeReadyConditionStatus != newNodeReadyCondition.Status { + if newNodeReadyCondition.Status == api.ConditionTrue { + nm.recordNodeStatusEvent("NodeReady") + } else { + nm.recordNodeStatusEvent("NodeNotReady") + } + } + if oldNodeUnschedulable != node.Spec.Unschedulable { + if node.Spec.Unschedulable { + nm.recordNodeStatusEvent("NodeNotSchedulable") + } else { + nm.recordNodeStatusEvent("NodeSchedulable") + } + oldNodeUnschedulable = node.Spec.Unschedulable + } + return nil +} + +// updateNodeStatus updates node status to master with retries. +func (nm *realNodeManager) updateNodeStatus() error { + for i := 0; i < nodeStatusUpdateRetry; i++ { + if err := nm.tryUpdateNodeStatus(); err != nil { + glog.Errorf("Error updating node status, will retry: %v", err) + } else { + return nil + } + } + return fmt.Errorf("update node status exceeds retry count") +} + +func (nm *realNodeManager) recordNodeStatusEvent(event string) { + glog.V(2).Infof("Recording %s event message for node %s", event, nm.nodeName) + // TODO: This requires a transaction, either both node status is updated + // and event is recorded or neither should happen, see issue #6055. + nm.recorder.Eventf(nm.nodeRef, event, "Node %s status is now: %s", nm.nodeName, event) +} + +// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 +// is set, this function will also confirm that cbr0 is configured correctly. +func (nm *realNodeManager) tryUpdateNodeStatus() error { + node, err := nm.client.Nodes().Get(nm.nodeName) + if err != nil { + return fmt.Errorf("error getting node %q: %v", nm.nodeName, err) + } + if node == nil { + return fmt.Errorf("no node instance returned for %q", nm.nodeName) + } + nm.lock.Lock() + defer nm.lock.Unlock() + nm.podCIDR = node.Spec.PodCIDR + + if err := nm.setNodeStatus(node); err != nil { + return err + } + // Update the current status on the API server + _, err = nm.client.Nodes().UpdateStatus(node) + return err +} diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index f8075832f28..52f33ad0ceb 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -95,7 +95,7 @@ type HostInterface interface { GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) GetContainerRuntimeVersion() (kubecontainer.Version, error) GetRawContainerInfo(containerName string, req *cadvisorApi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorApi.ContainerInfo, error) - GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) + GetMachineInfo() (*cadvisorApi.MachineInfo, error) GetPods() []*api.Pod GetRunningPods() ([]*api.Pod, error) GetPodByName(namespace, name string) (*api.Pod, bool) @@ -407,7 +407,7 @@ func (s *Server) getLogs(request *restful.Request, response *restful.Response) { // getSpec handles spec requests against the Kubelet. func (s *Server) getSpec(request *restful.Request, response *restful.Response) { - info, err := s.host.GetCachedMachineInfo() + info, err := s.host.GetMachineInfo() if err != nil { response.WriteError(http.StatusInternalServerError, err) return