mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-31 05:40:42 +00:00 
			
		
		
		
	This will make output checking easier (done in a separate commit). kubelet itself still uses the global logger.
		
			
				
	
	
		
			2447 lines
		
	
	
		
			97 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			2447 lines
		
	
	
		
			97 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2015 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package kubelet
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"crypto/tls"
 | |
| 	"fmt"
 | |
| 	"math"
 | |
| 	"net"
 | |
| 	"net/http"
 | |
| 	"os"
 | |
| 	"path"
 | |
| 	sysruntime "runtime"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/opencontainers/selinux/go-selinux"
 | |
| 	"k8s.io/client-go/informers"
 | |
| 
 | |
| 	cadvisorapi "github.com/google/cadvisor/info/v1"
 | |
| 	libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns"
 | |
| 	"k8s.io/mount-utils"
 | |
| 	"k8s.io/utils/integer"
 | |
| 	netutils "k8s.io/utils/net"
 | |
| 
 | |
| 	v1 "k8s.io/api/core/v1"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/fields"
 | |
| 	"k8s.io/apimachinery/pkg/labels"
 | |
| 	"k8s.io/apimachinery/pkg/types"
 | |
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/sets"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | |
| 	clientset "k8s.io/client-go/kubernetes"
 | |
| 	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | |
| 	corelisters "k8s.io/client-go/listers/core/v1"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	"k8s.io/client-go/util/certificate"
 | |
| 	"k8s.io/client-go/util/flowcontrol"
 | |
| 	cloudprovider "k8s.io/cloud-provider"
 | |
| 	"k8s.io/component-helpers/apimachinery/lease"
 | |
| 	internalapi "k8s.io/cri-api/pkg/apis"
 | |
| 	"k8s.io/klog/v2"
 | |
| 	pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
 | |
| 	statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
 | |
| 	"k8s.io/kubernetes/pkg/features"
 | |
| 	kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
 | |
| 	kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/cloudresource"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/cm"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/config"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/configmap"
 | |
| 	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/cri/remote"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/events"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/eviction"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/images"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/logs"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/metrics"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/network/dns"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/nodeshutdown"
 | |
| 	oomwatcher "k8s.io/kubernetes/pkg/kubelet/oom"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/pleg"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
 | |
| 	plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
 | |
| 	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/preemption"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/prober"
 | |
| 	proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/secret"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/server"
 | |
| 	servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
 | |
| 	serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/stats"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/status"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/sysctl"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/token"
 | |
| 	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/util"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/util/manager"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/util/queue"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/volumemanager"
 | |
| 	"k8s.io/kubernetes/pkg/security/apparmor"
 | |
| 	"k8s.io/kubernetes/pkg/util/oom"
 | |
| 	"k8s.io/kubernetes/pkg/volume"
 | |
| 	"k8s.io/kubernetes/pkg/volume/csi"
 | |
| 	"k8s.io/kubernetes/pkg/volume/util/hostutil"
 | |
| 	"k8s.io/kubernetes/pkg/volume/util/subpath"
 | |
| 	"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
 | |
| 	"k8s.io/utils/clock"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// Max amount of time to wait for the container runtime to come up.
 | |
| 	maxWaitForContainerRuntime = 30 * time.Second
 | |
| 
 | |
| 	// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
 | |
| 	nodeStatusUpdateRetry = 5
 | |
| 
 | |
| 	// ContainerLogsDir is the location of container logs.
 | |
| 	ContainerLogsDir = "/var/log/containers"
 | |
| 
 | |
| 	// MaxContainerBackOff is the max backoff period, exported for the e2e test
 | |
| 	MaxContainerBackOff = 300 * time.Second
 | |
| 
 | |
| 	// Period for performing global cleanup tasks.
 | |
| 	housekeepingPeriod = time.Second * 2
 | |
| 
 | |
| 	// Duration at which housekeeping failed to satisfy the invariant that
 | |
| 	// housekeeping should be fast to avoid blocking pod config (while
 | |
| 	// housekeeping is running no new pods are started or deleted).
 | |
| 	housekeepingWarningDuration = time.Second * 15
 | |
| 
 | |
| 	// Period for performing eviction monitoring.
 | |
| 	// ensure this is kept in sync with internal cadvisor housekeeping.
 | |
| 	evictionMonitoringPeriod = time.Second * 10
 | |
| 
 | |
| 	// The path in containers' filesystems where the hosts file is mounted.
 | |
| 	linuxEtcHostsPath   = "/etc/hosts"
 | |
| 	windowsEtcHostsPath = "C:\\Windows\\System32\\drivers\\etc\\hosts"
 | |
| 
 | |
| 	// Capacity of the channel for receiving pod lifecycle events. This number
 | |
| 	// is a bit arbitrary and may be adjusted in the future.
 | |
| 	plegChannelCapacity = 1000
 | |
| 
 | |
| 	// Generic PLEG relies on relisting for discovering container events.
 | |
| 	// A longer period means that kubelet will take longer to detect container
 | |
| 	// changes and to update pod status. On the other hand, a shorter period
 | |
| 	// will cause more frequent relisting (e.g., container runtime operations),
 | |
| 	// leading to higher cpu usage.
 | |
| 	// Note that even though we set the period to 1s, the relisting itself can
 | |
| 	// take more than 1s to finish if the container runtime responds slowly
 | |
| 	// and/or when there are many container changes in one cycle.
 | |
| 	plegRelistPeriod = time.Second * 1
 | |
| 
 | |
| 	// backOffPeriod is the period to back off when pod syncing results in an
 | |
| 	// error. It is also used as the base period for the exponential backoff
 | |
| 	// container restarts and image pulls.
 | |
| 	backOffPeriod = time.Second * 10
 | |
| 
 | |
| 	// ContainerGCPeriod is the period for performing container garbage collection.
 | |
| 	ContainerGCPeriod = time.Minute
 | |
| 	// ImageGCPeriod is the period for performing image garbage collection.
 | |
| 	ImageGCPeriod = 5 * time.Minute
 | |
| 
 | |
| 	// Minimum number of dead containers to keep in a pod
 | |
| 	minDeadContainerInPod = 1
 | |
| 
 | |
| 	// nodeLeaseRenewIntervalFraction is the fraction of lease duration to renew the lease
 | |
| 	nodeLeaseRenewIntervalFraction = 0.25
 | |
| )
 | |
| 
 | |
| var etcHostsPath = getContainerEtcHostsPath()
 | |
| 
 | |
| func getContainerEtcHostsPath() string {
 | |
| 	if sysruntime.GOOS == "windows" {
 | |
| 		return windowsEtcHostsPath
 | |
| 	}
 | |
| 	return linuxEtcHostsPath
 | |
| }
 | |
| 
 | |
| // SyncHandler is an interface implemented by Kubelet, for testability
 | |
| type SyncHandler interface {
 | |
| 	HandlePodAdditions(pods []*v1.Pod)
 | |
| 	HandlePodUpdates(pods []*v1.Pod)
 | |
| 	HandlePodRemoves(pods []*v1.Pod)
 | |
| 	HandlePodReconcile(pods []*v1.Pod)
 | |
| 	HandlePodSyncs(pods []*v1.Pod)
 | |
| 	HandlePodCleanups() error
 | |
| }
 | |
| 
 | |
| // Option is a functional option type for Kubelet
 | |
| type Option func(*Kubelet)
 | |
| 
 | |
| // Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol
 | |
| type Bootstrap interface {
 | |
| 	GetConfiguration() kubeletconfiginternal.KubeletConfiguration
 | |
| 	BirthCry()
 | |
| 	StartGarbageCollection()
 | |
| 	ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface)
 | |
| 	ListenAndServeReadOnly(address net.IP, port uint)
 | |
| 	ListenAndServePodResources()
 | |
| 	Run(<-chan kubetypes.PodUpdate)
 | |
| 	RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
 | |
| }
 | |
| 
 | |
| // Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
 | |
| // at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
 | |
| // these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
 | |
| type Dependencies struct {
 | |
| 	Options []Option
 | |
| 
 | |
| 	// Injected Dependencies
 | |
| 	Auth                 server.AuthInterface
 | |
| 	CAdvisorInterface    cadvisor.Interface
 | |
| 	Cloud                cloudprovider.Interface
 | |
| 	ContainerManager     cm.ContainerManager
 | |
| 	EventClient          v1core.EventsGetter
 | |
| 	HeartbeatClient      clientset.Interface
 | |
| 	OnHeartbeatFailure   func()
 | |
| 	KubeClient           clientset.Interface
 | |
| 	Mounter              mount.Interface
 | |
| 	HostUtil             hostutil.HostUtils
 | |
| 	OOMAdjuster          *oom.OOMAdjuster
 | |
| 	OSInterface          kubecontainer.OSInterface
 | |
| 	PodConfig            *config.PodConfig
 | |
| 	ProbeManager         prober.Manager
 | |
| 	Recorder             record.EventRecorder
 | |
| 	Subpather            subpath.Interface
 | |
| 	VolumePlugins        []volume.VolumePlugin
 | |
| 	DynamicPluginProber  volume.DynamicPluginProber
 | |
| 	TLSOptions           *server.TLSOptions
 | |
| 	RemoteRuntimeService internalapi.RuntimeService
 | |
| 	RemoteImageService   internalapi.ImageManagerService
 | |
| 	// remove it after cadvisor.UsingLegacyCadvisorStats dropped.
 | |
| 	useLegacyCadvisorStats bool
 | |
| }
 | |
| 
 | |
| // makePodSourceConfig creates a config.PodConfig from the given
 | |
| // KubeletConfiguration or returns an error.
 | |
| func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
 | |
| 	manifestURLHeader := make(http.Header)
 | |
| 	if len(kubeCfg.StaticPodURLHeader) > 0 {
 | |
| 		for k, v := range kubeCfg.StaticPodURLHeader {
 | |
| 			for i := range v {
 | |
| 				manifestURLHeader.Add(k, v[i])
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// source of all configuration
 | |
| 	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
 | |
| 
 | |
| 	// TODO:  it needs to be replaced by a proper context in the future
 | |
| 	ctx := context.TODO()
 | |
| 
 | |
| 	// define file config source
 | |
| 	if kubeCfg.StaticPodPath != "" {
 | |
| 		klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
 | |
| 		config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
 | |
| 	}
 | |
| 
 | |
| 	// define url config source
 | |
| 	if kubeCfg.StaticPodURL != "" {
 | |
| 		klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
 | |
| 		config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
 | |
| 	}
 | |
| 
 | |
| 	if kubeDeps.KubeClient != nil {
 | |
| 		klog.InfoS("Adding apiserver pod source")
 | |
| 		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
 | |
| 	}
 | |
| 	return cfg, nil
 | |
| }
 | |
| 
 | |
| // PreInitRuntimeService will init runtime service before RunKubelet.
 | |
| func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
 | |
| 	kubeDeps *Dependencies,
 | |
| 	remoteRuntimeEndpoint string,
 | |
| 	remoteImageEndpoint string) error {
 | |
| 	// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
 | |
| 	if remoteRuntimeEndpoint != "" && remoteImageEndpoint == "" {
 | |
| 		remoteImageEndpoint = remoteRuntimeEndpoint
 | |
| 	}
 | |
| 
 | |
| 	var err error
 | |
| 	if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(remoteRuntimeEndpoint)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
 | |
| // No initialization of Kubelet and its modules should happen here.
 | |
| func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
 | |
| 	kubeDeps *Dependencies,
 | |
| 	crOptions *config.ContainerRuntimeOptions,
 | |
| 	hostname string,
 | |
| 	hostnameOverridden bool,
 | |
| 	nodeName types.NodeName,
 | |
| 	nodeIPs []net.IP,
 | |
| 	providerID string,
 | |
| 	cloudProvider string,
 | |
| 	certDirectory string,
 | |
| 	rootDirectory string,
 | |
| 	imageCredentialProviderConfigFile string,
 | |
| 	imageCredentialProviderBinDir string,
 | |
| 	registerNode bool,
 | |
| 	registerWithTaints []v1.Taint,
 | |
| 	allowedUnsafeSysctls []string,
 | |
| 	experimentalMounterPath string,
 | |
| 	kernelMemcgNotification bool,
 | |
| 	experimentalNodeAllocatableIgnoreEvictionThreshold bool,
 | |
| 	minimumGCAge metav1.Duration,
 | |
| 	maxPerPodContainerCount int32,
 | |
| 	maxContainerCount int32,
 | |
| 	masterServiceNamespace string,
 | |
| 	registerSchedulable bool,
 | |
| 	keepTerminatedPodVolumes bool,
 | |
| 	nodeLabels map[string]string,
 | |
| 	nodeStatusMaxImages int32,
 | |
| 	seccompDefault bool,
 | |
| ) (*Kubelet, error) {
 | |
| 	logger := klog.TODO()
 | |
| 
 | |
| 	if rootDirectory == "" {
 | |
| 		return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
 | |
| 	}
 | |
| 	if kubeCfg.SyncFrequency.Duration <= 0 {
 | |
| 		return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
 | |
| 	}
 | |
| 
 | |
| 	if kubeCfg.MakeIPTablesUtilChains {
 | |
| 		if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
 | |
| 			return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
 | |
| 		}
 | |
| 		if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
 | |
| 			return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
 | |
| 		}
 | |
| 		if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
 | |
| 			return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if utilfeature.DefaultFeatureGate.Enabled(features.DisableCloudProviders) && cloudprovider.IsDeprecatedInternal(cloudProvider) {
 | |
| 		cloudprovider.DisableWarningForProvider(cloudProvider)
 | |
| 		return nil, fmt.Errorf("cloud provider %q was specified, but built-in cloud providers are disabled. Please set --cloud-provider=external and migrate to an external cloud provider", cloudProvider)
 | |
| 	}
 | |
| 
 | |
| 	var nodeHasSynced cache.InformerSynced
 | |
| 	var nodeLister corelisters.NodeLister
 | |
| 
 | |
| 	// If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
 | |
| 	// If not nil, we are running as part of a cluster and should sync w/API
 | |
| 	if kubeDeps.KubeClient != nil {
 | |
| 		kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
 | |
| 			options.FieldSelector = fields.Set{metav1.ObjectNameField: string(nodeName)}.String()
 | |
| 		}))
 | |
