diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 5c504980fa0..907f72a1cb3 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -210,7 +210,7 @@ func startComponents(manifestURL string) (apiServerURL string) { nodeResources := &api.NodeResources{} nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute) - nodeController.Run(5*time.Second, true, true) + nodeController.Run(5*time.Second, true, false) // Kubelet (localhost) testRootDir := makeTempDirOrDie("kubelet_integ_1.") diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 2d3849d6a44..a687a7d69e5 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -76,7 +76,7 @@ func NewCMServer() *CMServer { NodeMilliCPU: 1000, NodeMemory: resource.MustParse("3Gi"), SyncNodeList: true, - SyncNodeStatus: true, + SyncNodeStatus: false, KubeletConfig: client.KubeletConfig{ Port: ports.KubeletPort, EnableHttps: false, diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 4b6e8d3e4a2..d1b9a028f2d 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -53,6 +53,7 @@ type KubeletServer struct { SyncFrequency time.Duration FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration + StatusUpdateFrequency time.Duration ManifestURL string EnableServer bool Address util.IP @@ -82,12 +83,13 @@ type KubeletServer struct { // NewKubeletServer will create a new KubeletServer with default values. func NewKubeletServer() *KubeletServer { return &KubeletServer{ - SyncFrequency: 10 * time.Second, - FileCheckFrequency: 20 * time.Second, - HTTPCheckFrequency: 20 * time.Second, - EnableServer: true, - Address: util.IP(net.ParseIP("127.0.0.1")), - Port: ports.KubeletPort, + SyncFrequency: 10 * time.Second, + FileCheckFrequency: 20 * time.Second, + HTTPCheckFrequency: 20 * time.Second, + StatusUpdateFrequency: 20 * time.Second, + EnableServer: true, + Address: util.IP(net.ParseIP("127.0.0.1")), + Port: ports.KubeletPort, PodInfraContainerImage: kubelet.PodInfraContainerImage, RootDirectory: defaultRootDir, RegistryBurst: 10, @@ -104,6 +106,7 @@ func NewKubeletServer() *KubeletServer { func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files") fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config") + fs.DurationVar(&s.StatusUpdateFrequency, "status_update_frequency", s.StatusUpdateFrequency, "Duration between posting node status to master") fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data") fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data") fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest") @@ -157,6 +160,7 @@ func (s *KubeletServer) Run(_ []string) error { RootDirectory: s.RootDirectory, ConfigFile: s.Config, ManifestURL: s.ManifestURL, + StatusUpdateFrequency: s.StatusUpdateFrequency, FileCheckFrequency: s.FileCheckFrequency, HTTPCheckFrequency: s.HTTPCheckFrequency, PodInfraContainerImage: s.PodInfraContainerImage, @@ -250,6 +254,7 @@ func SimpleRunKubelet(client *client.Client, Address: util.IP(net.ParseIP(address)), EnableServer: true, EnableDebuggingHandlers: true, + StatusUpdateFrequency: 3 * time.Second, SyncFrequency: 3 * time.Second, MinimumGCAge: 10 * time.Second, MaxContainerCount: 5, @@ -345,6 +350,7 @@ type KubeletConfig struct { RootDirectory string ConfigFile string ManifestURL string + StatusUpdateFrequency time.Duration FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration Hostname string @@ -408,7 +414,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub kc.VolumePlugins, kc.StreamingConnectionIdleTimeout, kc.Recorder, - cadvisorInterface) + cadvisorInterface, + kc.StatusUpdateFrequency) if err != nil { return nil, err diff --git a/pkg/api/types.go b/pkg/api/types.go index e0bd74b28ac..285c2a2297b 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -903,7 +903,7 @@ type Node struct { Status NodeStatus `json:"status,omitempty"` } -// NodeList is a list of minions. +// NodeList is a list of nodes. type NodeList struct { TypeMeta `json:",inline"` ListMeta `json:"metadata,omitempty"` diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0e3e9eef30b..c187a82b91a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -32,6 +32,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -56,7 +57,7 @@ import ( ) const ( - // taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc + // Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc minShares = 2 sharesPerCPU = 1024 milliCPUToCPU = 1000 @@ -67,6 +68,14 @@ const ( // Max amount of time to wait for the Docker daemon to come up. maxWaitForDocker = 5 * time.Minute + + // Initial node status update frequency and incremental frequency, for faster cluster startup. + // The update frequency will be increameted linearly, until it reaches status_update_frequency. + initialNodeStatusUpdateFrequency = 100 * time.Millisecond + nodeStatusUpdateFrequencyInc = 500 * time.Millisecond + + // The retry count for updating node status at each sync period. + nodeStatusUpdateRetry = 5 ) var ( @@ -109,7 +118,8 @@ func NewMainKubelet( volumePlugins []volume.Plugin, streamingConnectionIdleTimeout time.Duration, recorder record.EventRecorder, - cadvisorInterface cadvisor.Interface) (*Kubelet, error) { + cadvisorInterface cadvisor.Interface, + statusUpdateFrequency time.Duration) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) } @@ -159,6 +169,7 @@ func NewMainKubelet( etcdClient: etcdClient, kubeClient: kubeClient, rootDirectory: rootDirectory, + statusUpdateFrequency: statusUpdateFrequency, resyncInterval: resyncInterval, podInfraContainerImage: podInfraContainerImage, dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{}, @@ -218,6 +229,7 @@ type Kubelet struct { rootDirectory string podInfraContainerImage string podWorkers *podWorkers + statusUpdateFrequency time.Duration resyncInterval time.Duration sourcesReady SourcesReadyFn @@ -520,9 +532,36 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { if kl.dockerPuller == nil { kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) } + if kl.kubeClient == nil { + glog.Warning("No api server defined - no node status update will be sent.") + } + go kl.syncNodeStatus() kl.syncLoop(updates, kl) } +// syncNodeStatus periodically synchronizes node status to master. +func (kl *Kubelet) syncNodeStatus() { + if kl.kubeClient == nil { + return + } + for feq := initialNodeStatusUpdateFrequency; feq < kl.statusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc { + select { + case <-time.After(feq): + if err := kl.updateNodeStatus(); err != nil { + glog.Errorf("Unable to update node status: %v", err) + } + } + } + for { + select { + case <-time.After(kl.statusUpdateFrequency): + if err := kl.updateNodeStatus(); err != nil { + glog.Errorf("Unable to update node status: %v", err) + } + } + } +} + func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string { binds := []string{} for _, mount := range container.VolumeMounts { @@ -538,6 +577,7 @@ func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap } return binds } + func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) { exposedPorts := map[docker.Port]struct{}{} portBindings := map[docker.Port][]docker.PortBinding{} @@ -1679,7 +1719,7 @@ func (kl *Kubelet) GetHostname() string { return kl.hostname } -// GetBoundPods returns all pods bound to the kubelet and their spec +// GetBoundPods returns all pods bound to the kubelet and their spec. func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) { kl.podLock.RLock() defer kl.podLock.RUnlock() @@ -1699,6 +1739,68 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) { return nil, false } +// updateNodeStatus updates node status to master with retries. +func (kl *Kubelet) updateNodeStatus() error { + for i := 0; i < nodeStatusUpdateRetry; i++ { + err := kl.tryUpdateNodeStatus() + if err != nil { + glog.Errorf("error updating node status, will retry: %v", err) + } else { + return nil + } + } + return fmt.Errorf("Update node status exceeds retry count") +} + +// tryUpdateNodeStatus tries to update node status to master. +func (kl *Kubelet) tryUpdateNodeStatus() error { + node, err := kl.kubeClient.Nodes().Get(kl.hostname) + if err != nil { + return fmt.Errorf("error getting node %s: %v", kl.hostname, err) + } + if node == nil { + return fmt.Errorf("no node instance returned for %v", kl.hostname) + } + + // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start + // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. + info, err := kl.GetMachineInfo() + if err != nil { + glog.Error("error getting machine info: %v", err) + } else { + node.Status.NodeInfo.MachineID = info.MachineID + node.Status.NodeInfo.SystemUUID = info.SystemUUID + node.Spec.Capacity = api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity( + int64(info.NumCores*1000), + resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity( + info.MemoryCapacity, + resource.BinarySI), + } + } + + newCondition := api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastProbeTime: util.Now(), + } + updated := false + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == api.NodeReady { + node.Status.Conditions[i] = newCondition + updated = true + } + } + if !updated { + node.Status.Conditions = append(node.Status.Conditions, newCondition) + } + + _, err = kl.kubeClient.Nodes().Update(node) + return err +} + // getPhase returns the phase of a pod given its container info. func getPhase(spec *api.PodSpec, info api.PodInfo) api.PodPhase { running := 0 diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 2479b4ff884..48f0decb9ec 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -33,6 +33,8 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" @@ -50,18 +52,26 @@ func init() { util.ReallyCrash = true } -// TODO(vmarmol): Consider compacting these return types of handling this better. -func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *sync.WaitGroup, *cadvisor.Mock) { - fakeDocker := &dockertools.FakeDockerClient{ - RemovedImages: util.StringSet{}, - } +type TestKubelet struct { + kubelet *Kubelet + fakeDocker *dockertools.FakeDockerClient + fakeCadvisor *cadvisor.Mock + fakeKubeClient *client.Fake + waitGroup *sync.WaitGroup +} + +func newTestKubelet(t *testing.T) *TestKubelet { + fakeDocker := &dockertools.FakeDockerClient{RemovedImages: util.StringSet{}} fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker) - recorder := &record.FakeRecorder{} + fakeRecorder := &record.FakeRecorder{} + fakeKubeClient := &client.Fake{} kubelet := &Kubelet{} kubelet.dockerClient = fakeDocker kubelet.dockerCache = fakeDockerCache + kubelet.kubeClient = fakeKubeClient kubelet.dockerPuller = &dockertools.FakeDockerPuller{} + kubelet.hostname = "testnode" if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { t.Fatalf("can't make a temp rootdir: %v", err) } else { @@ -78,12 +88,12 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn waitGroup.Done() return err }, - recorder) + fakeRecorder) kubelet.sourcesReady = func() bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.readiness = newReadinessStates() - kubelet.recorder = recorder + kubelet.recorder = fakeRecorder kubelet.podStatuses = map[string]api.PodStatus{} if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) @@ -91,7 +101,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn mockCadvisor := &cadvisor.Mock{} kubelet.cadvisor = mockCadvisor - return kubelet, fakeDocker, waitGroup, mockCadvisor + return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup} } func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) { @@ -143,7 +153,8 @@ func verifyBoolean(t *testing.T, expected, value bool) { } func TestKubeletDirs(t *testing.T) { - kubelet, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet root := kubelet.rootDirectory var exp, got string @@ -204,7 +215,8 @@ func TestKubeletDirs(t *testing.T) { } func TestKubeletDirsCompat(t *testing.T) { - kubelet, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet root := kubelet.rootDirectory if err := os.MkdirAll(root, 0750); err != nil { t.Fatalf("can't mkdir(%q): %s", root, err) @@ -310,7 +322,8 @@ func TestKillContainerWithError(t *testing.T) { Err: fmt.Errorf("sample error"), ContainerList: append([]docker.APIContainers{}, containers...), } - kubelet, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet for _, c := range fakeDocker.ContainerList { kubelet.readiness.set(c.ID, true) } @@ -341,7 +354,9 @@ func TestKillContainer(t *testing.T) { Names: []string{"/k8s_bar_qux_5678_42"}, }, } - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker fakeDocker.ContainerList = append([]docker.APIContainers{}, containers...) fakeDocker.Container = &docker.Container{ Name: "foobar", @@ -394,7 +409,11 @@ func (cr *channelReader) GetList() [][]api.BoundPod { var emptyPodUIDs map[types.UID]metrics.SyncPodType func TestSyncPodsDoesNothing(t *testing.T) { - kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + waitGroup := testKubelet.waitGroup + container := api.Container{Name: "bar"} fakeDocker.ContainerList = []docker.APIContainers{ { @@ -432,7 +451,10 @@ func TestSyncPodsDoesNothing(t *testing.T) { } func TestSyncPodsWithTerminationLog(t *testing.T) { - kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + waitGroup := testKubelet.waitGroup container := api.Container{ Name: "bar", TerminationMessagePath: "/dev/somepath", @@ -481,7 +503,10 @@ func matchString(t *testing.T, pattern, str string) bool { } func TestSyncPodsCreatesNetAndContainer(t *testing.T) { - kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + waitGroup := testKubelet.waitGroup kubelet.podInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} kubelet.pods = []api.BoundPod{ @@ -529,7 +554,10 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { } func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { - kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + waitGroup := testKubelet.waitGroup puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) puller.HasImages = []string{} kubelet.podInfraContainerImage = "custom_image_name" @@ -573,7 +601,10 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { } func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { - kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + waitGroup := testKubelet.waitGroup fakeDocker.ContainerList = []docker.APIContainers{ { // pod infra container @@ -614,7 +645,10 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { } func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { - kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + waitGroup := testKubelet.waitGroup fakeHttp := fakeHTTP{} kubelet.httpClient = &fakeHttp fakeDocker.ContainerList = []docker.APIContainers{ @@ -671,7 +705,10 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { } func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { - kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + waitGroup := testKubelet.waitGroup fakeDocker.ContainerList = []docker.APIContainers{ { // format is // k8s___ @@ -717,7 +754,9 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ready := false - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker kubelet.sourcesReady = func() bool { return ready } fakeDocker.ContainerList = []docker.APIContainers{ @@ -759,7 +798,9 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { } func TestSyncPodsDeletes(t *testing.T) { - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker fakeDocker.ContainerList = []docker.APIContainers{ { // the k8s prefix is required for the kubelet to manage the container @@ -797,7 +838,9 @@ func TestSyncPodsDeletes(t *testing.T) { } func TestSyncPodDeletesDuplicate(t *testing.T) { - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container @@ -841,7 +884,9 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { } func TestSyncPodBadHash(t *testing.T) { - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container @@ -889,7 +934,9 @@ func TestSyncPodBadHash(t *testing.T) { } func TestSyncPodUnhealthy(t *testing.T) { - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container @@ -938,7 +985,8 @@ func TestSyncPodUnhealthy(t *testing.T) { } func TestMountExternalVolumes(t *testing.T) { - kubelet, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{&volume.FakePlugin{"fake", nil}}, &volumeHost{kubelet}) pod := api.BoundPod{ @@ -972,7 +1020,8 @@ func TestMountExternalVolumes(t *testing.T) { } func TestGetPodVolumesFromDisk(t *testing.T) { - kubelet, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet plug := &volume.FakePlugin{"fake", nil} kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{plug}, &volumeHost{kubelet}) @@ -1194,9 +1243,13 @@ func TestGetContainerInfo(t *testing.T) { }, } - kubelet, fakeDocker, _, mockCadvisor := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + mockCadvisor := testKubelet.fakeCadvisor cadvisorReq := &cadvisorApi.ContainerInfoRequest{} mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil) + fakeDocker.ContainerList = []docker.APIContainers{ { ID: containerID, @@ -1246,7 +1299,10 @@ func TestGetRootInfo(t *testing.T) { func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { containerID := "ab2cdf" - kubelet, fakeDocker, _, mockCadvisor := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + mockCadvisor := testKubelet.fakeCadvisor cadvisorApiFailure := fmt.Errorf("cAdvisor failure") containerInfo := cadvisorApi.ContainerInfo{} cadvisorReq := &cadvisorApi.ContainerInfoRequest{} @@ -1275,7 +1331,10 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { } func TestGetContainerInfoOnNonExistContainer(t *testing.T) { - kubelet, fakeDocker, _, mockCadvisor := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + mockCadvisor := testKubelet.fakeCadvisor fakeDocker.ContainerList = []docker.APIContainers{} stats, _ := kubelet.GetContainerInfo("qux", "", "foo", nil) @@ -1286,7 +1345,9 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) { } func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) { - kubelet, _, _, mockCadvisor := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + mockCadvisor := testKubelet.fakeCadvisor expectedErr := fmt.Errorf("List containers error") kubelet.dockerClient = &errorTestingDockerClient{listContainersError: expectedErr} @@ -1304,7 +1365,9 @@ func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) { } func TestGetContainerInfoWithNoContainers(t *testing.T) { - kubelet, _, _, mockCadvisor := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + mockCadvisor := testKubelet.fakeCadvisor kubelet.dockerClient = &errorTestingDockerClient{listContainersError: nil} stats, err := kubelet.GetContainerInfo("qux_ns", "", "foo", nil) @@ -1321,7 +1384,9 @@ func TestGetContainerInfoWithNoContainers(t *testing.T) { } func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) { - kubelet, _, _, mockCadvisor := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + mockCadvisor := testKubelet.fakeCadvisor containerList := []docker.APIContainers{ { @@ -1385,7 +1450,9 @@ func (f *fakeContainerCommandRunner) PortForward(podInfraContainerID string, por func TestRunInContainerNoSuchPod(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker fakeDocker.ContainerList = []docker.APIContainers{} kubelet.runner = &fakeCommandRunner @@ -1407,7 +1474,9 @@ func TestRunInContainerNoSuchPod(t *testing.T) { func TestRunInContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker kubelet.runner = &fakeCommandRunner containerID := "abc1234" @@ -1447,7 +1516,9 @@ func TestRunInContainer(t *testing.T) { func TestRunHandlerExec(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker kubelet.runner = &fakeCommandRunner containerID := "abc1234" @@ -1495,7 +1566,8 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) { func TestRunHandlerHttp(t *testing.T) { fakeHttp := fakeHTTP{} - kubelet, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet kubelet.httpClient = &fakeHttp podName := "podFoo" @@ -1524,7 +1596,8 @@ func TestRunHandlerHttp(t *testing.T) { } func TestNewHandler(t *testing.T) { - kubelet, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet handler := &api.Handler{ HTTPGet: &api.HTTPGetAction{ Host: "foo", @@ -1555,7 +1628,9 @@ func TestNewHandler(t *testing.T) { } func TestSyncPodEventHandlerFails(t *testing.T) { - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker kubelet.httpClient = &fakeHTTP{ err: fmt.Errorf("test error"), } @@ -1743,7 +1818,9 @@ func TestKubeletGarbageCollection(t *testing.T) { }, } for _, test := range tests { - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker kubelet.maxContainerCount = 2 fakeDocker.ContainerList = test.containers fakeDocker.ContainerMap = test.containerDetails @@ -1908,7 +1985,9 @@ func TestPurgeOldest(t *testing.T) { }, } for _, test := range tests { - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker kubelet.maxContainerCount = 5 fakeDocker.ContainerMap = test.containerDetails kubelet.purgeOldest(test.ids) @@ -1919,7 +1998,10 @@ func TestPurgeOldest(t *testing.T) { } func TestSyncPodsWithPullPolicy(t *testing.T) { - kubelet, fakeDocker, waitGroup, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + waitGroup := testKubelet.waitGroup puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) puller.HasImages = []string{"existing_one", "want:latest"} kubelet.podInfraContainerImage = "custom_image_name" @@ -2252,7 +2334,8 @@ func TestMakeEnvironmentVariables(t *testing.T) { } for _, tc := range testCases { - kl, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kl := testKubelet.kubelet kl.masterServiceNamespace = tc.masterServiceNamespace if tc.nilLister { kl.serviceLister = nil @@ -2689,7 +2772,9 @@ func TestGetPodReadyCondition(t *testing.T) { func TestExecInContainerNoSuchPod(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker fakeDocker.ContainerList = []docker.APIContainers{} kubelet.runner = &fakeCommandRunner @@ -2716,7 +2801,9 @@ func TestExecInContainerNoSuchPod(t *testing.T) { func TestExecInContainerNoSuchContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker kubelet.runner = &fakeCommandRunner podName := "podFoo" @@ -2768,7 +2855,9 @@ func (f *fakeReadWriteCloser) Close() error { func TestExecInContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker kubelet.runner = &fakeCommandRunner podName := "podFoo" @@ -2826,7 +2915,9 @@ func TestExecInContainer(t *testing.T) { func TestPortForwardNoSuchPod(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker fakeDocker.ContainerList = []docker.APIContainers{} kubelet.runner = &fakeCommandRunner @@ -2850,7 +2941,9 @@ func TestPortForwardNoSuchPod(t *testing.T) { func TestPortForwardNoSuchContainer(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker kubelet.runner = &fakeCommandRunner podName := "podFoo" @@ -2884,7 +2977,9 @@ func TestPortForwardNoSuchContainer(t *testing.T) { func TestPortForward(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} - kubelet, fakeDocker, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker kubelet.runner = &fakeCommandRunner podName := "podFoo" @@ -2957,7 +3052,8 @@ func TestGetHostPortConflicts(t *testing.T) { // Tests that we handle port conflicts correctly by setting the failed status in status map. func TestHandlePortConflicts(t *testing.T) { - kl, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kl := testKubelet.kubelet spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}} pods := []api.BoundPod{ { @@ -3008,7 +3104,8 @@ func TestHandlePortConflicts(t *testing.T) { } func TestPurgingObsoleteStatusMapEntries(t *testing.T) { - kl, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kl := testKubelet.kubelet pods := []api.BoundPod{ {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, @@ -3026,7 +3123,8 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { } func TestValidatePodStatus(t *testing.T) { - kubelet, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet testCases := []struct { podPhase api.PodPhase success bool @@ -3051,7 +3149,8 @@ func TestValidatePodStatus(t *testing.T) { } func TestValidateContainerStatus(t *testing.T) { - kubelet, _, _, _ := newTestKubelet(t) + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet containerName := "x" testCases := []struct { podInfo api.PodInfo @@ -3089,3 +3188,143 @@ func TestValidateContainerStatus(t *testing.T) { t.Errorf("expected error with invalid container name") } } + +func TestUpdateNewNodeStatus(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + mockCadvisor := testKubelet.fakeCadvisor + kubeClient.MinionsList = api.NodeList{Items: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "testnode"}}, + }} + machineInfo := &cadvisorApi.MachineInfo{MachineID: "123", SystemUUID: "abc", NumCores: 2, MemoryCapacity: 1024} + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: "testnode"}, + Spec: api.NodeSpec{ + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + }, + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastProbeTime: util.Time{}, + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + }, + }, + } + + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(kubeClient.Actions) != 2 { + t.Errorf("unexpected actions: %v", kubeClient.Actions) + } + updatedNode, ok := kubeClient.Actions[1].Value.(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + if updatedNode.Status.Conditions[0].LastProbeTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") + } + updatedNode.Status.Conditions[0].LastProbeTime = util.Time{} + if !reflect.DeepEqual(expectedNode, updatedNode) { + t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) + } +} + +func TestUpdateExistingNodeStatus(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + mockCadvisor := testKubelet.fakeCadvisor + kubeClient.MinionsList = api.NodeList{Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "testnode"}, + Spec: api.NodeSpec{ + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(2048, resource.BinarySI), + }, + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }} + machineInfo := &cadvisorApi.MachineInfo{MachineID: "123", SystemUUID: "abc", NumCores: 2, MemoryCapacity: 1024} + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: "testnode"}, + Spec: api.NodeSpec{ + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), + }, + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastProbeTime: util.Time{}, // placeholder + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + }, + }, + } + + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(kubeClient.Actions) != 2 { + t.Errorf("unexpected actions: %v", kubeClient.Actions) + } + updatedNode, ok := kubeClient.Actions[1].Value.(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastProbeTime, util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { + t.Errorf("expected \n%v\n, got \n%v", updatedNode.Status.Conditions[0].LastProbeTime, + util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) + } + updatedNode.Status.Conditions[0].LastProbeTime = util.Time{} + if !reflect.DeepEqual(expectedNode, updatedNode) { + t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) + } +} + +func TestUpdateNodeStatusError(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + // No matching node for the kubelet + kubeClient.MinionsList = api.NodeList{Items: []api.Node{}} + + if err := kubelet.updateNodeStatus(); err == nil { + t.Errorf("unexpected non error: %v") + } + if len(kubeClient.Actions) != nodeStatusUpdateRetry { + t.Errorf("unexpected actions: %v", kubeClient.Actions) + } +}