kubelet: move all node status related methods to a separate file

The methods for registering a node and syncing node status to the apiserver
have grown large enough that it makes sense for them to live in a separate
place. This change adds a nodeManager to handle such interaction with the
apiserver.
This commit is contained in:
Yu-Ju Hong 2015-09-16 14:55:52 -07:00
parent 4c46bc3243
commit e7d1e47f31
3 changed files with 486 additions and 371 deletions

View File

@ -36,7 +36,6 @@ import (
"github.com/golang/glog"
cadvisorApi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/client/cache"
@ -66,7 +65,6 @@ import (
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@ -77,9 +75,6 @@ const (
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 5 * time.Minute
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
// Location of container logs.
containerLogsDir = "/var/log/containers"
@ -261,7 +256,6 @@ func NewMainKubelet(
readinessManager: readinessManager,
httpClient: &http.Client{},
sourcesReady: sourcesReady,
registerNode: registerNode,
standaloneMode: standaloneMode,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
@ -281,19 +275,16 @@ func NewMainKubelet(
volumeManager: volumeManager,
cloud: cloud,
nodeRef: nodeRef,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
resourceContainer: resourceContainer,
os: osInterface,
oomWatcher: oomWatcher,
cgroupRoot: cgroupRoot,
mounter: mounter,
configureCBR0: configureCBR0,
podCIDR: podCIDR,
pods: pods,
syncLoopMonitor: util.AtomicValue{},
resolverConfig: resolverConfig,
cpuCFSQuota: cpuCFSQuota,
daemonEndpoints: daemonEndpoints,
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
@ -302,7 +293,7 @@ func NewMainKubelet(
klet.networkPlugin = plug
}
machineInfo, err := klet.GetCachedMachineInfo()
machineInfo, err := klet.GetMachineInfo()
if err != nil {
return nil, err
}
@ -364,11 +355,10 @@ func NewMainKubelet(
}
klet.containerManager = containerManager
klet.nodeManager = newRealNodeManager(kubeClient, cloud, registerNode, nodeStatusUpdateFrequency, recorder, nodeName, hostname, podCIDR, pods, klet, daemonEndpoints, nodeRef)
klet.nodeManager.Start()
go util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop)
if klet.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go util.Until(klet.syncNodeStatus, klet.nodeStatusUpdateFrequency, util.NeverStop)
}
// Wait for the runtime to be up with a timeout.
if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {
@ -445,8 +435,6 @@ type Kubelet struct {
// cAdvisor used for container information.
cadvisor cadvisor.Interface
// Set to true to have the node register itself with the apiserver.
registerNode bool
// for internal book keeping; access only from within registerWithApiserver
registrationCompleted bool
@ -501,13 +489,18 @@ type Kubelet struct {
// Cached MachineInfo returned by cadvisor.
machineInfo *cadvisorApi.MachineInfo
// nodeManager handles interaction of api.Node object with apiserver. This
// includes watching the nodes, registering itself to the apiserver, and
// sync node status.
nodeManager nodeManager
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager status.Manager
// Manager for the volume maps for the pods.
volumeManager *volumeManager
//Cloud provider interface
// Cloud provider interface
cloud cloudprovider.Interface
// Reference to this node.
@ -516,19 +509,6 @@ type Kubelet struct {
// Container runtime.
containerRuntime kubecontainer.Runtime
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// status. Kubelet may fail to update node status reliably if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute.
resourceContainer string
@ -550,7 +530,6 @@ type Kubelet struct {
// Whether or not kubelet should take responsibility for keeping cbr0 in
// the correct state.
configureCBR0 bool
podCIDR string
// Number of Pods which can be run by this Kubelet
pods int
@ -574,9 +553,6 @@ type Kubelet struct {
// True if container cpu limits should be enforced via cgroup CFS quota
cpuCFSQuota bool
// Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *api.NodeDaemonEndpoints
}
// getRootDir returns the full path to the directory under which kubelet can
@ -789,117 +765,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.syncLoop(updates, kl)
}
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: kl.nodeName,
Labels: map[string]string{"kubernetes.io/hostname": kl.hostname},
},
}
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO: ExternalID is deprecated, we'll have to drop this code
externalID, err := instances.ExternalID(kl.nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err)
}
node.Spec.ExternalID = externalID
// TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName)
if err != nil {
return nil, err
}
} else {
node.Spec.ExternalID = kl.hostname
}
if err := kl.setNodeStatus(node); err != nil {
return nil, err
}
return node, nil
}
// registerWithApiserver registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (kl.registrationCompleted is
// not locked).
func (kl *Kubelet) registerWithApiserver() {
if kl.registrationCompleted {
return
}
step := 100 * time.Millisecond
for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}
node, err := kl.initialNodeStatus()
if err != nil {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}
glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Nodes().Create(node); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err)
continue
}
currentNode, err := kl.kubeClient.Nodes().Get(kl.nodeName)
if err != nil {
glog.Errorf("error getting node %q: %v", kl.nodeName, err)
continue
}
if currentNode == nil {
glog.Errorf("no node instance returned for %q", kl.nodeName)
continue
}
if currentNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", node.Name)
kl.registrationCompleted = true
return
}
glog.Errorf(
"Previously %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID,
)
if err := kl.kubeClient.Nodes().Delete(node.Name); err != nil {
glog.Errorf("Unable to delete old node: %v", err)
} else {
glog.Errorf("Deleted old node object %q", kl.nodeName)
}
continue
}
glog.Infof("Successfully registered node %s", node.Name)
kl.registrationCompleted = true
return
}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
return
}
if kl.registerNode {
// This will exit immediately if it doesn't need to do anything.
kl.registerWithApiserver()
}
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
func makeMounts(container *api.Container, podVolumes kubecontainer.VolumeMap) (mounts []kubecontainer.Mount) {
for _, mount := range container.VolumeMounts {
vol, ok := podVolumes[mount.Name]
@ -1777,7 +1642,7 @@ func hasHostPortConflicts(pods []*api.Pod) bool {
// TODO: Consider integrate disk space into this function, and returns a
// suitable reason and message per resource type.
func (kl *Kubelet) hasInsufficientfFreeResources(pods []*api.Pod) (bool, bool) {
info, err := kl.GetCachedMachineInfo()
info, err := kl.GetMachineInfo()
if err != nil {
glog.Errorf("error getting machine info: %v", err)
// TODO: Should we admit the pod when machine info is unavailable?
@ -1867,12 +1732,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
var housekeepingTimestamp time.Time
for {
if !kl.containerRuntimeUp() {
if !kl.ContainerRuntimeUp() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, container runtime is not up.")
continue
}
if !kl.doneNetworkConfigure() {
if !kl.NetworkConfigured() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, network is not configured")
continue
@ -2032,6 +1897,10 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time {
return val.(time.Time)
}
func (kl *Kubelet) GetVersionInfo() (*cadvisorApi.VersionInfo, error) {
return kl.cadvisor.VersionInfo()
}
// Returns the container runtime version for this Kubelet.
func (kl *Kubelet) GetContainerRuntimeVersion() (kubecontainer.Version, error) {
if kl.containerRuntime == nil {
@ -2186,25 +2055,6 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
return kl.shaper.ReconcileInterface()
}
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := kl.tryUpdateNodeStatus(); err != nil {
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("update node status exceeds retry count")
}
func (kl *Kubelet) recordNodeStatusEvent(event string) {
glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, event, "Node %s status is now: %s", kl.nodeName, event)
}
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
var oldNodeUnschedulable bool
@ -2218,10 +2068,10 @@ func (kl *Kubelet) syncNetworkStatus() {
networkConfigured = false
glog.Errorf("Error on adding ip table rules: %v", err)
}
if len(kl.podCIDR) == 0 {
if len(kl.nodeManager.GetPodCIDR()) == 0 {
glog.Warningf("ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now")
networkConfigured = false
} else if err := kl.reconcileCBR0(kl.podCIDR); err != nil {
} else if err := kl.reconcileCBR0(kl.nodeManager.GetPodCIDR()); err != nil {
networkConfigured = false
glog.Errorf("Error configuring cbr0: %v", err)
}
@ -2229,214 +2079,18 @@ func (kl *Kubelet) syncNetworkStatus() {
kl.networkConfigured = networkConfigured
}
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
func (kl *Kubelet) setNodeStatus(node *api.Node) error {
// Set addresses for the node.
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
nodeAddresses, err := instances.NodeAddresses(kl.nodeName)
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}
node.Status.Addresses = nodeAddresses
} else {
addr := net.ParseIP(kl.hostname)
if addr != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: addr.String()},
{Type: api.NodeInternalIP, Address: addr.String()},
}
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
return fmt.Errorf("no ip address for node %v", node.Name)
} else {
// check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address.
// If no match is found, it uses the IP of the interface with gateway on it.
for _, ip := range addrs {
if ip.IsLoopback() {
continue
}
if ip.To4() != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
break
}
}
if len(node.Status.Addresses) == 0 {
ip, err := util.ChooseHostInterface()
if err != nil {
return err
}
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
}
}
}
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
if err != nil {
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
api.ResourceMemory: resource.MustParse("0Gi"),
api.ResourcePods: *resource.NewQuantity(int64(kl.pods), resource.DecimalSI),
}
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Status.Capacity = CapacityFromMachineInfo(info)
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(kl.pods), resource.DecimalSI)
if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, "Rebooted",
"Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID)
}
node.Status.NodeInfo.BootID = info.BootID
}
verinfo, err := kl.cadvisor.VersionInfo()
if err != nil {
glog.Errorf("Error getting version info: %v", err)
} else {
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion
// TODO: Determine the runtime is docker or rocket
node.Status.NodeInfo.ContainerRuntimeVersion = "docker://" + verinfo.DockerVersion
node.Status.NodeInfo.KubeletVersion = version.Get().String()
// TODO: kube-proxy might be different version from kubelet in the future
node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
}
node.Status.DaemonEndpoints = *kl.daemonEndpoints
// Check whether container runtime can be reported as up.
containerRuntimeUp := kl.containerRuntimeUp()
// Check whether network is configured properly
networkConfigured := kl.doneNetworkConfigure()
currentTime := util.Now()
var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus
if containerRuntimeUp && networkConfigured {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
} else {
var reasons []string
var messages []string
if !containerRuntimeUp {
messages = append(messages, "container runtime is down")
}
if !networkConfigured {
messages = append(reasons, "network not configured correctly")
}
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(messages, ","),
LastHeartbeatTime: currentTime,
}
}
updated := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {
oldNodeReadyConditionStatus = node.Status.Conditions[i].Status
if oldNodeReadyConditionStatus == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
}
node.Status.Conditions[i] = newNodeReadyCondition
updated = true
}
}
if !updated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if !updated || oldNodeReadyConditionStatus != newNodeReadyCondition.Status {
if newNodeReadyCondition.Status == api.ConditionTrue {
kl.recordNodeStatusEvent("NodeReady")
} else {
kl.recordNodeStatusEvent("NodeNotReady")
}
}
if oldNodeUnschedulable != node.Spec.Unschedulable {
if node.Spec.Unschedulable {
kl.recordNodeStatusEvent("NodeNotSchedulable")
} else {
kl.recordNodeStatusEvent("NodeSchedulable")
}
oldNodeUnschedulable = node.Spec.Unschedulable
}
return nil
}
func (kl *Kubelet) containerRuntimeUp() bool {
func (kl *Kubelet) ContainerRuntimeUp() bool {
kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock()
return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now())
}
func (kl *Kubelet) doneNetworkConfigure() bool {
func (kl *Kubelet) NetworkConfigured() bool {
kl.networkConfigMutex.Lock()
defer kl.networkConfigMutex.Unlock()
return kl.networkConfigured
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.nodeName)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.nodeName)
}
kl.networkConfigMutex.Lock()
kl.podCIDR = node.Spec.PodCIDR
kl.networkConfigMutex.Unlock()
if err := kl.setNodeStatus(node); err != nil {
return err
}
// Update the current status on the API server
_, err = kl.kubeClient.Nodes().UpdateStatus(node)
return err
}
// GetPhase returns the phase of a pod given its container info.
// This func is exported to simplify integration with 3rd party kubelet
// integrations like kubernetes-mesos.
@ -2734,8 +2388,8 @@ func (kl *Kubelet) GetRawContainerInfo(containerName string, req *cadvisorApi.Co
}
}
// GetCachedMachineInfo assumes that the machine info can't change without a reboot
func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) {
// GetMachineInfo assumes that the machine info can't change without a reboot
func (kl *Kubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
if kl.machineInfo == nil {
info, err := kl.cadvisor.MachineInfo()
if err != nil {

461
pkg/kubelet/node_manager.go Normal file
View File

@ -0,0 +1,461 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
// Note: if you change code in this file, you might need to change code in
// contrib/mesos/pkg/executor/.
import (
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/golang/glog"
cadvisorApi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/version"
)
const (
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
)
type infoGetter interface {
GetMachineInfo() (*cadvisorApi.MachineInfo, error)
ContainerRuntimeUp() bool
NetworkConfigured() bool
GetVersionInfo() (*cadvisorApi.VersionInfo, error)
}
type nodeManager interface {
Start()
GetPodCIDR() string
}
type realNodeManager struct {
// apiserver client.
client client.Interface
// Set to true to have the node register itself with the apiserver.
registerNode bool
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// status. Kubelet may fail to update node status reliably if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// Cloud provider interface
cloud cloudprovider.Interface
nodeName string
hostname string
// Number of Pods which can be run by this Kubelet.
pods int
// The EventRecorder to use
recorder record.EventRecorder
// Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *api.NodeDaemonEndpoints
// Interface to get machine and version info.
infoGetter infoGetter
// Reference to this node.
nodeRef *api.ObjectReference
// podCIDR may be updated by node.Spec.
podCIDR string
// for internal book keeping; access only from within registerWithApiserver
registrationCompleted bool
lock sync.RWMutex
}
func newRealNodeManager(client client.Interface, cloud cloudprovider.Interface, registerNode bool,
nodeStatusUpdateFrequency time.Duration, recorder record.EventRecorder, nodeName, hostname, podCIDR string,
pods int, infoGetter infoGetter, daemonEndpoints *api.NodeDaemonEndpoints, nodeRef *api.ObjectReference) *realNodeManager {
return &realNodeManager{
client: client,
cloud: cloud,
registerNode: registerNode,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
recorder: recorder,
nodeName: nodeName,
hostname: hostname,
podCIDR: podCIDR,
pods: pods,
infoGetter: infoGetter,
daemonEndpoints: daemonEndpoints,
nodeRef: nodeRef,
}
}
func (nm *realNodeManager) Start() {
if nm.client != nil {
go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop)
}
}
func (nm *realNodeManager) GetPodCIDR() string {
nm.lock.RLock()
defer nm.lock.RUnlock()
return nm.podCIDR
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
func (nm *realNodeManager) syncNodeStatus() {
if nm.registerNode {
// This will exit immediately if it doesn't need to do anything.
nm.registerWithApiserver()
}
if err := nm.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
func (nm *realNodeManager) initialNodeStatus() (*api.Node, error) {
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: nm.nodeName,
Labels: map[string]string{"kubernetes.io/hostname": nm.hostname},
},
}
if nm.cloud != nil {
instances, ok := nm.cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO: ExternalID is deprecated, we'll have to drop this code
externalID, err := instances.ExternalID(nm.nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err)
}
node.Spec.ExternalID = externalID
// TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(nm.cloud, nm.nodeName)
if err != nil {
return nil, err
}
} else {
node.Spec.ExternalID = nm.hostname
}
if err := nm.setNodeStatus(node); err != nil {
return nil, err
}
return node, nil
}
// registerWithApiserver registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (nm.registrationCompleted is
// not locked).
func (nm *realNodeManager) registerWithApiserver() {
if nm.registrationCompleted {
return
}
step := 100 * time.Millisecond
for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}
node, err := nm.initialNodeStatus()
if err != nil {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}
glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := nm.client.Nodes().Create(node); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err)
continue
}
currentNode, err := nm.client.Nodes().Get(nm.nodeName)
if err != nil {
glog.Errorf("error getting node %q: %v", nm.nodeName, err)
continue
}
if currentNode == nil {
glog.Errorf("no node instance returned for %q", nm.nodeName)
continue
}
if currentNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", node.Name)
nm.registrationCompleted = true
return
}
glog.Errorf(
"Previously %q had externalID %q; now it is %q; will delete and recreate.",
nm.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID,
)
if err := nm.client.Nodes().Delete(node.Name); err != nil {
glog.Errorf("Unable to delete old node: %v", err)
} else {
glog.Errorf("Deleted old node object %q", nm.nodeName)
}
continue
}
glog.Infof("Successfully registered node %s", node.Name)
nm.registrationCompleted = true
return
}
}
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
func (nm *realNodeManager) setNodeStatus(node *api.Node) error {
// Set addresses for the node.
if nm.cloud != nil {
instances, ok := nm.cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
nodeAddresses, err := instances.NodeAddresses(nm.nodeName)
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}
node.Status.Addresses = nodeAddresses
} else {
addr := net.ParseIP(nm.hostname)
if addr != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: addr.String()},
{Type: api.NodeInternalIP, Address: addr.String()},
}
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
return fmt.Errorf("no ip address for node %v", node.Name)
} else {
// check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address.
// If no match is found, it uses the IP of the interface with gateway on it.
for _, ip := range addrs {
if ip.IsLoopback() {
continue
}
if ip.To4() != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
break
}
}
if len(node.Status.Addresses) == 0 {
ip, err := util.ChooseHostInterface()
if err != nil {
return err
}
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
}
}
}
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := nm.infoGetter.GetMachineInfo()
if err != nil {
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
api.ResourceMemory: resource.MustParse("0Gi"),
api.ResourcePods: *resource.NewQuantity(int64(nm.pods), resource.DecimalSI),
}
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Status.Capacity = CapacityFromMachineInfo(info)
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(nm.pods), resource.DecimalSI)
if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
nm.recorder.Eventf(nm.nodeRef, "Rebooted",
"Node %s has been rebooted, boot id: %s", nm.nodeName, info.BootID)
}
node.Status.NodeInfo.BootID = info.BootID
}
verinfo, err := nm.infoGetter.GetVersionInfo()
if err != nil {
glog.Errorf("Error getting version info: %v", err)
} else {
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion
// TODO: Determine the runtime is docker or rocket
node.Status.NodeInfo.ContainerRuntimeVersion = "docker://" + verinfo.DockerVersion
node.Status.NodeInfo.KubeletVersion = version.Get().String()
// TODO: kube-proxy might be different version from kubelet in the future
node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
}
node.Status.DaemonEndpoints = *nm.daemonEndpoints
// Check whether container runtime can be reported as up.
containerRuntimeUp := nm.infoGetter.ContainerRuntimeUp()
// Check whether network is configured properly
networkConfigured := nm.infoGetter.NetworkConfigured()
currentTime := util.Now()
var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus
if containerRuntimeUp && networkConfigured {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
} else {
var reasons []string
var messages []string
if !containerRuntimeUp {
messages = append(messages, "container runtime is down")
}
if !networkConfigured {
messages = append(reasons, "network not configured correctly")
}
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(messages, ","),
LastHeartbeatTime: currentTime,
}
}
updated := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {
oldNodeReadyConditionStatus = node.Status.Conditions[i].Status
if oldNodeReadyConditionStatus == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
}
node.Status.Conditions[i] = newNodeReadyCondition
updated = true
}
}
if !updated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if !updated || oldNodeReadyConditionStatus != newNodeReadyCondition.Status {
if newNodeReadyCondition.Status == api.ConditionTrue {
nm.recordNodeStatusEvent("NodeReady")
} else {
nm.recordNodeStatusEvent("NodeNotReady")
}
}
if oldNodeUnschedulable != node.Spec.Unschedulable {
if node.Spec.Unschedulable {
nm.recordNodeStatusEvent("NodeNotSchedulable")
} else {
nm.recordNodeStatusEvent("NodeSchedulable")
}
oldNodeUnschedulable = node.Spec.Unschedulable
}
return nil
}
// updateNodeStatus updates node status to master with retries.
func (nm *realNodeManager) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := nm.tryUpdateNodeStatus(); err != nil {
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("update node status exceeds retry count")
}
func (nm *realNodeManager) recordNodeStatusEvent(event string) {
glog.V(2).Infof("Recording %s event message for node %s", event, nm.nodeName)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
nm.recorder.Eventf(nm.nodeRef, event, "Node %s status is now: %s", nm.nodeName, event)
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (nm *realNodeManager) tryUpdateNodeStatus() error {
node, err := nm.client.Nodes().Get(nm.nodeName)
if err != nil {
return fmt.Errorf("error getting node %q: %v", nm.nodeName, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", nm.nodeName)
}
nm.lock.Lock()
defer nm.lock.Unlock()
nm.podCIDR = node.Spec.PodCIDR
if err := nm.setNodeStatus(node); err != nil {
return err
}
// Update the current status on the API server
_, err = nm.client.Nodes().UpdateStatus(node)
return err
}

View File

@ -95,7 +95,7 @@ type HostInterface interface {
GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
GetContainerRuntimeVersion() (kubecontainer.Version, error)
GetRawContainerInfo(containerName string, req *cadvisorApi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorApi.ContainerInfo, error)
GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error)
GetMachineInfo() (*cadvisorApi.MachineInfo, error)
GetPods() []*api.Pod
GetRunningPods() ([]*api.Pod, error)
GetPodByName(namespace, name string) (*api.Pod, bool)
@ -407,7 +407,7 @@ func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
// getSpec handles spec requests against the Kubelet.
func (s *Server) getSpec(request *restful.Request, response *restful.Response) {
info, err := s.host.GetCachedMachineInfo()
info, err := s.host.GetMachineInfo()
if err != nil {
response.WriteError(http.StatusInternalServerError, err)
return