| 		nodeLister = kubeInformers.Core().V1().Nodes().Lister()
 | |
| 		nodeHasSynced = func() bool {
 | |
| 			return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
 | |
| 		}
 | |
| 		kubeInformers.Start(wait.NeverStop)
 | |
| 		klog.InfoS("Attempting to sync node with API server")
 | |
| 	} else {
 | |
| 		// we don't have a client to sync!
 | |
| 		nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
 | |
| 		nodeLister = corelisters.NewNodeLister(nodeIndexer)
 | |
| 		nodeHasSynced = func() bool { return true }
 | |
| 		klog.InfoS("Kubelet is running in standalone mode, will skip API server sync")
 | |
| 	}
 | |
| 
 | |
| 	if kubeDeps.PodConfig == nil {
 | |
| 		var err error
 | |
| 		kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	containerGCPolicy := kubecontainer.GCPolicy{
 | |
| 		MinAge:             minimumGCAge.Duration,
 | |
| 		MaxPerPodContainer: int(maxPerPodContainerCount),
 | |
| 		MaxContainers:      int(maxContainerCount),
 | |
| 	}
 | |
| 
 | |
| 	daemonEndpoints := &v1.NodeDaemonEndpoints{
 | |
| 		KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
 | |
| 	}
 | |
| 
 | |
| 	imageGCPolicy := images.ImageGCPolicy{
 | |
| 		MinAge:               kubeCfg.ImageMinimumGCAge.Duration,
 | |
| 		HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
 | |
| 		LowThresholdPercent:  int(kubeCfg.ImageGCLowThresholdPercent),
 | |
| 	}
 | |
| 
 | |
| 	enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
 | |
| 	if experimentalNodeAllocatableIgnoreEvictionThreshold {
 | |
| 		// Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
 | |
| 		enforceNodeAllocatable = []string{}
 | |
| 	}
 | |
| 	thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	evictionConfig := eviction.Config{
 | |
| 		PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
 | |
| 		MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
 | |
| 		Thresholds:               thresholds,
 | |
| 		KernelMemcgNotification:  kernelMemcgNotification,
 | |
| 		PodCgroupRoot:            kubeDeps.ContainerManager.GetPodCgroupRoot(),
 | |
| 	}
 | |
| 
 | |
| 	var serviceLister corelisters.ServiceLister
 | |
| 	var serviceHasSynced cache.InformerSynced
 | |
| 	if kubeDeps.KubeClient != nil {
 | |
| 		kubeInformers := informers.NewSharedInformerFactory(kubeDeps.KubeClient, 0)
 | |
| 		serviceLister = kubeInformers.Core().V1().Services().Lister()
 | |
| 		serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced
 | |
| 		kubeInformers.Start(wait.NeverStop)
 | |
| 	} else {
 | |
| 		serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
 | |
| 		serviceLister = corelisters.NewServiceLister(serviceIndexer)
 | |
| 		serviceHasSynced = func() bool { return true }
 | |
| 	}
 | |
| 
 | |
| 	// construct a node reference used for events
 | |
| 	nodeRef := &v1.ObjectReference{
 | |
| 		Kind:      "Node",
 | |
| 		Name:      string(nodeName),
 | |
| 		UID:       types.UID(nodeName),
 | |
| 		Namespace: "",
 | |
| 	}
 | |
| 
 | |
| 	oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder)
 | |
| 	if err != nil {
 | |
| 		if libcontaineruserns.RunningInUserNS() {
 | |
| 			if utilfeature.DefaultFeatureGate.Enabled(features.KubeletInUserNamespace) {
 | |
| 				// oomwatcher.NewWatcher returns "open /dev/kmsg: operation not permitted" error,
 | |
| 				// when running in a user namespace with sysctl value `kernel.dmesg_restrict=1`.
 | |
| 				klog.V(2).InfoS("Failed to create an oomWatcher (running in UserNS, ignoring)", "err", err)
 | |
| 				oomWatcher = nil
 | |
| 			} else {
 | |
| 				klog.ErrorS(err, "Failed to create an oomWatcher (running in UserNS, Hint: enable KubeletInUserNamespace feature flag to ignore the error)")
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		} else {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
 | |
| 	for _, ipEntry := range kubeCfg.ClusterDNS {
 | |
| 		ip := netutils.ParseIPSloppy(ipEntry)
 | |
| 		if ip == nil {
 | |
| 			klog.InfoS("Invalid clusterDNS IP", "IP", ipEntry)
 | |
| 		} else {
 | |
| 			clusterDNS = append(clusterDNS, ip)
 | |
| 		}
 | |
| 	}
 | |
| 	httpClient := &http.Client{}
 | |
| 
 | |
| 	klet := &Kubelet{
 | |
| 		hostname:                                hostname,
 | |
| 		hostnameOverridden:                      hostnameOverridden,
 | |
| 		nodeName:                                nodeName,
 | |
| 		kubeClient:                              kubeDeps.KubeClient,
 | |
| 		heartbeatClient:                         kubeDeps.HeartbeatClient,
 | |
| 		onRepeatedHeartbeatFailure:              kubeDeps.OnHeartbeatFailure,
 | |
| 		rootDirectory:                           rootDirectory,
 | |
| 		resyncInterval:                          kubeCfg.SyncFrequency.Duration,
 | |
| 		sourcesReady:                            config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
 | |
| 		registerNode:                            registerNode,
 | |
| 		registerWithTaints:                      registerWithTaints,
 | |
| 		registerSchedulable:                     registerSchedulable,
 | |
| 		dnsConfigurer:                           dns.NewConfigurer(kubeDeps.Recorder, nodeRef, nodeIPs, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
 | |
| 		serviceLister:                           serviceLister,
 | |
| 		serviceHasSynced:                        serviceHasSynced,
 | |
| 		nodeLister:                              nodeLister,
 | |
| 		nodeHasSynced:                           nodeHasSynced,
 | |
| 		masterServiceNamespace:                  masterServiceNamespace,
 | |
| 		streamingConnectionIdleTimeout:          kubeCfg.StreamingConnectionIdleTimeout.Duration,
 | |
| 		recorder:                                kubeDeps.Recorder,
 | |
| 		cadvisor:                                kubeDeps.CAdvisorInterface,
 | |
| 		cloud:                                   kubeDeps.Cloud,
 | |
| 		externalCloudProvider:                   cloudprovider.IsExternal(cloudProvider),
 | |
| 		providerID:                              providerID,
 | |
| 		nodeRef:                                 nodeRef,
 | |
| 		nodeLabels:                              nodeLabels,
 | |
| 		nodeStatusUpdateFrequency:               kubeCfg.NodeStatusUpdateFrequency.Duration,
 | |
| 		nodeStatusReportFrequency:               kubeCfg.NodeStatusReportFrequency.Duration,
 | |
| 		os:                                      kubeDeps.OSInterface,
 | |
| 		oomWatcher:                              oomWatcher,
 | |
| 		cgroupsPerQOS:                           kubeCfg.CgroupsPerQOS,
 | |
| 		cgroupRoot:                              kubeCfg.CgroupRoot,
 | |
| 		mounter:                                 kubeDeps.Mounter,
 | |
| 		hostutil:                                kubeDeps.HostUtil,
 | |
| 		subpather:                               kubeDeps.Subpather,
 | |
| 		maxPods:                                 int(kubeCfg.MaxPods),
 | |
| 		podsPerCore:                             int(kubeCfg.PodsPerCore),
 | |
| 		syncLoopMonitor:                         atomic.Value{},
 | |
| 		daemonEndpoints:                         daemonEndpoints,
 | |
| 		containerManager:                        kubeDeps.ContainerManager,
 | |
| 		nodeIPs:                                 nodeIPs,
 | |
| 		nodeIPValidator:                         validateNodeIP,
 | |
| 		clock:                                   clock.RealClock{},
 | |
| 		enableControllerAttachDetach:            kubeCfg.EnableControllerAttachDetach,
 | |
| 		makeIPTablesUtilChains:                  kubeCfg.MakeIPTablesUtilChains,
 | |
| 		iptablesMasqueradeBit:                   int(kubeCfg.IPTablesMasqueradeBit),
 | |
| 		iptablesDropBit:                         int(kubeCfg.IPTablesDropBit),
 | |
| 		experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
 | |
| 		keepTerminatedPodVolumes:                keepTerminatedPodVolumes,
 | |
| 		nodeStatusMaxImages:                     nodeStatusMaxImages,
 | |
| 		lastContainerStartedTime:                newTimeCache(),
 | |
| 	}
 | |
| 
 | |
| 	if klet.cloud != nil {
 | |
| 		klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
 | |
| 	}
 | |
| 
 | |
| 	var secretManager secret.Manager
 | |
| 	var configMapManager configmap.Manager
 | |
| 	switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
 | |
| 	case kubeletconfiginternal.WatchChangeDetectionStrategy:
 | |
| 		secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval)
 | |
| 		configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval)
 | |
| 	case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
 | |
| 		secretManager = secret.NewCachingSecretManager(
 | |
| 			kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
 | |
| 		configMapManager = configmap.NewCachingConfigMapManager(
 | |
| 			kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
 | |
| 	case kubeletconfiginternal.GetChangeDetectionStrategy:
 | |
| 		secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
 | |
| 		configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
 | |
| 	}
 | |
| 
 | |
| 	klet.secretManager = secretManager
 | |
| 	klet.configMapManager = configMapManager
 | |
| 
 | |
| 	if klet.experimentalHostUserNamespaceDefaulting {
 | |
| 		klog.InfoS("Experimental host user namespace defaulting is enabled")
 | |
| 	}
 | |
| 
 | |
| 	machineInfo, err := klet.cadvisor.MachineInfo()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	// Avoid collector collects it as a timestamped metric
 | |
| 	// See PR #95210 and #97006 for more details.
 | |
| 	machineInfo.Timestamp = time.Time{}
 | |
| 	klet.setCachedMachineInfo(machineInfo)
 | |
| 
 | |
| 	imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
 | |
| 
 | |
| 	klet.livenessManager = proberesults.NewManager()
 | |
| 	klet.readinessManager = proberesults.NewManager()
 | |
| 	klet.startupManager = proberesults.NewManager()
 | |
| 	klet.podCache = kubecontainer.NewCache()
 | |
| 
 | |
| 	// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
 | |
| 	mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
 | |
| 	klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager)
 | |
| 
 | |
| 	klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
 | |
| 
 | |
| 	klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
 | |
| 
 | |
| 	klet.runtimeService = kubeDeps.RemoteRuntimeService
 | |
| 
 | |
| 	if kubeDeps.KubeClient != nil {
 | |
| 		klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
 | |
| 	}
 | |
| 
 | |
| 	// setup containerLogManager for CRI container runtime
 | |
| 	containerLogManager, err := logs.NewContainerLogManager(
 | |
| 		klet.runtimeService,
 | |
| 		kubeDeps.OSInterface,
 | |
| 		kubeCfg.ContainerLogMaxSize,
 | |
| 		int(kubeCfg.ContainerLogMaxFiles),
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
 | |
| 	}
 | |
| 	klet.containerLogManager = containerLogManager
 | |
| 
 | |
| 	klet.reasonCache = NewReasonCache()
 | |
| 	klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
 | |
| 	klet.podWorkers = newPodWorkers(
 | |
| 		klet.syncPod,
 | |
| 		klet.syncTerminatingPod,
 | |
| 		klet.syncTerminatedPod,
 | |
| 
 | |
| 		kubeDeps.Recorder,
 | |
| 		klet.workQueue,
 | |
| 		klet.resyncInterval,
 | |
| 		backOffPeriod,
 | |
| 		klet.podCache,
 | |
| 	)
 | |
| 
 | |
| 	runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
 | |
| 		kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
 | |
| 		klet.livenessManager,
 | |
| 		klet.readinessManager,
 | |
| 		klet.startupManager,
 | |
| 		rootDirectory,
 | |
| 		machineInfo,
 | |
| 		klet.podWorkers,
 | |
| 		kubeDeps.OSInterface,
 | |
| 		klet,
 | |
| 		httpClient,
 | |
| 		imageBackOff,
 | |
| 		kubeCfg.SerializeImagePulls,
 | |
| 		float32(kubeCfg.RegistryPullQPS),
 | |
| 		int(kubeCfg.RegistryBurst),
 | |
| 		imageCredentialProviderConfigFile,
 | |
| 		imageCredentialProviderBinDir,
 | |
| 		kubeCfg.CPUCFSQuota,
 | |
| 		kubeCfg.CPUCFSQuotaPeriod,
 | |
| 		kubeDeps.RemoteRuntimeService,
 | |
| 		kubeDeps.RemoteImageService,
 | |
| 		kubeDeps.ContainerManager.InternalContainerLifecycle(),
 | |
| 		klet.containerLogManager,
 | |
| 		klet.runtimeClassManager,
 | |
| 		seccompDefault,
 | |
| 		kubeCfg.MemorySwap.SwapBehavior,
 | |
| 		kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
 | |
| 		*kubeCfg.MemoryThrottlingFactor,
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	klet.containerRuntime = runtime
 | |
| 	klet.streamingRuntime = runtime
 | |
| 	klet.runner = runtime
 | |
| 
 | |
| 	runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	klet.runtimeCache = runtimeCache
 | |
| 
 | |
| 	// common provider to get host file system usage associated with a pod managed by kubelet
 | |
| 	hostStatsProvider := stats.NewHostStatsProvider(kubecontainer.RealOS{}, func(podUID types.UID) string {
 | |
| 		return getEtcHostsPath(klet.getPodDir(podUID))
 | |
| 	})
 | |
| 	if kubeDeps.useLegacyCadvisorStats {
 | |
| 		klet.StatsProvider = stats.NewCadvisorStatsProvider(
 | |
| 			klet.cadvisor,
 | |
| 			klet.resourceAnalyzer,
 | |
| 			klet.podManager,
 | |
| 			klet.runtimeCache,
 | |
| 			klet.containerRuntime,
 | |
| 			klet.statusManager,
 | |
| 			hostStatsProvider)
 | |
| 	} else {
 | |
| 		klet.StatsProvider = stats.NewCRIStatsProvider(
 | |
| 			klet.cadvisor,
 | |
| 			klet.resourceAnalyzer,
 | |
| 			klet.podManager,
 | |
| 			klet.runtimeCache,
 | |
| 			kubeDeps.RemoteRuntimeService,
 | |
| 			kubeDeps.RemoteImageService,
 | |
| 			hostStatsProvider,
 | |
| 			utilfeature.DefaultFeatureGate.Enabled(features.DisableAcceleratorUsageMetrics),
 | |
| 			utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI))
 | |
| 	}
 | |
| 
 | |
| 	klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
 | |
| 	klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
 | |
| 	klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
 | |
| 	if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
 | |
| 		klog.ErrorS(err, "Pod CIDR update failed")
 | |
| 	}
 | |
