Introduce SystemContainer to Kubelet ContainerManager.

This generalizes the handling of containers in the
ContainerManager.

Also introduces the ability to determine how much
resources are reserved for those system containers.
This commit is contained in:
Victor Marmol 2015-05-29 17:32:34 -07:00
parent 50b9d6284a
commit 7283e662b5
6 changed files with 134 additions and 56 deletions

View File

@ -16,10 +16,18 @@ limitations under the License.
package kubelet package kubelet
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// Manages the containers running on a machine. // Manages the containers running on a machine.
type containerManager interface { type containerManager interface {
// Runs the container manager's housekeeping. // Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container. // - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run. // - Creates the system container where all non-containerized processes run.
Start() error Start() error
// Returns resources allocated to system containers in the machine.
// These containers include the system and Kubernetes services.
SystemContainersLimit() api.ResourceList
} }

View File

@ -26,6 +26,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/docker/libcontainer/cgroups" "github.com/docker/libcontainer/cgroups"
@ -34,73 +36,132 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// A non-user container tracked by the Kubelet.
type systemContainer struct {
// Absolute name of the container.
name string
// CPU limit in millicores.
cpuMillicores int64
// Function that ensures the state of the container.
// m is the cgroup manager for the specified container.
ensureStateFunc func(m *fs.Manager) error
// Manager for the cgroups of the external container.
manager *fs.Manager
}
func newSystemContainer(containerName string) *systemContainer {
return &systemContainer{
name: containerName,
manager: createManager(containerName),
}
}
type containerManagerImpl struct { type containerManagerImpl struct {
// Whether to create and use the specified containers. // External containers being managed.
useDockerContainer bool systemContainers []*systemContainer
useSystemContainer bool
// OOM score for the Docker container.
dockerOomScoreAdj int
// Managers for containers.
dockerContainer fs.Manager
systemContainer fs.Manager
rootContainer fs.Manager
} }
var _ containerManager = &containerManagerImpl{} var _ containerManager = &containerManagerImpl{}
// TODO(vmarmol): Add limits to the system containers.
// Takes the absolute name of the specified containers. // Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container. // Empty container name disables use of the specified container.
func newContainerManager(dockerDaemonContainer, systemContainer string) (containerManager, error) { func newContainerManager(dockerDaemonContainerName, systemContainerName, kubeletContainerName string) (containerManager, error) {
if systemContainer == "/" { systemContainers := []*systemContainer{}
return nil, fmt.Errorf("system container cannot be root (\"/\")")
if dockerDaemonContainerName != "" {
cont := newSystemContainer(dockerDaemonContainerName)
cont.ensureStateFunc = func(manager *fs.Manager) error {
return ensureDockerInContainer(-900, createManager(dockerDaemonContainerName))
}
systemContainers = append(systemContainers, cont)
} }
return &containerManagerImpl{ if systemContainerName != "" {
useDockerContainer: dockerDaemonContainer != "", if systemContainerName == "/" {
useSystemContainer: systemContainer != "", return nil, fmt.Errorf("system container cannot be root (\"/\")")
dockerOomScoreAdj: -900, }
dockerContainer: fs.Manager{
Cgroups: &configs.Cgroup{ rootContainer := &fs.Manager{
Name: dockerDaemonContainer,
AllowAllDevices: true,
},
},
systemContainer: fs.Manager{
Cgroups: &configs.Cgroup{
Name: systemContainer,
AllowAllDevices: true,
},
},
rootContainer: fs.Manager{
Cgroups: &configs.Cgroup{ Cgroups: &configs.Cgroup{
Name: "/", Name: "/",
}, },
}, }
manager := createManager(systemContainerName)
err := ensureSystemContainer(rootContainer, manager)
if err != nil {
return nil, err
}
systemContainers = append(systemContainers, newSystemContainer(systemContainerName))
}
if kubeletContainerName != "" {
systemContainers = append(systemContainers, newSystemContainer(kubeletContainerName))
}
// TODO(vmarmol): Add Kube-proxy container.
return &containerManagerImpl{
systemContainers: systemContainers,
}, nil }, nil
} }
// Create a cgroup container manager.
func createManager(containerName string) *fs.Manager {
return &fs.Manager{
Cgroups: &configs.Cgroup{
Name: containerName,
AllowAllDevices: true,
},
}
}
func (cm *containerManagerImpl) Start() error { func (cm *containerManagerImpl) Start() error {
if cm.useSystemContainer { // Don't run a background thread if there are no ensureStateFuncs.
err := cm.ensureSystemContainer() numEnsureStateFuncs := 0
if err != nil { for _, cont := range cm.systemContainers {
return err if cont.ensureStateFunc != nil {
numEnsureStateFuncs++
} }
} }
if cm.useDockerContainer { if numEnsureStateFuncs == 0 {
go util.Until(func() { return nil
err := cm.ensureDockerInContainer()
if err != nil {
glog.Warningf("[ContainerManager] Failed to ensure Docker is in a container: %v", err)
}
}, time.Minute, util.NeverStop)
} }
// Run ensure state functions every minute.
go util.Until(func() {
for _, cont := range cm.systemContainers {
if cont.ensureStateFunc != nil {
err := cont.ensureStateFunc(cont.manager)
glog.Warningf("[ContainerManager] Failed to ensure state of %q: %v", cont.name, err)
}
}
}, time.Minute, util.NeverStop)
return nil return nil
} }
func (cm *containerManagerImpl) SystemContainersLimit() api.ResourceList {
cpuLimit := int64(0)
// Sum up resources of all external containers.
for _, cont := range cm.systemContainers {
cpuLimit += cont.cpuMillicores
}
return api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
cpuLimit,
resource.DecimalSI),
}
}
// Ensures that the Docker daemon is in the desired container. // Ensures that the Docker daemon is in the desired container.
func (cm *containerManagerImpl) ensureDockerInContainer() error { func ensureDockerInContainer(oomScoreAdj int, manager *fs.Manager) error {
// What container is Docker in? // What container is Docker in?
out, err := exec.Command("pidof", "docker").Output() out, err := exec.Command("pidof", "docker").Output()
if err != nil { if err != nil {
@ -126,16 +187,16 @@ func (cm *containerManagerImpl) ensureDockerInContainer() error {
errs = append(errs, fmt.Errorf("failed to find container of PID %q: %v", pid, err)) errs = append(errs, fmt.Errorf("failed to find container of PID %q: %v", pid, err))
} }
if cont != cm.dockerContainer.Cgroups.Name { if cont != manager.Cgroups.Name {
err = cm.dockerContainer.Apply(pid) err = manager.Apply(pid)
if err != nil { if err != nil {
errs = append(errs, fmt.Errorf("failed to move PID %q (in %q) to %q", pid, cont, cm.dockerContainer.Cgroups.Name)) errs = append(errs, fmt.Errorf("failed to move PID %q (in %q) to %q", pid, cont, manager.Cgroups.Name))
} }
} }
// Also apply oom_score_adj to processes // Also apply oom_score_adj to processes
if err := util.ApplyOomScoreAdj(pid, cm.dockerOomScoreAdj); err != nil { if err := util.ApplyOomScoreAdj(pid, oomScoreAdj); err != nil {
errs = append(errs, fmt.Errorf("failed to apply oom score %q to PID %q", cm.dockerOomScoreAdj, pid)) errs = append(errs, fmt.Errorf("failed to apply oom score %q to PID %q", oomScoreAdj, pid))
} }
} }
@ -155,7 +216,7 @@ func getContainer(pid int) (string, error) {
// Ensures the system container is created and all non-kernel processes without // Ensures the system container is created and all non-kernel processes without
// a container are moved to it. // a container are moved to it.
func (cm *containerManagerImpl) ensureSystemContainer() error { func ensureSystemContainer(rootContainer *fs.Manager, manager *fs.Manager) error {
// Move non-kernel PIDs to the system container. // Move non-kernel PIDs to the system container.
attemptsRemaining := 10 attemptsRemaining := 10
var errs []error var errs []error
@ -164,7 +225,7 @@ func (cm *containerManagerImpl) ensureSystemContainer() error {
errs = []error{} errs = []error{}
attemptsRemaining-- attemptsRemaining--
allPids, err := cm.rootContainer.GetPids() allPids, err := rootContainer.GetPids()
if err != nil { if err != nil {
errs = append(errs, fmt.Errorf("Failed to list PIDs for root: %v", err)) errs = append(errs, fmt.Errorf("Failed to list PIDs for root: %v", err))
continue continue
@ -188,16 +249,16 @@ func (cm *containerManagerImpl) ensureSystemContainer() error {
glog.Infof("Moving non-kernel threads: %v", pids) glog.Infof("Moving non-kernel threads: %v", pids)
for _, pid := range pids { for _, pid := range pids {
err := cm.systemContainer.Apply(pid) err := manager.Apply(pid)
if err != nil { if err != nil {
errs = append(errs, fmt.Errorf("failed to move PID %d into the system container %q: %v", pid, cm.systemContainer.Cgroups.Name, err)) errs = append(errs, fmt.Errorf("failed to move PID %d into the system container %q: %v", pid, manager.Cgroups.Name, err))
continue continue
} }
} }
} }
if attemptsRemaining < 0 { if attemptsRemaining < 0 {
errs = append(errs, fmt.Errorf("ran out of attempts to create system containers %q", cm.systemContainer.Cgroups.Name)) errs = append(errs, fmt.Errorf("ran out of attempts to create system containers %q", manager.Cgroups.Name))
} }
return errors.NewAggregate(errs) return errors.NewAggregate(errs)

View File

@ -20,6 +20,8 @@ package kubelet
import ( import (
"fmt" "fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
) )
type unsupportedContainerManager struct { type unsupportedContainerManager struct {
@ -31,6 +33,10 @@ func (unsupportedContainerManager) Start() error {
return fmt.Errorf("Container Manager is unsupported in this build") return fmt.Errorf("Container Manager is unsupported in this build")
} }
func newContainerManager(dockerDaemonContainer, systemContainer string) (containerManager, error) { func (unsupportedContainerManager) SystemContainersLimit() api.ResourceList {
return api.ResourceList{}
}
func newContainerManager(dockerDaemonContainer, systemContainer, kubeletContainer string) (containerManager, error) {
return &unsupportedContainerManager{}, nil return &unsupportedContainerManager{}, nil
} }

View File

@ -301,7 +301,7 @@ func NewMainKubelet(
// Setup container manager, can fail if the devices hierarchy is not mounted // Setup container manager, can fail if the devices hierarchy is not mounted
// (it is required by Docker however). // (it is required by Docker however).
containerManager, err := newContainerManager(dockerDaemonContainer, systemContainer) containerManager, err := newContainerManager(dockerDaemonContainer, systemContainer, resourceContainer)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create the Container Manager: %v", err) return nil, fmt.Errorf("failed to create the Container Manager: %v", err)
} }

View File

@ -124,6 +124,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
t: t, t: t,
} }
kubelet.volumeManager = newVolumeManager() kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager("", "", "")
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, fakeMirrorClient} return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, fakeMirrorClient}
} }
@ -233,6 +234,7 @@ func newTestKubeletWithFakeRuntime(t *testing.T) *TestKubeletWithFakeRuntime {
t: t, t: t,
} }
kubelet.volumeManager = newVolumeManager() kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager("", "", "")
return &TestKubeletWithFakeRuntime{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} return &TestKubeletWithFakeRuntime{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
} }

View File

@ -89,6 +89,7 @@ func TestRunOnce(t *testing.T) {
os: kubecontainer.FakeOS{}, os: kubecontainer.FakeOS{},
volumeManager: newVolumeManager(), volumeManager: newVolumeManager(),
} }
kb.containerManager, _ = newContainerManager("", "", "")
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if err := kb.setupDataDirs(); err != nil { if err := kb.setupDataDirs(); err != nil {