Merge pull request #14542 from vishh/kubelet-refactor

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-11-12 15:24:22 -08:00
commit e88593d788
19 changed files with 391 additions and 214 deletions

View File

@ -49,6 +49,7 @@ import (
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -216,7 +217,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
configFilePath := integration.MakeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
fakeDocker1.VersionInfo = docker.Env{"ApiVersion=1.20"}
cm := cm.NewStubContainerManager()
kcfg := kubeletapp.SimpleKubelet(
cl,
&fakeDocker1,
@ -238,7 +239,8 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
10*time.Second, /* MinimumGCAge */
3*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */
40 /* MaxPods */)
40, /* MaxPods */
cm)
kubeletapp.RunKubelet(kcfg)
// Kubelet (machine)
@ -270,7 +272,8 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
3*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */
40 /* MaxPods */)
40, /* MaxPods */
cm)
kubeletapp.RunKubelet(kcfg)
return apiServer.URL, configFilePath

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
@ -409,6 +410,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) {
ClusterDomain: s.ClusterDomain,
ConfigFile: s.Config,
ConfigureCBR0: s.ConfigureCBR0,
ContainerManager: nil,
ContainerRuntime: s.ContainerRuntime,
CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy,
@ -474,6 +476,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) {
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
// will be ignored.
func (s *KubeletServer) Run(kcfg *KubeletConfig) error {
var err error
if kcfg == nil {
cfg, err := s.UnsecuredKubeletConfig()
if err != nil {
@ -498,11 +501,17 @@ func (s *KubeletServer) Run(kcfg *KubeletConfig) error {
}
if kcfg.CAdvisorInterface == nil {
ca, err := cadvisor.New(s.CAdvisorPort)
kcfg.CAdvisorInterface, err = cadvisor.New(s.CAdvisorPort)
if err != nil {
return err
}
}
if kcfg.ContainerManager == nil {
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface)
if err != nil {
return err
}
kcfg.CAdvisorInterface = ca
}
util.ReallyCrash = s.ReallyCrashForTesting
@ -670,7 +679,7 @@ func SimpleKubelet(client *client.Client,
osInterface kubecontainer.OSInterface,
fileCheckFrequency, httpCheckFrequency, minimumGCAge, nodeStatusUpdateFrequency, syncFrequency time.Duration,
maxPods int,
) *KubeletConfig {
containerManager cm.ContainerManager) *KubeletConfig {
imageGCPolicy := kubelet.ImageGCPolicy{
HighThresholdPercent: 90,
LowThresholdPercent: 80,
@ -686,6 +695,7 @@ func SimpleKubelet(client *client.Client,
CgroupRoot: "",
Cloud: cloud,
ConfigFile: configFilePath,
ContainerManager: containerManager,
ContainerRuntime: "docker",
CPUCFSQuota: false,
DiskSpacePolicy: diskSpacePolicy,
@ -724,8 +734,8 @@ func SimpleKubelet(client *client.Client,
SyncFrequency: syncFrequency,
SystemContainer: "",
TLSOptions: tlsOptions,
Writer: &io.StdWriter{},
VolumePlugins: volumePlugins,
Writer: &io.StdWriter{},
}
return &kcfg
}
@ -864,6 +874,7 @@ type KubeletConfig struct {
ClusterDomain string
ConfigFile string
ConfigureCBR0 bool
ContainerManager cm.ContainerManager
ContainerRuntime string
CPUCFSQuota bool
DiskSpacePolicy kubelet.DiskSpacePolicy
@ -1004,6 +1015,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
daemonEndpoints,
kc.OOMAdjuster,
kc.SerializeImagePulls,
kc.ContainerManager,
)
if err != nil {

View File

@ -27,6 +27,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubemark"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
@ -93,6 +94,7 @@ func main() {
if config.Morph == "kubelet" {
cadvisorInterface := new(cadvisor.Fake)
containerManager := cm.NewStubContainerManager()
fakeDockerClient := &dockertools.FakeDockerClient{}
fakeDockerClient.VersionInfo = docker.Env{"ApiVersion=1.18"}
@ -106,6 +108,7 @@ func main() {
fakeDockerClient,
config.KubeletPort,
config.KubeletReadOnlyPort,
containerManager,
)
hollowKubelet.Run()
}

View File

@ -36,6 +36,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cm"
kconfig "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -151,7 +152,6 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat
if err != nil {
return k, pc, err
}
klet := k.(*kubelet.Kubelet)
s.kletLock.Lock()
@ -187,6 +187,11 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat
return err
}
kcfg.CAdvisorInterface = cAdvisorInterface
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface)
if err != nil {
return err
}
go func() {
for ni := range nodeInfos {
// TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished

View File

@ -31,7 +31,7 @@ var _ Interface = new(Mock)
func (c *Mock) Start() error {
args := c.Called()
return args.Error(1)
return args.Error(0)
}
// ContainerInfo is a mock implementation of Interface.ContainerInfo.

View File

@ -0,0 +1,35 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cadvisor
import (
cadvisorApi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)
func CapacityFromMachineInfo(info *cadvisorApi.MachineInfo) api.ResourceList {
c := api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
return c
}

View File

@ -14,20 +14,26 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cm
import (
"k8s.io/kubernetes/pkg/api"
)
// Manages the containers running on a machine.
type containerManager interface {
type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start() error
Start(NodeConfig) error
// Returns resources allocated to system containers in the machine.
// These containers include the system and Kubernetes services.
SystemContainersLimit() api.ResourceList
}
type NodeConfig struct {
DockerDaemonContainerName string
SystemContainerName string
KubeletContainerName string
}

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cm
import (
"fmt"
@ -73,21 +73,15 @@ func newSystemContainer(containerName string) *systemContainer {
}
}
type nodeConfig struct {
dockerDaemonContainerName string
systemContainerName string
kubeletContainerName string
}
type containerManagerImpl struct {
cadvisorInterface cadvisor.Interface
mountUtil mount.Interface
nodeConfig
NodeConfig
// External containers being managed.
systemContainers []*systemContainer
}
var _ containerManager = &containerManagerImpl{}
var _ ContainerManager = &containerManagerImpl{}
// checks if the required cgroups subsystems are mounted.
// As of now, only 'cpu' and 'memory' are required.
@ -120,15 +114,11 @@ func validateSystemRequirements(mountUtil mount.Interface) error {
// TODO(vmarmol): Add limits to the system containers.
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func newContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, dockerDaemonContainerName, systemContainerName, kubeletContainerName string) (containerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface) (ContainerManager, error) {
return &containerManagerImpl{
cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil,
nodeConfig: nodeConfig{
dockerDaemonContainerName: dockerDaemonContainerName,
systemContainerName: systemContainerName,
kubeletContainerName: kubeletContainerName,
},
NodeConfig: NodeConfig{},
}, nil
}
@ -197,26 +187,26 @@ func (cm *containerManagerImpl) setupNode() error {
}
systemContainers := []*systemContainer{}
if cm.dockerDaemonContainerName != "" {
cont := newSystemContainer(cm.dockerDaemonContainerName)
if cm.DockerDaemonContainerName != "" {
cont := newSystemContainer(cm.DockerDaemonContainerName)
info, err := cm.cadvisorInterface.MachineInfo()
var capacity = api.ResourceList{}
if err != nil {
} else {
capacity = CapacityFromMachineInfo(info)
capacity = cadvisor.CapacityFromMachineInfo(info)
}
memoryLimit := (int64(capacity.Memory().Value() * DockerMemoryLimitThresholdPercent / 100))
if memoryLimit < MinDockerMemoryLimit {
glog.Warningf("Memory limit %d for container %s is too small, reset it to %d", memoryLimit, cm.dockerDaemonContainerName, MinDockerMemoryLimit)
glog.Warningf("Memory limit %d for container %s is too small, reset it to %d", memoryLimit, cm.DockerDaemonContainerName, MinDockerMemoryLimit)
memoryLimit = MinDockerMemoryLimit
}
glog.V(2).Infof("Configure resource-only container %s with memory limit: %d", cm.dockerDaemonContainerName, memoryLimit)
glog.V(2).Infof("Configure resource-only container %s with memory limit: %d", cm.DockerDaemonContainerName, memoryLimit)
dockerContainer := &fs.Manager{
Cgroups: &configs.Cgroup{
Name: cm.dockerDaemonContainerName,
Name: cm.DockerDaemonContainerName,
Memory: memoryLimit,
MemorySwap: -1,
AllowAllDevices: true,
@ -228,8 +218,8 @@ func (cm *containerManagerImpl) setupNode() error {
systemContainers = append(systemContainers, cont)
}
if cm.systemContainerName != "" {
if cm.systemContainerName == "/" {
if cm.SystemContainerName != "" {
if cm.SystemContainerName == "/" {
return fmt.Errorf("system container cannot be root (\"/\")")
}
@ -238,23 +228,25 @@ func (cm *containerManagerImpl) setupNode() error {
Name: "/",
},
}
manager := createManager(cm.systemContainerName)
manager := createManager(cm.SystemContainerName)
err := ensureSystemContainer(rootContainer, manager)
if err != nil {
return err
}
systemContainers = append(systemContainers, newSystemContainer(cm.systemContainerName))
systemContainers = append(systemContainers, newSystemContainer(cm.SystemContainerName))
}
if cm.kubeletContainerName != "" {
systemContainers = append(systemContainers, newSystemContainer(cm.kubeletContainerName))
if cm.KubeletContainerName != "" {
systemContainers = append(systemContainers, newSystemContainer(cm.KubeletContainerName))
}
cm.systemContainers = systemContainers
return nil
}
func (cm *containerManagerImpl) Start() error {
func (cm *containerManagerImpl) Start(nodeConfig NodeConfig) error {
cm.NodeConfig = nodeConfig
// Setup the node
if err := cm.setupNode(); err != nil {
return err

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cm
import (
"fmt"

View File

@ -0,0 +1,39 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
)
type containerManagerStub struct{}
var _ ContainerManager = &containerManagerStub{}
func (cm *containerManagerStub) Start(_ NodeConfig) error {
glog.V(2).Infof("Starting stub container manager")
return nil
}
func (cm *containerManagerStub) SystemContainersLimit() api.ResourceList {
return api.ResourceList{}
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{}
}

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cm
import (
"fmt"
@ -29,9 +29,9 @@ import (
type unsupportedContainerManager struct {
}
var _ containerManager = &unsupportedContainerManager{}
var _ ContainerManager = &unsupportedContainerManager{}
func (unsupportedContainerManager) Start() error {
func (unsupportedContainerManager) Start(_ NodeConfig) error {
return fmt.Errorf("Container Manager is unsupported in this build")
}
@ -39,6 +39,6 @@ func (unsupportedContainerManager) SystemContainersLimit() api.ResourceList {
return api.ResourceList{}
}
func newContainerManager(mounter mount.Interface, cadvisorInterface cadvisor.Interface, dockerDaemonContainer, systemContainer, kubeletContainer string) (containerManager, error) {
func NewContainerManager(mounter mount.Interface, cadvisorInterface cadvisor.Interface) (ContainerManager, error) {
return &unsupportedContainerManager{}, nil
}

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cm
import (
"fmt"

View File

@ -76,6 +76,9 @@ type realImageManager struct {
// Reference to this node.
nodeRef *api.ObjectReference
// Track initialization
initialized bool
}
// Information about the images we track.
@ -105,23 +108,24 @@ func newImageManager(runtime container.Runtime, cadvisorInterface cadvisor.Inter
cadvisor: cadvisorInterface,
recorder: recorder,
nodeRef: nodeRef,
initialized: false,
}
return im, nil
}
func (im *realImageManager) Start() error {
// Initial detection make detected time "unknown" in the past.
var zero time.Time
err := im.detectImages(zero)
if err != nil {
return err
}
go util.Until(func() {
err := im.detectImages(time.Now())
// Initial detection make detected time "unknown" in the past.
var ts time.Time
if im.initialized {
ts = time.Now()
}
err := im.detectImages(ts)
if err != nil {
glog.Warningf("[ImageManager] Failed to monitor images: %v", err)
} else {
im.initialized = true
}
}, 5*time.Minute, util.NeverStop)

View File

@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/fieldpath"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/envvars"
@ -143,7 +144,8 @@ func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error
return err
}
// New creates a new Kubelet for use in main
// New instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(
hostname string,
nodeName string,
@ -196,7 +198,9 @@ func NewMainKubelet(
daemonEndpoints *api.NodeDaemonEndpoints,
oomAdjuster *oom.OOMAdjuster,
serializeImagePulls bool,
containerManager cm.ContainerManager,
) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -278,9 +282,6 @@ func NewMainKubelet(
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
runtimeMutex: sync.Mutex{},
runtimeUpThreshold: maxWaitForContainerRuntime,
lastTimestampRuntimeUp: time.Time{},
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
@ -299,13 +300,13 @@ func NewMainKubelet(
chownRunner: chownRunner,
writer: writer,
configureCBR0: configureCBR0,
podCIDR: podCIDR,
reconcileCIDR: reconcileCIDR,
pods: pods,
syncLoopMonitor: util.AtomicValue{},
resolverConfig: resolverConfig,
cpuCFSQuota: cpuCFSQuota,
daemonEndpoints: daemonEndpoints,
containerManager: containerManager,
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
@ -378,6 +379,8 @@ func NewMainKubelet(
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
}
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR, klet.isContainerRuntimeVersionCompatible)
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
if err != nil {
@ -394,23 +397,12 @@ func NewMainKubelet(
// Setup container manager, can fail if the devices hierarchy is not mounted
// (it is required by Docker however).
containerManager, err := newContainerManager(mounter, cadvisorInterface, dockerDaemonContainer, systemContainer, resourceContainer)
if err != nil {
return nil, fmt.Errorf("failed to create the Container Manager: %v", err)
klet.nodeConfig = cm.NodeConfig{
DockerDaemonContainerName: dockerDaemonContainer,
SystemContainerName: systemContainer,
KubeletContainerName: resourceContainer,
}
klet.containerManager = containerManager
go util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop)
if klet.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go util.Until(klet.syncNodeStatus, klet.nodeStatusUpdateFrequency, util.NeverStop)
}
// Wait for the runtime to be up with a timeout.
if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {
return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err)
}
klet.lastTimestampRuntimeUp = time.Now()
klet.runtimeState.setRuntimeSync(time.Now())
klet.runner = klet.containerRuntime
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
@ -423,6 +415,10 @@ func NewMainKubelet(
containerRefManager,
recorder)
if err := klet.volumePluginMgr.InitPlugins(volumePlugins, &volumeHost{klet}); err != nil {
return nil, err
}
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
@ -433,25 +429,8 @@ func NewMainKubelet(
// once we switch to using pod event generator.
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, klet.resyncInterval)
metrics.Register(runtimeCache)
if err = klet.setupDataDirs(); err != nil {
return nil, err
}
if err = klet.volumePluginMgr.InitPlugins(volumePlugins, &volumeHost{klet}); err != nil {
return nil, err
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil {
if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
}
}
klet.backOff = util.NewBackOff(resyncInterval, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
klet.sourcesSeen = sets.NewString()
return klet, nil
}
@ -519,13 +498,7 @@ type Kubelet struct {
// Last timestamp when runtime responded on ping.
// Mutex is used to protect this value.
runtimeMutex sync.Mutex
runtimeUpThreshold time.Duration
lastTimestampRuntimeUp time.Time
// Network Status information
networkConfigMutex sync.Mutex
networkConfigured bool
runtimeState *runtimeState
// Volume plugins.
volumePluginMgr volume.VolumePluginMgr
@ -608,12 +581,12 @@ type Kubelet struct {
writer kubeio.Writer
// Manager of non-Runtime containers.
containerManager containerManager
containerManager cm.ContainerManager
nodeConfig cm.NodeConfig
// Whether or not kubelet should take responsibility for keeping cbr0 in
// the correct state.
configureCBR0 bool
podCIDR string
reconcileCIDR bool
// Number of Pods which can be run by this Kubelet
@ -644,6 +617,9 @@ type Kubelet struct {
// A queue used to trigger pod workers.
workQueue queue.WorkQueue
// oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
oneTimeInitializer sync.Once
}
func (kl *Kubelet) allSourcesReady() bool {
@ -816,16 +792,25 @@ func (kl *Kubelet) StartGarbageCollection() {
}, 5*time.Minute, util.NeverStop)
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
// initializeModules will initialize internal modules that do not require the container runtime to be up.
// Note that the modules here must not depend on modules that are not initialized here.
func (kl *Kubelet) initializeModules() error {
// Promethues metrics.
metrics.Register(kl.runtimeCache)
// Step 1: Setup filesystem directories.
if err := kl.setupDataDirs(); err != nil {
return err
}
// Move Kubelet to a container.
// Step 2: If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil {
if err := kl.os.Mkdir(containerLogsDir, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
}
}
// Step 3: Move Kubelet to a container, if required.
if kl.resourceContainer != "" {
// Fixme: I need to reside inside ContainerManager interface.
err := util.RunInResourceContainer(kl.resourceContainer)
@ -835,26 +820,49 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
glog.Infof("Running in container %q", kl.resourceContainer)
}
// Step 4: Start the image manager.
if err := kl.imageManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, "Failed to start ImageManager %v", err)
glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
}
if err := kl.cadvisor.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, "Failed to start CAdvisor %v", err)
glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err)
}
if err := kl.containerManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, "Failed to start ContainerManager %v", err)
glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err)
// Step 5: Start container manager.
if err := kl.containerManager.Start(kl.nodeConfig); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err)
}
// Step 6: Start out of memory watcher.
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, "Failed to start OOM watcher %v", err)
glog.Errorf("Failed to start OOM watching: %v", err)
return fmt.Errorf("Failed to start OOM watcher %v", err)
}
return nil
}
// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
func (kl *Kubelet) initializeRuntimeDependentModules() {
if err := kl.cadvisor.Start(); err != nil {
kl.runtimeState.setInternalError(fmt.Errorf("Failed to start cAdvisor %v", err))
}
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, kubecontainer.KubeletSetupFailed, err.Error())
glog.Error(err)
kl.runtimeState.setInitError(err)
}
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go util.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, util.NeverStop)
}
go util.Until(kl.syncNetworkStatus, 30*time.Second, util.NeverStop)
go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
// Start a goroutine responsible for killing pods (that are not properly
@ -2029,7 +2037,7 @@ func (kl *Kubelet) hasInsufficientfFreeResources(pods []*api.Pod) (bool, bool) {
// TODO: Should we admit the pod when machine info is unavailable?
return false, false
}
capacity := CapacityFromMachineInfo(info)
capacity := cadvisor.CapacityFromMachineInfo(info)
_, notFittingCPU, notFittingMemory := predicates.CheckPodsExceedingFreeResources(pods, capacity)
return len(notFittingCPU) > 0, len(notFittingMemory) > 0
}
@ -2117,14 +2125,9 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
syncTicker := time.NewTicker(time.Second)
housekeepingTicker := time.NewTicker(housekeepingPeriod)
for {
if !kl.containerRuntimeUp() {
if rs := kl.runtimeState.errors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, container runtime is not up.")
continue
}
if !kl.doneNetworkConfigure() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, network is not configured")
continue
}
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C) {
@ -2414,10 +2417,10 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
func (kl *Kubelet) updateRuntimeUp() {
start := time.Now()
err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond)
kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock()
if err == nil {
kl.lastTimestampRuntimeUp = time.Now()
// Errors in initialization will be synchronized internally.
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(time.Now())
} else {
glog.Errorf("Container runtime sanity check failed after %v, err: %v", time.Since(start), err)
}
@ -2468,24 +2471,22 @@ func (kl *Kubelet) recordNodeStatusEvent(event string) {
var oldNodeUnschedulable bool
func (kl *Kubelet) syncNetworkStatus() {
kl.networkConfigMutex.Lock()
defer kl.networkConfigMutex.Unlock()
networkConfigured := true
var err error
if kl.configureCBR0 {
if err := ensureIPTablesMasqRule(); err != nil {
networkConfigured = false
glog.Errorf("Error on adding ip table rules: %v", err)
err = fmt.Errorf("Error on adding ip table rules: %v", err)
glog.Error(err)
}
if len(kl.podCIDR) == 0 {
glog.Warningf("ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now")
networkConfigured = false
} else if err := kl.reconcileCBR0(kl.podCIDR); err != nil {
networkConfigured = false
glog.Errorf("Error configuring cbr0: %v", err)
podCIDR := kl.runtimeState.podCIDR()
if len(podCIDR) == 0 {
err = fmt.Errorf("ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now")
glog.Warning(err)
} else if err := kl.reconcileCBR0(podCIDR); err != nil {
err = fmt.Errorf("Error configuring cbr0: %v", err)
glog.Error(err)
}
}
kl.networkConfigured = networkConfigured
kl.runtimeState.setNetworkState(err)
}
// setNodeStatus fills in the Status fields of the given Node, overwriting
@ -2567,7 +2568,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Status.Capacity = CapacityFromMachineInfo(info)
node.Status.Capacity = cadvisor.CapacityFromMachineInfo(info)
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(kl.pods), resource.DecimalSI)
if node.Status.NodeInfo.BootID != "" &&
@ -2595,17 +2596,10 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
node.Status.DaemonEndpoints = *kl.daemonEndpoints
// Check whether container runtime can be reported as up.
containerRuntimeUp := kl.containerRuntimeUp()
// Check whether network is configured properly
networkConfigured := kl.doneNetworkConfigure()
// Check whether runtime version meets the minimal requirements
containerRuntimeVersionRequirementMet := kl.containerRuntimeVersionRequirementMet()
currentTime := unversioned.Now()
var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus
if containerRuntimeUp && networkConfigured && containerRuntimeVersionRequirementMet {
if rs := kl.runtimeState.errors(); len(rs) == 0 {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
@ -2614,21 +2608,11 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
LastHeartbeatTime: currentTime,
}
} else {
var messages []string
if !containerRuntimeUp {
messages = append(messages, "container runtime is down")
}
if !networkConfigured {
messages = append(messages, "network not configured correctly")
}
if !containerRuntimeVersionRequirementMet {
messages = append(messages, fmt.Sprintf("container runtime version is older than %s", dockertools.MinimumDockerAPIVersion))
}
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(messages, ","),
Message: strings.Join(rs, ","),
LastHeartbeatTime: currentTime,
}
}
@ -2724,39 +2708,24 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
return nil
}
func (kl *Kubelet) containerRuntimeUp() bool {
kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock()
return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now())
}
func (kl *Kubelet) doneNetworkConfigure() bool {
kl.networkConfigMutex.Lock()
defer kl.networkConfigMutex.Unlock()
return kl.networkConfigured
}
func (kl *Kubelet) containerRuntimeVersionRequirementMet() bool {
// FIXME: Why not combine this with container runtime health check?
func (kl *Kubelet) isContainerRuntimeVersionCompatible() error {
switch kl.GetRuntime().Type() {
case "docker":
version, err := kl.GetContainerRuntimeVersion()
if err != nil {
return true
return nil
}
// Verify the docker version.
result, err := version.Compare(dockertools.MinimumDockerAPIVersion)
if err != nil {
glog.Errorf("Cannot compare current docker version %v with minimum support Docker version %q", version, dockertools.MinimumDockerAPIVersion)
return false
return fmt.Errorf("failed to compare current docker version %v with minimum support Docker version %q - %v", version, dockertools.MinimumDockerAPIVersion, err)
}
if result < 0 {
return fmt.Errorf("container runtime version is older than %s", dockertools.MinimumDockerAPIVersion)
}
return (result >= 0)
case "rkt":
// TODO(dawnchen): Rkt support here
return true
default:
glog.Errorf("unsupported container runtime %s specified", kl.GetRuntime().Type())
return true
}
return nil
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
@ -2769,11 +2738,9 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.nodeName)
}
kl.networkConfigMutex.Lock()
if kl.reconcileCIDR {
kl.podCIDR = node.Spec.PodCIDR
kl.runtimeState.setPodCIDR(node.Spec.PodCIDR)
}
kl.networkConfigMutex.Unlock()
if err := kl.setNodeStatus(node); err != nil {
return err

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
@ -96,7 +97,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.hostname = testKubeletHostname
kubelet.nodeName = testKubeletHostname
kubelet.runtimeUpThreshold = maxWaitForContainerRuntime
kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, "" /* Pod CIDR */, func() error { return nil })
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err)
@ -139,8 +140,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.livenessManager = proberesults.NewManager()
kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "")
kubelet.networkConfigured = true
kubelet.containerManager = cm.NewStubContainerManager()
fakeClock := &util.FakeClock{Time: time.Now()}
kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock
@ -357,8 +357,10 @@ func TestSyncLoopAbort(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
kubelet.lastTimestampRuntimeUp = time.Now()
kubelet.networkConfigured = true
kubelet.runtimeState.setRuntimeSync(time.Now())
// The syncLoop waits on time.After(resyncInterval), set it really big so that we don't race for
// the channel close
kubelet.resyncInterval = time.Second * 30
ch := make(chan kubetypes.PodUpdate)
close(ch)
@ -2527,6 +2529,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
@ -2626,7 +2629,8 @@ func TestUpdateNewNodeStatus(t *testing.T) {
}
}
func TestDockerRuntimeVersion(t *testing.T) {
// FIXME: Enable me..
func testDockerRuntimeVersion(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
@ -2799,6 +2803,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@ -2919,6 +2924,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@ -2993,8 +2999,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
},
},
}
kubelet.runtimeUpThreshold = time.Duration(0)
kubelet.runtimeState = newRuntimeState(time.Duration(0), false, "" /* Pod CIDR */, func() error { return nil })
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
@ -3074,9 +3079,11 @@ func TestCreateMirrorPod(t *testing.T) {
func TestDeleteOutdatedMirrorPod(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("Start").Return(nil)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorClient
pod := &api.Pod{
@ -3127,9 +3134,11 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
func TestDeleteOrphanedMirrorPods(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("Start").Return(nil)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorClient
orphanPods := []*api.Pod{

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
@ -56,7 +57,7 @@ func TestRunOnce(t *testing.T) {
diskSpaceManager: diskSpaceManager,
containerRuntime: fakeRuntime,
}
kb.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), cadvisor, "", "", "")
kb.containerManager = cm.NewStubContainerManager()
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if err := kb.setupDataDirs(); err != nil {

112
pkg/kubelet/runtime.go Normal file
View File

@ -0,0 +1,112 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"sync"
"time"
)
type runtimeState struct {
sync.Mutex
lastBaseRuntimeSync time.Time
baseRuntimeSyncThreshold time.Duration
networkError error
internalError error
cidr string
initError error
runtimeCompatibility func() error
}
func (s *runtimeState) setRuntimeSync(t time.Time) {
s.Lock()
defer s.Unlock()
s.lastBaseRuntimeSync = t
}
func (s *runtimeState) setInternalError(err error) {
s.Lock()
defer s.Unlock()
s.internalError = err
}
func (s *runtimeState) setNetworkState(err error) {
s.Lock()
defer s.Unlock()
s.networkError = err
}
func (s *runtimeState) setPodCIDR(cidr string) {
s.Lock()
defer s.Unlock()
s.cidr = cidr
}
func (s *runtimeState) podCIDR() string {
s.Lock()
defer s.Unlock()
return s.cidr
}
func (s *runtimeState) setInitError(err error) {
s.Lock()
defer s.Unlock()
s.initError = err
}
func (s *runtimeState) errors() []string {
s.Lock()
defer s.Unlock()
var ret []string
if s.initError != nil {
ret = append(ret, s.initError.Error())
}
if s.networkError != nil {
ret = append(ret, s.networkError.Error())
}
if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
ret = append(ret, "container runtime is down")
}
if s.internalError != nil {
ret = append(ret, s.internalError.Error())
}
if err := s.runtimeCompatibility(); err != nil {
ret = append(ret, err.Error())
}
return ret
}
func newRuntimeState(
runtimeSyncThreshold time.Duration,
configureNetwork bool,
cidr string,
runtimeCompatibility func() error,
) *runtimeState {
var networkError error = nil
if configureNetwork {
networkError = fmt.Errorf("network state unknown")
}
return &runtimeState{
lastBaseRuntimeSync: time.Time{},
baseRuntimeSyncThreshold: runtimeSyncThreshold,
networkError: networkError,
cidr: cidr,
internalError: nil,
runtimeCompatibility: runtimeCompatibility,
}
}

View File

@ -19,26 +19,12 @@ package kubelet
import (
"fmt"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/capabilities"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/securitycontext"
)
func CapacityFromMachineInfo(info *cadvisorapi.MachineInfo) api.ResourceList {
c := api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
return c
}
// Check whether we have the capabilities to run the specified pod.
func canRunPod(pod *api.Pod) error {
if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork {

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/volume/empty_dir"
@ -41,6 +42,7 @@ func NewHollowKubelet(
cadvisorInterface cadvisor.Interface,
dockerClient dockertools.DockerInterface,
kubeletPort, kubeletReadOnlyPort int,
containerManager cm.ContainerManager,
) *HollowKubelet {
testRootDir := integration.MakeTempDirOrDie("hollow-kubelet.", "")
manifestFilePath := integration.MakeTempDirOrDie("manifest", testRootDir)
@ -69,6 +71,7 @@ func NewHollowKubelet(
10*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */
40, /* MaxPods */
containerManager,
),
}
}