| 
 | |
| 	// setup containerGC
 | |
| 	containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	klet.containerGC = containerGC
 | |
| 	klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
 | |
| 
 | |
| 	// setup imageManager
 | |
| 	imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to initialize image manager: %v", err)
 | |
| 	}
 | |
| 	klet.imageManager = imageManager
 | |
| 
 | |
| 	if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
 | |
| 		klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
 | |
| 		}
 | |
| 		kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
 | |
| 			cert := klet.serverCertificateManager.Current()
 | |
| 			if cert == nil {
 | |
| 				return nil, fmt.Errorf("no serving certificate available for the kubelet")
 | |
| 			}
 | |
| 			return cert, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if kubeDeps.ProbeManager != nil {
 | |
| 		klet.probeManager = kubeDeps.ProbeManager
 | |
| 	} else {
 | |
| 		klet.probeManager = prober.NewManager(
 | |
| 			klet.statusManager,
 | |
| 			klet.livenessManager,
 | |
| 			klet.readinessManager,
 | |
| 			klet.startupManager,
 | |
| 			klet.runner,
 | |
| 			kubeDeps.Recorder)
 | |
| 	}
 | |
| 
 | |
| 	tokenManager := token.NewManager(kubeDeps.KubeClient)
 | |
| 
 | |
| 	// NewInitializedVolumePluginMgr initializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
 | |
| 	// which affects node ready status. This function must be called before Kubelet is initialized so that the Node
 | |
| 	// ReadyState is accurate with the storage state.
 | |
| 	klet.volumePluginMgr, err =
 | |
| 		NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	klet.pluginManager = pluginmanager.NewPluginManager(
 | |
| 		klet.getPluginsRegistrationDir(), /* sockDir */
 | |
| 		kubeDeps.Recorder,
 | |
| 	)
 | |
| 
 | |
| 	// If the experimentalMounterPathFlag is set, we do not want to
 | |
| 	// check node capabilities since the mount path is not the default
 | |
| 	if len(experimentalMounterPath) != 0 {
 | |
| 		// Replace the nameserver in containerized-mounter's rootfs/etc/resolv.conf with kubelet.ClusterDNS
 | |
| 		// so that service name could be resolved
 | |
| 		klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
 | |
| 	}
 | |
| 
 | |
| 	// setup volumeManager
 | |
| 	klet.volumeManager = volumemanager.NewVolumeManager(
 | |
| 		kubeCfg.EnableControllerAttachDetach,
 | |
| 		nodeName,
 | |
| 		klet.podManager,
 | |
| 		klet.podWorkers,
 | |
| 		klet.kubeClient,
 | |
| 		klet.volumePluginMgr,
 | |
| 		klet.containerRuntime,
 | |
| 		kubeDeps.Mounter,
 | |
| 		kubeDeps.HostUtil,
 | |
| 		klet.getPodsDir(),
 | |
| 		kubeDeps.Recorder,
 | |
| 		keepTerminatedPodVolumes,
 | |
| 		volumepathhandler.NewBlockVolumePathHandler())
 | |
| 
 | |
| 	klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
 | |
| 
 | |
| 	// setup eviction manager
 | |
| 	evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
 | |
| 
 | |
| 	klet.evictionManager = evictionManager
 | |
| 	klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
 | |
| 
 | |
| 	// Safe, allowed sysctls can always be used as unsafe sysctls in the spec.
 | |
| 	// Hence, we concatenate those two lists.
 | |
| 	safeAndUnsafeSysctls := append(sysctl.SafeSysctlAllowlist(), allowedUnsafeSysctls...)
 | |
| 	sysctlsAllowlist, err := sysctl.NewAllowlist(safeAndUnsafeSysctls)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	klet.admitHandlers.AddPodAdmitHandler(sysctlsAllowlist)
 | |
| 
 | |
| 	// enable active deadline handler
 | |
| 	activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	klet.AddPodSyncLoopHandler(activeDeadlineHandler)
 | |
| 	klet.AddPodSyncHandler(activeDeadlineHandler)
 | |
| 
 | |
| 	klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
 | |
| 
 | |
| 	criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
 | |
| 	klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
 | |
| 	// apply functional Option's
 | |
| 	for _, opt := range kubeDeps.Options {
 | |
| 		opt(klet)
 | |
| 	}
 | |
| 
 | |
| 	if sysruntime.GOOS == "linux" {
 | |
| 		// AppArmor is a Linux kernel security module and it does not support other operating systems.
 | |
| 		klet.appArmorValidator = apparmor.NewValidator()
 | |
| 		klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
 | |
| 	}
 | |
| 
 | |
| 	leaseDuration := time.Duration(kubeCfg.NodeLeaseDurationSeconds) * time.Second
 | |
| 	renewInterval := time.Duration(float64(leaseDuration) * nodeLeaseRenewIntervalFraction)
 | |
| 	klet.nodeLeaseController = lease.NewController(
 | |
| 		klet.clock,
 | |
| 		klet.heartbeatClient,
 | |
| 		string(klet.nodeName),
 | |
| 		kubeCfg.NodeLeaseDurationSeconds,
 | |
| 		klet.onRepeatedHeartbeatFailure,
 | |
| 		renewInterval,
 | |
| 		v1.NamespaceNodeLease,
 | |
| 		util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName)))
 | |
| 
 | |
| 	// setup node shutdown manager
 | |
| 	shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
 | |
| 		Logger:                           logger,
 | |
| 		ProbeManager:                     klet.probeManager,
 | |
| 		Recorder:                         kubeDeps.Recorder,
 | |
| 		NodeRef:                          nodeRef,
 | |
| 		GetPodsFunc:                      klet.GetActivePods,
 | |
| 		KillPodFunc:                      killPodNow(klet.podWorkers, kubeDeps.Recorder),
 | |
| 		SyncNodeStatusFunc:               klet.syncNodeStatus,
 | |
| 		ShutdownGracePeriodRequested:     kubeCfg.ShutdownGracePeriod.Duration,
 | |
| 		ShutdownGracePeriodCriticalPods:  kubeCfg.ShutdownGracePeriodCriticalPods.Duration,
 | |
| 		ShutdownGracePeriodByPodPriority: kubeCfg.ShutdownGracePeriodByPodPriority,
 | |
| 		StateDirectory:                   rootDirectory,
 | |
| 	})
 | |
| 	klet.shutdownManager = shutdownManager
 | |
| 	klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
 | |
| 
 | |
| 	// Finally, put the most recent version of the config on the Kubelet, so
 | |
| 	// people can see how it was configured.
 | |
| 	klet.kubeletConfiguration = *kubeCfg
 | |
| 
 | |
| 	// Generating the status funcs should be the last thing we do,
 | |
| 	// since this relies on the rest of the Kubelet having been constructed.
 | |
| 	klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
 | |
| 
 | |
| 	return klet, nil
 | |
| }
 | |
| 
 | |
| type serviceLister interface {
 | |
| 	List(labels.Selector) ([]*v1.Service, error)
 | |
| }
 | |
| 
 | |
| // Kubelet is the main kubelet implementation.
 | |
