Merge pull request #5265 from ddysher/kubelet-post-status

kubelet post node status to master
This commit is contained in:
Dawn Chen
2015-03-13 15:29:22 -07:00
6 changed files with 414 additions and 66 deletions

View File

@@ -32,6 +32,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@@ -55,7 +56,7 @@ import (
)
const (
// taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
// Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
minShares = 2
sharesPerCPU = 1024
milliCPUToCPU = 1000
@@ -66,6 +67,14 @@ const (
// Max amount of time to wait for the Docker daemon to come up.
maxWaitForDocker = 5 * time.Minute
// Initial node status update frequency and incremental frequency, for faster cluster startup.
// The update frequency will be increameted linearly, until it reaches status_update_frequency.
initialNodeStatusUpdateFrequency = 100 * time.Millisecond
nodeStatusUpdateFrequencyInc = 500 * time.Millisecond
// The retry count for updating node status at each sync period.
nodeStatusUpdateRetry = 5
)
var (
@@ -107,7 +116,8 @@ func NewMainKubelet(
volumePlugins []volume.Plugin,
streamingConnectionIdleTimeout time.Duration,
recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface) (*Kubelet, error) {
cadvisorInterface cadvisor.Interface,
statusUpdateFrequency time.Duration) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@@ -156,6 +166,7 @@ func NewMainKubelet(
dockerClient: dockerClient,
kubeClient: kubeClient,
rootDirectory: rootDirectory,
statusUpdateFrequency: statusUpdateFrequency,
resyncInterval: resyncInterval,
podInfraContainerImage: podInfraContainerImage,
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
@@ -215,6 +226,7 @@ type Kubelet struct {
rootDirectory string
podInfraContainerImage string
podWorkers *podWorkers
statusUpdateFrequency time.Duration
resyncInterval time.Duration
sourcesReady SourcesReadyFn
@@ -552,9 +564,36 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
go kl.syncNodeStatus()
kl.syncLoop(updates, kl)
}
// syncNodeStatus periodically synchronizes node status to master.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
return
}
for feq := initialNodeStatusUpdateFrequency; feq < kl.statusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc {
select {
case <-time.After(feq):
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
}
for {
select {
case <-time.After(kl.statusUpdateFrequency):
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
}
}
func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
binds := []string{}
for _, mount := range container.VolumeMounts {
@@ -570,6 +609,7 @@ func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap
}
return binds
}
func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
exposedPorts := map[docker.Port]struct{}{}
portBindings := map[docker.Port][]docker.PortBinding{}
@@ -1795,7 +1835,7 @@ func (kl *Kubelet) GetHostname() string {
return kl.hostname
}
// GetBoundPods returns all pods bound to the kubelet and their spec
// GetBoundPods returns all pods bound to the kubelet and their spec.
func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
@@ -1815,6 +1855,68 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
return nil, false
}
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
err := kl.tryUpdateNodeStatus()
if err != nil {
glog.Errorf("error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("Update node status exceeds retry count")
}
// tryUpdateNodeStatus tries to update node status to master.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
return fmt.Errorf("error getting node %s: %v", kl.hostname, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %v", kl.hostname)
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetMachineInfo()
if err != nil {
glog.Error("error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Spec.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
}
newCondition := api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFull,
Reason: fmt.Sprintf("kubelet is posting ready status"),
LastProbeTime: util.Now(),
}
updated := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {
node.Status.Conditions[i] = newCondition
updated = true
}
}
if !updated {
node.Status.Conditions = append(node.Status.Conditions, newCondition)
}
_, err = kl.kubeClient.Nodes().Update(node)
return err
}
// getPhase returns the phase of a pod given its container info.
func getPhase(spec *api.PodSpec, info api.PodInfo) api.PodPhase {
running := 0