From 7283e662b552d2d389d38dbfb2a346bee4070014 Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Fri, 29 May 2015 17:32:34 -0700 Subject: [PATCH] Introduce SystemContainer to Kubelet ContainerManager. This generalizes the handling of containers in the ContainerManager. Also introduces the ability to determine how much resources are reserved for those system containers. --- pkg/kubelet/container_manager.go | 8 + pkg/kubelet/container_manager_linux.go | 169 +++++++++++++------ pkg/kubelet/container_manager_unsupported.go | 8 +- pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/kubelet_test.go | 2 + pkg/kubelet/runonce_test.go | 1 + 6 files changed, 134 insertions(+), 56 deletions(-) diff --git a/pkg/kubelet/container_manager.go b/pkg/kubelet/container_manager.go index 188646c45d3..0f533683585 100644 --- a/pkg/kubelet/container_manager.go +++ b/pkg/kubelet/container_manager.go @@ -16,10 +16,18 @@ limitations under the License. package kubelet +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + // Manages the containers running on a machine. type containerManager interface { // Runs the container manager's housekeeping. // - Ensures that the Docker daemon is in a container. // - Creates the system container where all non-containerized processes run. Start() error + + // Returns resources allocated to system containers in the machine. + // These containers include the system and Kubernetes services. + SystemContainersLimit() api.ResourceList } diff --git a/pkg/kubelet/container_manager_linux.go b/pkg/kubelet/container_manager_linux.go index af1d53a5007..5a8cdc89100 100644 --- a/pkg/kubelet/container_manager_linux.go +++ b/pkg/kubelet/container_manager_linux.go @@ -26,6 +26,8 @@ import ( "strings" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" "github.com/docker/libcontainer/cgroups" @@ -34,73 +36,132 @@ import ( "github.com/golang/glog" ) +// A non-user container tracked by the Kubelet. +type systemContainer struct { + // Absolute name of the container. + name string + + // CPU limit in millicores. + cpuMillicores int64 + + // Function that ensures the state of the container. + // m is the cgroup manager for the specified container. + ensureStateFunc func(m *fs.Manager) error + + // Manager for the cgroups of the external container. + manager *fs.Manager +} + +func newSystemContainer(containerName string) *systemContainer { + return &systemContainer{ + name: containerName, + manager: createManager(containerName), + } +} + type containerManagerImpl struct { - // Whether to create and use the specified containers. - useDockerContainer bool - useSystemContainer bool - - // OOM score for the Docker container. - dockerOomScoreAdj int - - // Managers for containers. - dockerContainer fs.Manager - systemContainer fs.Manager - rootContainer fs.Manager + // External containers being managed. + systemContainers []*systemContainer } var _ containerManager = &containerManagerImpl{} +// TODO(vmarmol): Add limits to the system containers. // Takes the absolute name of the specified containers. // Empty container name disables use of the specified container. -func newContainerManager(dockerDaemonContainer, systemContainer string) (containerManager, error) { - if systemContainer == "/" { - return nil, fmt.Errorf("system container cannot be root (\"/\")") +func newContainerManager(dockerDaemonContainerName, systemContainerName, kubeletContainerName string) (containerManager, error) { + systemContainers := []*systemContainer{} + + if dockerDaemonContainerName != "" { + cont := newSystemContainer(dockerDaemonContainerName) + cont.ensureStateFunc = func(manager *fs.Manager) error { + return ensureDockerInContainer(-900, createManager(dockerDaemonContainerName)) + } + systemContainers = append(systemContainers, cont) } - return &containerManagerImpl{ - useDockerContainer: dockerDaemonContainer != "", - useSystemContainer: systemContainer != "", - dockerOomScoreAdj: -900, - dockerContainer: fs.Manager{ - Cgroups: &configs.Cgroup{ - Name: dockerDaemonContainer, - AllowAllDevices: true, - }, - }, - systemContainer: fs.Manager{ - Cgroups: &configs.Cgroup{ - Name: systemContainer, - AllowAllDevices: true, - }, - }, - rootContainer: fs.Manager{ + if systemContainerName != "" { + if systemContainerName == "/" { + return nil, fmt.Errorf("system container cannot be root (\"/\")") + } + + rootContainer := &fs.Manager{ Cgroups: &configs.Cgroup{ Name: "/", }, - }, + } + manager := createManager(systemContainerName) + + err := ensureSystemContainer(rootContainer, manager) + if err != nil { + return nil, err + } + systemContainers = append(systemContainers, newSystemContainer(systemContainerName)) + } + + if kubeletContainerName != "" { + systemContainers = append(systemContainers, newSystemContainer(kubeletContainerName)) + } + + // TODO(vmarmol): Add Kube-proxy container. + + return &containerManagerImpl{ + systemContainers: systemContainers, }, nil } +// Create a cgroup container manager. +func createManager(containerName string) *fs.Manager { + return &fs.Manager{ + Cgroups: &configs.Cgroup{ + Name: containerName, + AllowAllDevices: true, + }, + } +} + func (cm *containerManagerImpl) Start() error { - if cm.useSystemContainer { - err := cm.ensureSystemContainer() - if err != nil { - return err + // Don't run a background thread if there are no ensureStateFuncs. + numEnsureStateFuncs := 0 + for _, cont := range cm.systemContainers { + if cont.ensureStateFunc != nil { + numEnsureStateFuncs++ } } - if cm.useDockerContainer { - go util.Until(func() { - err := cm.ensureDockerInContainer() - if err != nil { - glog.Warningf("[ContainerManager] Failed to ensure Docker is in a container: %v", err) - } - }, time.Minute, util.NeverStop) + if numEnsureStateFuncs == 0 { + return nil } + + // Run ensure state functions every minute. + go util.Until(func() { + for _, cont := range cm.systemContainers { + if cont.ensureStateFunc != nil { + err := cont.ensureStateFunc(cont.manager) + glog.Warningf("[ContainerManager] Failed to ensure state of %q: %v", cont.name, err) + } + } + }, time.Minute, util.NeverStop) + return nil } +func (cm *containerManagerImpl) SystemContainersLimit() api.ResourceList { + cpuLimit := int64(0) + + // Sum up resources of all external containers. + for _, cont := range cm.systemContainers { + cpuLimit += cont.cpuMillicores + } + + return api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity( + cpuLimit, + resource.DecimalSI), + } +} + // Ensures that the Docker daemon is in the desired container. -func (cm *containerManagerImpl) ensureDockerInContainer() error { +func ensureDockerInContainer(oomScoreAdj int, manager *fs.Manager) error { // What container is Docker in? out, err := exec.Command("pidof", "docker").Output() if err != nil { @@ -126,16 +187,16 @@ func (cm *containerManagerImpl) ensureDockerInContainer() error { errs = append(errs, fmt.Errorf("failed to find container of PID %q: %v", pid, err)) } - if cont != cm.dockerContainer.Cgroups.Name { - err = cm.dockerContainer.Apply(pid) + if cont != manager.Cgroups.Name { + err = manager.Apply(pid) if err != nil { - errs = append(errs, fmt.Errorf("failed to move PID %q (in %q) to %q", pid, cont, cm.dockerContainer.Cgroups.Name)) + errs = append(errs, fmt.Errorf("failed to move PID %q (in %q) to %q", pid, cont, manager.Cgroups.Name)) } } // Also apply oom_score_adj to processes - if err := util.ApplyOomScoreAdj(pid, cm.dockerOomScoreAdj); err != nil { - errs = append(errs, fmt.Errorf("failed to apply oom score %q to PID %q", cm.dockerOomScoreAdj, pid)) + if err := util.ApplyOomScoreAdj(pid, oomScoreAdj); err != nil { + errs = append(errs, fmt.Errorf("failed to apply oom score %q to PID %q", oomScoreAdj, pid)) } } @@ -155,7 +216,7 @@ func getContainer(pid int) (string, error) { // Ensures the system container is created and all non-kernel processes without // a container are moved to it. -func (cm *containerManagerImpl) ensureSystemContainer() error { +func ensureSystemContainer(rootContainer *fs.Manager, manager *fs.Manager) error { // Move non-kernel PIDs to the system container. attemptsRemaining := 10 var errs []error @@ -164,7 +225,7 @@ func (cm *containerManagerImpl) ensureSystemContainer() error { errs = []error{} attemptsRemaining-- - allPids, err := cm.rootContainer.GetPids() + allPids, err := rootContainer.GetPids() if err != nil { errs = append(errs, fmt.Errorf("Failed to list PIDs for root: %v", err)) continue @@ -188,16 +249,16 @@ func (cm *containerManagerImpl) ensureSystemContainer() error { glog.Infof("Moving non-kernel threads: %v", pids) for _, pid := range pids { - err := cm.systemContainer.Apply(pid) + err := manager.Apply(pid) if err != nil { - errs = append(errs, fmt.Errorf("failed to move PID %d into the system container %q: %v", pid, cm.systemContainer.Cgroups.Name, err)) + errs = append(errs, fmt.Errorf("failed to move PID %d into the system container %q: %v", pid, manager.Cgroups.Name, err)) continue } } } if attemptsRemaining < 0 { - errs = append(errs, fmt.Errorf("ran out of attempts to create system containers %q", cm.systemContainer.Cgroups.Name)) + errs = append(errs, fmt.Errorf("ran out of attempts to create system containers %q", manager.Cgroups.Name)) } return errors.NewAggregate(errs) diff --git a/pkg/kubelet/container_manager_unsupported.go b/pkg/kubelet/container_manager_unsupported.go index 77246f174a4..8a14b331f92 100644 --- a/pkg/kubelet/container_manager_unsupported.go +++ b/pkg/kubelet/container_manager_unsupported.go @@ -20,6 +20,8 @@ package kubelet import ( "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) type unsupportedContainerManager struct { @@ -31,6 +33,10 @@ func (unsupportedContainerManager) Start() error { return fmt.Errorf("Container Manager is unsupported in this build") } -func newContainerManager(dockerDaemonContainer, systemContainer string) (containerManager, error) { +func (unsupportedContainerManager) SystemContainersLimit() api.ResourceList { + return api.ResourceList{} +} + +func newContainerManager(dockerDaemonContainer, systemContainer, kubeletContainer string) (containerManager, error) { return &unsupportedContainerManager{}, nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ad2c09feb77..d9582347004 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -301,7 +301,7 @@ func NewMainKubelet( // Setup container manager, can fail if the devices hierarchy is not mounted // (it is required by Docker however). - containerManager, err := newContainerManager(dockerDaemonContainer, systemContainer) + containerManager, err := newContainerManager(dockerDaemonContainer, systemContainer, resourceContainer) if err != nil { return nil, fmt.Errorf("failed to create the Container Manager: %v", err) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 5d5a132a259..48670fda292 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -124,6 +124,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { t: t, } kubelet.volumeManager = newVolumeManager() + kubelet.containerManager, _ = newContainerManager("", "", "") return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, fakeMirrorClient} } @@ -233,6 +234,7 @@ func newTestKubeletWithFakeRuntime(t *testing.T) *TestKubeletWithFakeRuntime { t: t, } kubelet.volumeManager = newVolumeManager() + kubelet.containerManager, _ = newContainerManager("", "", "") return &TestKubeletWithFakeRuntime{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 08cc67d5c36..9eba420ad69 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -89,6 +89,7 @@ func TestRunOnce(t *testing.T) { os: kubecontainer.FakeOS{}, volumeManager: newVolumeManager(), } + kb.containerManager, _ = newContainerManager("", "", "") kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) if err := kb.setupDataDirs(); err != nil {