| type Kubelet struct {
 | |
| 	kubeletConfiguration kubeletconfiginternal.KubeletConfiguration
 | |
| 
 | |
| 	// hostname is the hostname the kubelet detected or was given via flag/config
 | |
| 	hostname string
 | |
| 	// hostnameOverridden indicates the hostname was overridden via flag/config
 | |
| 	hostnameOverridden bool
 | |
| 
 | |
| 	nodeName        types.NodeName
 | |
| 	runtimeCache    kubecontainer.RuntimeCache
 | |
| 	kubeClient      clientset.Interface
 | |
| 	heartbeatClient clientset.Interface
 | |
| 	rootDirectory   string
 | |
| 
 | |
| 	lastObservedNodeAddressesMux sync.RWMutex
 | |
| 	lastObservedNodeAddresses    []v1.NodeAddress
 | |
| 
 | |
| 	// onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
 | |
| 	onRepeatedHeartbeatFailure func()
 | |
| 
 | |
| 	// podWorkers handle syncing Pods in response to events.
 | |
| 	podWorkers PodWorkers
 | |
| 
 | |
| 	// resyncInterval is the interval between periodic full reconciliations of
 | |
| 	// pods on this node.
 | |
| 	resyncInterval time.Duration
 | |
| 
 | |
| 	// sourcesReady records the sources seen by the kubelet, it is thread-safe.
 | |
| 	sourcesReady config.SourcesReady
 | |
| 
 | |
| 	// podManager is a facade that abstracts away the various sources of pods
 | |
| 	// this Kubelet services.
 | |
| 	podManager kubepod.Manager
 | |
| 
 | |
| 	// Needed to observe and respond to situations that could impact node stability
 | |
| 	evictionManager eviction.Manager
 | |
| 
 | |
| 	// Optional, defaults to /logs/ from /var/log
 | |
| 	logServer http.Handler
 | |
| 	// Optional, defaults to simple Docker implementation
 | |
| 	runner kubecontainer.CommandRunner
 | |
| 
 | |
| 	// cAdvisor used for container information.
 | |
| 	cadvisor cadvisor.Interface
 | |
| 
 | |
| 	// Set to true to have the node register itself with the apiserver.
 | |
| 	registerNode bool
 | |
| 	// List of taints to add to a node object when the kubelet registers itself.
 | |
| 	registerWithTaints []v1.Taint
 | |
| 	// Set to true to have the node register itself as schedulable.
 | |
| 	registerSchedulable bool
 | |
| 	// for internal book keeping; access only from within registerWithApiserver
 | |
| 	registrationCompleted bool
 | |
| 
 | |
| 	// dnsConfigurer is used for setting up DNS resolver configuration when launching pods.
 | |
| 	dnsConfigurer *dns.Configurer
 | |
| 
 | |
| 	// masterServiceNamespace is the namespace that the master service is exposed in.
 | |
| 	masterServiceNamespace string
 | |
| 	// serviceLister knows how to list services
 | |
| 	serviceLister serviceLister
 | |
| 	// serviceHasSynced indicates whether services have been sync'd at least once.
 | |
| 	// Check this before trusting a response from the lister.
 | |
| 	serviceHasSynced cache.InformerSynced
 | |
| 	// nodeLister knows how to list nodes
 | |
| 	nodeLister corelisters.NodeLister
 | |
| 	// nodeHasSynced indicates whether nodes have been sync'd at least once.
 | |
| 	// Check this before trusting a response from the node lister.
 | |
| 	nodeHasSynced cache.InformerSynced
 | |
| 	// a list of node labels to register
 | |
| 	nodeLabels map[string]string
 | |
| 
 | |
| 	// Last timestamp when runtime responded on ping.
 | |
| 	// Mutex is used to protect this value.
 | |
| 	runtimeState *runtimeState
 | |
| 
 | |
| 	// Volume plugins.
 | |
| 	volumePluginMgr *volume.VolumePluginMgr
 | |
| 
 | |
| 	// Handles container probing.
 | |
| 	probeManager prober.Manager
 | |
| 	// Manages container health check results.
 | |
| 	livenessManager  proberesults.Manager
 | |
| 	readinessManager proberesults.Manager
 | |
| 	startupManager   proberesults.Manager
 | |
| 
 | |
| 	// How long to keep idle streaming command execution/port forwarding
 | |
| 	// connections open before terminating them
 | |
| 	streamingConnectionIdleTimeout time.Duration
 | |
| 
 | |
| 	// The EventRecorder to use
 | |
| 	recorder record.EventRecorder
 | |
| 
 | |
| 	// Policy for handling garbage collection of dead containers.
 | |
| 	containerGC kubecontainer.GC
 | |
| 
 | |
| 	// Manager for image garbage collection.
 | |
| 	imageManager images.ImageGCManager
 | |
| 
 | |
| 	// Manager for container logs.
 | |
| 	containerLogManager logs.ContainerLogManager
 | |
| 
 | |
| 	// Secret manager.
 | |
| 	secretManager secret.Manager
 | |
| 
 | |
| 	// ConfigMap manager.
 | |
| 	configMapManager configmap.Manager
 | |
| 
 | |
| 	// Cached MachineInfo returned by cadvisor.
 | |
| 	machineInfoLock sync.RWMutex
 | |
| 	machineInfo     *cadvisorapi.MachineInfo
 | |
| 
 | |
| 	// Handles certificate rotations.
 | |
| 	serverCertificateManager certificate.Manager
 | |
| 
 | |
| 	// Syncs pods statuses with apiserver; also used as a cache of statuses.
 | |
| 	statusManager status.Manager
 | |
| 
 | |
| 	// VolumeManager runs a set of asynchronous loops that figure out which
 | |
| 	// volumes need to be attached/mounted/unmounted/detached based on the pods
 | |
| 	// scheduled on this node and makes it so.
 | |
| 	volumeManager volumemanager.VolumeManager
 | |
| 
 | |
| 	// Cloud provider interface.
 | |
| 	cloud cloudprovider.Interface
 | |
| 	// Handles requests to cloud provider with timeout
 | |
| 	cloudResourceSyncManager cloudresource.SyncManager
 | |
| 
 | |
| 	// Indicates that the node initialization happens in an external cloud controller
 | |
| 	externalCloudProvider bool
 | |
| 	// Reference to this node.
 | |
| 	nodeRef *v1.ObjectReference
 | |
| 
 | |
| 	// Container runtime.
 | |
| 	containerRuntime kubecontainer.Runtime
 | |
| 
 | |
| 	// Streaming runtime handles container streaming.
 | |
| 	streamingRuntime kubecontainer.StreamingRuntime
 | |
| 
 | |
| 	// Container runtime service (needed by container runtime Start()).
 | |
| 	runtimeService internalapi.RuntimeService
 | |
| 
 | |
| 	// reasonCache caches the failure reason of the last creation of all containers, which is
 | |
| 	// used for generating ContainerStatus.
 | |
| 	reasonCache *ReasonCache
 | |
| 
 | |
| 	// nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease
 | |
| 	// feature is not enabled, it is also the frequency that kubelet posts node status to master.
 | |
| 	// In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod
 | |
| 	// in nodecontroller. There are several constraints:
 | |
| 	// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
 | |
| 	//    N means number of retries allowed for kubelet to post node status. It is pointless
 | |
| 	//    to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
 | |
| 	//    will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
 | |
| 	//    The constant must be less than podEvictionTimeout.
 | |
| 	// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
 | |
| 	//    status. Kubelet may fail to update node status reliably if the value is too small,
 | |
| 	//    as it takes time to gather all necessary node information.
 | |
| 	nodeStatusUpdateFrequency time.Duration
 | |
| 
 | |
| 	// nodeStatusReportFrequency is the frequency that kubelet posts node
 | |
| 	// status to master. It is only used when node lease feature is enabled.
 | |
| 	nodeStatusReportFrequency time.Duration
 | |
| 
 | |
| 	// lastStatusReportTime is the time when node status was last reported.
 | |
| 	lastStatusReportTime time.Time
 | |
| 
 | |
| 	// lastContainerStartedTime is the time of the last ContainerStarted event observed per pod
 | |
| 	lastContainerStartedTime *timeCache
 | |
| 
 | |
| 	// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
 | |
| 	// This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
 | |
| 	syncNodeStatusMux sync.Mutex
 | |
| 
 | |
| 	// updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe.
 | |
| 	// This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
 | |
| 	updatePodCIDRMux sync.Mutex
 | |
| 
 | |
| 	// updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe.
 | |
| 	// This lock is used by Kubelet.updateRuntimeUp function and shouldn't be used anywhere else.
 | |
| 	updateRuntimeMux sync.Mutex
 | |
| 
 | |
| 	// nodeLeaseController claims and renews the node lease for this Kubelet
 | |
| 	nodeLeaseController lease.Controller
 | |
| 
 | |
| 	// Generates pod events.
 | |
| 	pleg pleg.PodLifecycleEventGenerator
 | |
| 
 | |
| 	// Store kubecontainer.PodStatus for all pods.
 | |
| 	podCache kubecontainer.Cache
 | |
| 
 | |
| 	// os is a facade for various syscalls that need to be mocked during testing.
 | |
| 	os kubecontainer.OSInterface
 | |
| 
 | |
| 	// Watcher of out of memory events.
 | |
| 	oomWatcher oomwatcher.Watcher
 | |
| 
 | |
| 	// Monitor resource usage
 | |
| 	resourceAnalyzer serverstats.ResourceAnalyzer
 | |
| 
 | |
| 	// Whether or not we should have the QOS cgroup hierarchy for resource management
 | |
| 	cgroupsPerQOS bool
 | |
| 
 | |
| 	// If non-empty, pass this to the container runtime as the root cgroup.
 | |
| 	cgroupRoot string
 | |
| 
 | |
| 	// Mounter to use for volumes.
 | |
| 	mounter mount.Interface
 | |
| 
 | |
| 	// hostutil to interact with filesystems
 | |
| 	hostutil hostutil.HostUtils
 | |
| 
 | |
| 	// subpather to execute subpath actions
 | |
| 	subpather subpath.Interface
 | |
| 
 | |
| 	// Manager of non-Runtime containers.
 | |
| 	containerManager cm.ContainerManager
 | |
| 
 | |
| 	// Maximum Number of Pods which can be run by this Kubelet
 | |
| 	maxPods int
 | |
| 
 | |
| 	// Monitor Kubelet's sync loop
 | |
| 	syncLoopMonitor atomic.Value
 | |
| 
 | |
| 	// Container restart Backoff
 | |
| 	backOff *flowcontrol.Backoff
 | |
| 
 | |
| 	// Information about the ports which are opened by daemons on Node running this Kubelet server.
 | |
| 	daemonEndpoints *v1.NodeDaemonEndpoints
 | |
| 
 | |
| 	// A queue used to trigger pod workers.
 | |
| 	workQueue queue.WorkQueue
 | |
| 
 | |
| 	// oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
 | |
| 	oneTimeInitializer sync.Once
 | |
| 
 | |
| 	// If set, use this IP address or addresses for the node
 | |
| 	nodeIPs []net.IP
 | |
| 
 | |
| 	// use this function to validate the kubelet nodeIP
 | |
| 	nodeIPValidator func(net.IP) error
 | |
| 
 | |
| 	// If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider
 | |
| 	providerID string
 | |
| 
 | |
| 	// clock is an interface that provides time related functionality in a way that makes it
 | |
| 	// easy to test the code.
 | |
| 	clock clock.WithTicker
 | |
| 
 | |
| 	// handlers called during the tryUpdateNodeStatus cycle
 | |
| 	setNodeStatusFuncs []func(*v1.Node) error
 | |
| 
 | |
| 	lastNodeUnschedulableLock sync.Mutex
 | |
| 	// maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
 | |
| 	lastNodeUnschedulable bool
 | |
| 
 | |
| 	// the list of handlers to call during pod admission.
 | |
| 	admitHandlers lifecycle.PodAdmitHandlers
 | |
| 
 | |
| 	// softAdmithandlers are applied to the pod after it is admitted by the Kubelet, but before it is
 | |
| 	// run. A pod rejected by a softAdmitHandler will be left in a Pending state indefinitely. If a
 | |
| 	// rejected pod should not be recreated, or the scheduler is not aware of the rejection rule, the
 | |
| 	// admission rule should be applied by a softAdmitHandler.
 | |
| 	softAdmitHandlers lifecycle.PodAdmitHandlers
 | |
| 
 | |
| 	// the list of handlers to call during pod sync loop.
 | |
| 	lifecycle.PodSyncLoopHandlers
 | |
| 
 | |
| 	// the list of handlers to call during pod sync.
 | |
| 	lifecycle.PodSyncHandlers
 | |
| 
 | |
| 	// the number of allowed pods per core
 | |
| 	podsPerCore int
 | |
| 
 | |
| 	// enableControllerAttachDetach indicates the Attach/Detach controller
 | |
| 	// should manage attachment/detachment of volumes scheduled to this node,
 | |
| 	// and disable kubelet from executing any attach/detach operations
 | |
| 	enableControllerAttachDetach bool
 | |
| 
 | |
| 	// trigger deleting containers in a pod
 | |
| 	containerDeletor *podContainerDeletor
 | |
| 
 | |
| 	// config iptables util rules
 | |
| 	makeIPTablesUtilChains bool
 | |
| 
 | |
| 	// The bit of the fwmark space to mark packets for SNAT.
 | |
| 	iptablesMasqueradeBit int
 | |
| 
 | |
| 	// The bit of the fwmark space to mark packets for dropping.
 | |
| 	iptablesDropBit int
 | |
| 
 | |
| 	// The AppArmor validator for checking whether AppArmor is supported.
 | |
| 	appArmorValidator apparmor.Validator
 | |
| 
 | |
| 	// experimentalHostUserNamespaceDefaulting sets userns=true when users request host namespaces (pid, ipc, net),
 | |
| 	// are using non-namespaced capabilities (mknod, sys_time, sys_module), the pod contains a privileged container,
 | |
| 	// or using host path volumes.
 | |
| 	// This should only be enabled when the container runtime is performing user remapping AND if the
 | |
| 	// experimental behavior is desired.
 | |
| 	experimentalHostUserNamespaceDefaulting bool
 | |
| 
 | |
| 	// StatsProvider provides the node and the container stats.
 | |
| 	StatsProvider *stats.Provider
 | |
| 
 | |
| 	// This flag, if set, instructs the kubelet to keep volumes from terminated pods mounted to the node.
 | |
| 	// This can be useful for debugging volume related issues.
 | |
| 	keepTerminatedPodVolumes bool // DEPRECATED
 | |
| 
 | |
| 	// pluginmanager runs a set of asynchronous loops that figure out which
 | |
| 	// plugins need to be registered/unregistered based on this node and makes it so.
 | |
| 	pluginManager pluginmanager.PluginManager
 | |
| 
 | |
| 	// This flag sets a maximum number of images to report in the node status.
 | |
| 	nodeStatusMaxImages int32
 | |
| 
 | |
| 	// Handles RuntimeClass objects for the Kubelet.
 | |
| 	runtimeClassManager *runtimeclass.Manager
 | |
| 
 | |
| 	// Handles node shutdown events for the Node.
 | |
| 	shutdownManager nodeshutdown.Manager
 | |
| }
 | |
| 
 | |
| // ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
 | |
| func (kl *Kubelet) ListPodStats() ([]statsapi.PodStats, error) {
 | |
| 	return kl.StatsProvider.ListPodStats()
 | |
| }
 | |
| 
 | |
| // ListPodCPUAndMemoryStats is delegated to StatsProvider, which implements stats.Provider interface
 | |
| func (kl *Kubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) {
 | |
| 	return kl.StatsProvider.ListPodCPUAndMemoryStats()
 | |
| }
 | |
| 
 | |
| // ListPodStatsAndUpdateCPUNanoCoreUsage is delegated to StatsProvider, which implements stats.Provider interface
 | |
| func (kl *Kubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
 | |
| 	return kl.StatsProvider.ListPodStatsAndUpdateCPUNanoCoreUsage()
 | |
| }
 | |
| 
 | |
| // ImageFsStats is delegated to StatsProvider, which implements stats.Provider interface
 | |
| func (kl *Kubelet) ImageFsStats() (*statsapi.FsStats, error) {
 | |
| 	return kl.StatsProvider.ImageFsStats()
 | |
| }
 | |
| 
 | |
| // GetCgroupStats is delegated to StatsProvider, which implements stats.Provider interface
 | |
| func (kl *Kubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) {
 | |
| 	return kl.StatsProvider.GetCgroupStats(cgroupName, updateStats)
 | |
| }
 | |
| 
 | |
| // GetCgroupCPUAndMemoryStats is delegated to StatsProvider, which implements stats.Provider interface
 | |
| func (kl *Kubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
 | |
| 	return kl.StatsProvider.GetCgroupCPUAndMemoryStats(cgroupName, updateStats)
 | |
| }
 | |
| 
 | |
| // RootFsStats is delegated to StatsProvider, which implements stats.Provider interface
 | |
| func (kl *Kubelet) RootFsStats() (*statsapi.FsStats, error) {
 | |
| 	return kl.StatsProvider.RootFsStats()
 | |
| }
 | |
| 
 | |
| // GetContainerInfo is delegated to StatsProvider, which implements stats.Provider interface
 | |
| func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
 | |
| 	return kl.StatsProvider.GetContainerInfo(podFullName, uid, containerName, req)
 | |
| }
 | |
| 
 | |
| // GetRawContainerInfo is delegated to StatsProvider, which implements stats.Provider interface
 | |
| func (kl *Kubelet) GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error) {
 | |
| 	return kl.StatsProvider.GetRawContainerInfo(containerName, req, subcontainers)
 | |
| }
 | |
| 
 | |
| // RlimitStats is delegated to StatsProvider, which implements stats.Provider interface
 | |
| func (kl *Kubelet) RlimitStats() (*statsapi.RlimitStats, error) {
 | |
| 	return kl.StatsProvider.RlimitStats()
 | |
| }
 | |
| 
 | |
| // setupDataDirs creates:
 | |
| // 1.  the root directory
 | |
| // 2.  the pods directory
 | |
| // 3.  the plugins directory
 | |
| // 4.  the pod-resources directory
 | |
| func (kl *Kubelet) setupDataDirs() error {
 | |
| 	kl.rootDirectory = path.Clean(kl.rootDirectory)
 | |
| 	pluginRegistrationDir := kl.getPluginsRegistrationDir()
 | |
| 	pluginsDir := kl.getPluginsDir()
 | |
| 	if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
 | |
| 		return fmt.Errorf("error creating root directory: %v", err)
 | |
| 	}
 | |
| 	if err := kl.hostutil.MakeRShared(kl.getRootDir()); err != nil {
 | |
| 		return fmt.Errorf("error configuring root directory: %v", err)
 | |
| 	}
 | |
| 	if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
 | |
| 		return fmt.Errorf("error creating pods directory: %v", err)
 | |
| 	}
 | |
| 	if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
 | |
| 		return fmt.Errorf("error creating plugins directory: %v", err)
 | |
| 	}
 | |
| 	if err := os.MkdirAll(kl.getPluginsRegistrationDir(), 0750); err != nil {
 | |
| 		return fmt.Errorf("error creating plugins registry directory: %v", err)
 | |
| 	}
 | |
| 	if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil {
 | |
| 		return fmt.Errorf("error creating podresources directory: %v", err)
 | |
| 	}
 | |
| 	if selinux.GetEnabled() {
 | |
| 		err := selinux.SetFileLabel(pluginRegistrationDir, config.KubeletPluginsDirSELinuxLabel)
 | |
| 		if err != nil {
 | |
| 			klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugin registration dir", "path", pluginRegistrationDir, "err", err)
 | |
| 		}
 | |
| 		err = selinux.SetFileLabel(pluginsDir, config.KubeletPluginsDirSELinuxLabel)
 | |
| 		if err != nil {
 | |
| 			klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugins dir", "path", pluginsDir, "err", err)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // StartGarbageCollection starts garbage collection threads.
 | |
| func (kl *Kubelet) StartGarbageCollection() {
 | |
| 	loggedContainerGCFailure := false
 | |
| 	go wait.Until(func() {
 | |
| 		if err := kl.containerGC.GarbageCollect(); err != nil {
 | |
| 			klog.ErrorS(err, "Container garbage collection failed")
 | |
| 			kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
 | |
| 			loggedContainerGCFailure = true
 | |
| 		} else {
 | |
| 			var vLevel klog.Level = 4
 | |
| 			if loggedContainerGCFailure {
 | |
| 				vLevel = 1
 | |
| 				loggedContainerGCFailure = false
 | |
| 			}
 | |
| 
 | |
| 			klog.V(vLevel).InfoS("Container garbage collection succeeded")
 | |
| 		}
 | |
| 	}, ContainerGCPeriod, wait.NeverStop)
 | |
| 
 | |
| 	// when the high threshold is set to 100, stub the image GC manager
 | |
| 	if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
 | |
| 		klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100, Disable image GC")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	prevImageGCFailed := false
 | |
| 	go wait.Until(func() {
 | |
| 		if err := kl.imageManager.GarbageCollect(); err != nil {
 | |
| 			if prevImageGCFailed {
 | |
| 				klog.ErrorS(err, "Image garbage collection failed multiple times in a row")
 | |
| 				// Only create an event for repeated failures
 | |
| 				kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
 | |
| 			} else {
 | |
| 				klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
 | |
| 			}
 | |
| 			prevImageGCFailed = true
 | |
| 		} else {
 | |
| 			var vLevel klog.Level = 4
 | |
| 			if prevImageGCFailed {
 | |
| 				vLevel = 1
 | |
| 				prevImageGCFailed = false
 | |
| 			}
 | |
| 
 | |
| 			klog.V(vLevel).InfoS("Image garbage collection succeeded")
 | |
| 		}
 | |
| 	}, ImageGCPeriod, wait.NeverStop)
 | |
| }
 | |
| 
 | |
| // initializeModules will initialize internal modules that do not require the container runtime to be up.
 | |
| // Note that the modules here must not depend on modules that are not initialized here.
 | |
| func (kl *Kubelet) initializeModules() error {
 | |
| 	// Prometheus metrics.
 | |
| 	metrics.Register(
 | |
| 		collectors.NewVolumeStatsCollector(kl),
 | |
| 		collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
 | |
| 	)
 | |
| 	metrics.SetNodeName(kl.nodeName)
 | |
| 	servermetrics.Register()
 | |
| 
 | |
| 	// Setup filesystem directories.
 | |
| 	if err := kl.setupDataDirs(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// If the container logs directory does not exist, create it.
 | |
| 	if _, err := os.Stat(ContainerLogsDir); err != nil {
 | |
| 		if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
 | |
| 			return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Start the image manager.
 | |
| 	kl.imageManager.Start()
 | |
| 
 | |
| 	// Start the certificate manager if it was enabled.
 | |
| 	if kl.serverCertificateManager != nil {
 | |
| 		kl.serverCertificateManager.Start()
 | |
| 	}
 | |
| 
 | |
| 	// Start out of memory watcher.
 | |
| 	if kl.oomWatcher != nil {
 | |
| 		if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
 | |
| 			return fmt.Errorf("failed to start OOM watcher: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Start resource analyzer
 | |
| 	kl.resourceAnalyzer.Start()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
 | |
| func (kl *Kubelet) initializeRuntimeDependentModules() {
 | |
| 	if err := kl.cadvisor.Start(); err != nil {
 | |
| 		// Fail kubelet and rely on the babysitter to retry starting kubelet.
 | |
| 		klog.ErrorS(err, "Failed to start cAdvisor")
 | |
| 		os.Exit(1)
 | |
| 	}
 | |
| 
 | |
| 	// trigger on-demand stats collection once so that we have capacity information for ephemeral storage.
 | |
| 	// ignore any errors, since if stats collection is not successful, the container manager will fail to start below.
 | |
| 	kl.StatsProvider.GetCgroupStats("/", true)
 | |
| 	// Start container manager.
 | |
| 	node, err := kl.getNodeAnyWay()
 | |
| 	if err != nil {
 | |
| 		// Fail kubelet and rely on the babysitter to retry starting kubelet.
 | |
| 		klog.ErrorS(err, "Kubelet failed to get node info")
 | |
| 		os.Exit(1)
 | |
| 	}
 | |
| 	// containerManager must start after cAdvisor because it needs filesystem capacity information
 | |
| 	if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
 | |
| 		// Fail kubelet and rely on the babysitter to retry starting kubelet.
 | |
| 		klog.ErrorS(err, "Failed to start ContainerManager")
 | |
| 		os.Exit(1)
 | |
| 	}
 | |
| 	// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
 | |
| 	kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
 | |
| 
 | |
| 	// container log manager must start after container runtime is up to retrieve information from container runtime
 | |
| 	// and inform container to reopen log file after log rotation.
 | |
| 	kl.containerLogManager.Start()
 | |
| 	// Adding Registration Callback function for CSI Driver
 | |
| 	kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
 | |
| 	// Adding Registration Callback function for Device Manager
 | |
| 	kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
 | |
| 	// Start the plugin manager
 | |
| 	klog.V(4).InfoS("Starting plugin manager")
 | |
| 	go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
 | |
| 
 | |
| 	err = kl.shutdownManager.Start()
 | |
| 	if err != nil {
 | |
| 		// The shutdown manager is not critical for kubelet, so log failure, but don't block Kubelet startup if there was a failure starting it.
 | |
| 		klog.ErrorS(err, "Failed to start node shutdown manager")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Run starts the kubelet reacting to config updates
 | |
| func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
 | |
| 	if kl.logServer == nil {
 | |
| 		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
 | |
| 	}
 | |
| 	if kl.kubeClient == nil {
 | |
| 		klog.InfoS("No API server defined - no node status update will be sent")
 | |
| 	}
 | |
| 
 | |
| 	// Start the cloud provider sync manager
 | |
| 	if kl.cloudResourceSyncManager != nil {
 | |
| 		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
 | |
| 	}
 | |
| 
 | |
| 	if err := kl.initializeModules(); err != nil {
 | |
| 		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
 | |
| 		klog.ErrorS(err, "Failed to initialize internal modules")
 | |
| 		os.Exit(1)
 | |
| 	}
 | |
| 
 | |
| 	// Start volume manager
 | |
| 	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
 | |
| 
 | |
| 	if kl.kubeClient != nil {
 | |
| 		// Introduce some small jittering to ensure that over time the requests won't start
 | |
| 		// accumulating at approximately the same time from the set of nodes due to priority and
 | |
| 		// fairness effect.
 | |
| 		go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
 | |
| 		go kl.fastStatusUpdateOnce()
 | |
| 
 | |
| 		// start syncing lease
 | |
| 		go kl.nodeLeaseController.Run(wait.NeverStop)
 | |
| 	}
 | |
| 	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
 | |
| 
 | |
| 	// Set up iptables util rules
 | |
| 	if kl.makeIPTablesUtilChains {
 | |
| 		kl.initNetworkUtil()
 | |
| 	}
 | |
| 
 | |
| 	// Start component sync loops.
 | |
| 	kl.statusManager.Start()
 | |
| 
 | |
| 	// Start syncing RuntimeClasses if enabled.
 | |
| 	if kl.runtimeClassManager != nil {
 | |
| 		kl.runtimeClassManager.Start(wait.NeverStop)
 | |
| 	}
 | |
| 
 | |
| 	// Start the pod lifecycle event generator.
 | |
| 	kl.pleg.Start()
 | |
| 	kl.syncLoop(updates, kl)
 | |
| }
 | |
| 
 | |
| // syncPod is the transaction script for the sync of a single pod (setting up)
 | |
| // a pod. This method is reentrant and expected to converge a pod towards the
 | |
| // desired state of the spec. The reverse (teardown) is handled in
 | |
| // syncTerminatingPod and syncTerminatedPod. If syncPod exits without error,
 | |
| // then the pod runtime state is in sync with the desired configuration state
 | |
| // (pod is running). If syncPod exits with a transient error, the next
 | |
| // invocation of syncPod is expected to make progress towards reaching the
 | |
| // runtime state. syncPod exits with isTerminal when the pod was detected to
 | |
| // have reached a terminal lifecycle phase due to container exits (for
 | |
| // RestartNever or RestartOnFailure) and the next method invoked will by
 | |
| // syncTerminatingPod.
 | |
| //
 | |
| // Arguments:
 | |
| //
 | |
| // updateType - whether this is a create (first time) or an update, should
 | |
| //   only be used for metrics since this method must be reentrant
 | |
| // pod - the pod that is being set up
 | |
| // mirrorPod - the mirror pod known to the kubelet for this pod, if any
 | |
| // podStatus - the most recent pod status observed for this pod which can
 | |
| //   be used to determine the set of actions that should be taken during
 | |
| //   this loop of syncPod
 | |
| //
 | |
| // The workflow is:
 | |
| // * If the pod is being created, record pod worker start latency
 | |
| // * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
 | |
| // * If the pod is being seen as running for the first time, record pod
 | |
| //   start latency
 | |
| // * Update the status of the pod in the status manager
 | |
| // * Stop the pod's containers if it should not be running due to soft
 | |
| //   admission
 | |
| // * Ensure any background tracking for a runnable pod is started
 | |
| // * Create a mirror pod if the pod is a static pod, and does not
 | |
| //   already have a mirror pod
 | |
| // * Create the data directories for the pod if they do not exist
 | |
| // * Wait for volumes to attach/mount
 | |
| // * Fetch the pull secrets for the pod
 | |
| // * Call the container runtime's SyncPod callback
 | |
| // * Update the traffic shaping for the pod's ingress and egress limits
 | |
| //
 | |
| // If any step of this workflow errors, the error is returned, and is repeated
 | |
| // on the next syncPod call.
 | |
| //
 | |
| // This operation writes all events that are dispatched in order to provide
 | |
| // the most accurate information possible about an error situation to aid debugging.
 | |
| // Callers should not write an event if this operation returns an error.
 | |
| func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
 | |
| 	klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 	defer func() {
 | |
| 		klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
 | |
| 	}()
 | |
| 
 | |
| 	// Latency measurements for the main workflow are relative to the
 | |
| 	// first time the pod was seen by kubelet.
 | |
| 	var firstSeenTime time.Time
 | |
| 	if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
 | |
| 		firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
 | |
| 	}
 | |
| 
 | |
| 	// Record pod worker start latency if being created
 | |
| 	// TODO: make pod workers record their own latencies
 | |
| 	if updateType == kubetypes.SyncPodCreate {
 | |
| 		if !firstSeenTime.IsZero() {
 | |
| 			// This is the first time we are syncing the pod. Record the latency
 | |
| 			// since kubelet first saw the pod if firstSeenTime is set.
 | |
| 			metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
 | |
| 		} else {
 | |
| 			klog.V(3).InfoS("First seen time not recorded for pod",
 | |
| 				"podUID", pod.UID,
 | |
| 				"pod", klog.KObj(pod))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Generate final API pod status with pod and status manager status
 | |
| 	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
 | |
| 	// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
 | |
| 	// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
 | |
| 	// set pod IP to hostIP directly in runtime.GetPodStatus
 | |
| 	podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
 | |
| 	for _, ipInfo := range apiPodStatus.PodIPs {
 | |
| 		podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
 | |
| 	}
 | |
| 	if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
 | |
| 		podStatus.IPs = []string{apiPodStatus.PodIP}
 | |
| 	}
 | |
| 
 | |
| 	// If the pod is terminal, we don't need to continue to setup the pod
 | |
| 	if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
 | |
| 		kl.statusManager.SetPodStatus(pod, apiPodStatus)
 | |
| 		isTerminal = true
 | |
| 		return isTerminal, nil
 | |
| 	}
 | |
| 
 | |
| 	// If the pod should not be running, we request the pod's containers be stopped. This is not the same
 | |
| 	// as termination (we want to stop the pod, but potentially restart it later if soft admission allows
 | |
| 	// it later). Set the status and phase appropriately
 | |
| 	runnable := kl.canRunPod(pod)
 | |
| 	if !runnable.Admit {
 | |
| 		// Pod is not runnable; and update the Pod and Container statuses to why.
 | |
| 		if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded {
 | |
| 			apiPodStatus.Phase = v1.PodPending
 | |
| 		}
 | |
| 		apiPodStatus.Reason = runnable.Reason
 | |
| 		apiPodStatus.Message = runnable.Message
 | |
| 		// Waiting containers are not creating.
 | |
| 		const waitingReason = "Blocked"
 | |
| 		for _, cs := range apiPodStatus.InitContainerStatuses {
 | |
| 			if cs.State.Waiting != nil {
 | |
| 				cs.State.Waiting.Reason = waitingReason
 | |
| 			}
 | |
| 		}
 | |
| 		for _, cs := range apiPodStatus.ContainerStatuses {
 | |
| 			if cs.State.Waiting != nil {
 | |
| 				cs.State.Waiting.Reason = waitingReason
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Record the time it takes for the pod to become running
 | |
| 	// since kubelet first saw the pod if firstSeenTime is set.
 | |
| 	existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
 | |
| 	if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
 | |
| 		!firstSeenTime.IsZero() {
 | |
| 		metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
 | |
| 	}
 | |
| 
 | |
| 	kl.statusManager.SetPodStatus(pod, apiPodStatus)
 | |
| 
 | |
| 	// Pods that are not runnable must be stopped - return a typed error to the pod worker
 | |
| 	if !runnable.Admit {
 | |
| 		klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
 | |
| 		var syncErr error
 | |
| 		p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
 | |
| 		if err := kl.killPod(pod, p, nil); err != nil {
 | |
| 			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
 | |
| 			syncErr = fmt.Errorf("error killing pod: %v", err)
 | |
| 			utilruntime.HandleError(syncErr)
 | |
| 		} else {
 | |
| 			// There was no error killing the pod, but the pod cannot be run.
 | |
| 			// Return an error to signal that the sync loop should back off.
 | |
| 			syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
 | |
| 		}
 | |
| 		return false, syncErr
 | |
| 	}
 | |
| 
 | |
| 	// If the network plugin is not ready, only start the pod if it uses the host network
 | |
| 	if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
 | |
| 		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
 | |
| 		return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
 | |
| 	}
 | |
| 
 | |
| 	// ensure the kubelet knows about referenced secrets or configmaps used by the pod
 | |
| 	if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
 | |
| 		if kl.secretManager != nil {
 | |
| 			kl.secretManager.RegisterPod(pod)
 | |
| 		}
 | |
| 		if kl.configMapManager != nil {
 | |
| 			kl.configMapManager.RegisterPod(pod)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Create Cgroups for the pod and apply resource parameters
 | |
| 	// to them if cgroups-per-qos flag is enabled.
 | |
| 	pcm := kl.containerManager.NewPodContainerManager()
 | |
| 	// If pod has already been terminated then we need not create
 | |
| 	// or update the pod's cgroup
 | |
| 	// TODO: once context cancellation is added this check can be removed
 | |
| 	if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
 | |
| 		// When the kubelet is restarted with the cgroups-per-qos
 | |
| 		// flag enabled, all the pod's running containers
 | |
| 		// should be killed intermittently and brought back up
 | |
| 		// under the qos cgroup hierarchy.
 | |
| 		// Check if this is the pod's first sync
 | |
| 		firstSync := true
 | |
| 		for _, containerStatus := range apiPodStatus.ContainerStatuses {
 | |
| 			if containerStatus.State.Running != nil {
 | |
| 				firstSync = false
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 		// Don't kill containers in pod if pod's cgroups already
 | |
| 		// exists or the pod is running for the first time
 | |
| 		podKilled := false
 | |
| 		if !pcm.Exists(pod) && !firstSync {
 | |
| 			p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
 | |
| 			if err := kl.killPod(pod, p, nil); err == nil {
 | |
| 				podKilled = true
 | |
| 			} else {
 | |
| 				klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
 | |
| 			}
 | |
| 		}
 | |
| 		// Create and Update pod's Cgroups
 | |
| 		// Don't create cgroups for run once pod if it was killed above
 | |
| 		// The current policy is not to restart the run once pods when
 | |
| 		// the kubelet is restarted with the new flag as run once pods are
 | |
| 		// expected to run only once and if the kubelet is restarted then
 | |
| 		// they are not expected to run again.
 | |
| 		// We don't create and apply updates to cgroup if its a run once pod and was killed above
 | |
| 		if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
 | |
| 			if !pcm.Exists(pod) {
 | |
| 				if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
 | |
| 					klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
 | |
| 				}
 | |
| 				if err := pcm.EnsureExists(pod); err != nil {
 | |
| 					kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
 | |
| 					return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Create Mirror Pod for Static Pod if it doesn't already exist
 | |
| 	if kubetypes.IsStaticPod(pod) {
 | |
| 		deleted := false
 | |
| 		if mirrorPod != nil {
 | |
| 			if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
 | |
| 				// The mirror pod is semantically different from the static pod. Remove
 | |
| 				// it. The mirror pod will get recreated later.
 | |
| 				klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
 | |
| 				podFullName := kubecontainer.GetPodFullName(pod)
 | |
| 				var err error
 | |
| 				deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
 | |
| 				if deleted {
 | |
| 					klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
 | |
| 				} else if err != nil {
 | |
| 					klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 		if mirrorPod == nil || deleted {
 | |
| 			node, err := kl.GetNode()
 | |
| 			if err != nil || node.DeletionTimestamp != nil {
 | |
| 				klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
 | |
| 			} else {
 | |
| 				klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
 | |
| 				if err := kl.podManager.CreateMirrorPod(pod); err != nil {
 | |
| 					klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Make data directories for the pod
 | |
| 	if err := kl.makePodDataDirs(pod); err != nil {
 | |
| 		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
 | |
| 		klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
 | |
| 		return false, err
 | |
| 	}
 | |
| 
 | |
| 	// Volume manager will not mount volumes for terminating pods
 | |
| 	// TODO: once context cancellation is added this check can be removed
 | |
| 	if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
 | |
| 		// Wait for volumes to attach/mount
 | |
| 		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
 | |
| 			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
 | |
| 			klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
 | |
| 			return false, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Fetch the pull secrets for the pod
 | |
| 	pullSecrets := kl.getPullSecretsForPod(pod)
 | |
| 
 | |
| 	// Ensure the pod is being probed
 | |
| 	kl.probeManager.AddPod(pod)
 | |
| 
 | |
| 	// Call the container runtime's SyncPod callback
 | |
| 	result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
 | |
| 	kl.reasonCache.Update(pod.UID, result)
 | |
| 	if err := result.Error(); err != nil {
 | |
| 		// Do not return error if the only failures were pods in backoff
 | |
| 		for _, r := range result.SyncResults {
 | |
| 			if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
 | |
| 				// Do not record an event here, as we keep all event logging for sync pod failures
 | |
| 				// local to container runtime, so we get better errors.
 | |
| 				return false, err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		return false, nil
 | |
| 	}
 | |
| 
 | |
| 	return false, nil
 | |
| }
 | |
| 
 | |
| // syncTerminatingPod is expected to terminate all running containers in a pod. Once this method
 | |
| // returns without error, the pod's local state can be safely cleaned up. If runningPod is passed,
 | |
| // we perform no status updates.
 | |
| func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
 | |
| 	klog.V(4).InfoS("syncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 	defer klog.V(4).InfoS("syncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 
 | |
| 	// when we receive a runtime only pod (runningPod != nil) we don't need to update the status
 | |
| 	// manager or refresh the status of the cache, because a successful killPod will ensure we do
 | |
| 	// not get invoked again
 | |
| 	if runningPod != nil {
 | |
| 		// we kill the pod with the specified grace period since this is a termination
 | |
| 		if gracePeriod != nil {
 | |
| 			klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
 | |
| 		} else {
 | |
| 			klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
 | |
| 		}
 | |
| 		if err := kl.killPod(pod, *runningPod, gracePeriod); err != nil {
 | |
| 			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
 | |
| 			// there was an error killing the pod, so we return that error directly
 | |
| 			utilruntime.HandleError(err)
 | |
| 			return err
 | |
| 		}
 | |
| 		klog.V(4).InfoS("Pod termination stopped all running orphan containers", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
 | |
| 	if podStatusFn != nil {
 | |
| 		podStatusFn(&apiPodStatus)
 | |
| 	}
 | |
| 	kl.statusManager.SetPodStatus(pod, apiPodStatus)
 | |
| 
 | |
| 	if gracePeriod != nil {
 | |
| 		klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
 | |
| 	} else {
 | |
| 		klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
 | |
| 	}
 | |
| 
 | |
| 	kl.probeManager.StopLivenessAndStartup(pod)
 | |
| 
 | |
| 	p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
 | |
| 	if err := kl.killPod(pod, p, gracePeriod); err != nil {
 | |
| 		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
 | |
| 		// there was an error killing the pod, so we return that error directly
 | |
| 		utilruntime.HandleError(err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Once the containers are stopped, we can stop probing for liveness and readiness.
 | |
| 	// TODO: once a pod is terminal, certain probes (liveness exec) could be stopped immediately after
 | |
| 	//   the detection of a container shutdown or (for readiness) after the first failure. Tracked as
 | |
| 	//   https://github.com/kubernetes/kubernetes/issues/107894 although may not be worth optimizing.
 | |
| 	kl.probeManager.RemovePod(pod)
 | |
| 
 | |
| 	// Guard against consistency issues in KillPod implementations by checking that there are no
 | |
| 	// running containers. This method is invoked infrequently so this is effectively free and can
 | |
| 	// catch race conditions introduced by callers updating pod status out of order.
 | |
| 	// TODO: have KillPod return the terminal status of stopped containers and write that into the
 | |
| 	//  cache immediately
 | |
| 	podStatus, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
 | |
| 	if err != nil {
 | |
| 		klog.ErrorS(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 		return err
 | |
| 	}
 | |
| 	var runningContainers []string
 | |
| 	type container struct {
 | |
| 		Name       string
 | |
| 		State      string
 | |
| 		ExitCode   int
 | |
| 		FinishedAt string
 | |
| 	}
 | |
| 	var containers []container
 | |
| 	klogV := klog.V(4)
 | |
| 	klogVEnabled := klogV.Enabled()
 | |
| 	for _, s := range podStatus.ContainerStatuses {
 | |
| 		if s.State == kubecontainer.ContainerStateRunning {
 | |
| 			runningContainers = append(runningContainers, s.ID.String())
 | |
| 		}
 | |
| 		if klogVEnabled {
 | |
| 			containers = append(containers, container{Name: s.Name, State: string(s.State), ExitCode: s.ExitCode, FinishedAt: s.FinishedAt.UTC().Format(time.RFC3339Nano)})
 | |
| 		}
 | |
| 	}
 | |
| 	if klogVEnabled {
 | |
| 		sort.Slice(containers, func(i, j int) bool { return containers[i].Name < containers[j].Name })
 | |
| 		klog.V(4).InfoS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", containers)
 | |
| 	}
 | |
| 	if len(runningContainers) > 0 {
 | |
| 		return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers)
 | |
| 	}
 | |
| 
 | |
| 	// we have successfully stopped all containers, the pod is terminating, our status is "done"
 | |
| 	klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // syncTerminatedPod cleans up a pod that has terminated (has no running containers).
 | |
| // The invocations in this call are expected to tear down what PodResourcesAreReclaimed checks (which
 | |
| // gates pod deletion). When this method exits the pod is expected to be ready for cleanup.
 | |
| // TODO: make this method take a context and exit early
 | |
| func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
 | |
| 	klog.V(4).InfoS("syncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 	defer klog.V(4).InfoS("syncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 
 | |
| 	// generate the final status of the pod
 | |
| 	// TODO: should we simply fold this into TerminatePod? that would give a single pod update
 | |
| 	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
 | |
| 	kl.statusManager.SetPodStatus(pod, apiPodStatus)
 | |
| 
 | |
| 	// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied
 | |
| 	// before syncTerminatedPod is invoked)
 | |
| 	if err := kl.volumeManager.WaitForUnmount(pod); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 
 | |
| 	// After volume unmount is complete, let the secret and configmap managers know we're done with this pod
 | |
| 	if kl.secretManager != nil {
 | |
| 		kl.secretManager.UnregisterPod(pod)
 | |
| 	}
 | |
| 	if kl.configMapManager != nil {
 | |
| 		kl.configMapManager.UnregisterPod(pod)
 | |
| 	}
 | |
| 
 | |
| 	// Note: we leave pod containers to be reclaimed in the background since dockershim requires the
 | |
| 	// container for retrieving logs and we want to make sure logs are available until the pod is
 | |
| 	// physically deleted.
 | |
| 
 | |
| 	// remove any cgroups in the hierarchy for pods that are no longer running.
 | |
| 	if kl.cgroupsPerQOS {
 | |
| 		pcm := kl.containerManager.NewPodContainerManager()
 | |
| 		name, _ := pcm.GetPodContainerName(pod)
 | |
| 		if err := pcm.Destroy(name); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 	}
 | |
| 
 | |
| 	// mark the final pod status
 | |
| 	kl.statusManager.TerminatePod(pod)
 | |
| 	klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
 | |
| //   * pod whose work is ready.
 | |
| //   * internal modules that request sync of a pod.
 | |
| func (kl *Kubelet) getPodsToSync() []*v1.Pod {
 | |
| 	allPods := kl.podManager.GetPods()
 | |
| 	podUIDs := kl.workQueue.GetWork()
 | |
| 	podUIDSet := sets.NewString()
 | |
| 	for _, podUID := range podUIDs {
 | |
| 		podUIDSet.Insert(string(podUID))
 | |
| 	}
 | |
| 	var podsToSync []*v1.Pod
 | |
| 	for _, pod := range allPods {
 | |
| 		if podUIDSet.Has(string(pod.UID)) {
 | |
| 			// The work of the pod is ready
 | |
| 			podsToSync = append(podsToSync, pod)
 | |
| 			continue
 | |
| 		}
 | |
| 		for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
 | |
| 			if podSyncLoopHandler.ShouldSync(pod) {
 | |
| 				podsToSync = append(podsToSync, pod)
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return podsToSync
 | |
| }
 | |
| 
 | |
| // deletePod deletes the pod from the internal state of the kubelet by:
 | |
| // 1.  stopping the associated pod worker asynchronously
 | |
| // 2.  signaling to kill the pod by sending on the podKillingCh channel
 | |
| //
 | |
| // deletePod returns an error if not all sources are ready or the pod is not
 | |
| // found in the runtime cache.
 | |
| func (kl *Kubelet) deletePod(pod *v1.Pod) error {
 | |
| 	if pod == nil {
 | |
| 		return fmt.Errorf("deletePod does not allow nil pod")
 | |
| 	}
 | |
| 	if !kl.sourcesReady.AllReady() {
 | |
| 		// If the sources aren't ready, skip deletion, as we may accidentally delete pods
 | |
| 		// for sources that haven't reported yet.
 | |
| 		return fmt.Errorf("skipping delete because sources aren't ready yet")
 | |
| 	}
 | |
| 	klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
 | |
| 	kl.podWorkers.UpdatePod(UpdatePodOptions{
 | |
| 		Pod:        pod,
 | |
| 		UpdateType: kubetypes.SyncPodKill,
 | |
| 	})
 | |
| 	// We leave the volume/directory cleanup to the periodic cleanup routine.
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // rejectPod records an event about the pod with the given reason and message,
 | |
| // and updates the pod to the failed phase in the status manage.
 | |
| func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
 | |
| 	kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
 | |
| 	kl.statusManager.SetPodStatus(pod, v1.PodStatus{
 | |
| 		Phase:   v1.PodFailed,
 | |
| 		Reason:  reason,
 | |
| 		Message: "Pod " + message})
 | |
| }
 | |
| 
 | |
| // canAdmitPod determines if a pod can be admitted, and gives a reason if it
 | |
| // cannot. "pod" is new pod, while "pods" are all admitted pods
 | |
| // The function returns a boolean value indicating whether the pod
 | |
| // can be admitted, a brief single-word reason and a message explaining why
 | |
| // the pod cannot be admitted.
 | |
| func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
 | |
| 	// the kubelet will invoke each pod admit handler in sequence
 | |
| 	// if any handler rejects, the pod is rejected.
 | |
| 	// TODO: move out of disk check into a pod admitter
 | |
| 	// TODO: out of resource eviction should have a pod admitter call-out
 | |
| 	attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
 | |
| 	for _, podAdmitHandler := range kl.admitHandlers {
 | |
| 		if result := podAdmitHandler.Admit(attrs); !result.Admit {
 | |
| 			return false, result.Reason, result.Message
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return true, "", ""
 | |
| }
 | |
| 
 | |
| func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
 | |
| 	attrs := &lifecycle.PodAdmitAttributes{Pod: pod}
 | |
| 	// Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive.
 | |
| 	attrs.OtherPods = kl.GetActivePods()
 | |
| 
 | |
| 	for _, handler := range kl.softAdmitHandlers {
 | |
| 		if result := handler.Admit(attrs); !result.Admit {
 | |
| 			return result
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return lifecycle.PodAdmitResult{Admit: true}
 | |
| }
 | |
| 
 | |
| // syncLoop is the main loop for processing changes. It watches for changes from
 | |
| // three channels (file, apiserver, and http) and creates a union of them. For
 | |
| // any new change seen, will run a sync against desired state and running state. If
 | |
| // no changes are seen to the configuration, will synchronize the last known desired
 | |
| // state every sync-frequency seconds. Never returns.
 | |
| func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
 | |
| 	klog.InfoS("Starting kubelet main sync loop")
 | |
| 	// The syncTicker wakes up kubelet to checks if there are any pod workers
 | |
| 	// that need to be sync'd. A one-second period is sufficient because the
 | |
| 	// sync interval is defaulted to 10s.
 | |
| 	syncTicker := time.NewTicker(time.Second)
 | |
| 	defer syncTicker.Stop()
 | |
| 	housekeepingTicker := time.NewTicker(housekeepingPeriod)
 | |
| 	defer housekeepingTicker.Stop()
 | |
| 	plegCh := kl.pleg.Watch()
 | |
| 	const (
 | |
| 		base   = 100 * time.Millisecond
 | |
| 		max    = 5 * time.Second
 | |
| 		factor = 2
 | |
| 	)
 | |
| 	duration := base
 | |
| 	// Responsible for checking limits in resolv.conf
 | |
| 	// The limits do not have anything to do with individual pods
 | |
| 	// Since this is called in syncLoop, we don't need to call it anywhere else
 | |
| 	if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
 | |
| 		kl.dnsConfigurer.CheckLimitsForResolvConf()
 | |
| 	}
 | |
| 
 | |
| 	for {
 | |
| 		if err := kl.runtimeState.runtimeErrors(); err != nil {
 | |
| 			klog.ErrorS(err, "Skipping pod synchronization")
 | |
| 			// exponential backoff
 | |
| 			time.Sleep(duration)
 | |
| 			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
 | |
| 			continue
 | |
| 		}
 | |
| 		// reset backoff if we have a success
 | |
| 		duration = base
 | |
| 
 | |
| 		kl.syncLoopMonitor.Store(kl.clock.Now())
 | |
| 		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
 | |
| 			break
 | |
| 		}
 | |
| 		kl.syncLoopMonitor.Store(kl.clock.Now())
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // syncLoopIteration reads from various channels and dispatches pods to the
 | |
| // given handler.
 | |
| //
 | |
| // Arguments:
 | |
| // 1.  configCh:       a channel to read config events from
 | |
| // 2.  handler:        the SyncHandler to dispatch pods to
 | |
| // 3.  syncCh:         a channel to read periodic sync events from
 | |
| // 4.  housekeepingCh: a channel to read housekeeping events from
 | |
| // 5.  plegCh:         a channel to read PLEG updates from
 | |
| //
 | |
| // Events are also read from the kubelet liveness manager's update channel.
 | |
| //
 | |
| // The workflow is to read from one of the channels, handle that event, and
 | |
| // update the timestamp in the sync loop monitor.
 | |
| //
 | |
| // Here is an appropriate place to note that despite the syntactical
 | |
| // similarity to the switch statement, the case statements in a select are
 | |
| // evaluated in a pseudorandom order if there are multiple channels ready to
 | |
| // read from when the select is evaluated.  In other words, case statements
 | |
| // are evaluated in random order, and you can not assume that the case
 | |
| // statements evaluate in order if multiple channels have events.
 | |
| //
 | |
| // With that in mind, in truly no particular order, the different channels
 | |
| // are handled as follows:
 | |
| //
 | |
| // * configCh: dispatch the pods for the config change to the appropriate
 | |
| //             handler callback for the event type
 | |
| // * plegCh: update the runtime cache; sync pod
 | |
| // * syncCh: sync all pods waiting for sync
 | |
| // * housekeepingCh: trigger cleanup of pods
 | |
| // * health manager: sync pods that have failed or in which one or more
 | |
| //                     containers have failed health checks
 | |
| func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
 | |
| 	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
 | |
| 	select {
 | |
| 	case u, open := <-configCh:
 | |
| 		// Update from a config source; dispatch it to the right handler
 | |
| 		// callback.
 | |
| 		if !open {
 | |
| 			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
 | |
| 			return false
 | |
| 		}
 | |
| 
 | |
| 		switch u.Op {
 | |
| 		case kubetypes.ADD:
 | |
| 			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
 | |
| 			// After restarting, kubelet will get all existing pods through
 | |
| 			// ADD as if they are new pods. These pods will then go through the
 | |
| 			// admission process and *may* be rejected. This can be resolved
 | |
| 			// once we have checkpointing.
 | |
| 			handler.HandlePodAdditions(u.Pods)
 | |
| 		case kubetypes.UPDATE:
 | |
| 			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
 | |
| 			handler.HandlePodUpdates(u.Pods)
 | |
| 		case kubetypes.REMOVE:
 | |
| 			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
 | |
| 			handler.HandlePodRemoves(u.Pods)
 | |
| 		case kubetypes.RECONCILE:
 | |
| 			klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
 | |
| 			handler.HandlePodReconcile(u.Pods)
 | |
| 		case kubetypes.DELETE:
 | |
| 			klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
 | |
| 			// DELETE is treated as a UPDATE because of graceful deletion.
 | |
| 			handler.HandlePodUpdates(u.Pods)
 | |
| 		case kubetypes.SET:
 | |
| 			// TODO: Do we want to support this?
 | |
| 			klog.ErrorS(nil, "Kubelet does not support snapshot update")
 | |
| 		default:
 | |
| 			klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
 | |
| 		}
 | |
| 
 | |
| 		kl.sourcesReady.AddSource(u.Source)
 | |
| 
 | |
| 	case e := <-plegCh:
 | |
| 		if e.Type == pleg.ContainerStarted {
 | |
| 			// record the most recent time we observed a container start for this pod.
 | |
| 			// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
 | |
| 			// to make sure we don't miss handling graceful termination for containers we reported as having started.
 | |
| 			kl.lastContainerStartedTime.Add(e.ID, time.Now())
 | |
| 		}
 | |
| 		if isSyncPodWorthy(e) {
 | |
| 			// PLEG event for a pod; sync it.
 | |
| 			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
 | |
| 				klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
 | |
| 				handler.HandlePodSyncs([]*v1.Pod{pod})
 | |
| 			} else {
 | |
| 				// If the pod no longer exists, ignore the event.
 | |
| 				klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if e.Type == pleg.ContainerDied {
 | |
| 			if containerID, ok := e.Data.(string); ok {
 | |
| 				kl.cleanUpContainersInPod(e.ID, containerID)
 | |
| 			}
 | |
| 		}
 | |
| 	case <-syncCh:
 | |
| 		// Sync pods waiting for sync
 | |
| 		podsToSync := kl.getPodsToSync()
 | |
| 		if len(podsToSync) == 0 {
 | |
| 			break
 | |
| 		}
 | |
| 		klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
 | |
| 		handler.HandlePodSyncs(podsToSync)
 | |
| 	case update := <-kl.livenessManager.Updates():
 | |
| 		if update.Result == proberesults.Failure {
 | |
| 			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
 | |
| 		}
 | |
| 	case update := <-kl.readinessManager.Updates():
 | |
| 		ready := update.Result == proberesults.Success
 | |
| 		kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
 | |
| 
 | |
| 		status := ""
 | |
| 		if ready {
 | |
| 			status = "ready"
 | |
| 		}
 | |
| 		handleProbeSync(kl, update, handler, "readiness", status)
 | |
| 	case update := <-kl.startupManager.Updates():
 | |
| 		started := update.Result == proberesults.Success
 | |
| 		kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
 | |
| 
 | |
| 		status := "unhealthy"
 | |
| 		if started {
 | |
| 			status = "started"
 | |
| 		}
 | |
| 		handleProbeSync(kl, update, handler, "startup", status)
 | |
| 	case <-housekeepingCh:
 | |
| 		if !kl.sourcesReady.AllReady() {
 | |
| 			// If the sources aren't ready or volume manager has not yet synced the states,
 | |
| 			// skip housekeeping, as we may accidentally delete pods from unready sources.
 | |
| 			klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
 | |
| 		} else {
 | |
| 			start := time.Now()
 | |
| 			klog.V(4).InfoS("SyncLoop (housekeeping)")
 | |
| 			if err := handler.HandlePodCleanups(); err != nil {
 | |
| 				klog.ErrorS(err, "Failed cleaning pods")
 | |
| 			}
 | |
| 			duration := time.Since(start)
 | |
| 			if duration > housekeepingWarningDuration {
 | |
| 				klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
 | |
| 			}
 | |
| 			klog.V(4).InfoS("SyncLoop (housekeeping) end")
 | |
| 		}
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) {
 | |
| 	// We should not use the pod from manager, because it is never updated after initialization.
 | |
| 	pod, ok := kl.podManager.GetPodByUID(update.PodUID)
 | |
| 	if !ok {
 | |
| 		// If the pod no longer exists, ignore the update.
 | |
| 		klog.V(4).InfoS("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update)
 | |
| 		return
 | |
| 	}
 | |
| 	klog.V(1).InfoS("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod))
 | |
| 	handler.HandlePodSyncs([]*v1.Pod{pod})
 | |
| }
 | |
| 
 | |
| // dispatchWork starts the asynchronous sync of the pod in a pod worker.
 | |
| // If the pod has completed termination, dispatchWork will perform no action.
 | |
| func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
 | |
| 	// Run the sync in an async worker.
 | |
| 	kl.podWorkers.UpdatePod(UpdatePodOptions{
 | |
| 		Pod:        pod,
 | |
| 		MirrorPod:  mirrorPod,
 | |
| 		UpdateType: syncType,
 | |
| 		StartTime:  start,
 | |
| 	})
 | |
| 	// Note the number of containers for new pods.
 | |
| 	if syncType == kubetypes.SyncPodCreate {
 | |
| 		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TODO: handle mirror pods in a separate component (issue #17251)
 | |
| func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) {
 | |
| 	// Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
 | |
| 	// corresponding static pod. Send update to the pod worker if the static
 | |
| 	// pod exists.
 | |
| 	if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
 | |
| 		kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HandlePodAdditions is the callback in SyncHandler for pods being added from
 | |
| // a config source.
 | |
| func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
 | |
| 	start := kl.clock.Now()
 | |
| 	sort.Sort(sliceutils.PodsByCreationTime(pods))
 | |
| 	for _, pod := range pods {
 | |
| 		existingPods := kl.podManager.GetPods()
 | |
| 		// Always add the pod to the pod manager. Kubelet relies on the pod
 | |
| 		// manager as the source of truth for the desired state. If a pod does
 | |
| 		// not exist in the pod manager, it means that it has been deleted in
 | |
| 		// the apiserver and no action (other than cleanup) is required.
 | |
| 		kl.podManager.AddPod(pod)
 | |
| 
 | |
| 		if kubetypes.IsMirrorPod(pod) {
 | |
| 			kl.handleMirrorPod(pod, start)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// Only go through the admission process if the pod is not requested
 | |
| 		// for termination by another part of the kubelet. If the pod is already
 | |
| 		// using resources (previously admitted), the pod worker is going to be
 | |
| 		// shutting it down. If the pod hasn't started yet, we know that when
 | |
| 		// the pod worker is invoked it will also avoid setting up the pod, so
 | |
| 		// we simply avoid doing any work.
 | |
| 		if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
 | |
| 			// We failed pods that we rejected, so activePods include all admitted
 | |
| 			// pods that are alive.
 | |
| 			activePods := kl.filterOutInactivePods(existingPods)
 | |
| 
 | |
| 			// Check if we can admit the pod; if not, reject it.
 | |
| 			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
 | |
| 				kl.rejectPod(pod, reason, message)
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
 | |
| 		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HandlePodUpdates is the callback in the SyncHandler interface for pods
 | |
| // being updated from a config source.
 | |
| func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
 | |
| 	start := kl.clock.Now()
 | |
| 	for _, pod := range pods {
 | |
| 		kl.podManager.UpdatePod(pod)
 | |
| 		if kubetypes.IsMirrorPod(pod) {
 | |
| 			kl.handleMirrorPod(pod, start)
 | |
| 			continue
 | |
| 		}
 | |
| 		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
 | |
| 		kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HandlePodRemoves is the callback in the SyncHandler interface for pods
 | |
| // being removed from a config source.
 | |
| func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
 | |
| 	start := kl.clock.Now()
 | |
| 	for _, pod := range pods {
 | |
| 		kl.podManager.DeletePod(pod)
 | |
| 		if kubetypes.IsMirrorPod(pod) {
 | |
| 			kl.handleMirrorPod(pod, start)
 | |
| 			continue
 | |
| 		}
 | |
| 		// Deletion is allowed to fail because the periodic cleanup routine
 | |
| 		// will trigger deletion again.
 | |
| 		if err := kl.deletePod(pod); err != nil {
 | |
| 			klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HandlePodReconcile is the callback in the SyncHandler interface for pods
 | |
| // that should be reconciled.
 | |
| func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
 | |
| 	start := kl.clock.Now()
 | |
| 	for _, pod := range pods {
 | |
| 		// Update the pod in pod manager, status manager will do periodically reconcile according
 | |
| 		// to the pod manager.
 | |
| 		kl.podManager.UpdatePod(pod)
 | |
| 
 | |
| 		// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
 | |
| 		if status.NeedToReconcilePodReadiness(pod) {
 | |
| 			mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
 | |
| 			kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
 | |
| 		}
 | |
| 
 | |
| 		// After an evicted pod is synced, all dead containers in the pod can be removed.
 | |
| 		if eviction.PodIsEvicted(pod.Status) {
 | |
| 			if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
 | |
| 				kl.containerDeletor.deleteContainersInPod("", podStatus, true)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HandlePodSyncs is the callback in the syncHandler interface for pods
 | |
| // that should be dispatched to pod workers for sync.
 | |
| func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
 | |
| 	start := kl.clock.Now()
 | |
| 	for _, pod := range pods {
 | |
| 		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
 | |
| 		kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // LatestLoopEntryTime returns the last time in the sync loop monitor.
 | |
| func (kl *Kubelet) LatestLoopEntryTime() time.Time {
 | |
| 	val := kl.syncLoopMonitor.Load()
 | |
| 	if val == nil {
 | |
| 		return time.Time{}
 | |
| 	}
 | |
| 	return val.(time.Time)
 | |
| }
 | |
| 
 | |
| // updateRuntimeUp calls the container runtime status callback, initializing
 | |
| // the runtime dependent modules when the container runtime first comes up,
 | |
| // and returns an error if the status check fails.  If the status check is OK,
 | |
| // update the container runtime uptime in the kubelet runtimeState.
 | |
| func (kl *Kubelet) updateRuntimeUp() {
 | |
| 	kl.updateRuntimeMux.Lock()
 | |
| 	defer kl.updateRuntimeMux.Unlock()
 | |
| 
 | |
| 	s, err := kl.containerRuntime.Status()
 | |
| 	if err != nil {
 | |
| 		klog.ErrorS(err, "Container runtime sanity check failed")
 | |
| 		return
 | |
| 	}
 | |
| 	if s == nil {
 | |
| 		klog.ErrorS(nil, "Container runtime status is nil")
 | |
| 		return
 | |
| 	}
 | |
| 	// Periodically log the whole runtime status for debugging.
 | |
| 	klog.V(4).InfoS("Container runtime status", "status", s)
 | |
| 	networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
 | |
| 	if networkReady == nil || !networkReady.Status {
 | |
| 		klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
 | |
| 		kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
 | |
| 	} else {
 | |
| 		// Set nil if the container runtime network is ready.
 | |
| 		kl.runtimeState.setNetworkState(nil)
 | |
| 	}
 | |
| 	// information in RuntimeReady condition will be propagated to NodeReady condition.
 | |
| 	runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
 | |
| 	// If RuntimeReady is not set or is false, report an error.
 | |
| 	if runtimeReady == nil || !runtimeReady.Status {
 | |
| 		klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
 | |
| 		kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))
 | |
| 		return
 | |
| 	}
 | |
| 	kl.runtimeState.setRuntimeState(nil)
 | |
| 	kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
 | |
| 	kl.runtimeState.setRuntimeSync(kl.clock.Now())
 | |
| }
 | |
| 
 | |
| // GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
 | |
| func (kl *Kubelet) GetConfiguration() kubeletconfiginternal.KubeletConfiguration {
 | |
| 	return kl.kubeletConfiguration
 | |
| }
 | |
| 
 | |
| // BirthCry sends an event that the kubelet has started up.
 | |
| func (kl *Kubelet) BirthCry() {
 | |
| 	// Make an event that kubelet restarted.
 | |
| 	kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
 | |
| }
 | |
| 
 | |
| // ResyncInterval returns the interval used for periodic syncs.
 | |
| func (kl *Kubelet) ResyncInterval() time.Duration {
 | |
| 	return kl.resyncInterval
 | |
| }
 | |
| 
 | |
| // ListenAndServe runs the kubelet HTTP server.
 | |
| func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
 | |
| 	auth server.AuthInterface) {
 | |
| 	server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth)
 | |
| }
 | |
| 
 | |
| // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
 | |
| func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
 | |
| 	server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port)
 | |
| }
 | |
| 
 | |
| // ListenAndServePodResources runs the kubelet podresources grpc service
 | |
| func (kl *Kubelet) ListenAndServePodResources() {
 | |
| 	socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
 | |
| 	if err != nil {
 | |
| 		klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err)
 | |
| 		return
 | |
| 	}
 | |
| 	server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager, kl.containerManager)
 | |
| }
 | |
| 
 | |
| // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
 | |
| func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
 | |
| 	if podStatus, err := kl.podCache.Get(podID); err == nil {
 | |
| 		// When an evicted or deleted pod has already synced, all containers can be removed.
 | |
| 		removeAll := kl.podWorkers.ShouldPodContentBeRemoved(podID)
 | |
| 		kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR
 | |
| // is applied  and tries to update pod CIDR immediately. After pod CIDR is updated it fires off
 | |
| // a runtime update and a node status update. Function returns after one successful node status update.
 | |
| // Function is executed only during Kubelet start which improves latency to ready node by updating
 | |
| // pod CIDR, runtime status and node statuses ASAP.
 | |
| func (kl *Kubelet) fastStatusUpdateOnce() {
 | |
| 	for {
 | |
| 		time.Sleep(100 * time.Millisecond)
 | |
| 		node, err := kl.GetNode()
 | |
| 		if err != nil {
 | |
| 			klog.ErrorS(err, "Error getting node")
 | |
| 			continue
 | |
| 		}
 | |
| 		if len(node.Spec.PodCIDRs) != 0 {
 | |
| 			podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
 | |
| 			if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
 | |
| 				klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs)
 | |
| 				continue
 | |
| 			}
 | |
| 			kl.updateRuntimeUp()
 | |
| 			kl.syncNodeStatus()
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // isSyncPodWorthy filters out events that are not worthy of pod syncing
 | |
| func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
 | |
| 	// ContainerRemoved doesn't affect pod state
 | |
| 	return event.Type != pleg.ContainerRemoved
 | |
| }
 |